212 lines
8.1 KiB
Python
212 lines
8.1 KiB
Python
from playwright.async_api import async_playwright, Page, Browser
|
||
from typing import Optional
|
||
import asyncio
|
||
import sys
|
||
import os
|
||
import time
|
||
from PIL import Image
|
||
import io
|
||
|
||
# 尝试导入 OpenCV 用于更好的模板匹配
|
||
try:
|
||
import cv2
|
||
HAS_CV2 = True
|
||
except ImportError:
|
||
HAS_CV2 = False
|
||
|
||
# 尝试导入 playwright-stealth,如果没有安装则忽略
|
||
try:
|
||
from playwright_stealth import stealth_async
|
||
except ImportError:
|
||
stealth_async = None
|
||
|
||
|
||
async def download_from_url_playwright(url: str, save_path: str) -> tuple[bool, str]:
|
||
async with async_playwright() as p:
|
||
try:
|
||
browser = await p.chromium.launch(headless=False)
|
||
context = await browser.new_context(viewport={"width": 1920, "height": 1080})
|
||
page = await context.new_page()
|
||
page.set_default_timeout(60000)
|
||
|
||
# 应用 stealth 模式绕过反爬虫检测
|
||
if stealth_async:
|
||
await stealth_async(page)
|
||
else:
|
||
# 如果没有 stealth,手动设置一些反爬虫对抗
|
||
await page.add_init_script("""
|
||
Object.defineProperty(navigator, 'webdriver', {
|
||
get: () => false,
|
||
});
|
||
""")
|
||
|
||
pdf_content: Optional[bytes] = None
|
||
|
||
async def on_response(response):
|
||
nonlocal pdf_content
|
||
if "application/pdf" in response.headers.get("content-type", ""):
|
||
try:
|
||
# 确保完全读取响应体
|
||
pdf_content = await response.body()
|
||
print(f"✓ 成功捕获 PDF,大小: {len(pdf_content)} bytes")
|
||
except Exception as e:
|
||
print(f"⚠ 读取 PDF 响应体失败: {e}")
|
||
|
||
page.on("response", on_response)
|
||
|
||
# 先用较宽松的等待条件加载页面,避免卡在 Cloudflare
|
||
try:
|
||
await page.goto(url, wait_until="domcontentloaded", timeout=5000)
|
||
except:
|
||
print("⚠ 页面加载超时,但继续处理...")
|
||
await page.wait_for_timeout(5000)
|
||
|
||
# 处理 Cloudflare 校验
|
||
print("开始处理 Cloudflare 校验...")
|
||
await page.wait_for_timeout(3000)
|
||
|
||
# Cloudflare 可能需要连续点击多次,最多尝试5次
|
||
max_cloudflare_attempts = 5
|
||
|
||
for attempt in range(max_cloudflare_attempts):
|
||
# 检查是否已获取到 PDF,如果已获取则无需继续验证
|
||
if pdf_content:
|
||
print("✓ 已获取到 PDF 内容,停止验证框处理")
|
||
break
|
||
|
||
print(f"\nCloudflare 验证尝试 {attempt + 1}/{max_cloudflare_attempts}")
|
||
success = await handle_cloudflare_with_image(page)
|
||
|
||
if success:
|
||
print("✓ 成功处理一次验证框")
|
||
# 等待新验证框出现或页面刷新
|
||
await page.wait_for_timeout(2000)
|
||
# 检查是否还有验证框,如果没有则说明验证完成
|
||
# 这里简单地继续尝试,直到达到最大次数
|
||
if attempt < max_cloudflare_attempts - 1:
|
||
print(" 检查是否还有验证框...")
|
||
await page.wait_for_timeout(1000)
|
||
else:
|
||
print("⚠ 未找到验证框,可能已完成验证或验证框已消失")
|
||
break
|
||
|
||
print("✓ Cloudflare 验证处理完成")
|
||
await page.wait_for_timeout(2000)
|
||
|
||
# 如果尚未获取 PDF,继续等待响应
|
||
if not pdf_content:
|
||
print("等待 PDF 响应...")
|
||
try:
|
||
response = await page.wait_for_response(
|
||
lambda response: "application/pdf" in response.headers.get("content-type", ""),
|
||
timeout=15000
|
||
)
|
||
# 确保响应体完全加载
|
||
pdf_content = await response.body()
|
||
print(f"✓ 通过 wait_for_response 获取 PDF,大小: {len(pdf_content)} bytes")
|
||
except Exception as e:
|
||
print(f"⚠ 等待 PDF 响应超时: {e}")
|
||
|
||
if pdf_content:
|
||
# 验证文件大小(PDF 通常大于 10KB)
|
||
pdf_size = len(pdf_content)
|
||
if pdf_size < 10240:
|
||
await browser.close()
|
||
return False, f"PDF 文件过小: {pdf_size} bytes,可能下载不完整"
|
||
|
||
with open(save_path, "wb") as f:
|
||
f.write(pdf_content)
|
||
print(f"✓ PDF 已保存到: {save_path},大小: {pdf_size} bytes")
|
||
await browser.close()
|
||
return True, ""
|
||
else:
|
||
await browser.close()
|
||
return False, "未能获取 PDF 内容"
|
||
except Exception as e:
|
||
print(f"异常: {e}")
|
||
return False, str(e)
|
||
finally:
|
||
try:
|
||
await browser.close()
|
||
except:
|
||
pass
|
||
|
||
|
||
async def handle_cloudflare_with_image(page: Page) -> bool:
|
||
"""
|
||
使用图像识别方式处理 Cloudflare 验证框
|
||
支持模板匹配和颜色识别两种方式
|
||
"""
|
||
# 在尝试之前先等待2秒,让验证框完全加载
|
||
await page.wait_for_timeout(2000)
|
||
|
||
max_retries = 5
|
||
|
||
for retry in range(max_retries):
|
||
print(f"图像识别方式尝试第 {retry + 1}/{max_retries} 次")
|
||
|
||
try:
|
||
# 方式1: 通过模板图像识别(如果有模板文件)
|
||
success = await try_template_matching()
|
||
if success:
|
||
print(" ✓ 模板匹配方式成功")
|
||
await page.wait_for_timeout(5000)
|
||
return True
|
||
|
||
print(f" 等待后重试... ({retry + 1}/{max_retries})")
|
||
await page.wait_for_timeout(2000)
|
||
|
||
except Exception as e:
|
||
print(f" ✗ 图像处理异常: {e}")
|
||
await page.wait_for_timeout(2000)
|
||
|
||
return False
|
||
|
||
|
||
async def try_template_matching() -> bool:
|
||
"""
|
||
通过模板匹配查找并点击验证框
|
||
使用 pyautogui.locateOnScreen 直接在屏幕上定位验证框模板
|
||
"""
|
||
import pyautogui
|
||
template_paths = [
|
||
'apps/resm/cloudflare_checkbox2.png',
|
||
]
|
||
|
||
# pyautogui 定位的准确度阈值(0.0-1.0,越高越严格)
|
||
ACCURACY = 0.4
|
||
|
||
for template_path in template_paths:
|
||
if not os.path.exists(template_path):
|
||
print(f" 模板文件不存在: {template_path}")
|
||
continue
|
||
|
||
try:
|
||
print(f" 尝试在屏幕上定位模板: {template_path} (confidence={ACCURACY})")
|
||
|
||
# 直接在屏幕上查找模板,使用 confidence 参数
|
||
loc = pyautogui.locateOnScreen(template_path, confidence=ACCURACY)
|
||
|
||
if loc:
|
||
# loc 是 (left, top, width, height) 或 (x, y, w, h)
|
||
# pyautogui.center(loc) 返回中心坐标
|
||
center_x, center_y = pyautogui.center(loc)
|
||
print(f" 找到验证框位置: ({center_x}, {center_y})")
|
||
print(f" 模板匹配区域: {loc}")
|
||
pyautogui.click(center_x, center_y, clicks=1, interval=0.1)
|
||
return True
|
||
else:
|
||
print(f" 未找到模板 (confidence={ACCURACY})")
|
||
return False
|
||
|
||
except Exception as e:
|
||
# 捕获所有异常,包括 ImageNotFoundException
|
||
error_type = type(e).__name__
|
||
if "ImageNotFoundException" in error_type:
|
||
print(f" 模板匹配异常: {error_type} - 屏幕上找不到模板,停止尝试")
|
||
else:
|
||
print(f" 模板匹配异常: {error_type} - {e}")
|
||
return False
|
||
|
||
return False
|