diff --git a/mycode/main.py b/mycode/main.py index 5f076ad..60ddedf 100644 --- a/mycode/main.py +++ b/mycode/main.py @@ -5,6 +5,7 @@ from mycode.base import BASE_DIR import re from openpyxl import load_workbook from urllib.parse import urlparse +from urllib.request import Request, urlopen from datetime import datetime import numpy as np from collections import defaultdict @@ -56,6 +57,43 @@ def should_skip_error_phrase(error_phrase, content, exemption_rules=None): return False +DELETE_MARKER = '\u8be5\u5185\u5bb9\u5df2\u88ab\u53d1\u5e03\u8005\u5220\u9664' + + +def fetch_url_html(url, timeout=10): + request = Request(url, headers={'User-Agent': 'Mozilla/5.0'}) + with urlopen(request, timeout=timeout) as response: + charset = response.headers.get_content_charset() or 'utf-8' + return response.read().decode(charset, errors='ignore') + + +def is_deleted_wechat_content(url, fetch_html=None): + if not isinstance(url, str) or not url.strip(): + return False + if fetch_html is None: + fetch_html = fetch_url_html + try: + html = fetch_html(url) + except Exception: + return False + return isinstance(html, str) and DELETE_MARKER in html + + +def filter_deleted_wechat_rows(rows, fetch_html=None): + checked_urls = {} + filtered_rows = [] + + for row in rows: + url = row[-1] if row else '' + if url not in checked_urls: + checked_urls[url] = is_deleted_wechat_content(url, fetch_html=fetch_html) + if checked_urls[url]: + continue + filtered_rows.append(row) + + return filtered_rows + + def fix_url_scheme(url, default_scheme='http'): # 检查URL是否包含方案 if not url.startswith('http://') and not url.startswith('https://'): @@ -565,7 +603,7 @@ def ana_wechat(): print(f'找到公众号问题{index}---{row2["nickname"]}') # output_data.insert(0, ['序号', '信源名称', '文章标题', '错误表述', '建议修改词语', '错误分类', '原文链接']) - return output_data + return filter_deleted_wechat_rows(output_data) def find_title(text): diff --git a/tests/test_main.py b/tests/test_main.py index 35f177e..9707bc5 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -1,4 +1,4 @@ -import unittest +import unittest import pandas as pd @@ -58,5 +58,53 @@ class PhraseExemptionRulesTest(unittest.TestCase): self.assertFalse(should_skip) +class DeletedWechatContentFilterTest(unittest.TestCase): + def test_filters_rows_when_url_page_says_deleted(self): + rows = [ + [1, '公众号A', '标题A', '错误表述A', '建议A', '分类A', 'https://a.test/1'], + [2, '公众号B', '标题B', '错误表述B', '建议B', '分类B', 'https://b.test/2'], + ] + + def fetch_html(url): + if url == 'https://a.test/1': + return '该内容已被发布者删除' + return '正常文章内容' + + filtered_rows = main.filter_deleted_wechat_rows(rows, fetch_html=fetch_html) + + self.assertEqual( + filtered_rows, + [[2, '公众号B', '标题B', '错误表述B', '建议B', '分类B', 'https://b.test/2']] + ) + + def test_keeps_rows_when_url_page_is_not_deleted(self): + rows = [ + [1, '公众号A', '标题A', '错误表述A', '建议A', '分类A', 'https://a.test/1'], + ] + + filtered_rows = main.filter_deleted_wechat_rows( + rows, + fetch_html=lambda url: '正文还在' + ) + + self.assertEqual(filtered_rows, rows) + + def test_checks_same_url_only_once(self): + rows = [ + [1, '公众号A', '标题A', '错误表述A', '建议A', '分类A', 'https://same.test/1'], + [2, '公众号A', '标题B', '错误表述B', '建议B', '分类B', 'https://same.test/1'], + ] + calls = [] + + def fetch_html(url): + calls.append(url) + return '正常文章内容' + + filtered_rows = main.filter_deleted_wechat_rows(rows, fetch_html=fetch_html) + + self.assertEqual(filtered_rows, rows) + self.assertEqual(calls, ['https://same.test/1']) + + if __name__ == '__main__': unittest.main()