From 15edaaa3b65b4ce324ab7e10c39957d8e75b8a0c Mon Sep 17 00:00:00 2001 From: caoqianming Date: Thu, 18 Jun 2026 11:33:51 +0800 Subject: [PATCH] fix: prevent summary analysis stalls --- .../plans/2026-06-18-analysis-stall.md | 117 ++++++++++++++++++ mycode/main.py | 70 ++++++++--- start.py | 24 +++- tests/test_main.py | 71 +++++++++++ 4 files changed, 262 insertions(+), 20 deletions(-) create mode 100644 docs/superpowers/plans/2026-06-18-analysis-stall.md diff --git a/docs/superpowers/plans/2026-06-18-analysis-stall.md b/docs/superpowers/plans/2026-06-18-analysis-stall.md new file mode 100644 index 0000000..72af047 --- /dev/null +++ b/docs/superpowers/plans/2026-06-18-analysis-stall.md @@ -0,0 +1,117 @@ +# Analysis Stall Fix Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Prevent summary analysis from appearing stuck by bounding URL-check latency, reporting progress, and preventing duplicate analysis threads. + +**Architecture:** Keep matching behavior in `mycode/main.py`, add optional progress callbacks, and use `ThreadPoolExecutor` only for independent URL checks. `AnaThread` converts callbacks into existing Qt log signals, while `MainWindow` owns button state and thread lifecycle. + +**Tech Stack:** Python 3.8, `concurrent.futures`, pandas, PySide6, unittest + +--- + +### Task 1: Concurrent Deleted-Article Checks + +**Files:** +- Modify: `tests/test_main.py` +- Modify: `mycode/main.py` + +- [ ] **Step 1: Write failing tests** + +Add tests proving duplicate URLs are fetched once, request failures retain rows, +and the progress callback reaches `(total, total)`. + +- [ ] **Step 2: Verify tests fail** + +Run: + +```powershell +.\runtime\python.exe -m unittest tests.test_main.DeletedWechatContentFilterTest -v +``` + +Expected: failure because `filter_deleted_wechat_rows` does not accept +`progress_callback` or bounded concurrency options. + +- [ ] **Step 3: Implement bounded concurrency** + +Use `ThreadPoolExecutor(max_workers=8)` and `as_completed`. Submit one task per +unique non-empty URL, preserve original row order, retain rows for failed +requests, and invoke `progress_callback(completed, total)` after each completed +URL. + +- [ ] **Step 4: Verify tests pass** + +Run the Task 1 unittest command and expect all deleted-content tests to pass. + +### Task 2: Rule-Scan Progress + +**Files:** +- Modify: `tests/test_main.py` +- Modify: `mycode/main.py` + +- [ ] **Step 1: Write a failing test** + +Add a focused test using small injected data frames to prove `ana_wechat` +reports final rule progress without network access. + +- [ ] **Step 2: Verify the test fails** + +Run the new test directly and expect failure because `ana_wechat` has no +progress callback or injectable data frames. + +- [ ] **Step 3: Implement progress callbacks** + +Add optional `progress_callback`, `rules_df`, and `articles_df` parameters. +Report `(completed_rules, total_rules)` after each rule and pass a separate URL +progress callback to `filter_deleted_wechat_rows`. Preserve default production +behavior when parameters are omitted. + +- [ ] **Step 4: Verify the test passes** + +Run the focused test and the full `tests.test_main` module. + +### Task 3: Qt Lifecycle and User Feedback + +**Files:** +- Modify: `start.py` + +- [ ] **Step 1: Wire progress messages** + +In `AnaThread`, emit periodic messages for rule scanning and article-link +checking, including completed and total counts. + +- [ ] **Step 2: Prevent duplicate runs** + +In `MainWindow.start_ana`, return early when the current analysis thread is +running, disable `bAna` before starting, connect `finished` to a cleanup method, +and restore the button in cleanup. + +- [ ] **Step 3: Verify syntax and imports** + +Run: + +```powershell +.\runtime\python.exe -m py_compile start.py mycode\main.py tests\test_main.py +``` + +Expected: exit code 0. + +### Task 4: Final Verification + +**Files:** +- Verify: `tests/test_main.py` +- Verify: `start.py` +- Verify: `mycode/main.py` + +- [ ] **Step 1: Run all tests** + +```powershell +.\runtime\python.exe -m unittest discover -s tests -v +``` + +Expected: all tests pass. + +- [ ] **Step 2: Review the diff** + +Confirm the diff contains only the planned analysis behavior, tests, and +documentation, and does not touch `col.bat` or `mycode/main2.py`. diff --git a/mycode/main.py b/mycode/main.py index 60ddedf..752d8a9 100644 --- a/mycode/main.py +++ b/mycode/main.py @@ -9,6 +9,7 @@ from urllib.request import Request, urlopen from datetime import datetime import numpy as np from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor, as_completed wechat_dir = os.path.join(BASE_DIR, 'article') web_dir = os.path.join(BASE_DIR, 'web_dir') @@ -60,7 +61,7 @@ def should_skip_error_phrase(error_phrase, content, exemption_rules=None): DELETE_MARKER = '\u8be5\u5185\u5bb9\u5df2\u88ab\u53d1\u5e03\u8005\u5220\u9664' -def fetch_url_html(url, timeout=10): +def fetch_url_html(url, timeout=3): request = Request(url, headers={'User-Agent': 'Mozilla/5.0'}) with urlopen(request, timeout=timeout) as response: charset = response.headers.get_content_charset() or 'utf-8' @@ -79,19 +80,34 @@ def is_deleted_wechat_content(url, fetch_html=None): return isinstance(html, str) and DELETE_MARKER in html -def filter_deleted_wechat_rows(rows, fetch_html=None): +def filter_deleted_wechat_rows( + rows, fetch_html=None, progress_callback=None, max_workers=8): + urls = list(dict.fromkeys( + row[-1] for row in rows + if row and isinstance(row[-1], str) and row[-1].strip() + )) checked_urls = {} - filtered_rows = [] - for row in rows: - url = row[-1] if row else '' - if url not in checked_urls: - checked_urls[url] = is_deleted_wechat_content(url, fetch_html=fetch_html) - if checked_urls[url]: - continue - filtered_rows.append(row) + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = { + executor.submit( + is_deleted_wechat_content, + url, + fetch_html + ): url + for url in urls + } + total = len(futures) + for completed, future in enumerate(as_completed(futures), start=1): + url = futures[future] + checked_urls[url] = future.result() + if progress_callback is not None: + progress_callback(completed, total) - return filtered_rows + return [ + row for row in rows + if not row or not checked_urls.get(row[-1], False) + ] def fix_url_scheme(url, default_scheme='http'): @@ -570,18 +586,28 @@ def make_wechat_articles_full(): df.to_csv(output_path) -def ana_wechat(): - articles_full_path = os.path.join(wechat_dir, 'articles_full.csv') - if not os.path.exists(articles_full_path): - make_wechat_articles_full() - - df = pd.read_csv(articles_full_path) +def ana_wechat( + progress_callback=None, + url_progress_callback=None, + rules_df=None, + articles_df=None, + fetch_html=None): + if articles_df is None: + articles_full_path = os.path.join(wechat_dir, 'articles_full.csv') + if not os.path.exists(articles_full_path): + make_wechat_articles_full() + df = pd.read_csv(articles_full_path) + else: + df = articles_df.copy() df['content'] = df['content'].fillna('') + active_rules_df = df_s if rules_df is None else rules_df output_data = [] index = 1 - for ind, row in df_s.iterrows(): + total_rules = len(active_rules_df) + for completed_rules, (_, row) in enumerate( + active_rules_df.iterrows(), start=1): mask = df['content'].str.contains(row['错误表述'], regex=False) result = df[mask] @@ -601,9 +627,15 @@ def ana_wechat(): output_data.append(output_row) index += 1 print(f'找到公众号问题{index}---{row2["nickname"]}') + if progress_callback is not None: + progress_callback(completed_rules, total_rules) # output_data.insert(0, ['序号', '信源名称', '文章标题', '错误表述', '建议修改词语', '错误分类', '原文链接']) - return filter_deleted_wechat_rows(output_data) + return filter_deleted_wechat_rows( + output_data, + fetch_html=fetch_html, + progress_callback=url_progress_callback + ) def find_title(text): diff --git a/start.py b/start.py index 074a2e8..43670b9 100644 --- a/start.py +++ b/start.py @@ -79,6 +79,12 @@ def gen_doc(w1, w2): class AnaThread(QThread): update_signal = Signal(object) + def report_progress(self, stage, completed, total): + if completed == 1 or completed == total or completed % 10 == 0: + self.update_signal.emit({ + 'msg': f'{stage}: {completed}/{total}' + }) + def ana(self): now = datetime.datetime.now() self.update_signal.emit({'msg': '对比开始...'}) @@ -87,7 +93,14 @@ class AnaThread(QThread): make_wechat_articles_full() self.update_signal.emit({'msg': "公众号爬取内容组装完毕!"}) self.update_signal.emit({'msg': '开始对比分析所有内容...'}) - wechat_results = ana_wechat() + wechat_results = ana_wechat( + progress_callback=lambda completed, total: self.report_progress( + '正在扫描公众号规则', completed, total + ), + url_progress_callback=lambda completed, total: self.report_progress( + '正在检查公众号原文', completed, total + ) + ) web_results = ana_web() try: # 生成汇总表 @@ -277,10 +290,19 @@ class MainWindow(QMainWindow): self.web_thread.start() def start_ana(self): + if self.ana_thread is not None and self.ana_thread.isRunning(): + self.update_log({'msg': '汇总分析正在进行,请勿重复启动'}) + return + self.ui.bAna.setEnabled(False) self.ana_thread = AnaThread() self.ana_thread.update_signal.connect(self.update_log) + self.ana_thread.finished.connect(self.finish_ana) self.ana_thread.start() + def finish_ana(self): + self.ui.bAna.setEnabled(True) + self.ana_thread = None + def cbma_cal(self, year): try: now_year = int(year) diff --git a/tests/test_main.py b/tests/test_main.py index 9707bc5..ab8e36b 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -105,6 +105,77 @@ class DeletedWechatContentFilterTest(unittest.TestCase): self.assertEqual(filtered_rows, rows) self.assertEqual(calls, ['https://same.test/1']) + def test_keeps_rows_when_fetch_fails(self): + rows = [ + [1, '公众号A', '标题A', '错误表述A', '建议A', '分类A', 'https://a.test/1'], + ] + + def fetch_html(url): + raise TimeoutError(url) + + filtered_rows = main.filter_deleted_wechat_rows( + rows, + fetch_html=fetch_html + ) + + self.assertEqual(filtered_rows, rows) + + def test_reports_url_check_progress(self): + rows = [ + [1, '公众号A', '标题A', '错误表述A', '建议A', '分类A', 'https://a.test/1'], + [2, '公众号B', '标题B', '错误表述B', '建议B', '分类B', 'https://b.test/2'], + [3, '公众号A', '标题C', '错误表述C', '建议C', '分类C', 'https://a.test/1'], + ] + progress = [] + + main.filter_deleted_wechat_rows( + rows, + fetch_html=lambda url: '正常文章内容', + progress_callback=lambda completed, total: progress.append( + (completed, total) + ) + ) + + self.assertEqual(progress[-1], (2, 2)) + self.assertEqual(len(progress), 2) + + +class WechatAnalysisProgressTest(unittest.TestCase): + def test_reports_rule_scan_progress(self): + rules_df = pd.DataFrame([ + { + '错误表述': '错误A', + '建议修改词语': '修改A', + '错误分类': '分类A' + }, + { + '错误表述': '错误B', + '建议修改词语': '修改B', + '错误分类': '分类B' + }, + ]) + articles_df = pd.DataFrame([ + { + 'nickname': '公众号A', + 'title': '标题A', + 'content': '这里包含错误A', + 'content_url': 'https://a.test/1' + } + ]) + progress = [] + + rows = main.ana_wechat( + rules_df=rules_df, + articles_df=articles_df, + progress_callback=lambda completed, total: progress.append( + (completed, total) + ), + fetch_html=lambda url: '正常文章内容' + ) + + self.assertEqual(len(rows), 1) + self.assertEqual(progress[-1], (2, 2)) + if __name__ == '__main__': unittest.main()