diff --git a/apps/resm/d_oaurl.py b/apps/resm/d_oaurl.py
index 5b37fb3..0967d00 100644
--- a/apps/resm/d_oaurl.py
+++ b/apps/resm/d_oaurl.py
@@ -3,16 +3,6 @@ from typing import Optional
import asyncio
import sys
import os
-import time
-from PIL import Image
-import io
-
-# 尝试导入 OpenCV 用于更好的模板匹配
-try:
- import cv2
- HAS_CV2 = True
-except ImportError:
- HAS_CV2 = False
# 尝试导入 playwright-stealth,如果没有安装则忽略
try:
@@ -20,25 +10,46 @@ try:
except ImportError:
stealth_async = None
+# 隐藏自动化特征的浏览器启动参数
+_STEALTH_ARGS = [
+ "--disable-blink-features=AutomationControlled",
+ "--no-first-run",
+ "--no-default-browser-check",
+ "--disable-infobars",
+ "--disable-extensions",
+ "--disable-notifications",
+]
+
+_USER_AGENT = (
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
+ "AppleWebKit/537.36 (KHTML, like Gecko) "
+ "Chrome/131.0.0.0 Safari/537.36"
+)
+
async def download_from_url_playwright(url: str, save_path: str) -> tuple[bool, str]:
async with async_playwright() as p:
try:
- browser = await p.chromium.launch(headless=False)
- context = await browser.new_context(viewport={"width": 1920, "height": 1080})
+ browser = await p.chromium.launch(
+ headless=False,
+ args=_STEALTH_ARGS,
+ )
+ context = await browser.new_context(
+ viewport={"width": 1920, "height": 1080},
+ user_agent=_USER_AGENT,
+ locale="zh-CN",
+ timezone_id="Asia/Shanghai",
+ )
page = await context.new_page()
page.set_default_timeout(60000)
-
+
# 应用 stealth 模式绕过反爬虫检测
if stealth_async:
await stealth_async(page)
else:
- # 如果没有 stealth,手动设置一些反爬虫对抗
- await page.add_init_script("""
- Object.defineProperty(navigator, 'webdriver', {
- get: () => false,
- });
- """)
+ await page.add_init_script(
+ "Object.defineProperty(navigator,'webdriver',{get:()=>false});"
+ )
pdf_content: Optional[bytes] = None
@@ -46,53 +57,23 @@ async def download_from_url_playwright(url: str, save_path: str) -> tuple[bool,
nonlocal pdf_content
if "application/pdf" in response.headers.get("content-type", ""):
try:
- # 确保完全读取响应体
pdf_content = await response.body()
print(f"✓ 成功捕获 PDF,大小: {len(pdf_content)} bytes")
except Exception as e:
print(f"⚠ 读取 PDF 响应体失败: {e}")
page.on("response", on_response)
-
- # 先用较宽松的等待条件加载页面,避免卡在 Cloudflare
+
+ # 加载页面
try:
- await page.goto(url, wait_until="domcontentloaded", timeout=5000)
- except:
+ await page.goto(url, wait_until="domcontentloaded", timeout=30000)
+ except Exception:
print("⚠ 页面加载超时,但继续处理...")
- await page.wait_for_timeout(5000)
-
- # 处理 Cloudflare 校验
- print("开始处理 Cloudflare 校验...")
- await page.wait_for_timeout(3000)
-
- # Cloudflare 可能需要连续点击多次,最多尝试5次
- max_cloudflare_attempts = 5
-
- for attempt in range(max_cloudflare_attempts):
- # 检查是否已获取到 PDF,如果已获取则无需继续验证
- if pdf_content:
- print("✓ 已获取到 PDF 内容,停止验证框处理")
- break
-
- print(f"\nCloudflare 验证尝试 {attempt + 1}/{max_cloudflare_attempts}")
- success = await handle_cloudflare_with_image(page)
-
- if success:
- print("✓ 成功处理一次验证框")
- # 等待新验证框出现或页面刷新
- await page.wait_for_timeout(2000)
- # 检查是否还有验证框,如果没有则说明验证完成
- # 这里简单地继续尝试,直到达到最大次数
- if attempt < max_cloudflare_attempts - 1:
- print(" 检查是否还有验证框...")
- await page.wait_for_timeout(1000)
- else:
- print("⚠ 未找到验证框,可能已完成验证或验证框已消失")
- break
-
- print("✓ Cloudflare 验证处理完成")
+
+ # 等待 Cloudflare 挑战完成
+ await handle_cloudflare(page)
await page.wait_for_timeout(2000)
-
+
# 如果尚未获取 PDF,继续等待响应
if not pdf_content:
print("等待 PDF 响应...")
@@ -101,19 +82,17 @@ async def download_from_url_playwright(url: str, save_path: str) -> tuple[bool,
lambda response: "application/pdf" in response.headers.get("content-type", ""),
timeout=15000
)
- # 确保响应体完全加载
pdf_content = await response.body()
print(f"✓ 通过 wait_for_response 获取 PDF,大小: {len(pdf_content)} bytes")
except Exception as e:
print(f"⚠ 等待 PDF 响应超时: {e}")
-
+
if pdf_content:
- # 验证文件大小(PDF 通常大于 10KB)
pdf_size = len(pdf_content)
if pdf_size < 10240:
await browser.close()
return False, f"PDF 文件过小: {pdf_size} bytes,可能下载不完整"
-
+
with open(save_path, "wb") as f:
f.write(pdf_content)
print(f"✓ PDF 已保存到: {save_path},大小: {pdf_size} bytes")
@@ -128,84 +107,89 @@ async def download_from_url_playwright(url: str, save_path: str) -> tuple[bool,
finally:
try:
await browser.close()
- except:
+ except Exception:
pass
-async def handle_cloudflare_with_image(page: Page) -> bool:
- """
- 使用图像识别方式处理 Cloudflare 验证框
- 支持模板匹配和颜色识别两种方式
- """
- # 在尝试之前先等待2秒,让验证框完全加载
- await page.wait_for_timeout(2000)
-
- max_retries = 5
-
- for retry in range(max_retries):
- print(f"图像识别方式尝试第 {retry + 1}/{max_retries} 次")
-
- try:
- # 方式1: 通过模板图像识别(如果有模板文件)
- success = await try_template_matching()
- if success:
- print(" ✓ 模板匹配方式成功")
- await page.wait_for_timeout(5000)
- return True
-
- print(f" 等待后重试... ({retry + 1}/{max_retries})")
- await page.wait_for_timeout(2000)
-
- except Exception as e:
- print(f" ✗ 图像处理异常: {e}")
- await page.wait_for_timeout(2000)
-
- return False
+def download_pdf_with_curl_cffi(url: str, save_path: str) -> tuple[bool, str]:
+ """使用 curl-cffi 伪造 Chrome TLS 指纹下载 PDF,可绕过 Cloudflare JS 挑战"""
+ try:
+ import curl_cffi.requests as cf
+ resp = cf.get(
+ url,
+ impersonate="chrome131",
+ headers={
+ "Accept": "application/pdf,application/octet-stream,*/*",
+ "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
+ "Referer": url,
+ },
+ timeout=30,
+ allow_redirects=True,
+ )
+ if resp.status_code != 200:
+ return False, f"http_{resp.status_code}"
+ content = resp.content
+ if not content.startswith(b"%PDF") or len(content) < 10240:
+ return False, "not_valid_pdf"
+ import os
+ os.makedirs(os.path.dirname(os.path.abspath(save_path)), exist_ok=True)
+ with open(save_path, "wb") as f:
+ f.write(content)
+ return True, ""
+ except ImportError:
+ return False, "curl_cffi_not_installed"
+ except Exception as e:
+ return False, str(e)
-async def try_template_matching() -> bool:
+_CHALLENGE_TITLES = ["just a moment", "cloudflare", "checking your browser", "ddos-guard", "ddos guard", "attention required"]
+
+
+async def handle_cloudflare(page: Page, timeout: int = 45000) -> bool:
"""
- 通过模板匹配查找并点击验证框
- 使用 pyautogui.locateOnScreen 直接在屏幕上定位验证框模板
+ 处理 bot 检测安全验证(Cloudflare / DDoS-Guard 等):
+ 1. 通过页面标题检测是否处于挑战页面
+ 2. 尝试点击 Turnstile 复选框(iframe 内,如果有)
+ 3. 等待标题变化,表明挑战已自动通过
"""
- import pyautogui
- template_paths = [
- 'apps/resm/cloudflare_checkbox2.png',
- ]
-
- # pyautogui 定位的准确度阈值(0.0-1.0,越高越严格)
- ACCURACY = 0.4
-
- for template_path in template_paths:
- if not os.path.exists(template_path):
- print(f" 模板文件不存在: {template_path}")
- continue
-
+ try:
+ title = await page.title()
+ except Exception:
+ return False
+
+ title_l = title.lower()
+ if not any(kw in title_l for kw in _CHALLENGE_TITLES):
+ return True # 无挑战
+
+ print(f"检测到安全挑战页面(标题: {title!r}),等待 JS 自动通过...")
+
+ # 尝试点击 Cloudflare Turnstile 复选框(如果存在)
+ for frame_sel in [
+ 'iframe[src*="challenges.cloudflare.com"]',
+ 'iframe[title*="cloudflare"]',
+ 'iframe[title*="challenge"]',
+ ]:
try:
- print(f" 尝试在屏幕上定位模板: {template_path} (confidence={ACCURACY})")
-
- # 直接在屏幕上查找模板,使用 confidence 参数
- loc = pyautogui.locateOnScreen(template_path, confidence=ACCURACY)
-
- if loc:
- # loc 是 (left, top, width, height) 或 (x, y, w, h)
- # pyautogui.center(loc) 返回中心坐标
- center_x, center_y = pyautogui.center(loc)
- print(f" 找到验证框位置: ({center_x}, {center_y})")
- print(f" 模板匹配区域: {loc}")
- pyautogui.click(center_x, center_y, clicks=1, interval=0.1)
- return True
- else:
- print(f" 未找到模板 (confidence={ACCURACY})")
- return False
-
- except Exception as e:
- # 捕获所有异常,包括 ImageNotFoundException
- error_type = type(e).__name__
- if "ImageNotFoundException" in error_type:
- print(f" 模板匹配异常: {error_type} - 屏幕上找不到模板,停止尝试")
- else:
- print(f" 模板匹配异常: {error_type} - {e}")
- return False
-
- return False
+ checkbox = page.frame_locator(frame_sel).locator('input[type="checkbox"]')
+ if await checkbox.count() > 0:
+ await checkbox.click(timeout=3000)
+ print(" ✓ 点击了验证复选框")
+ break
+ except Exception:
+ pass
+
+ try:
+ await page.wait_for_function(
+ """() => {
+ const t = document.title.toLowerCase();
+ const kws = ['just a moment', 'cloudflare', 'checking your browser',
+ 'ddos-guard', 'ddos guard', 'attention required'];
+ return !kws.some(kw => t.includes(kw));
+ }""",
+ timeout=timeout,
+ )
+ print(f" ✓ 安全挑战已通过")
+ return True
+ except Exception as e:
+ print(f"⚠ 安全挑战处理超时,继续尝试获取 PDF...")
+ return False
diff --git a/apps/resm/d_scihub.py b/apps/resm/d_scihub.py
index 853a415..ef1fffa 100644
--- a/apps/resm/d_scihub.py
+++ b/apps/resm/d_scihub.py
@@ -5,6 +5,12 @@ from pathlib import Path
from typing import Optional
from playwright.async_api import async_playwright, Page, Browser
+# 尝试导入 playwright-stealth
+try:
+ from playwright_stealth import stealth_async
+except ImportError:
+ stealth_async = None
+
# 初始化日志
Path("log").mkdir(parents=True, exist_ok=True)
LOG_PATH = Path("log") / "scihub_downloader.log"
@@ -15,6 +21,23 @@ logging.basicConfig(
)
logger = logging.getLogger("scihub")
+# 隐藏自动化特征的浏览器启动参数
+_STEALTH_ARGS = [
+ "--disable-blink-features=AutomationControlled",
+ "--no-first-run",
+ "--no-default-browser-check",
+ "--disable-infobars",
+ "--disable-extensions",
+ "--disable-notifications",
+ "--disable-popup-blocking",
+]
+
+_USER_AGENT = (
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
+ "AppleWebKit/537.36 (KHTML, like Gecko) "
+ "Chrome/131.0.0.0 Safari/537.36"
+)
+
async def _wait_for_user_to_solve_challenge(page: Page):
logger.info("请在浏览器中完成验证(如果需要),完成后按回车继续...")
@@ -40,7 +63,7 @@ async def _try_click_robot_button(page: Page, headless: bool) -> bool:
await page.wait_for_timeout(1500)
# 等待可能的导航/重定向
try:
- await page.wait_for_navigation(timeout=8000)
+ await page.wait_for_load_state("domcontentloaded", timeout=8000)
logger.info("点击验证后检测到导航完成")
except Exception:
await page.wait_for_timeout(500)
@@ -57,7 +80,6 @@ async def _try_click_robot_button(page: Page, headless: bool) -> bool:
async def _click_no_button(page: Page) -> bool:
"""尝试点击 'No' 按钮(可选步骤,如果找不到则直接继续)"""
- # 精确匹配
No
selectors = ["div.answer[onclick=\"check()\"]", "div.answer:has-text('No')", "text=No"]
for sel in selectors:
try:
@@ -73,13 +95,11 @@ async def _click_no_button(page: Page) -> bool:
logger.warning(f"点击 'No' 失败: {click_err}")
pass
await page.wait_for_timeout(1200)
- # 点击 No 后也可能触发重定向
try:
- await page.wait_for_navigation(timeout=8000)
+ await page.wait_for_load_state("domcontentloaded", timeout=8000)
logger.info("点击 No 后检测到导航完成")
except Exception:
pass
- # 保存结果用于排查
try:
await page.screenshot(path="after_click_no.png", full_page=True)
html = await page.content()
@@ -95,90 +115,153 @@ async def _click_no_button(page: Page) -> bool:
return False
+# 各种 bot 检测页面的标题关键词
+_CHALLENGE_TITLES = ["just a moment", "cloudflare", "checking your browser", "ddos-guard", "attention required", "ddos guard"]
+
+
+async def _wait_challenge_clear(page: Page, timeout: int = 45000) -> bool:
+ """等待 bot 检测(Cloudflare/DDoS-Guard 等)挑战页面自动通过"""
+ try:
+ title = await page.title()
+ except Exception:
+ return False
+
+ title_l = title.lower()
+ if not any(kw in title_l for kw in _CHALLENGE_TITLES):
+ return True # 无挑战,直接通过
+
+ logger.info(f"检测到安全挑战页面(标题: {title!r}),等待 JS 自动通过...")
+
+ # 尝试点击 Cloudflare Turnstile 复选框(如果有)
+ for frame_sel in [
+ 'iframe[src*="challenges.cloudflare.com"]',
+ 'iframe[title*="cloudflare"]',
+ 'iframe[title*="challenge"]',
+ ]:
+ try:
+ checkbox = page.frame_locator(frame_sel).locator('input[type="checkbox"]')
+ if await checkbox.count() > 0:
+ await checkbox.click(timeout=3000)
+ logger.info("点击了验证复选框")
+ break
+ except Exception:
+ pass
+
+ try:
+ await page.wait_for_function(
+ """() => {
+ const t = document.title.toLowerCase();
+ const keywords = ['just a moment', 'cloudflare', 'checking your browser',
+ 'ddos-guard', 'ddos guard', 'attention required'];
+ return !keywords.some(kw => t.includes(kw));
+ }""",
+ timeout=timeout,
+ )
+ logger.info(f"挑战已通过,当前标题: {await page.title()!r}")
+ return True
+ except Exception as e:
+ logger.warning(f"等待挑战超时({timeout}ms): {e}")
+ return False
+
+
async def download_pdf_with_playwright(url: str, output: str = "paper.pdf", headless: bool = False) -> Optional[bytes]:
async with async_playwright() as p:
- browser: Browser = await p.chromium.launch(headless=headless)
- context = await browser.new_context(viewport={"width": 1920, "height": 1080})
+ browser: Browser = await p.chromium.launch(
+ headless=headless,
+ args=_STEALTH_ARGS,
+ )
+ context = await browser.new_context(
+ viewport={"width": 1920, "height": 1080},
+ user_agent=_USER_AGENT,
+ locale="zh-CN",
+ timezone_id="Asia/Shanghai",
+ )
page = await context.new_page()
+ # 应用 stealth 模式
+ if stealth_async:
+ await stealth_async(page)
+ else:
+ await page.add_init_script(
+ "Object.defineProperty(navigator,'webdriver',{get:()=>false});"
+ )
+
+ pdf_url: Optional[str] = None
pdf_content: Optional[bytes] = None
async def on_response(response):
- nonlocal pdf_content
- try:
- ct = response.headers.get("content-type", "")
- if "application/pdf" in ct:
- logger.info(f"捕获到 PDF 响应: {response.url}")
- pdf_content = await response.body()
- except Exception:
- logger.exception("处理响应时出错")
+ nonlocal pdf_url
+ ct = response.headers.get("content-type", "")
+ resp_url = response.url
+ # 忽略 chrome-extension 等内部 URL
+ if "application/pdf" in ct and resp_url.startswith("http"):
+ logger.info(f"检测到 PDF 响应 URL: {resp_url}")
+ pdf_url = resp_url
page.on("response", on_response)
try:
logger.info(f"打开: {url}")
- await page.goto(url, wait_until="networkidle")
+ try:
+ await page.goto(url, wait_until="domcontentloaded", timeout=30000)
+ except Exception:
+ logger.info("页面加载超时(可能正在处理安全验证),继续等待...")
+ await page.wait_for_timeout(3000)
+
+ # 等待 bot 检测挑战完成(DDoS-Guard / Cloudflare 等)
+ await _wait_challenge_clear(page)
await page.wait_for_timeout(1000)
# 尝试点击验证 & No
await _try_click_robot_button(page, headless)
await _click_no_button(page)
- # 点击后充分等待以让页面加载和触发PDF响应
- logger.info("等待页面加载和PDF响应...")
+ # 等待页面加载并检测 PDF 响应 URL
+ logger.info("等待 PDF 响应...")
await page.wait_for_timeout(3000)
-
- # 尝试主动等待PDF响应(点击后可能会自动加载或重定向触发PDF请求)
- if not pdf_content:
+
+ if not pdf_url:
try:
- await page.wait_for_response(
- lambda r: "application/pdf" in r.headers.get("content-type", ""),
- timeout=5000,
+ resp = await page.wait_for_response(
+ lambda r: "application/pdf" in r.headers.get("content-type", "")
+ and r.url.startswith("http"),
+ timeout=8000,
)
- logger.info("捕获到主动等待的 PDF 响应")
+ pdf_url = resp.url
+ logger.info(f"主动等待到 PDF URL: {pdf_url}")
except Exception:
- logger.info("主动等待 PDF 响应超时,继续其他方式")
+ logger.info("等待 PDF 响应超时")
- # 尝试通过页面下载按钮
- # download_selectors = ["a[href*='.pdf']", "button:has-text('Download')", "a:has-text('PDF')"]
- # for sel in download_selectors:
- # try:
- # if await page.locator(sel).count() > 0:
- # logger.info(f"尝试点击下载元素: {sel}")
- # async with page.expect_download() as di:
- # await page.click(sel)
- # download = await di.value
- # await download.save_as(output)
- # logger.info(f"已保存 PDF: {output}")
- # with open(output, "rb") as f:
- # pdf_content = f.read()
- # break
- # except Exception:
- # logger.exception(f"通过选择器下载失败: {sel}")
+ # 直接导航到 PDF URL 下载完整内容
+ if pdf_url:
+ logger.info(f"直接请求 PDF: {pdf_url}")
+ try:
+ pdf_resp = await page.goto(pdf_url, wait_until="networkidle", timeout=30000)
+ if pdf_resp and pdf_resp.status == 200:
+ pdf_content = await pdf_resp.body()
+ logger.info(f"下载成功,大小: {len(pdf_content)} bytes")
+ except Exception as e:
+ logger.warning(f"直接导航下载失败: {e},尝试 fetch")
+ try:
+ pdf_content = await page.evaluate(f"""
+ async () => {{
+ const r = await fetch({pdf_url!r});
+ const buf = await r.arrayBuffer();
+ return Array.from(new Uint8Array(buf));
+ }}
+ """)
+ if pdf_content:
+ pdf_content = bytes(pdf_content)
+ except Exception as e2:
+ logger.warning(f"fetch 下载也失败: {e2}")
- # 回退:查找页面内 PDF 链接并直接访问
- # if not pdf_content:
- # logger.info("尝试查找页面内 PDF 链接")
- # try:
- # links = await page.eval_on_selector_all("a[href]", "els => els.map(e=>e.href)")
- # candidates = [u for u in links if ".pdf" in u]
- # if candidates:
- # pdf_url = candidates[0]
- # logger.info(f"直接访问 PDF 链接: {pdf_url}")
- # resp = await page.goto(pdf_url, wait_until="networkidle")
- # if resp and resp.status == 200:
- # pdf_content = await resp.body()
- # with open(output, "wb") as f:
- # f.write(pdf_content)
- # logger.info(f"已保存 PDF: {output}")
- # except Exception:
- # logger.exception("直接访问 PDF 链接失败")
-
- if pdf_content:
+ if pdf_content and len(pdf_content) > 10240:
logger.info(f"下载成功,大小: {len(pdf_content)} bytes")
return pdf_content
else:
- logger.warning("未能获取 PDF,已保存页面快照供排查")
+ if pdf_content:
+ logger.warning(f"PDF 文件过小({len(pdf_content)} bytes),可能是错误页")
+ logger.warning("未能获取有效 PDF,已保存页面快照供排查")
try:
await page.screenshot(path="scihub_screenshot.png", full_page=True)
html = await page.content()
@@ -198,15 +281,24 @@ async def download_pdf_with_playwright(url: str, output: str = "paper.pdf", head
logger.exception("关闭 browser 失败")
+# 按优先级排列的 sci-hub 域名(国内相对可访问)
+_SCIHUB_DOMAINS = [
+ "sci-hub.ren",
+ "sci-hub.ee",
+ "sci-hub.st",
+ "sci-hub.se",
+]
+
+
def download_paper_by_doi(doi: str, output: Optional[str] = None, headless: bool = True) -> tuple[bool, str]:
"""
通过 DOI 下载论文 PDF(供 task 调用)
-
+
参数:
doi: DOI 字符串,例如 "10.1016/j.conbuildmat.2017.10.091"
output: 输出文件路径(默认基于 DOI 生成,格式:10.1016_j.xxx.pdf)
headless: 是否无头模式(默认 True)
-
+
返回:
(True, "文件路径") 如果成功
(False, "scihub_error_*: 错误详情") 如果失败,错误码前缀包括:
@@ -222,33 +314,35 @@ def download_paper_by_doi(doi: str, output: Optional[str] = None, headless: bool
err = "scihub_error_empty_doi: DOI 为空"
logger.error(err)
return False, err
-
- url = f"https://sci-hub.st/{doi}"
+
output_path = output or f"{doi.replace('/', '_')}.pdf"
-
- logger.info(f"开始下载 DOI: {doi}")
- logger.info(f"目标 URL: {url}")
- logger.info(f"输出文件: {output_path}")
-
- try:
- pdf_content = asyncio.run(download_pdf_with_playwright(url, output=output_path, headless=headless))
- except asyncio.TimeoutError as e:
- err = f"scihub_error_timeout: 网页加载超时(可能网络慢或网站不可用)"
- logger.error(err)
- return False, err
- except Exception as e:
- err = f"scihub_error_load_failed: 加载页面时出错 - {str(e)}"
- logger.exception(err)
- return False, err
-
- if pdf_content:
- logger.info(f"✓ 成功下载: {output_path} ({len(pdf_content)} bytes)")
- return True, output_path
- else:
- # PDF 内容为空,说明所有获取方式都失败
- err = f"scihub_error_pdf_not_found: 无法从 Sci-Hub 获取 PDF(可能 DOI 不存在、网站不可用、或无权限访问)"
- logger.error(err)
- return False, err
+
+ for domain in _SCIHUB_DOMAINS:
+ url = f"https://{domain}/{doi}"
+ logger.info(f"尝试域名: {url}")
+ try:
+ pdf_content = asyncio.run(download_pdf_with_playwright(url, output=output_path, headless=headless))
+ except asyncio.TimeoutError:
+ logger.warning(f"{domain} 超时,尝试下一个域名")
+ continue
+ except Exception as e:
+ logger.warning(f"{domain} 出错: {e},尝试下一个域名")
+ continue
+
+ if pdf_content:
+ # 写入文件
+ import os
+ os.makedirs(os.path.dirname(os.path.abspath(output_path)), exist_ok=True)
+ with open(output_path, "wb") as f:
+ f.write(pdf_content)
+ logger.info(f"✓ 成功下载({domain}): {output_path} ({len(pdf_content)} bytes)")
+ return True, output_path
+ else:
+ logger.warning(f"{domain} 未获取到 PDF,尝试下一个域名")
+
+ err = "scihub_error_pdf_not_found: 所有域名均无法获取 PDF"
+ logger.error(err)
+ return False, err
except Exception as e:
err = f"scihub_error_exception: 执行下载时发生异常 - {str(e)}"
logger.exception(err)
@@ -274,4 +368,3 @@ if __name__ == "__main__":
else:
logger.error(f"失败: {msg}")
raise SystemExit(1)
-
diff --git a/apps/resm/tasks.py b/apps/resm/tasks.py
index 40f769e..cf0ecdf 100644
--- a/apps/resm/tasks.py
+++ b/apps/resm/tasks.py
@@ -413,18 +413,21 @@ def download_pdf(paper_id):
paper.fetch_end()
-def save_pdf_from_oa_url(paper:Paper):
+def save_pdf_from_oa_url(paper: Paper):
+ from .d_oaurl import download_pdf_with_curl_cffi, download_from_url_playwright
+
+ # 策略1: 直接请求
try:
headers = get_random_headers()
res = requests.get(paper.oa_url, headers=headers, timeout=(3, 15))
except requests.RequestException as e:
paper.save_fail_reason("oa_url_request_error")
return f"oa_url_request_error: {str(e)}"
-
+
if res.status_code in [200, 201, 202]:
# 检查是否是PDF文件:检查魔数 %PDF 或 content-type
is_pdf = (
- res.content.startswith(b'%PDF') or
+ res.content.startswith(b'%PDF') or
res.headers.get("content-type", "").startswith("application/pdf") or
res.headers.get("content-type", "") == "application/octet-stream"
)
@@ -434,19 +437,26 @@ def save_pdf_from_oa_url(paper:Paper):
else:
paper.save_fail_reason("oa_url_not_pdf")
return "oa_url_not_pdf"
- elif res.status_code == 403:
- paper.save_fail_reason("oa_url_need_play")
- # paper_path = paper.init_paper_path("pdf")
- # is_ok, err_msg = run_async(download_from_url_playwright(paper.oa_url, paper_path))
- # if is_ok:
- # paper.has_fulltext = True
- # paper.has_fulltext_pdf = True
- # paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "update_time"])
- # return "success"
- # else:
- # paper.save_fail_reason(f"oa_url_pdf_play_error: {err_msg}")
- # return f"oa_url_pdf_play_error: {err_msg}"
- return f"oa_url_pdf_oerror: {res.status_code}"
+
+ # 策略2: curl-cffi(处理 Cloudflare JS 挑战)
+ paper_path = paper.init_paper_path("pdf")
+ is_ok, err_msg = download_pdf_with_curl_cffi(paper.oa_url, paper_path)
+ if is_ok:
+ paper.has_fulltext = True
+ paper.has_fulltext_pdf = True
+ paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "update_time"])
+ return "success"
+
+ # 策略3: Playwright(最终回退)
+ is_ok, err_msg = run_async(download_from_url_playwright(paper.oa_url, paper_path))
+ if is_ok:
+ paper.has_fulltext = True
+ paper.has_fulltext_pdf = True
+ paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "update_time"])
+ return "success"
+
+ paper.save_fail_reason(f"oa_url_all_methods_failed: {err_msg}")
+ return f"oa_url_all_methods_failed: {err_msg}"
def save_pdf_from_openalex(paper:Paper):
if cache.get("openalex_api_exceed"):