zcbot/tools/web_fetch.py

107 lines
3.1 KiB
Python

"""Web Fetch: 抓取任意 URL 并返回 markdown 文本。"""
from __future__ import annotations
import ipaddress
import re
import socket
import html2text
import httpx
from .base import Tool
_SSRF_BLOCKED = {
ipaddress.ip_network(n)
for n in (
"127.0.0.0/8", "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16",
"169.254.0.0/16", "0.0.0.0/8", "::1/128", "fc00::/7", "fe80::/10",
)
}
_MAX_CHARS = 8000
_TIMEOUT = 15.0
_UA = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
)
_h2t = html2text.HTML2Text()
_h2t.ignore_links = False
_h2t.ignore_images = True
_h2t.body_width = 0
_h2t.skip_internal_links = True
def _check_ssrf(url: str) -> str | None:
"""返回 None 表示安全;否则返回错误信息字符串。"""
import urllib.parse
parsed = urllib.parse.urlparse(url)
host = parsed.hostname
if not host:
return f"invalid URL: no host in {url!r}"
try:
ip = ipaddress.ip_address(host)
except ValueError:
try:
ip = ipaddress.ip_address(socket.getaddrinfo(host, None, 0, socket.SOCK_STREAM, socket.IPPROTO_TCP)[0][4][0])
except (OSError, IndexError, ValueError):
return f"cannot resolve host: {host!r}"
for net in _SSRF_BLOCKED:
if ip in net:
return f"blocked internal/private host: {host} ({ip})"
return None
class WebFetchTool(Tool):
name = "web_fetch"
description = (
"Fetch a web page and return its content as markdown text. "
"Use this to read the full content of a URL found in search results or referenced by the user. "
"Results are truncated to 8000 characters."
)
parameters = {
"type": "object",
"properties": {
"url": {"type": "string", "description": "The URL to fetch"},
},
"required": ["url"],
}
def execute(self, url: str) -> str:
err = _check_ssrf(url)
if err:
return f"[Error] {err}"
try:
resp = httpx.get(
url,
headers={"User-Agent": _UA},
timeout=_TIMEOUT,
follow_redirects=True,
)
except httpx.TimeoutException:
return f"[Error] request timed out after {_TIMEOUT:.0f}s"
except httpx.HTTPError as e:
return f"[Error] request failed: {e}"
if resp.status_code >= 400:
return f"[Error] HTTP {resp.status_code}"
content_type = resp.headers.get("content-type", "")
if "text/html" not in content_type and "text/plain" not in content_type:
return f"[Error] unsupported content type: {content_type} — only HTML/text pages are supported"
try:
text = _h2t.handle(resp.text)
except Exception as e:
return f"[Error] failed to convert HTML to text: {e}"
# 压缩多余空行
text = re.sub(r"\n{3,}", "\n\n", text).strip()
if len(text) > _MAX_CHARS:
text = text[:_MAX_CHARS] + f"\n\n...(truncated, {len(text) - _MAX_CHARS} more chars)"
return text