fix: prevent summary analysis stalls

This commit is contained in:
caoqianming 2026-06-18 11:33:51 +08:00
parent d88589dd68
commit 15edaaa3b6
4 changed files with 262 additions and 20 deletions

View File

@ -0,0 +1,117 @@
# Analysis Stall Fix Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Prevent summary analysis from appearing stuck by bounding URL-check latency, reporting progress, and preventing duplicate analysis threads.
**Architecture:** Keep matching behavior in `mycode/main.py`, add optional progress callbacks, and use `ThreadPoolExecutor` only for independent URL checks. `AnaThread` converts callbacks into existing Qt log signals, while `MainWindow` owns button state and thread lifecycle.
**Tech Stack:** Python 3.8, `concurrent.futures`, pandas, PySide6, unittest
---
### Task 1: Concurrent Deleted-Article Checks
**Files:**
- Modify: `tests/test_main.py`
- Modify: `mycode/main.py`
- [ ] **Step 1: Write failing tests**
Add tests proving duplicate URLs are fetched once, request failures retain rows,
and the progress callback reaches `(total, total)`.
- [ ] **Step 2: Verify tests fail**
Run:
```powershell
.\runtime\python.exe -m unittest tests.test_main.DeletedWechatContentFilterTest -v
```
Expected: failure because `filter_deleted_wechat_rows` does not accept
`progress_callback` or bounded concurrency options.
- [ ] **Step 3: Implement bounded concurrency**
Use `ThreadPoolExecutor(max_workers=8)` and `as_completed`. Submit one task per
unique non-empty URL, preserve original row order, retain rows for failed
requests, and invoke `progress_callback(completed, total)` after each completed
URL.
- [ ] **Step 4: Verify tests pass**
Run the Task 1 unittest command and expect all deleted-content tests to pass.
### Task 2: Rule-Scan Progress
**Files:**
- Modify: `tests/test_main.py`
- Modify: `mycode/main.py`
- [ ] **Step 1: Write a failing test**
Add a focused test using small injected data frames to prove `ana_wechat`
reports final rule progress without network access.
- [ ] **Step 2: Verify the test fails**
Run the new test directly and expect failure because `ana_wechat` has no
progress callback or injectable data frames.
- [ ] **Step 3: Implement progress callbacks**
Add optional `progress_callback`, `rules_df`, and `articles_df` parameters.
Report `(completed_rules, total_rules)` after each rule and pass a separate URL
progress callback to `filter_deleted_wechat_rows`. Preserve default production
behavior when parameters are omitted.
- [ ] **Step 4: Verify the test passes**
Run the focused test and the full `tests.test_main` module.
### Task 3: Qt Lifecycle and User Feedback
**Files:**
- Modify: `start.py`
- [ ] **Step 1: Wire progress messages**
In `AnaThread`, emit periodic messages for rule scanning and article-link
checking, including completed and total counts.
- [ ] **Step 2: Prevent duplicate runs**
In `MainWindow.start_ana`, return early when the current analysis thread is
running, disable `bAna` before starting, connect `finished` to a cleanup method,
and restore the button in cleanup.
- [ ] **Step 3: Verify syntax and imports**
Run:
```powershell
.\runtime\python.exe -m py_compile start.py mycode\main.py tests\test_main.py
```
Expected: exit code 0.
### Task 4: Final Verification
**Files:**
- Verify: `tests/test_main.py`
- Verify: `start.py`
- Verify: `mycode/main.py`
- [ ] **Step 1: Run all tests**
```powershell
.\runtime\python.exe -m unittest discover -s tests -v
```
Expected: all tests pass.
- [ ] **Step 2: Review the diff**
Confirm the diff contains only the planned analysis behavior, tests, and
documentation, and does not touch `col.bat` or `mycode/main2.py`.

View File

@ -9,6 +9,7 @@ from urllib.request import Request, urlopen
from datetime import datetime
import numpy as np
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
wechat_dir = os.path.join(BASE_DIR, 'article')
web_dir = os.path.join(BASE_DIR, 'web_dir')
@ -60,7 +61,7 @@ def should_skip_error_phrase(error_phrase, content, exemption_rules=None):
DELETE_MARKER = '\u8be5\u5185\u5bb9\u5df2\u88ab\u53d1\u5e03\u8005\u5220\u9664'
def fetch_url_html(url, timeout=10):
def fetch_url_html(url, timeout=3):
request = Request(url, headers={'User-Agent': 'Mozilla/5.0'})
with urlopen(request, timeout=timeout) as response:
charset = response.headers.get_content_charset() or 'utf-8'
@ -79,19 +80,34 @@ def is_deleted_wechat_content(url, fetch_html=None):
return isinstance(html, str) and DELETE_MARKER in html
def filter_deleted_wechat_rows(rows, fetch_html=None):
def filter_deleted_wechat_rows(
rows, fetch_html=None, progress_callback=None, max_workers=8):
urls = list(dict.fromkeys(
row[-1] for row in rows
if row and isinstance(row[-1], str) and row[-1].strip()
))
checked_urls = {}
filtered_rows = []
for row in rows:
url = row[-1] if row else ''
if url not in checked_urls:
checked_urls[url] = is_deleted_wechat_content(url, fetch_html=fetch_html)
if checked_urls[url]:
continue
filtered_rows.append(row)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(
is_deleted_wechat_content,
url,
fetch_html
): url
for url in urls
}
total = len(futures)
for completed, future in enumerate(as_completed(futures), start=1):
url = futures[future]
checked_urls[url] = future.result()
if progress_callback is not None:
progress_callback(completed, total)
return filtered_rows
return [
row for row in rows
if not row or not checked_urls.get(row[-1], False)
]
def fix_url_scheme(url, default_scheme='http'):
@ -570,18 +586,28 @@ def make_wechat_articles_full():
df.to_csv(output_path)
def ana_wechat():
articles_full_path = os.path.join(wechat_dir, 'articles_full.csv')
if not os.path.exists(articles_full_path):
make_wechat_articles_full()
df = pd.read_csv(articles_full_path)
def ana_wechat(
progress_callback=None,
url_progress_callback=None,
rules_df=None,
articles_df=None,
fetch_html=None):
if articles_df is None:
articles_full_path = os.path.join(wechat_dir, 'articles_full.csv')
if not os.path.exists(articles_full_path):
make_wechat_articles_full()
df = pd.read_csv(articles_full_path)
else:
df = articles_df.copy()
df['content'] = df['content'].fillna('')
active_rules_df = df_s if rules_df is None else rules_df
output_data = []
index = 1
for ind, row in df_s.iterrows():
total_rules = len(active_rules_df)
for completed_rules, (_, row) in enumerate(
active_rules_df.iterrows(), start=1):
mask = df['content'].str.contains(row['错误表述'], regex=False)
result = df[mask]
@ -601,9 +627,15 @@ def ana_wechat():
output_data.append(output_row)
index += 1
print(f'找到公众号问题{index}---{row2["nickname"]}')
if progress_callback is not None:
progress_callback(completed_rules, total_rules)
# output_data.insert(0, ['序号', '信源名称', '文章标题', '错误表述', '建议修改词语', '错误分类', '原文链接'])
return filter_deleted_wechat_rows(output_data)
return filter_deleted_wechat_rows(
output_data,
fetch_html=fetch_html,
progress_callback=url_progress_callback
)
def find_title(text):

