feat: 增加d_scihub
This commit is contained in:
parent
aa95818414
commit
33a6dbf431
|
|
@ -0,0 +1,277 @@
|
||||||
|
import argparse
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Optional
|
||||||
|
from playwright.async_api import async_playwright, Page, Browser
|
||||||
|
|
||||||
|
# 初始化日志
|
||||||
|
Path("log").mkdir(parents=True, exist_ok=True)
|
||||||
|
LOG_PATH = Path("log") / "scihub_downloader.log"
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format="%(asctime)s %(levelname)s %(message)s",
|
||||||
|
handlers=[logging.FileHandler(LOG_PATH, encoding="utf-8"), logging.StreamHandler()],
|
||||||
|
)
|
||||||
|
logger = logging.getLogger("scihub")
|
||||||
|
|
||||||
|
|
||||||
|
async def _wait_for_user_to_solve_challenge(page: Page):
|
||||||
|
logger.info("请在浏览器中完成验证(如果需要),完成后按回车继续...")
|
||||||
|
await asyncio.get_event_loop().run_in_executor(None, input)
|
||||||
|
|
||||||
|
|
||||||
|
async def _try_click_robot_button(page: Page, headless: bool) -> bool:
|
||||||
|
"""尝试点击验证按钮(可选步骤,如果找不到则直接继续)"""
|
||||||
|
selectors = ["text=/are you a robot/i", "div.ask", "div.altcha-checkbox", "text=Are you a robot"]
|
||||||
|
for sel in selectors:
|
||||||
|
try:
|
||||||
|
loc = page.locator(sel)
|
||||||
|
if await loc.count() > 0:
|
||||||
|
logger.info(f"找到验证元素,尝试点击: {sel}")
|
||||||
|
try:
|
||||||
|
await loc.first.click()
|
||||||
|
except Exception as e:
|
||||||
|
try:
|
||||||
|
await page.click(sel)
|
||||||
|
except Exception as click_err:
|
||||||
|
logger.warning(f"点击验证元素失败: {click_err}")
|
||||||
|
pass
|
||||||
|
await page.wait_for_timeout(1500)
|
||||||
|
# 等待可能的导航/重定向
|
||||||
|
try:
|
||||||
|
await page.wait_for_navigation(timeout=8000)
|
||||||
|
logger.info("点击验证后检测到导航完成")
|
||||||
|
except Exception:
|
||||||
|
await page.wait_for_timeout(500)
|
||||||
|
# 如果出现需要人工的 captcha,提示用户
|
||||||
|
if any("recaptcha" in f.url for f in page.frames):
|
||||||
|
if not headless:
|
||||||
|
await _wait_for_user_to_solve_challenge(page)
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"尝试点击验证元素失败: {sel}: {e}")
|
||||||
|
logger.info("页面上未发现验证按钮,直接继续")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
async def _click_no_button(page: Page) -> bool:
|
||||||
|
"""尝试点击 'No' 按钮(可选步骤,如果找不到则直接继续)"""
|
||||||
|
# 精确匹配 <div class="answer" onclick="check()">No</div>
|
||||||
|
selectors = ["div.answer[onclick=\"check()\"]", "div.answer:has-text('No')", "text=No"]
|
||||||
|
for sel in selectors:
|
||||||
|
try:
|
||||||
|
loc = page.locator(sel)
|
||||||
|
if await loc.count() > 0:
|
||||||
|
logger.info(f"找到 'No' 元素,尝试点击: {sel}")
|
||||||
|
try:
|
||||||
|
await loc.first.click()
|
||||||
|
except Exception as e:
|
||||||
|
try:
|
||||||
|
await page.click(sel)
|
||||||
|
except Exception as click_err:
|
||||||
|
logger.warning(f"点击 'No' 失败: {click_err}")
|
||||||
|
pass
|
||||||
|
await page.wait_for_timeout(1200)
|
||||||
|
# 点击 No 后也可能触发重定向
|
||||||
|
try:
|
||||||
|
await page.wait_for_navigation(timeout=8000)
|
||||||
|
logger.info("点击 No 后检测到导航完成")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
# 保存结果用于排查
|
||||||
|
try:
|
||||||
|
await page.screenshot(path="after_click_no.png", full_page=True)
|
||||||
|
html = await page.content()
|
||||||
|
with open("after_click_no.html", "w", encoding="utf-8") as f:
|
||||||
|
f.write(html)
|
||||||
|
logger.info("已保存 after_click_no.png / after_click_no.html")
|
||||||
|
except Exception:
|
||||||
|
logger.exception("保存点击 No 的结果失败")
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"检查 'No' 按钮时出错: {sel}: {e}")
|
||||||
|
logger.info("页面上未发现 'No' 按钮")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
async def download_pdf_with_playwright(url: str, output: str = "paper.pdf", headless: bool = False) -> Optional[bytes]:
|
||||||
|
async with async_playwright() as p:
|
||||||
|
browser: Browser = await p.chromium.launch(headless=headless)
|
||||||
|
context = await browser.new_context(viewport={"width": 1920, "height": 1080})
|
||||||
|
page = await context.new_page()
|
||||||
|
|
||||||
|
pdf_content: Optional[bytes] = None
|
||||||
|
|
||||||
|
async def on_response(response):
|
||||||
|
nonlocal pdf_content
|
||||||
|
try:
|
||||||
|
ct = response.headers.get("content-type", "")
|
||||||
|
if "application/pdf" in ct:
|
||||||
|
logger.info(f"捕获到 PDF 响应: {response.url}")
|
||||||
|
pdf_content = await response.body()
|
||||||
|
except Exception:
|
||||||
|
logger.exception("处理响应时出错")
|
||||||
|
|
||||||
|
page.on("response", on_response)
|
||||||
|
|
||||||
|
try:
|
||||||
|
logger.info(f"打开: {url}")
|
||||||
|
await page.goto(url, wait_until="networkidle")
|
||||||
|
await page.wait_for_timeout(1000)
|
||||||
|
|
||||||
|
# 尝试点击验证 & No
|
||||||
|
await _try_click_robot_button(page, headless)
|
||||||
|
await _click_no_button(page)
|
||||||
|
|
||||||
|
# 点击后充分等待以让页面加载和触发PDF响应
|
||||||
|
logger.info("等待页面加载和PDF响应...")
|
||||||
|
await page.wait_for_timeout(3000)
|
||||||
|
|
||||||
|
# 尝试主动等待PDF响应(点击后可能会自动加载或重定向触发PDF请求)
|
||||||
|
if not pdf_content:
|
||||||
|
try:
|
||||||
|
await page.wait_for_response(
|
||||||
|
lambda r: "application/pdf" in r.headers.get("content-type", ""),
|
||||||
|
timeout=5000,
|
||||||
|
)
|
||||||
|
logger.info("捕获到主动等待的 PDF 响应")
|
||||||
|
except Exception:
|
||||||
|
logger.info("主动等待 PDF 响应超时,继续其他方式")
|
||||||
|
|
||||||
|
# 尝试通过页面下载按钮
|
||||||
|
download_selectors = ["a[href*='.pdf']", "button:has-text('Download')", "a:has-text('PDF')"]
|
||||||
|
for sel in download_selectors:
|
||||||
|
try:
|
||||||
|
if await page.locator(sel).count() > 0:
|
||||||
|
logger.info(f"尝试点击下载元素: {sel}")
|
||||||
|
async with page.expect_download() as di:
|
||||||
|
await page.click(sel)
|
||||||
|
download = await di.value
|
||||||
|
await download.save_as(output)
|
||||||
|
logger.info(f"已保存 PDF: {output}")
|
||||||
|
with open(output, "rb") as f:
|
||||||
|
pdf_content = f.read()
|
||||||
|
break
|
||||||
|
except Exception:
|
||||||
|
logger.exception(f"通过选择器下载失败: {sel}")
|
||||||
|
|
||||||
|
# 回退:查找页面内 PDF 链接并直接访问
|
||||||
|
if not pdf_content:
|
||||||
|
logger.info("尝试查找页面内 PDF 链接")
|
||||||
|
try:
|
||||||
|
links = await page.eval_on_selector_all("a[href]", "els => els.map(e=>e.href)")
|
||||||
|
candidates = [u for u in links if ".pdf" in u]
|
||||||
|
if candidates:
|
||||||
|
pdf_url = candidates[0]
|
||||||
|
logger.info(f"直接访问 PDF 链接: {pdf_url}")
|
||||||
|
resp = await page.goto(pdf_url, wait_until="networkidle")
|
||||||
|
if resp and resp.status == 200:
|
||||||
|
pdf_content = await resp.body()
|
||||||
|
with open(output, "wb") as f:
|
||||||
|
f.write(pdf_content)
|
||||||
|
logger.info(f"已保存 PDF: {output}")
|
||||||
|
except Exception:
|
||||||
|
logger.exception("直接访问 PDF 链接失败")
|
||||||
|
|
||||||
|
if pdf_content:
|
||||||
|
logger.info(f"下载成功,大小: {len(pdf_content)} bytes")
|
||||||
|
return pdf_content
|
||||||
|
else:
|
||||||
|
logger.warning("未能获取 PDF,已保存页面快照供排查")
|
||||||
|
try:
|
||||||
|
await page.screenshot(path="scihub_screenshot.png", full_page=True)
|
||||||
|
html = await page.content()
|
||||||
|
with open("scihub_page.html", "w", encoding="utf-8") as f:
|
||||||
|
f.write(html)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("保存调试信息失败")
|
||||||
|
return None
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
await context.close()
|
||||||
|
except Exception:
|
||||||
|
logger.exception("关闭 context 失败")
|
||||||
|
try:
|
||||||
|
await browser.close()
|
||||||
|
except Exception:
|
||||||
|
logger.exception("关闭 browser 失败")
|
||||||
|
|
||||||
|
|
||||||
|
def download_paper_by_doi(doi: str, output: Optional[str] = None, headless: bool = True) -> tuple[bool, str]:
|
||||||
|
"""
|
||||||
|
通过 DOI 下载论文 PDF(供 task 调用)
|
||||||
|
|
||||||
|
参数:
|
||||||
|
doi: DOI 字符串,例如 "10.1016/j.conbuildmat.2017.10.091"
|
||||||
|
output: 输出文件路径(默认基于 DOI 生成,格式:10.1016_j.xxx.pdf)
|
||||||
|
headless: 是否无头模式(默认 True)
|
||||||
|
|
||||||
|
返回:
|
||||||
|
(True, "文件路径") 如果成功
|
||||||
|
(False, "scihub_error_*: 错误详情") 如果失败,错误码前缀包括:
|
||||||
|
- scihub_error_empty_doi: DOI 为空
|
||||||
|
- scihub_error_timeout: 网页加载超时
|
||||||
|
- scihub_error_load_failed: 加载页面失败
|
||||||
|
- scihub_error_pdf_not_found: 无法获取 PDF
|
||||||
|
- scihub_error_exception: 其他异常
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
doi = doi.strip()
|
||||||
|
if not doi:
|
||||||
|
err = "scihub_error_empty_doi: DOI 为空"
|
||||||
|
logger.error(err)
|
||||||
|
return False, err
|
||||||
|
|
||||||
|
url = f"https://sci-hub.st/{doi}"
|
||||||
|
output_path = output or f"{doi.replace('/', '_')}.pdf"
|
||||||
|
|
||||||
|
logger.info(f"开始下载 DOI: {doi}")
|
||||||
|
logger.info(f"目标 URL: {url}")
|
||||||
|
logger.info(f"输出文件: {output_path}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
pdf_content = asyncio.run(download_pdf_with_playwright(url, output=output_path, headless=headless))
|
||||||
|
except asyncio.TimeoutError as e:
|
||||||
|
err = f"scihub_error_timeout: 网页加载超时(可能网络慢或网站不可用)"
|
||||||
|
logger.error(err)
|
||||||
|
return False, err
|
||||||
|
except Exception as e:
|
||||||
|
err = f"scihub_error_load_failed: 加载页面时出错 - {str(e)}"
|
||||||
|
logger.exception(err)
|
||||||
|
return False, err
|
||||||
|
|
||||||
|
if pdf_content:
|
||||||
|
logger.info(f"✓ 成功下载: {output_path} ({len(pdf_content)} bytes)")
|
||||||
|
return True, output_path
|
||||||
|
else:
|
||||||
|
# PDF 内容为空,说明所有获取方式都失败
|
||||||
|
err = f"scihub_error_pdf_not_found: 无法从 Sci-Hub 获取 PDF(可能 DOI 不存在、网站不可用、或无权限访问)"
|
||||||
|
logger.error(err)
|
||||||
|
return False, err
|
||||||
|
except Exception as e:
|
||||||
|
err = f"scihub_error_exception: 执行下载时发生异常 - {str(e)}"
|
||||||
|
logger.exception(err)
|
||||||
|
return False, err
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_args():
|
||||||
|
p = argparse.ArgumentParser(description="简化的 Sci-Hub PDF 下载器,支持 DOI")
|
||||||
|
p.add_argument("--doi", help="DOI,例如 10.1016/j.conbuildmat.2017.10.091")
|
||||||
|
p.add_argument("-o", "--output", help="输出文件名(默认基于 DOI)")
|
||||||
|
p.add_argument("--headless", action="store_true", help="无头模式")
|
||||||
|
return p.parse_args()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
args = _parse_args()
|
||||||
|
if not args.doi:
|
||||||
|
logger.error("请通过 --doi 提供 DOI")
|
||||||
|
raise SystemExit(1)
|
||||||
|
success, msg = download_paper_by_doi(args.doi, output=args.output, headless=args.headless)
|
||||||
|
if success:
|
||||||
|
logger.info(f"完成: {msg}")
|
||||||
|
else:
|
||||||
|
logger.error(f"失败: {msg}")
|
||||||
|
raise SystemExit(1)
|
||||||
|
|
||||||
|
|
@ -53,15 +53,24 @@ class Paper(BaseModel):
|
||||||
os.makedirs(paper_dir, exist_ok=True)
|
os.makedirs(paper_dir, exist_ok=True)
|
||||||
return paper_dir
|
return paper_dir
|
||||||
|
|
||||||
def save_file_xml(self, content):
|
def init_paper_path(self, type:str):
|
||||||
|
paper_dir = self.init_save_dir()
|
||||||
safe_doi = self.doi.replace("/", "_")
|
safe_doi = self.doi.replace("/", "_")
|
||||||
paper_file = os.path.join(self.init_save_dir(), f"{safe_doi}.xml")
|
if type == "xml":
|
||||||
|
paper_file = os.path.join(paper_dir, f"{safe_doi}.xml")
|
||||||
|
elif type == "pdf":
|
||||||
|
paper_file = os.path.join(paper_dir, f"{safe_doi}.pdf")
|
||||||
|
else:
|
||||||
|
raise ValueError("type must be xml or pdf")
|
||||||
|
return paper_file
|
||||||
|
|
||||||
|
def save_file_xml(self, content):
|
||||||
|
paper_file = self.init_paper_path("xml")
|
||||||
with open(paper_file, "wb") as f:
|
with open(paper_file, "wb") as f:
|
||||||
f.write(content.encode("utf-8"))
|
f.write(content.encode("utf-8"))
|
||||||
|
|
||||||
def save_file_pdf(self, content, save_obj=False):
|
def save_file_pdf(self, content, save_obj=False):
|
||||||
safe_doi = self.doi.replace("/", "_")
|
paper_file = self.init_paper_path("pdf")
|
||||||
paper_file = os.path.join(self.init_save_dir(), f"{safe_doi}.pdf")
|
|
||||||
with open(paper_file, "wb") as f:
|
with open(paper_file, "wb") as f:
|
||||||
f.write(content)
|
f.write(content)
|
||||||
if save_obj:
|
if save_obj:
|
||||||
|
|
|
||||||
|
|
@ -336,6 +336,8 @@ def download_pdf(paper_id):
|
||||||
# if paper.has_fulltext_pdf is False and cache.get("openalex_api_exceed") is None:
|
# if paper.has_fulltext_pdf is False and cache.get("openalex_api_exceed") is None:
|
||||||
# current_from = "openalex"
|
# current_from = "openalex"
|
||||||
# msg = save_pdf_from_openalex(paper)
|
# msg = save_pdf_from_openalex(paper)
|
||||||
|
if paper.has_fulltext_pdf is False:
|
||||||
|
msg = save_pdf_from_scihub(paper)
|
||||||
return msg, current_from
|
return msg, current_from
|
||||||
finally:
|
finally:
|
||||||
paper.fetch_end()
|
paper.fetch_end()
|
||||||
|
|
@ -404,5 +406,12 @@ def save_pdf_from_elsevier(paper:Paper):
|
||||||
return f"elsevier_status_error: {res.status_code} {res.text}"
|
return f"elsevier_status_error: {res.status_code} {res.text}"
|
||||||
|
|
||||||
def save_pdf_from_scihub(paper:Paper):
|
def save_pdf_from_scihub(paper:Paper):
|
||||||
pass
|
from .d_scihub import download_paper_by_doi
|
||||||
|
is_ok, err_msg = download_paper_by_doi(paper.doi, paper.init_paper_path("pdf"))
|
||||||
|
if is_ok:
|
||||||
|
paper.has_fulltext_pdf = True
|
||||||
|
paper.has_fulltext_pdf = True
|
||||||
|
paper.save(update_fields=["has_fulltext", "has_fulltext_pdf"])
|
||||||
|
else:
|
||||||
|
paper.save_fail_reason(err_msg)
|
||||||
# https://sci.bban.top/pdf/10.1016/j.conbuildmat.2020.121016.pdf?download=true
|
# https://sci.bban.top/pdf/10.1016/j.conbuildmat.2020.121016.pdf?download=true
|
||||||
Loading…
Reference in New Issue