158 lines
4.6 KiB
Python
158 lines
4.6 KiB
Python
"""source_to_md.py: 把素材转成干净 Markdown,作为后续策略阶段的输入。
|
|
|
|
用法:
|
|
python source_to_md.py <input> # 自动按扩展名识别
|
|
python source_to_md.py <url> # http/https 走 web 抓
|
|
python source_to_md.py file.pdf -o source.md
|
|
|
|
支持:
|
|
.pdf → pypdf 提取文本
|
|
.docx → python-docx 段落
|
|
.pptx → python-pptx 提取每页文字
|
|
.txt/.md → 直读
|
|
URL → requests + 简易 HTML 剥离
|
|
|
|
设计原则:模型在策略阶段只看 Markdown,不读二进制 / 不爬复杂排版。
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
from urllib.parse import urlparse
|
|
|
|
|
|
def from_pdf(path: Path) -> str:
|
|
try:
|
|
from pypdf import PdfReader
|
|
except ImportError:
|
|
return "[error] pip install pypdf"
|
|
reader = PdfReader(str(path))
|
|
parts = [f"# {path.stem}\n"]
|
|
for i, page in enumerate(reader.pages, 1):
|
|
text = (page.extract_text() or "").strip()
|
|
if text:
|
|
parts.append(f"\n## Page {i}\n\n{text}\n")
|
|
return "\n".join(parts)
|
|
|
|
|
|
def from_docx(path: Path) -> str:
|
|
try:
|
|
from docx import Document
|
|
except ImportError:
|
|
return "[error] pip install python-docx"
|
|
doc = Document(str(path))
|
|
parts = [f"# {path.stem}\n"]
|
|
for para in doc.paragraphs:
|
|
text = para.text.strip()
|
|
if not text:
|
|
continue
|
|
style = (para.style.name or "").lower() if para.style else ""
|
|
if "heading 1" in style:
|
|
parts.append(f"\n## {text}\n")
|
|
elif "heading 2" in style:
|
|
parts.append(f"\n### {text}\n")
|
|
elif "heading 3" in style:
|
|
parts.append(f"\n#### {text}\n")
|
|
else:
|
|
parts.append(f"\n{text}\n")
|
|
return "".join(parts)
|
|
|
|
|
|
def from_pptx(path: Path) -> str:
|
|
try:
|
|
from pptx import Presentation
|
|
except ImportError:
|
|
return "[error] pip install python-pptx"
|
|
prs = Presentation(str(path))
|
|
parts = [f"# {path.stem}\n"]
|
|
for i, slide in enumerate(prs.slides, 1):
|
|
parts.append(f"\n## Slide {i}\n")
|
|
for shape in slide.shapes:
|
|
if shape.has_text_frame:
|
|
txt = shape.text_frame.text.strip()
|
|
if txt:
|
|
parts.append(f"\n{txt}\n")
|
|
return "".join(parts)
|
|
|
|
|
|
def from_text(path: Path) -> str:
|
|
return path.read_text(encoding="utf-8", errors="replace")
|
|
|
|
|
|
_TAG_RE = re.compile(r"<[^>]+>")
|
|
_WS_RE = re.compile(r"\n{3,}")
|
|
|
|
|
|
def from_url(url: str) -> str:
|
|
try:
|
|
import requests
|
|
except ImportError:
|
|
return "[error] pip install requests"
|
|
r = requests.get(url, timeout=30, headers={
|
|
"User-Agent": "Mozilla/5.0 (compatible; ppt-source-to-md/1.0)"
|
|
})
|
|
r.raise_for_status()
|
|
html = r.text
|
|
|
|
# 极简剥离:script/style 删,标签去除
|
|
html = re.sub(r"<script[\s\S]*?</script>", "", html, flags=re.I)
|
|
html = re.sub(r"<style[\s\S]*?</style>", "", html, flags=re.I)
|
|
|
|
title_m = re.search(r"<title[^>]*>([^<]+)</title>", html, re.I)
|
|
title = title_m.group(1).strip() if title_m else url
|
|
|
|
# 块级标签转换行
|
|
html = re.sub(r"</?(p|div|br|li|h[1-6]|tr)[^>]*>", "\n", html, flags=re.I)
|
|
text = _TAG_RE.sub("", html)
|
|
text = re.sub(r" ", " ", text)
|
|
text = re.sub(r"&", "&", text)
|
|
text = re.sub(r"<", "<", text)
|
|
text = re.sub(r">", ">", text)
|
|
text = re.sub(r""", '"', text)
|
|
text = "\n".join(line.strip() for line in text.splitlines())
|
|
text = _WS_RE.sub("\n\n", text).strip()
|
|
|
|
return f"# {title}\n\nSource: {url}\n\n{text}\n"
|
|
|
|
|
|
def dispatch(src: str) -> str:
|
|
parsed = urlparse(src)
|
|
if parsed.scheme in ("http", "https"):
|
|
return from_url(src)
|
|
|
|
path = Path(src)
|
|
if not path.exists():
|
|
return f"[error] not found: {src}"
|
|
|
|
ext = path.suffix.lower()
|
|
if ext == ".pdf":
|
|
return from_pdf(path)
|
|
if ext == ".docx":
|
|
return from_docx(path)
|
|
if ext == ".pptx":
|
|
return from_pptx(path)
|
|
if ext in (".txt", ".md"):
|
|
return from_text(path)
|
|
return f"[error] unsupported extension: {ext}"
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("src", help="文件路径或 http(s) URL")
|
|
ap.add_argument("-o", "--output", type=Path, default=None,
|
|
help="写到文件;默认打印到 stdout")
|
|
args = ap.parse_args()
|
|
|
|
md = dispatch(args.src)
|
|
if args.output:
|
|
args.output.write_text(md, encoding="utf-8")
|
|
print(f"[ok] {args.output} ({len(md)} chars)")
|
|
else:
|
|
sys.stdout.write(md)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|