feat: 添加pyautogui调用

This commit is contained in:
caoqianming 2026-02-09 15:17:02 +08:00
parent 9efc412f7d
commit 94f269626d
4 changed files with 236 additions and 2 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 492 B

196
apps/resm/d_oaurl.py Normal file
View File

@ -0,0 +1,196 @@
from playwright.async_api import async_playwright, Page, Browser
from typing import Optional
import asyncio
import sys
import os
import pyautogui
import time
from PIL import Image
import io
# 尝试导入 OpenCV 用于更好的模板匹配
try:
import cv2
HAS_CV2 = True
except ImportError:
HAS_CV2 = False
# 尝试导入 playwright-stealth如果没有安装则忽略
try:
from playwright_stealth import stealth_async
except ImportError:
stealth_async = None
async def download_from_url_playwright(url: str, save_path: str) -> tuple[bool, str]:
async with async_playwright() as p:
try:
browser = await p.chromium.launch(headless=False)
context = await browser.new_context(viewport={"width": 1920, "height": 1080})
page = await context.new_page()
page.set_default_timeout(60000)
# 应用 stealth 模式绕过反爬虫检测
if stealth_async:
await stealth_async(page)
else:
# 如果没有 stealth手动设置一些反爬虫对抗
await page.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {
get: () => false,
});
""")
pdf_content: Optional[bytes] = None
async def on_response(response):
nonlocal pdf_content
if "application/pdf" in response.headers.get("content-type", ""):
pdf_content = await response.body()
page.on("response", on_response)
# 先用较宽松的等待条件加载页面,避免卡在 Cloudflare
try:
await page.goto(url, wait_until="domcontentloaded", timeout=5000)
except:
print("⚠ 页面加载超时,但继续处理...")
await page.wait_for_timeout(5000)
# 处理 Cloudflare 校验
print("开始处理 Cloudflare 校验...")
await page.wait_for_timeout(3000)
# Cloudflare 可能需要连续点击多次最多尝试5次
max_cloudflare_attempts = 5
for attempt in range(max_cloudflare_attempts):
# 检查是否已获取到 PDF如果已获取则无需继续验证
if pdf_content:
print("✓ 已获取到 PDF 内容,停止验证框处理")
break
print(f"\nCloudflare 验证尝试 {attempt + 1}/{max_cloudflare_attempts}")
success = await handle_cloudflare_with_image(page)
if success:
print("✓ 成功处理一次验证框")
# 等待新验证框出现或页面刷新
await page.wait_for_timeout(2000)
# 检查是否还有验证框,如果没有则说明验证完成
# 这里简单地继续尝试,直到达到最大次数
if attempt < max_cloudflare_attempts - 1:
print(" 检查是否还有验证框...")
await page.wait_for_timeout(1000)
else:
print("⚠ 未找到验证框,可能已完成验证或验证框已消失")
break
print("✓ Cloudflare 验证处理完成")
await page.wait_for_timeout(2000)
# 如果尚未获取 PDF继续等待响应
if not pdf_content:
print("等待 PDF 响应...")
try:
await page.wait_for_response(
lambda response: "application/pdf" in response.headers.get("content-type", ""),
timeout=15000
)
except Exception as e:
print(f"⚠ 等待 PDF 响应超时: {e}")
if pdf_content:
with open(save_path, "wb") as f:
f.write(pdf_content)
await browser.close()
return True, ""
else:
await browser.close()
return False, "未能获取 PDF 内容"
except Exception as e:
print(f"异常: {e}")
return False, str(e)
finally:
try:
await browser.close()
except:
pass
async def handle_cloudflare_with_image(page: Page) -> bool:
"""
使用图像识别方式处理 Cloudflare 验证框
支持模板匹配和颜色识别两种方式
"""
# 在尝试之前先等待2秒让验证框完全加载
await page.wait_for_timeout(2000)
max_retries = 5
for retry in range(max_retries):
print(f"图像识别方式尝试第 {retry + 1}/{max_retries}")
try:
# 方式1: 通过模板图像识别(如果有模板文件)
success = await try_template_matching()
if success:
print(" ✓ 模板匹配方式成功")
await page.wait_for_timeout(5000)
return True
print(f" 等待后重试... ({retry + 1}/{max_retries})")
await page.wait_for_timeout(2000)
except Exception as e:
print(f" ✗ 图像处理异常: {e}")
await page.wait_for_timeout(2000)
return False
async def try_template_matching() -> bool:
"""
通过模板匹配查找并点击验证框
使用 pyautogui.locateOnScreen 直接在屏幕上定位验证框模板
"""
template_paths = [
'apps/resm/cloudflare_checkbox2.png',
]
# pyautogui 定位的准确度阈值0.0-1.0,越高越严格)
ACCURACY = 0.6
for template_path in template_paths:
if not os.path.exists(template_path):
print(f" 模板文件不存在: {template_path}")
continue
try:
print(f" 尝试在屏幕上定位模板: {template_path} (confidence={ACCURACY})")
# 直接在屏幕上查找模板,使用 confidence 参数
loc = pyautogui.locateOnScreen(template_path, confidence=ACCURACY)
if loc:
# loc 是 (left, top, width, height) 或 (x, y, w, h)
# pyautogui.center(loc) 返回中心坐标
center_x, center_y = pyautogui.center(loc)
print(f" 找到验证框位置: ({center_x}, {center_y})")
print(f" 模板匹配区域: {loc}")
pyautogui.click(center_x, center_y, clicks=1, interval=0.1)
return True
else:
print(f" 未找到模板 (confidence={ACCURACY})")
return False
except Exception as e:
# 捕获所有异常,包括 ImageNotFoundException
error_type = type(e).__name__
if "ImageNotFoundException" in error_type:
print(f" 模板匹配异常: {error_type} - 屏幕上找不到模板,停止尝试")
else:
print(f" 模板匹配异常: {error_type} - {e}")
return False
return False

View File

@ -11,7 +11,10 @@ from lxml import etree
from celery import current_app from celery import current_app
from datetime import datetime, timedelta from datetime import datetime, timedelta
import random import random
from django.db.models import Q from .d_oaurl import download_from_url_playwright
import asyncio
import sys
import os
# config.email = "caoqianming@foxmail.com" # config.email = "caoqianming@foxmail.com"
config.email = "caoqianming@ctc.ac.cn" config.email = "caoqianming@ctc.ac.cn"
@ -28,6 +31,25 @@ ELSEVIER_HEADERS = {
"X-ELS-APIKey": ELSEVIER_APIKEY, "X-ELS-APIKey": ELSEVIER_APIKEY,
} }
def run_async(coro):
"""
跨平台运行异步任务解决 Windows asyncio subprocess 问题
"""
if sys.platform == 'win32':
# Windows 上需要使用 ProactorEventLoop 来支持 subprocess
policy = asyncio.WindowsProactorEventLoopPolicy()
asyncio.set_event_loop_policy(policy)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(coro)
finally:
loop.close()
else:
# Unix/Linux/Mac 上使用默认方式
return asyncio.run(coro)
@shared_task(base=CustomTask) @shared_task(base=CustomTask)
def get_paper_meta_from_openalex(publication_year:int, keywords:str="", search:str="", end_year:int=None): def get_paper_meta_from_openalex(publication_year:int, keywords:str="", search:str="", end_year:int=None):
cache_key = f"openalex_cursor_{publication_year}_{keywords}{search}" cache_key = f"openalex_cursor_{publication_year}_{keywords}{search}"
@ -368,7 +390,18 @@ def save_pdf_from_oa_url(paper:Paper):
else: else:
paper.save_fail_reason("oa_url_not_pdf") paper.save_fail_reason("oa_url_not_pdf")
return "oa_url_not_pdf" return "oa_url_not_pdf"
return f"oa_url_pdf_error: {res.status_code}" elif res.status_code == 403:
paper_path = paper.init_paper_path("pdf")
is_ok, err_msg = run_async(download_from_url_playwright(paper.oa_url, paper_path))
if is_ok:
paper.has_fulltext = True
paper.has_fulltext_pdf = True
paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "update_time"])
return "success"
else:
paper.save_fail_reason(f"oa_url_pdf_play_error: {err_msg}")
return f"oa_url_pdf_play_error: {err_msg}"
return f"oa_url_pdf_oerror: {res.status_code}"
def save_pdf_from_openalex(paper:Paper): def save_pdf_from_openalex(paper:Paper):
if cache.get("openalex_api_exceed"): if cache.get("openalex_api_exceed"):

View File

@ -19,3 +19,8 @@ xlwt==1.3.0
openpyxl==3.1.5 openpyxl==3.1.5
cron-descriptor==1.2.35 cron-descriptor==1.2.35
docxtpl==0.16.7 docxtpl==0.16.7
playwright==1.58.0
playwright-stealth==2.0.1
pyautogui==0.9.54
pillow>=10.0.0
opencv-python>=4.8.0