paper_server/apps/resm/d_scihub.py

278 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import argparse
import asyncio
import logging
from pathlib import Path
from typing import Optional
from playwright.async_api import async_playwright, Page, Browser
# 初始化日志
Path("log").mkdir(parents=True, exist_ok=True)
LOG_PATH = Path("log") / "scihub_downloader.log"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
handlers=[logging.FileHandler(LOG_PATH, encoding="utf-8"), logging.StreamHandler()],
)
logger = logging.getLogger("scihub")
async def _wait_for_user_to_solve_challenge(page: Page):
logger.info("请在浏览器中完成验证(如果需要),完成后按回车继续...")
await asyncio.get_event_loop().run_in_executor(None, input)
async def _try_click_robot_button(page: Page, headless: bool) -> bool:
"""尝试点击验证按钮(可选步骤,如果找不到则直接继续)"""
selectors = ["text=/are you a robot/i", "div.ask", "div.altcha-checkbox", "text=Are you a robot"]
for sel in selectors:
try:
loc = page.locator(sel)
if await loc.count() > 0:
logger.info(f"找到验证元素,尝试点击: {sel}")
try:
await loc.first.click()
except Exception as e:
try:
await page.click(sel)
except Exception as click_err:
logger.warning(f"点击验证元素失败: {click_err}")
pass
await page.wait_for_timeout(1500)
# 等待可能的导航/重定向
try:
await page.wait_for_navigation(timeout=8000)
logger.info("点击验证后检测到导航完成")
except Exception:
await page.wait_for_timeout(500)
# 如果出现需要人工的 captcha提示用户
if any("recaptcha" in f.url for f in page.frames):
if not headless:
await _wait_for_user_to_solve_challenge(page)
return True
except Exception as e:
logger.debug(f"尝试点击验证元素失败: {sel}: {e}")
logger.info("页面上未发现验证按钮,直接继续")
return False
async def _click_no_button(page: Page) -> bool:
"""尝试点击 'No' 按钮(可选步骤,如果找不到则直接继续)"""
# 精确匹配 <div class="answer" onclick="check()">No</div>
selectors = ["div.answer[onclick=\"check()\"]", "div.answer:has-text('No')", "text=No"]
for sel in selectors:
try:
loc = page.locator(sel)
if await loc.count() > 0:
logger.info(f"找到 'No' 元素,尝试点击: {sel}")
try:
await loc.first.click()
except Exception as e:
try:
await page.click(sel)
except Exception as click_err:
logger.warning(f"点击 'No' 失败: {click_err}")
pass
await page.wait_for_timeout(1200)
# 点击 No 后也可能触发重定向
try:
await page.wait_for_navigation(timeout=8000)
logger.info("点击 No 后检测到导航完成")
except Exception:
pass
# 保存结果用于排查
try:
await page.screenshot(path="after_click_no.png", full_page=True)
html = await page.content()
with open("after_click_no.html", "w", encoding="utf-8") as f:
f.write(html)
logger.info("已保存 after_click_no.png / after_click_no.html")
except Exception:
logger.exception("保存点击 No 的结果失败")
return True
except Exception as e:
logger.debug(f"检查 'No' 按钮时出错: {sel}: {e}")
logger.info("页面上未发现 'No' 按钮")
return False
async def download_pdf_with_playwright(url: str, output: str = "paper.pdf", headless: bool = False) -> Optional[bytes]:
async with async_playwright() as p:
browser: Browser = await p.chromium.launch(headless=headless)
context = await browser.new_context(viewport={"width": 1920, "height": 1080})
page = await context.new_page()
pdf_content: Optional[bytes] = None
async def on_response(response):
nonlocal pdf_content
try:
ct = response.headers.get("content-type", "")
if "application/pdf" in ct:
logger.info(f"捕获到 PDF 响应: {response.url}")
pdf_content = await response.body()
except Exception:
logger.exception("处理响应时出错")
page.on("response", on_response)
try:
logger.info(f"打开: {url}")
await page.goto(url, wait_until="networkidle")
await page.wait_for_timeout(1000)
# 尝试点击验证 & No
await _try_click_robot_button(page, headless)
await _click_no_button(page)
# 点击后充分等待以让页面加载和触发PDF响应
logger.info("等待页面加载和PDF响应...")
await page.wait_for_timeout(3000)
# 尝试主动等待PDF响应点击后可能会自动加载或重定向触发PDF请求
if not pdf_content:
try:
await page.wait_for_response(
lambda r: "application/pdf" in r.headers.get("content-type", ""),
timeout=5000,
)
logger.info("捕获到主动等待的 PDF 响应")
except Exception:
logger.info("主动等待 PDF 响应超时,继续其他方式")
# 尝试通过页面下载按钮
# download_selectors = ["a[href*='.pdf']", "button:has-text('Download')", "a:has-text('PDF')"]
# for sel in download_selectors:
# try:
# if await page.locator(sel).count() > 0:
# logger.info(f"尝试点击下载元素: {sel}")
# async with page.expect_download() as di:
# await page.click(sel)
# download = await di.value
# await download.save_as(output)
# logger.info(f"已保存 PDF: {output}")
# with open(output, "rb") as f:
# pdf_content = f.read()
# break
# except Exception:
# logger.exception(f"通过选择器下载失败: {sel}")
# 回退:查找页面内 PDF 链接并直接访问
# if not pdf_content:
# logger.info("尝试查找页面内 PDF 链接")
# try:
# links = await page.eval_on_selector_all("a[href]", "els => els.map(e=>e.href)")
# candidates = [u for u in links if ".pdf" in u]
# if candidates:
# pdf_url = candidates[0]
# logger.info(f"直接访问 PDF 链接: {pdf_url}")
# resp = await page.goto(pdf_url, wait_until="networkidle")
# if resp and resp.status == 200:
# pdf_content = await resp.body()
# with open(output, "wb") as f:
# f.write(pdf_content)
# logger.info(f"已保存 PDF: {output}")
# except Exception:
# logger.exception("直接访问 PDF 链接失败")
if pdf_content:
logger.info(f"下载成功,大小: {len(pdf_content)} bytes")
return pdf_content
else:
logger.warning("未能获取 PDF已保存页面快照供排查")
try:
await page.screenshot(path="scihub_screenshot.png", full_page=True)
html = await page.content()
with open("scihub_page.html", "w", encoding="utf-8") as f:
f.write(html)
except Exception:
logger.exception("保存调试信息失败")
return None
finally:
try:
await context.close()
except Exception:
logger.exception("关闭 context 失败")
try:
await browser.close()
except Exception:
logger.exception("关闭 browser 失败")
def download_paper_by_doi(doi: str, output: Optional[str] = None, headless: bool = True) -> tuple[bool, str]:
"""
通过 DOI 下载论文 PDF供 task 调用)
参数:
doi: DOI 字符串,例如 "10.1016/j.conbuildmat.2017.10.091"
output: 输出文件路径(默认基于 DOI 生成格式10.1016_j.xxx.pdf
headless: 是否无头模式(默认 True
返回:
(True, "文件路径") 如果成功
(False, "scihub_error_*: 错误详情") 如果失败,错误码前缀包括:
- scihub_error_empty_doi: DOI 为空
- scihub_error_timeout: 网页加载超时
- scihub_error_load_failed: 加载页面失败
- scihub_error_pdf_not_found: 无法获取 PDF
- scihub_error_exception: 其他异常
"""
try:
doi = doi.strip()
if not doi:
err = "scihub_error_empty_doi: DOI 为空"
logger.error(err)
return False, err
url = f"https://sci-hub.st/{doi}"
output_path = output or f"{doi.replace('/', '_')}.pdf"
logger.info(f"开始下载 DOI: {doi}")
logger.info(f"目标 URL: {url}")
logger.info(f"输出文件: {output_path}")
try:
pdf_content = asyncio.run(download_pdf_with_playwright(url, output=output_path, headless=headless))
except asyncio.TimeoutError as e:
err = f"scihub_error_timeout: 网页加载超时(可能网络慢或网站不可用)"
logger.error(err)
return False, err
except Exception as e:
err = f"scihub_error_load_failed: 加载页面时出错 - {str(e)}"
logger.exception(err)
return False, err
if pdf_content:
logger.info(f"✓ 成功下载: {output_path} ({len(pdf_content)} bytes)")
return True, output_path
else:
# PDF 内容为空,说明所有获取方式都失败
err = f"scihub_error_pdf_not_found: 无法从 Sci-Hub 获取 PDF可能 DOI 不存在、网站不可用、或无权限访问)"
logger.error(err)
return False, err
except Exception as e:
err = f"scihub_error_exception: 执行下载时发生异常 - {str(e)}"
logger.exception(err)
return False, err
def _parse_args():
p = argparse.ArgumentParser(description="简化的 Sci-Hub PDF 下载器,支持 DOI")
p.add_argument("--doi", help="DOI例如 10.1016/j.conbuildmat.2017.10.091")
p.add_argument("-o", "--output", help="输出文件名(默认基于 DOI")
p.add_argument("--headless", action="store_true", help="无头模式")
return p.parse_args()
if __name__ == "__main__":
args = _parse_args()
if not args.doi:
logger.error("请通过 --doi 提供 DOI")
raise SystemExit(1)
success, msg = download_paper_by_doi(args.doi, output=args.output, headless=args.headless)
if success:
logger.info(f"完成: {msg}")
else:
logger.error(f"失败: {msg}")
raise SystemExit(1)