feat:通过cloudflare 验证
This commit is contained in:
parent
54780b8ce1
commit
d7dd606f15
|
|
@ -3,16 +3,6 @@ from typing import Optional
|
||||||
import asyncio
|
import asyncio
|
||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
import time
|
|
||||||
from PIL import Image
|
|
||||||
import io
|
|
||||||
|
|
||||||
# 尝试导入 OpenCV 用于更好的模板匹配
|
|
||||||
try:
|
|
||||||
import cv2
|
|
||||||
HAS_CV2 = True
|
|
||||||
except ImportError:
|
|
||||||
HAS_CV2 = False
|
|
||||||
|
|
||||||
# 尝试导入 playwright-stealth,如果没有安装则忽略
|
# 尝试导入 playwright-stealth,如果没有安装则忽略
|
||||||
try:
|
try:
|
||||||
|
|
@ -20,25 +10,46 @@ try:
|
||||||
except ImportError:
|
except ImportError:
|
||||||
stealth_async = None
|
stealth_async = None
|
||||||
|
|
||||||
|
# 隐藏自动化特征的浏览器启动参数
|
||||||
|
_STEALTH_ARGS = [
|
||||||
|
"--disable-blink-features=AutomationControlled",
|
||||||
|
"--no-first-run",
|
||||||
|
"--no-default-browser-check",
|
||||||
|
"--disable-infobars",
|
||||||
|
"--disable-extensions",
|
||||||
|
"--disable-notifications",
|
||||||
|
]
|
||||||
|
|
||||||
|
_USER_AGENT = (
|
||||||
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
||||||
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
||||||
|
"Chrome/131.0.0.0 Safari/537.36"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def download_from_url_playwright(url: str, save_path: str) -> tuple[bool, str]:
|
async def download_from_url_playwright(url: str, save_path: str) -> tuple[bool, str]:
|
||||||
async with async_playwright() as p:
|
async with async_playwright() as p:
|
||||||
try:
|
try:
|
||||||
browser = await p.chromium.launch(headless=False)
|
browser = await p.chromium.launch(
|
||||||
context = await browser.new_context(viewport={"width": 1920, "height": 1080})
|
headless=False,
|
||||||
|
args=_STEALTH_ARGS,
|
||||||
|
)
|
||||||
|
context = await browser.new_context(
|
||||||
|
viewport={"width": 1920, "height": 1080},
|
||||||
|
user_agent=_USER_AGENT,
|
||||||
|
locale="zh-CN",
|
||||||
|
timezone_id="Asia/Shanghai",
|
||||||
|
)
|
||||||
page = await context.new_page()
|
page = await context.new_page()
|
||||||
page.set_default_timeout(60000)
|
page.set_default_timeout(60000)
|
||||||
|
|
||||||
# 应用 stealth 模式绕过反爬虫检测
|
# 应用 stealth 模式绕过反爬虫检测
|
||||||
if stealth_async:
|
if stealth_async:
|
||||||
await stealth_async(page)
|
await stealth_async(page)
|
||||||
else:
|
else:
|
||||||
# 如果没有 stealth,手动设置一些反爬虫对抗
|
await page.add_init_script(
|
||||||
await page.add_init_script("""
|
"Object.defineProperty(navigator,'webdriver',{get:()=>false});"
|
||||||
Object.defineProperty(navigator, 'webdriver', {
|
)
|
||||||
get: () => false,
|
|
||||||
});
|
|
||||||
""")
|
|
||||||
|
|
||||||
pdf_content: Optional[bytes] = None
|
pdf_content: Optional[bytes] = None
|
||||||
|
|
||||||
|
|
@ -46,53 +57,23 @@ async def download_from_url_playwright(url: str, save_path: str) -> tuple[bool,
|
||||||
nonlocal pdf_content
|
nonlocal pdf_content
|
||||||
if "application/pdf" in response.headers.get("content-type", ""):
|
if "application/pdf" in response.headers.get("content-type", ""):
|
||||||
try:
|
try:
|
||||||
# 确保完全读取响应体
|
|
||||||
pdf_content = await response.body()
|
pdf_content = await response.body()
|
||||||
print(f"✓ 成功捕获 PDF,大小: {len(pdf_content)} bytes")
|
print(f"✓ 成功捕获 PDF,大小: {len(pdf_content)} bytes")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"⚠ 读取 PDF 响应体失败: {e}")
|
print(f"⚠ 读取 PDF 响应体失败: {e}")
|
||||||
|
|
||||||
page.on("response", on_response)
|
page.on("response", on_response)
|
||||||
|
|
||||||
# 先用较宽松的等待条件加载页面,避免卡在 Cloudflare
|
# 加载页面
|
||||||
try:
|
try:
|
||||||
await page.goto(url, wait_until="domcontentloaded", timeout=5000)
|
await page.goto(url, wait_until="domcontentloaded", timeout=30000)
|
||||||
except:
|
except Exception:
|
||||||
print("⚠ 页面加载超时,但继续处理...")
|
print("⚠ 页面加载超时,但继续处理...")
|
||||||
await page.wait_for_timeout(5000)
|
|
||||||
|
# 等待 Cloudflare 挑战完成
|
||||||
# 处理 Cloudflare 校验
|
await handle_cloudflare(page)
|
||||||
print("开始处理 Cloudflare 校验...")
|
|
||||||
await page.wait_for_timeout(3000)
|
|
||||||
|
|
||||||
# Cloudflare 可能需要连续点击多次,最多尝试5次
|
|
||||||
max_cloudflare_attempts = 5
|
|
||||||
|
|
||||||
for attempt in range(max_cloudflare_attempts):
|
|
||||||
# 检查是否已获取到 PDF,如果已获取则无需继续验证
|
|
||||||
if pdf_content:
|
|
||||||
print("✓ 已获取到 PDF 内容,停止验证框处理")
|
|
||||||
break
|
|
||||||
|
|
||||||
print(f"\nCloudflare 验证尝试 {attempt + 1}/{max_cloudflare_attempts}")
|
|
||||||
success = await handle_cloudflare_with_image(page)
|
|
||||||
|
|
||||||
if success:
|
|
||||||
print("✓ 成功处理一次验证框")
|
|
||||||
# 等待新验证框出现或页面刷新
|
|
||||||
await page.wait_for_timeout(2000)
|
|
||||||
# 检查是否还有验证框,如果没有则说明验证完成
|
|
||||||
# 这里简单地继续尝试,直到达到最大次数
|
|
||||||
if attempt < max_cloudflare_attempts - 1:
|
|
||||||
print(" 检查是否还有验证框...")
|
|
||||||
await page.wait_for_timeout(1000)
|
|
||||||
else:
|
|
||||||
print("⚠ 未找到验证框,可能已完成验证或验证框已消失")
|
|
||||||
break
|
|
||||||
|
|
||||||
print("✓ Cloudflare 验证处理完成")
|
|
||||||
await page.wait_for_timeout(2000)
|
await page.wait_for_timeout(2000)
|
||||||
|
|
||||||
# 如果尚未获取 PDF,继续等待响应
|
# 如果尚未获取 PDF,继续等待响应
|
||||||
if not pdf_content:
|
if not pdf_content:
|
||||||
print("等待 PDF 响应...")
|
print("等待 PDF 响应...")
|
||||||
|
|
@ -101,19 +82,17 @@ async def download_from_url_playwright(url: str, save_path: str) -> tuple[bool,
|
||||||
lambda response: "application/pdf" in response.headers.get("content-type", ""),
|
lambda response: "application/pdf" in response.headers.get("content-type", ""),
|
||||||
timeout=15000
|
timeout=15000
|
||||||
)
|
)
|
||||||
# 确保响应体完全加载
|
|
||||||
pdf_content = await response.body()
|
pdf_content = await response.body()
|
||||||
print(f"✓ 通过 wait_for_response 获取 PDF,大小: {len(pdf_content)} bytes")
|
print(f"✓ 通过 wait_for_response 获取 PDF,大小: {len(pdf_content)} bytes")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"⚠ 等待 PDF 响应超时: {e}")
|
print(f"⚠ 等待 PDF 响应超时: {e}")
|
||||||
|
|
||||||
if pdf_content:
|
if pdf_content:
|
||||||
# 验证文件大小(PDF 通常大于 10KB)
|
|
||||||
pdf_size = len(pdf_content)
|
pdf_size = len(pdf_content)
|
||||||
if pdf_size < 10240:
|
if pdf_size < 10240:
|
||||||
await browser.close()
|
await browser.close()
|
||||||
return False, f"PDF 文件过小: {pdf_size} bytes,可能下载不完整"
|
return False, f"PDF 文件过小: {pdf_size} bytes,可能下载不完整"
|
||||||
|
|
||||||
with open(save_path, "wb") as f:
|
with open(save_path, "wb") as f:
|
||||||
f.write(pdf_content)
|
f.write(pdf_content)
|
||||||
print(f"✓ PDF 已保存到: {save_path},大小: {pdf_size} bytes")
|
print(f"✓ PDF 已保存到: {save_path},大小: {pdf_size} bytes")
|
||||||
|
|
@ -128,84 +107,89 @@ async def download_from_url_playwright(url: str, save_path: str) -> tuple[bool,
|
||||||
finally:
|
finally:
|
||||||
try:
|
try:
|
||||||
await browser.close()
|
await browser.close()
|
||||||
except:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
async def handle_cloudflare_with_image(page: Page) -> bool:
|
def download_pdf_with_curl_cffi(url: str, save_path: str) -> tuple[bool, str]:
|
||||||
"""
|
"""使用 curl-cffi 伪造 Chrome TLS 指纹下载 PDF,可绕过 Cloudflare JS 挑战"""
|
||||||
使用图像识别方式处理 Cloudflare 验证框
|
try:
|
||||||
支持模板匹配和颜色识别两种方式
|
import curl_cffi.requests as cf
|
||||||
"""
|
resp = cf.get(
|
||||||
# 在尝试之前先等待2秒,让验证框完全加载
|
url,
|
||||||
await page.wait_for_timeout(2000)
|
impersonate="chrome131",
|
||||||
|
headers={
|
||||||
max_retries = 5
|
"Accept": "application/pdf,application/octet-stream,*/*",
|
||||||
|
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
|
||||||
for retry in range(max_retries):
|
"Referer": url,
|
||||||
print(f"图像识别方式尝试第 {retry + 1}/{max_retries} 次")
|
},
|
||||||
|
timeout=30,
|
||||||
try:
|
allow_redirects=True,
|
||||||
# 方式1: 通过模板图像识别(如果有模板文件)
|
)
|
||||||
success = await try_template_matching()
|
if resp.status_code != 200:
|
||||||
if success:
|
return False, f"http_{resp.status_code}"
|
||||||
print(" ✓ 模板匹配方式成功")
|
content = resp.content
|
||||||
await page.wait_for_timeout(5000)
|
if not content.startswith(b"%PDF") or len(content) < 10240:
|
||||||
return True
|
return False, "not_valid_pdf"
|
||||||
|
import os
|
||||||
print(f" 等待后重试... ({retry + 1}/{max_retries})")
|
os.makedirs(os.path.dirname(os.path.abspath(save_path)), exist_ok=True)
|
||||||
await page.wait_for_timeout(2000)
|
with open(save_path, "wb") as f:
|
||||||
|
f.write(content)
|
||||||
except Exception as e:
|
return True, ""
|
||||||
print(f" ✗ 图像处理异常: {e}")
|
except ImportError:
|
||||||
await page.wait_for_timeout(2000)
|
return False, "curl_cffi_not_installed"
|
||||||
|
except Exception as e:
|
||||||
return False
|
return False, str(e)
|
||||||
|
|
||||||
|
|
||||||
async def try_template_matching() -> bool:
|
_CHALLENGE_TITLES = ["just a moment", "cloudflare", "checking your browser", "ddos-guard", "ddos guard", "attention required"]
|
||||||
|
|
||||||
|
|
||||||
|
async def handle_cloudflare(page: Page, timeout: int = 45000) -> bool:
|
||||||
"""
|
"""
|
||||||
通过模板匹配查找并点击验证框
|
处理 bot 检测安全验证(Cloudflare / DDoS-Guard 等):
|
||||||
使用 pyautogui.locateOnScreen 直接在屏幕上定位验证框模板
|
1. 通过页面标题检测是否处于挑战页面
|
||||||
|
2. 尝试点击 Turnstile 复选框(iframe 内,如果有)
|
||||||
|
3. 等待标题变化,表明挑战已自动通过
|
||||||
"""
|
"""
|
||||||
import pyautogui
|
try:
|
||||||
template_paths = [
|
title = await page.title()
|
||||||
'apps/resm/cloudflare_checkbox2.png',
|
except Exception:
|
||||||
]
|
return False
|
||||||
|
|
||||||
# pyautogui 定位的准确度阈值(0.0-1.0,越高越严格)
|
title_l = title.lower()
|
||||||
ACCURACY = 0.4
|
if not any(kw in title_l for kw in _CHALLENGE_TITLES):
|
||||||
|
return True # 无挑战
|
||||||
for template_path in template_paths:
|
|
||||||
if not os.path.exists(template_path):
|
print(f"检测到安全挑战页面(标题: {title!r}),等待 JS 自动通过...")
|
||||||
print(f" 模板文件不存在: {template_path}")
|
|
||||||
continue
|
# 尝试点击 Cloudflare Turnstile 复选框(如果存在)
|
||||||
|
for frame_sel in [
|
||||||
|
'iframe[src*="challenges.cloudflare.com"]',
|
||||||
|
'iframe[title*="cloudflare"]',
|
||||||
|
'iframe[title*="challenge"]',
|
||||||
|
]:
|
||||||
try:
|
try:
|
||||||
print(f" 尝试在屏幕上定位模板: {template_path} (confidence={ACCURACY})")
|
checkbox = page.frame_locator(frame_sel).locator('input[type="checkbox"]')
|
||||||
|
if await checkbox.count() > 0:
|
||||||
# 直接在屏幕上查找模板,使用 confidence 参数
|
await checkbox.click(timeout=3000)
|
||||||
loc = pyautogui.locateOnScreen(template_path, confidence=ACCURACY)
|
print(" ✓ 点击了验证复选框")
|
||||||
|
break
|
||||||
if loc:
|
except Exception:
|
||||||
# loc 是 (left, top, width, height) 或 (x, y, w, h)
|
pass
|
||||||
# pyautogui.center(loc) 返回中心坐标
|
|
||||||
center_x, center_y = pyautogui.center(loc)
|
try:
|
||||||
print(f" 找到验证框位置: ({center_x}, {center_y})")
|
await page.wait_for_function(
|
||||||
print(f" 模板匹配区域: {loc}")
|
"""() => {
|
||||||
pyautogui.click(center_x, center_y, clicks=1, interval=0.1)
|
const t = document.title.toLowerCase();
|
||||||
return True
|
const kws = ['just a moment', 'cloudflare', 'checking your browser',
|
||||||
else:
|
'ddos-guard', 'ddos guard', 'attention required'];
|
||||||
print(f" 未找到模板 (confidence={ACCURACY})")
|
return !kws.some(kw => t.includes(kw));
|
||||||
return False
|
}""",
|
||||||
|
timeout=timeout,
|
||||||
except Exception as e:
|
)
|
||||||
# 捕获所有异常,包括 ImageNotFoundException
|
print(f" ✓ 安全挑战已通过")
|
||||||
error_type = type(e).__name__
|
return True
|
||||||
if "ImageNotFoundException" in error_type:
|
except Exception as e:
|
||||||
print(f" 模板匹配异常: {error_type} - 屏幕上找不到模板,停止尝试")
|
print(f"⚠ 安全挑战处理超时,继续尝试获取 PDF...")
|
||||||
else:
|
return False
|
||||||
print(f" 模板匹配异常: {error_type} - {e}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
return False
|
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,12 @@ from pathlib import Path
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from playwright.async_api import async_playwright, Page, Browser
|
from playwright.async_api import async_playwright, Page, Browser
|
||||||
|
|
||||||
|
# 尝试导入 playwright-stealth
|
||||||
|
try:
|
||||||
|
from playwright_stealth import stealth_async
|
||||||
|
except ImportError:
|
||||||
|
stealth_async = None
|
||||||
|
|
||||||
# 初始化日志
|
# 初始化日志
|
||||||
Path("log").mkdir(parents=True, exist_ok=True)
|
Path("log").mkdir(parents=True, exist_ok=True)
|
||||||
LOG_PATH = Path("log") / "scihub_downloader.log"
|
LOG_PATH = Path("log") / "scihub_downloader.log"
|
||||||
|
|
@ -15,6 +21,23 @@ logging.basicConfig(
|
||||||
)
|
)
|
||||||
logger = logging.getLogger("scihub")
|
logger = logging.getLogger("scihub")
|
||||||
|
|
||||||
|
# 隐藏自动化特征的浏览器启动参数
|
||||||
|
_STEALTH_ARGS = [
|
||||||
|
"--disable-blink-features=AutomationControlled",
|
||||||
|
"--no-first-run",
|
||||||
|
"--no-default-browser-check",
|
||||||
|
"--disable-infobars",
|
||||||
|
"--disable-extensions",
|
||||||
|
"--disable-notifications",
|
||||||
|
"--disable-popup-blocking",
|
||||||
|
]
|
||||||
|
|
||||||
|
_USER_AGENT = (
|
||||||
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
||||||
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
||||||
|
"Chrome/131.0.0.0 Safari/537.36"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def _wait_for_user_to_solve_challenge(page: Page):
|
async def _wait_for_user_to_solve_challenge(page: Page):
|
||||||
logger.info("请在浏览器中完成验证(如果需要),完成后按回车继续...")
|
logger.info("请在浏览器中完成验证(如果需要),完成后按回车继续...")
|
||||||
|
|
@ -40,7 +63,7 @@ async def _try_click_robot_button(page: Page, headless: bool) -> bool:
|
||||||
await page.wait_for_timeout(1500)
|
await page.wait_for_timeout(1500)
|
||||||
# 等待可能的导航/重定向
|
# 等待可能的导航/重定向
|
||||||
try:
|
try:
|
||||||
await page.wait_for_navigation(timeout=8000)
|
await page.wait_for_load_state("domcontentloaded", timeout=8000)
|
||||||
logger.info("点击验证后检测到导航完成")
|
logger.info("点击验证后检测到导航完成")
|
||||||
except Exception:
|
except Exception:
|
||||||
await page.wait_for_timeout(500)
|
await page.wait_for_timeout(500)
|
||||||
|
|
@ -57,7 +80,6 @@ async def _try_click_robot_button(page: Page, headless: bool) -> bool:
|
||||||
|
|
||||||
async def _click_no_button(page: Page) -> bool:
|
async def _click_no_button(page: Page) -> bool:
|
||||||
"""尝试点击 'No' 按钮(可选步骤,如果找不到则直接继续)"""
|
"""尝试点击 'No' 按钮(可选步骤,如果找不到则直接继续)"""
|
||||||
# 精确匹配 <div class="answer" onclick="check()">No</div>
|
|
||||||
selectors = ["div.answer[onclick=\"check()\"]", "div.answer:has-text('No')", "text=No"]
|
selectors = ["div.answer[onclick=\"check()\"]", "div.answer:has-text('No')", "text=No"]
|
||||||
for sel in selectors:
|
for sel in selectors:
|
||||||
try:
|
try:
|
||||||
|
|
@ -73,13 +95,11 @@ async def _click_no_button(page: Page) -> bool:
|
||||||
logger.warning(f"点击 'No' 失败: {click_err}")
|
logger.warning(f"点击 'No' 失败: {click_err}")
|
||||||
pass
|
pass
|
||||||
await page.wait_for_timeout(1200)
|
await page.wait_for_timeout(1200)
|
||||||
# 点击 No 后也可能触发重定向
|
|
||||||
try:
|
try:
|
||||||
await page.wait_for_navigation(timeout=8000)
|
await page.wait_for_load_state("domcontentloaded", timeout=8000)
|
||||||
logger.info("点击 No 后检测到导航完成")
|
logger.info("点击 No 后检测到导航完成")
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
# 保存结果用于排查
|
|
||||||
try:
|
try:
|
||||||
await page.screenshot(path="after_click_no.png", full_page=True)
|
await page.screenshot(path="after_click_no.png", full_page=True)
|
||||||
html = await page.content()
|
html = await page.content()
|
||||||
|
|
@ -95,90 +115,153 @@ async def _click_no_button(page: Page) -> bool:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
# 各种 bot 检测页面的标题关键词
|
||||||
|
_CHALLENGE_TITLES = ["just a moment", "cloudflare", "checking your browser", "ddos-guard", "attention required", "ddos guard"]
|
||||||
|
|
||||||
|
|
||||||
|
async def _wait_challenge_clear(page: Page, timeout: int = 45000) -> bool:
|
||||||
|
"""等待 bot 检测(Cloudflare/DDoS-Guard 等)挑战页面自动通过"""
|
||||||
|
try:
|
||||||
|
title = await page.title()
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
|
title_l = title.lower()
|
||||||
|
if not any(kw in title_l for kw in _CHALLENGE_TITLES):
|
||||||
|
return True # 无挑战,直接通过
|
||||||
|
|
||||||
|
logger.info(f"检测到安全挑战页面(标题: {title!r}),等待 JS 自动通过...")
|
||||||
|
|
||||||
|
# 尝试点击 Cloudflare Turnstile 复选框(如果有)
|
||||||
|
for frame_sel in [
|
||||||
|
'iframe[src*="challenges.cloudflare.com"]',
|
||||||
|
'iframe[title*="cloudflare"]',
|
||||||
|
'iframe[title*="challenge"]',
|
||||||
|
]:
|
||||||
|
try:
|
||||||
|
checkbox = page.frame_locator(frame_sel).locator('input[type="checkbox"]')
|
||||||
|
if await checkbox.count() > 0:
|
||||||
|
await checkbox.click(timeout=3000)
|
||||||
|
logger.info("点击了验证复选框")
|
||||||
|
break
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
try:
|
||||||
|
await page.wait_for_function(
|
||||||
|
"""() => {
|
||||||
|
const t = document.title.toLowerCase();
|
||||||
|
const keywords = ['just a moment', 'cloudflare', 'checking your browser',
|
||||||
|
'ddos-guard', 'ddos guard', 'attention required'];
|
||||||
|
return !keywords.some(kw => t.includes(kw));
|
||||||
|
}""",
|
||||||
|
timeout=timeout,
|
||||||
|
)
|
||||||
|
logger.info(f"挑战已通过,当前标题: {await page.title()!r}")
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"等待挑战超时({timeout}ms): {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
async def download_pdf_with_playwright(url: str, output: str = "paper.pdf", headless: bool = False) -> Optional[bytes]:
|
async def download_pdf_with_playwright(url: str, output: str = "paper.pdf", headless: bool = False) -> Optional[bytes]:
|
||||||
async with async_playwright() as p:
|
async with async_playwright() as p:
|
||||||
browser: Browser = await p.chromium.launch(headless=headless)
|
browser: Browser = await p.chromium.launch(
|
||||||
context = await browser.new_context(viewport={"width": 1920, "height": 1080})
|
headless=headless,
|
||||||
|
args=_STEALTH_ARGS,
|
||||||
|
)
|
||||||
|
context = await browser.new_context(
|
||||||
|
viewport={"width": 1920, "height": 1080},
|
||||||
|
user_agent=_USER_AGENT,
|
||||||
|
locale="zh-CN",
|
||||||
|
timezone_id="Asia/Shanghai",
|
||||||
|
)
|
||||||
page = await context.new_page()
|
page = await context.new_page()
|
||||||
|
|
||||||
|
# 应用 stealth 模式
|
||||||
|
if stealth_async:
|
||||||
|
await stealth_async(page)
|
||||||
|
else:
|
||||||
|
await page.add_init_script(
|
||||||
|
"Object.defineProperty(navigator,'webdriver',{get:()=>false});"
|
||||||
|
)
|
||||||
|
|
||||||
|
pdf_url: Optional[str] = None
|
||||||
pdf_content: Optional[bytes] = None
|
pdf_content: Optional[bytes] = None
|
||||||
|
|
||||||
async def on_response(response):
|
async def on_response(response):
|
||||||
nonlocal pdf_content
|
nonlocal pdf_url
|
||||||
try:
|
ct = response.headers.get("content-type", "")
|
||||||
ct = response.headers.get("content-type", "")
|
resp_url = response.url
|
||||||
if "application/pdf" in ct:
|
# 忽略 chrome-extension 等内部 URL
|
||||||
logger.info(f"捕获到 PDF 响应: {response.url}")
|
if "application/pdf" in ct and resp_url.startswith("http"):
|
||||||
pdf_content = await response.body()
|
logger.info(f"检测到 PDF 响应 URL: {resp_url}")
|
||||||
except Exception:
|
pdf_url = resp_url
|
||||||
logger.exception("处理响应时出错")
|
|
||||||
|
|
||||||
page.on("response", on_response)
|
page.on("response", on_response)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
logger.info(f"打开: {url}")
|
logger.info(f"打开: {url}")
|
||||||
await page.goto(url, wait_until="networkidle")
|
try:
|
||||||
|
await page.goto(url, wait_until="domcontentloaded", timeout=30000)
|
||||||
|
except Exception:
|
||||||
|
logger.info("页面加载超时(可能正在处理安全验证),继续等待...")
|
||||||
|
await page.wait_for_timeout(3000)
|
||||||
|
|
||||||
|
# 等待 bot 检测挑战完成(DDoS-Guard / Cloudflare 等)
|
||||||
|
await _wait_challenge_clear(page)
|
||||||
await page.wait_for_timeout(1000)
|
await page.wait_for_timeout(1000)
|
||||||
|
|
||||||
# 尝试点击验证 & No
|
# 尝试点击验证 & No
|
||||||
await _try_click_robot_button(page, headless)
|
await _try_click_robot_button(page, headless)
|
||||||
await _click_no_button(page)
|
await _click_no_button(page)
|
||||||
|
|
||||||
# 点击后充分等待以让页面加载和触发PDF响应
|
# 等待页面加载并检测 PDF 响应 URL
|
||||||
logger.info("等待页面加载和PDF响应...")
|
logger.info("等待 PDF 响应...")
|
||||||
await page.wait_for_timeout(3000)
|
await page.wait_for_timeout(3000)
|
||||||
|
|
||||||
# 尝试主动等待PDF响应(点击后可能会自动加载或重定向触发PDF请求)
|
if not pdf_url:
|
||||||
if not pdf_content:
|
|
||||||
try:
|
try:
|
||||||
await page.wait_for_response(
|
resp = await page.wait_for_response(
|
||||||
lambda r: "application/pdf" in r.headers.get("content-type", ""),
|
lambda r: "application/pdf" in r.headers.get("content-type", "")
|
||||||
timeout=5000,
|
and r.url.startswith("http"),
|
||||||
|
timeout=8000,
|
||||||
)
|
)
|
||||||
logger.info("捕获到主动等待的 PDF 响应")
|
pdf_url = resp.url
|
||||||
|
logger.info(f"主动等待到 PDF URL: {pdf_url}")
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.info("主动等待 PDF 响应超时,继续其他方式")
|
logger.info("等待 PDF 响应超时")
|
||||||
|
|
||||||
# 尝试通过页面下载按钮
|
# 直接导航到 PDF URL 下载完整内容
|
||||||
# download_selectors = ["a[href*='.pdf']", "button:has-text('Download')", "a:has-text('PDF')"]
|
if pdf_url:
|
||||||
# for sel in download_selectors:
|
logger.info(f"直接请求 PDF: {pdf_url}")
|
||||||
# try:
|
try:
|
||||||
# if await page.locator(sel).count() > 0:
|
pdf_resp = await page.goto(pdf_url, wait_until="networkidle", timeout=30000)
|
||||||
# logger.info(f"尝试点击下载元素: {sel}")
|
if pdf_resp and pdf_resp.status == 200:
|
||||||
# async with page.expect_download() as di:
|
pdf_content = await pdf_resp.body()
|
||||||
# await page.click(sel)
|
logger.info(f"下载成功,大小: {len(pdf_content)} bytes")
|
||||||
# download = await di.value
|
except Exception as e:
|
||||||
# await download.save_as(output)
|
logger.warning(f"直接导航下载失败: {e},尝试 fetch")
|
||||||
# logger.info(f"已保存 PDF: {output}")
|
try:
|
||||||
# with open(output, "rb") as f:
|
pdf_content = await page.evaluate(f"""
|
||||||
# pdf_content = f.read()
|
async () => {{
|
||||||
# break
|
const r = await fetch({pdf_url!r});
|
||||||
# except Exception:
|
const buf = await r.arrayBuffer();
|
||||||
# logger.exception(f"通过选择器下载失败: {sel}")
|
return Array.from(new Uint8Array(buf));
|
||||||
|
}}
|
||||||
|
""")
|
||||||
|
if pdf_content:
|
||||||
|
pdf_content = bytes(pdf_content)
|
||||||
|
except Exception as e2:
|
||||||
|
logger.warning(f"fetch 下载也失败: {e2}")
|
||||||
|
|
||||||
# 回退:查找页面内 PDF 链接并直接访问
|
if pdf_content and len(pdf_content) > 10240:
|
||||||
# if not pdf_content:
|
|
||||||
# logger.info("尝试查找页面内 PDF 链接")
|
|
||||||
# try:
|
|
||||||
# links = await page.eval_on_selector_all("a[href]", "els => els.map(e=>e.href)")
|
|
||||||
# candidates = [u for u in links if ".pdf" in u]
|
|
||||||
# if candidates:
|
|
||||||
# pdf_url = candidates[0]
|
|
||||||
# logger.info(f"直接访问 PDF 链接: {pdf_url}")
|
|
||||||
# resp = await page.goto(pdf_url, wait_until="networkidle")
|
|
||||||
# if resp and resp.status == 200:
|
|
||||||
# pdf_content = await resp.body()
|
|
||||||
# with open(output, "wb") as f:
|
|
||||||
# f.write(pdf_content)
|
|
||||||
# logger.info(f"已保存 PDF: {output}")
|
|
||||||
# except Exception:
|
|
||||||
# logger.exception("直接访问 PDF 链接失败")
|
|
||||||
|
|
||||||
if pdf_content:
|
|
||||||
logger.info(f"下载成功,大小: {len(pdf_content)} bytes")
|
logger.info(f"下载成功,大小: {len(pdf_content)} bytes")
|
||||||
return pdf_content
|
return pdf_content
|
||||||
else:
|
else:
|
||||||
logger.warning("未能获取 PDF,已保存页面快照供排查")
|
if pdf_content:
|
||||||
|
logger.warning(f"PDF 文件过小({len(pdf_content)} bytes),可能是错误页")
|
||||||
|
logger.warning("未能获取有效 PDF,已保存页面快照供排查")
|
||||||
try:
|
try:
|
||||||
await page.screenshot(path="scihub_screenshot.png", full_page=True)
|
await page.screenshot(path="scihub_screenshot.png", full_page=True)
|
||||||
html = await page.content()
|
html = await page.content()
|
||||||
|
|
@ -198,15 +281,24 @@ async def download_pdf_with_playwright(url: str, output: str = "paper.pdf", head
|
||||||
logger.exception("关闭 browser 失败")
|
logger.exception("关闭 browser 失败")
|
||||||
|
|
||||||
|
|
||||||
|
# 按优先级排列的 sci-hub 域名(国内相对可访问)
|
||||||
|
_SCIHUB_DOMAINS = [
|
||||||
|
"sci-hub.ren",
|
||||||
|
"sci-hub.ee",
|
||||||
|
"sci-hub.st",
|
||||||
|
"sci-hub.se",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
def download_paper_by_doi(doi: str, output: Optional[str] = None, headless: bool = True) -> tuple[bool, str]:
|
def download_paper_by_doi(doi: str, output: Optional[str] = None, headless: bool = True) -> tuple[bool, str]:
|
||||||
"""
|
"""
|
||||||
通过 DOI 下载论文 PDF(供 task 调用)
|
通过 DOI 下载论文 PDF(供 task 调用)
|
||||||
|
|
||||||
参数:
|
参数:
|
||||||
doi: DOI 字符串,例如 "10.1016/j.conbuildmat.2017.10.091"
|
doi: DOI 字符串,例如 "10.1016/j.conbuildmat.2017.10.091"
|
||||||
output: 输出文件路径(默认基于 DOI 生成,格式:10.1016_j.xxx.pdf)
|
output: 输出文件路径(默认基于 DOI 生成,格式:10.1016_j.xxx.pdf)
|
||||||
headless: 是否无头模式(默认 True)
|
headless: 是否无头模式(默认 True)
|
||||||
|
|
||||||
返回:
|
返回:
|
||||||
(True, "文件路径") 如果成功
|
(True, "文件路径") 如果成功
|
||||||
(False, "scihub_error_*: 错误详情") 如果失败,错误码前缀包括:
|
(False, "scihub_error_*: 错误详情") 如果失败,错误码前缀包括:
|
||||||
|
|
@ -222,33 +314,35 @@ def download_paper_by_doi(doi: str, output: Optional[str] = None, headless: bool
|
||||||
err = "scihub_error_empty_doi: DOI 为空"
|
err = "scihub_error_empty_doi: DOI 为空"
|
||||||
logger.error(err)
|
logger.error(err)
|
||||||
return False, err
|
return False, err
|
||||||
|
|
||||||
url = f"https://sci-hub.st/{doi}"
|
|
||||||
output_path = output or f"{doi.replace('/', '_')}.pdf"
|
output_path = output or f"{doi.replace('/', '_')}.pdf"
|
||||||
|
|
||||||
logger.info(f"开始下载 DOI: {doi}")
|
for domain in _SCIHUB_DOMAINS:
|
||||||
logger.info(f"目标 URL: {url}")
|
url = f"https://{domain}/{doi}"
|
||||||
logger.info(f"输出文件: {output_path}")
|
logger.info(f"尝试域名: {url}")
|
||||||
|
try:
|
||||||
try:
|
pdf_content = asyncio.run(download_pdf_with_playwright(url, output=output_path, headless=headless))
|
||||||
pdf_content = asyncio.run(download_pdf_with_playwright(url, output=output_path, headless=headless))
|
except asyncio.TimeoutError:
|
||||||
except asyncio.TimeoutError as e:
|
logger.warning(f"{domain} 超时,尝试下一个域名")
|
||||||
err = f"scihub_error_timeout: 网页加载超时(可能网络慢或网站不可用)"
|
continue
|
||||||
logger.error(err)
|
except Exception as e:
|
||||||
return False, err
|
logger.warning(f"{domain} 出错: {e},尝试下一个域名")
|
||||||
except Exception as e:
|
continue
|
||||||
err = f"scihub_error_load_failed: 加载页面时出错 - {str(e)}"
|
|
||||||
logger.exception(err)
|
if pdf_content:
|
||||||
return False, err
|
# 写入文件
|
||||||
|
import os
|
||||||
if pdf_content:
|
os.makedirs(os.path.dirname(os.path.abspath(output_path)), exist_ok=True)
|
||||||
logger.info(f"✓ 成功下载: {output_path} ({len(pdf_content)} bytes)")
|
with open(output_path, "wb") as f:
|
||||||
return True, output_path
|
f.write(pdf_content)
|
||||||
else:
|
logger.info(f"✓ 成功下载({domain}): {output_path} ({len(pdf_content)} bytes)")
|
||||||
# PDF 内容为空,说明所有获取方式都失败
|
return True, output_path
|
||||||
err = f"scihub_error_pdf_not_found: 无法从 Sci-Hub 获取 PDF(可能 DOI 不存在、网站不可用、或无权限访问)"
|
else:
|
||||||
logger.error(err)
|
logger.warning(f"{domain} 未获取到 PDF,尝试下一个域名")
|
||||||
return False, err
|
|
||||||
|
err = "scihub_error_pdf_not_found: 所有域名均无法获取 PDF"
|
||||||
|
logger.error(err)
|
||||||
|
return False, err
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
err = f"scihub_error_exception: 执行下载时发生异常 - {str(e)}"
|
err = f"scihub_error_exception: 执行下载时发生异常 - {str(e)}"
|
||||||
logger.exception(err)
|
logger.exception(err)
|
||||||
|
|
@ -274,4 +368,3 @@ if __name__ == "__main__":
|
||||||
else:
|
else:
|
||||||
logger.error(f"失败: {msg}")
|
logger.error(f"失败: {msg}")
|
||||||
raise SystemExit(1)
|
raise SystemExit(1)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -413,18 +413,21 @@ def download_pdf(paper_id):
|
||||||
paper.fetch_end()
|
paper.fetch_end()
|
||||||
|
|
||||||
|
|
||||||
def save_pdf_from_oa_url(paper:Paper):
|
def save_pdf_from_oa_url(paper: Paper):
|
||||||
|
from .d_oaurl import download_pdf_with_curl_cffi, download_from_url_playwright
|
||||||
|
|
||||||
|
# 策略1: 直接请求
|
||||||
try:
|
try:
|
||||||
headers = get_random_headers()
|
headers = get_random_headers()
|
||||||
res = requests.get(paper.oa_url, headers=headers, timeout=(3, 15))
|
res = requests.get(paper.oa_url, headers=headers, timeout=(3, 15))
|
||||||
except requests.RequestException as e:
|
except requests.RequestException as e:
|
||||||
paper.save_fail_reason("oa_url_request_error")
|
paper.save_fail_reason("oa_url_request_error")
|
||||||
return f"oa_url_request_error: {str(e)}"
|
return f"oa_url_request_error: {str(e)}"
|
||||||
|
|
||||||
if res.status_code in [200, 201, 202]:
|
if res.status_code in [200, 201, 202]:
|
||||||
# 检查是否是PDF文件:检查魔数 %PDF 或 content-type
|
# 检查是否是PDF文件:检查魔数 %PDF 或 content-type
|
||||||
is_pdf = (
|
is_pdf = (
|
||||||
res.content.startswith(b'%PDF') or
|
res.content.startswith(b'%PDF') or
|
||||||
res.headers.get("content-type", "").startswith("application/pdf") or
|
res.headers.get("content-type", "").startswith("application/pdf") or
|
||||||
res.headers.get("content-type", "") == "application/octet-stream"
|
res.headers.get("content-type", "") == "application/octet-stream"
|
||||||
)
|
)
|
||||||
|
|
@ -434,19 +437,26 @@ def save_pdf_from_oa_url(paper:Paper):
|
||||||
else:
|
else:
|
||||||
paper.save_fail_reason("oa_url_not_pdf")
|
paper.save_fail_reason("oa_url_not_pdf")
|
||||||
return "oa_url_not_pdf"
|
return "oa_url_not_pdf"
|
||||||
elif res.status_code == 403:
|
|
||||||
paper.save_fail_reason("oa_url_need_play")
|
# 策略2: curl-cffi(处理 Cloudflare JS 挑战)
|
||||||
# paper_path = paper.init_paper_path("pdf")
|
paper_path = paper.init_paper_path("pdf")
|
||||||
# is_ok, err_msg = run_async(download_from_url_playwright(paper.oa_url, paper_path))
|
is_ok, err_msg = download_pdf_with_curl_cffi(paper.oa_url, paper_path)
|
||||||
# if is_ok:
|
if is_ok:
|
||||||
# paper.has_fulltext = True
|
paper.has_fulltext = True
|
||||||
# paper.has_fulltext_pdf = True
|
paper.has_fulltext_pdf = True
|
||||||
# paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "update_time"])
|
paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "update_time"])
|
||||||
# return "success"
|
return "success"
|
||||||
# else:
|
|
||||||
# paper.save_fail_reason(f"oa_url_pdf_play_error: {err_msg}")
|
# 策略3: Playwright(最终回退)
|
||||||
# return f"oa_url_pdf_play_error: {err_msg}"
|
is_ok, err_msg = run_async(download_from_url_playwright(paper.oa_url, paper_path))
|
||||||
return f"oa_url_pdf_oerror: {res.status_code}"
|
if is_ok:
|
||||||
|
paper.has_fulltext = True
|
||||||
|
paper.has_fulltext_pdf = True
|
||||||
|
paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "update_time"])
|
||||||
|
return "success"
|
||||||
|
|
||||||
|
paper.save_fail_reason(f"oa_url_all_methods_failed: {err_msg}")
|
||||||
|
return f"oa_url_all_methods_failed: {err_msg}"
|
||||||
|
|
||||||
def save_pdf_from_openalex(paper:Paper):
|
def save_pdf_from_openalex(paper:Paper):
|
||||||
if cache.get("openalex_api_exceed"):
|
if cache.get("openalex_api_exceed"):
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue