import argparse import asyncio import logging from pathlib import Path from typing import Optional from playwright.async_api import async_playwright, Page, Browser # 尝试导入 playwright-stealth try: from playwright_stealth import stealth_async except ImportError: stealth_async = None # 初始化日志 Path("log").mkdir(parents=True, exist_ok=True) LOG_PATH = Path("log") / "scihub_downloader.log" logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", handlers=[logging.FileHandler(LOG_PATH, encoding="utf-8"), logging.StreamHandler()], ) logger = logging.getLogger("scihub") # 隐藏自动化特征的浏览器启动参数 _STEALTH_ARGS = [ "--disable-blink-features=AutomationControlled", "--no-first-run", "--no-default-browser-check", "--disable-infobars", "--disable-extensions", "--disable-notifications", "--disable-popup-blocking", ] _USER_AGENT = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/131.0.0.0 Safari/537.36" ) async def _wait_for_user_to_solve_challenge(page: Page): logger.info("请在浏览器中完成验证(如果需要),完成后按回车继续...") await asyncio.get_event_loop().run_in_executor(None, input) async def _try_click_robot_button(page: Page, headless: bool) -> bool: """尝试点击验证按钮(可选步骤,如果找不到则直接继续)""" selectors = ["text=/are you a robot/i", "div.ask", "div.altcha-checkbox", "text=Are you a robot"] for sel in selectors: try: loc = page.locator(sel) if await loc.count() > 0: logger.info(f"找到验证元素,尝试点击: {sel}") try: await loc.first.click() except Exception as e: try: await page.click(sel) except Exception as click_err: logger.warning(f"点击验证元素失败: {click_err}") pass await page.wait_for_timeout(1500) # 等待可能的导航/重定向 try: await page.wait_for_load_state("domcontentloaded", timeout=8000) logger.info("点击验证后检测到导航完成") except Exception: await page.wait_for_timeout(500) # 如果出现需要人工的 captcha,提示用户 if any("recaptcha" in f.url for f in page.frames): if not headless: await _wait_for_user_to_solve_challenge(page) return True except Exception as e: logger.debug(f"尝试点击验证元素失败: {sel}: {e}") logger.info("页面上未发现验证按钮,直接继续") return False async def _click_no_button(page: Page) -> bool: """尝试点击 'No' 按钮(可选步骤,如果找不到则直接继续)""" selectors = ["div.answer[onclick=\"check()\"]", "div.answer:has-text('No')", "text=No"] for sel in selectors: try: loc = page.locator(sel) if await loc.count() > 0: logger.info(f"找到 'No' 元素,尝试点击: {sel}") try: await loc.first.click() except Exception as e: try: await page.click(sel) except Exception as click_err: logger.warning(f"点击 'No' 失败: {click_err}") pass await page.wait_for_timeout(1200) try: await page.wait_for_load_state("domcontentloaded", timeout=8000) logger.info("点击 No 后检测到导航完成") except Exception: pass try: await page.screenshot(path="after_click_no.png", full_page=True) html = await page.content() with open("after_click_no.html", "w", encoding="utf-8") as f: f.write(html) logger.info("已保存 after_click_no.png / after_click_no.html") except Exception: logger.exception("保存点击 No 的结果失败") return True except Exception as e: logger.debug(f"检查 'No' 按钮时出错: {sel}: {e}") logger.info("页面上未发现 'No' 按钮") return False # 各种 bot 检测页面的标题关键词 _CHALLENGE_TITLES = ["just a moment", "cloudflare", "checking your browser", "ddos-guard", "attention required", "ddos guard"] async def _wait_challenge_clear(page: Page, timeout: int = 45000) -> bool: """等待 bot 检测(Cloudflare/DDoS-Guard 等)挑战页面自动通过""" try: title = await page.title() except Exception: return False title_l = title.lower() if not any(kw in title_l for kw in _CHALLENGE_TITLES): return True # 无挑战,直接通过 logger.info(f"检测到安全挑战页面(标题: {title!r}),等待 JS 自动通过...") # 尝试点击 Cloudflare Turnstile 复选框(如果有) for frame_sel in [ 'iframe[src*="challenges.cloudflare.com"]', 'iframe[title*="cloudflare"]', 'iframe[title*="challenge"]', ]: try: checkbox = page.frame_locator(frame_sel).locator('input[type="checkbox"]') if await checkbox.count() > 0: await checkbox.click(timeout=3000) logger.info("点击了验证复选框") break except Exception: pass try: await page.wait_for_function( """() => { const t = document.title.toLowerCase(); const keywords = ['just a moment', 'cloudflare', 'checking your browser', 'ddos-guard', 'ddos guard', 'attention required']; return !keywords.some(kw => t.includes(kw)); }""", timeout=timeout, ) logger.info(f"挑战已通过,当前标题: {await page.title()!r}") return True except Exception as e: logger.warning(f"等待挑战超时({timeout}ms): {e}") return False async def download_pdf_with_playwright(url: str, output: str = "paper.pdf", headless: bool = False) -> Optional[bytes]: async with async_playwright() as p: browser: Browser = await p.chromium.launch( headless=headless, args=_STEALTH_ARGS, ) context = await browser.new_context( viewport={"width": 1920, "height": 1080}, user_agent=_USER_AGENT, locale="zh-CN", timezone_id="Asia/Shanghai", ) page = await context.new_page() # 应用 stealth 模式 if stealth_async: await stealth_async(page) else: await page.add_init_script( "Object.defineProperty(navigator,'webdriver',{get:()=>false});" ) pdf_url: Optional[str] = None pdf_content: Optional[bytes] = None async def on_response(response): nonlocal pdf_url ct = response.headers.get("content-type", "") resp_url = response.url # 忽略 chrome-extension 等内部 URL if "application/pdf" in ct and resp_url.startswith("http"): logger.info(f"检测到 PDF 响应 URL: {resp_url}") pdf_url = resp_url page.on("response", on_response) try: logger.info(f"打开: {url}") try: await page.goto(url, wait_until="domcontentloaded", timeout=30000) except Exception: logger.info("页面加载超时(可能正在处理安全验证),继续等待...") await page.wait_for_timeout(3000) # 等待 bot 检测挑战完成(DDoS-Guard / Cloudflare 等) await _wait_challenge_clear(page) await page.wait_for_timeout(1000) # 尝试点击验证 & No await _try_click_robot_button(page, headless) await _click_no_button(page) # 等待页面加载并检测 PDF 响应 URL logger.info("等待 PDF 响应...") await page.wait_for_timeout(3000) if not pdf_url: try: resp = await page.wait_for_response( lambda r: "application/pdf" in r.headers.get("content-type", "") and r.url.startswith("http"), timeout=8000, ) pdf_url = resp.url logger.info(f"主动等待到 PDF URL: {pdf_url}") except Exception: logger.info("等待 PDF 响应超时") # 直接导航到 PDF URL 下载完整内容 if pdf_url: logger.info(f"直接请求 PDF: {pdf_url}") try: pdf_resp = await page.goto(pdf_url, wait_until="networkidle", timeout=30000) if pdf_resp and pdf_resp.status == 200: pdf_content = await pdf_resp.body() logger.info(f"下载成功,大小: {len(pdf_content)} bytes") except Exception as e: logger.warning(f"直接导航下载失败: {e},尝试 fetch") try: pdf_content = await page.evaluate(f""" async () => {{ const r = await fetch({pdf_url!r}); const buf = await r.arrayBuffer(); return Array.from(new Uint8Array(buf)); }} """) if pdf_content: pdf_content = bytes(pdf_content) except Exception as e2: logger.warning(f"fetch 下载也失败: {e2}") if pdf_content and len(pdf_content) > 10240: logger.info(f"下载成功,大小: {len(pdf_content)} bytes") return pdf_content else: if pdf_content: logger.warning(f"PDF 文件过小({len(pdf_content)} bytes),可能是错误页") logger.warning("未能获取有效 PDF,已保存页面快照供排查") try: await page.screenshot(path="scihub_screenshot.png", full_page=True) html = await page.content() with open("scihub_page.html", "w", encoding="utf-8") as f: f.write(html) except Exception: logger.exception("保存调试信息失败") return None finally: try: await context.close() except Exception: logger.exception("关闭 context 失败") try: await browser.close() except Exception: logger.exception("关闭 browser 失败") # 按优先级排列的 sci-hub 域名(国内相对可访问) _SCIHUB_DOMAINS = [ "sci-hub.ren", "sci-hub.ee", "sci-hub.st", "sci-hub.se", ] def download_paper_by_doi(doi: str, output: Optional[str] = None, headless: bool = True) -> tuple[bool, str]: """ 通过 DOI 下载论文 PDF(供 task 调用) 参数: doi: DOI 字符串,例如 "10.1016/j.conbuildmat.2017.10.091" output: 输出文件路径(默认基于 DOI 生成,格式:10.1016_j.xxx.pdf) headless: 是否无头模式(默认 True) 返回: (True, "文件路径") 如果成功 (False, "scihub_error_*: 错误详情") 如果失败,错误码前缀包括: - scihub_error_empty_doi: DOI 为空 - scihub_error_timeout: 网页加载超时 - scihub_error_load_failed: 加载页面失败 - scihub_error_pdf_not_found: 无法获取 PDF - scihub_error_exception: 其他异常 """ try: doi = doi.strip() if not doi: err = "scihub_error_empty_doi: DOI 为空" logger.error(err) return False, err output_path = output or f"{doi.replace('/', '_')}.pdf" for domain in _SCIHUB_DOMAINS: url = f"https://{domain}/{doi}" logger.info(f"尝试域名: {url}") try: pdf_content = asyncio.run(download_pdf_with_playwright(url, output=output_path, headless=headless)) except asyncio.TimeoutError: logger.warning(f"{domain} 超时,尝试下一个域名") continue except Exception as e: logger.warning(f"{domain} 出错: {e},尝试下一个域名") continue if pdf_content: # 写入文件 import os os.makedirs(os.path.dirname(os.path.abspath(output_path)), exist_ok=True) with open(output_path, "wb") as f: f.write(pdf_content) logger.info(f"✓ 成功下载({domain}): {output_path} ({len(pdf_content)} bytes)") return True, output_path else: logger.warning(f"{domain} 未获取到 PDF,尝试下一个域名") err = "scihub_error_pdf_not_found: 所有域名均无法获取 PDF" logger.error(err) return False, err except Exception as e: err = f"scihub_error_exception: 执行下载时发生异常 - {str(e)}" logger.exception(err) return False, err def _parse_args(): p = argparse.ArgumentParser(description="简化的 Sci-Hub PDF 下载器,支持 DOI") p.add_argument("--doi", help="DOI,例如 10.1016/j.conbuildmat.2017.10.091") p.add_argument("-o", "--output", help="输出文件名(默认基于 DOI)") p.add_argument("--headless", action="store_true", help="无头模式") return p.parse_args() if __name__ == "__main__": args = _parse_args() if not args.doi: logger.error("请通过 --doi 提供 DOI") raise SystemExit(1) success, msg = download_paper_by_doi(args.doi, output=args.output, headless=args.headless) if success: logger.info(f"完成: {msg}") else: logger.error(f"失败: {msg}") raise SystemExit(1)