View File

@ -79,6 +79,12 @@ def gen_doc(w1, w2):
class AnaThread(QThread):
update_signal = Signal(object)
def report_progress(self, stage, completed, total):
if completed == 1 or completed == total or completed % 10 == 0:
self.update_signal.emit({
'msg': f'{stage}: {completed}/{total}'
})
def ana(self):
now = datetime.datetime.now()
self.update_signal.emit({'msg': '对比开始...'})
@ -87,7 +93,14 @@ class AnaThread(QThread):
make_wechat_articles_full()
self.update_signal.emit({'msg': "公众号爬取内容组装完毕!"})
self.update_signal.emit({'msg': '开始对比分析所有内容...'})
wechat_results = ana_wechat()
wechat_results = ana_wechat(
progress_callback=lambda completed, total: self.report_progress(
'正在扫描公众号规则', completed, total
),
url_progress_callback=lambda completed, total: self.report_progress(
'正在检查公众号原文', completed, total
)
)
web_results = ana_web()
try:
# 生成汇总表
@ -277,10 +290,19 @@ class MainWindow(QMainWindow):
self.web_thread.start()
def start_ana(self):
if self.ana_thread is not None and self.ana_thread.isRunning():
self.update_log({'msg': '汇总分析正在进行,请勿重复启动'})
return
self.ui.bAna.setEnabled(False)
self.ana_thread = AnaThread()
self.ana_thread.update_signal.connect(self.update_log)
self.ana_thread.finished.connect(self.finish_ana)
self.ana_thread.start()
def finish_ana(self):
self.ui.bAna.setEnabled(True)
self.ana_thread = None
def cbma_cal(self, year):
try:
now_year = int(year)

View File

@ -105,6 +105,77 @@ class DeletedWechatContentFilterTest(unittest.TestCase):
self.assertEqual(filtered_rows, rows)
self.assertEqual(calls, ['https://same.test/1'])
def test_keeps_rows_when_fetch_fails(self):
rows = [
[1, '公众号A', '标题A', '错误表述A', '建议A', '分类A', 'https://a.test/1'],
]
def fetch_html(url):
raise TimeoutError(url)
filtered_rows = main.filter_deleted_wechat_rows(
rows,
fetch_html=fetch_html
)
self.assertEqual(filtered_rows, rows)
def test_reports_url_check_progress(self):
rows = [
[1, '公众号A', '标题A', '错误表述A', '建议A', '分类A', 'https://a.test/1'],
[2, '公众号B', '标题B', '错误表述B', '建议B', '分类B', 'https://b.test/2'],
[3, '公众号A', '标题C', '错误表述C', '建议C', '分类C', 'https://a.test/1'],
]
progress = []
main.filter_deleted_wechat_rows(
rows,
fetch_html=lambda url: '正常文章内容',
progress_callback=lambda completed, total: progress.append(
(completed, total)
)
)
self.assertEqual(progress[-1], (2, 2))
self.assertEqual(len(progress), 2)
class WechatAnalysisProgressTest(unittest.TestCase):
def test_reports_rule_scan_progress(self):
rules_df = pd.DataFrame([
{
'错误表述': '错误A',
'建议修改词语': '修改A',
'错误分类': '分类A'
},
{
'错误表述': '错误B',
'建议修改词语': '修改B',
'错误分类': '分类B'
},
])
articles_df = pd.DataFrame([
{
'nickname': '公众号A',
'title': '标题A',
'content': '这里包含错误A',
'content_url': 'https://a.test/1'
}
])
progress = []
rows = main.ana_wechat(
rules_df=rules_df,
articles_df=articles_df,
progress_callback=lambda completed, total: progress.append(
(completed, total)
),
fetch_html=lambda url: '正常文章内容'
)
self.assertEqual(len(rows), 1)
self.assertEqual(progress[-1], (2, 2))
if __name__ == '__main__':
unittest.main()