import sys from PySide6.QtCore import QStringListModel, QThread, Signal from PySide6.QtWidgets import QApplication, QMainWindow from PySide6.QtGui import QIntValidator from ui_mainwindow import Ui_MainWindow import win32com.client as win32 import subprocess import os import datetime from mycode.main import make_simple_csv_from_db, make_wechat_articles_full, ana_web, ana_wechat, output_dir, get_cbma_info_from_db_and_ana from mycode.crawl_chrome import chrom_main_from_list import pandas as pd from urllib.parse import urlparse from openpyxl import load_workbook import threading import traceback from docxtpl import DocxTemplate import json # from queue import Queue BASE_DIR = os.path.dirname(os.path.abspath(__file__)) WEB_SITES_PATH = os.path.join(BASE_DIR, 'web_sites.xlsx') BIAO_PATH = os.path.join(BASE_DIR, 'biao.xlsx') PYTHON_PATH = os.path.join(BASE_DIR, 'runtime/python.exe') TEMPLATE_PATH = os.path.join(BASE_DIR, 'summary/template.xlsx') TEMPLATE_REPORT_PATH = os.path.join(BASE_DIR, 'summary/template_report.docx') def fix_url_scheme(url, default_scheme='http'): # 检查URL是否包含方案 if not url.startswith('http://') and not url.startswith('https://'): # 如果没有方案,添加默认方案 url = f'{default_scheme}://{url}' return url class MyApplication(QApplication): def __init__(self, argv): super(MyApplication, self).__init__(argv) self.main_window = None def createMainWindow(self): if self.main_window is None: self.main_window = MainWindow() return self.main_window def gen_doc(w1, w2): now = datetime.datetime.now() now_3 = now - datetime.timedelta(days=3) # with open('w2.json', 'r', encoding='utf-8') as f: # w2 = json.loads(f.read()) # with open('w1.json', 'r', encoding='utf-8') as f: # w1 = json.loads(f.read()) gdbs = 0 yzbs = 0 ybwz = 0 zzcc = 0 context = {'y': now.year, 'm': now.month, 'd': now.day, 'mo': now_3.month, 'do': now_3.day, 'su': 'xx', 'w1': w1, 'w2': w2} output_report_path = os.path.join(BASE_DIR, f'summary/{now.year}年{now.month}月-分析结果简报.docx') doc = DocxTemplate(TEMPLATE_REPORT_PATH) for i in w1: if i[5] == '固定表述错误': gdbs =gdbs + 1 elif i[5] == '严重表述错误': yzbs = yzbs +1 elif i[5] == '一般文字差错': ybwz = ybwz +1 elif i[5] == '政治差错': zzcc = zzcc +1 for i in w2: if i[5] == '固定表述错误': gdbs =gdbs + 1 elif i[5] == '严重表述错误': yzbs = yzbs +1 elif i[5] == '一般文字差错': ybwz = ybwz +1 elif i[5] == '政治差错': zzcc = zzcc +1 context['su'] = f'固定表述错误{gdbs}项, 严重表述错误{yzbs}项, 一般文字差错{ybwz}项, 政治差错{zzcc}项' doc.render(context) doc.save(output_report_path) return output_report_path class AnaThread(QThread): update_signal = Signal(object) def ana(self): now = datetime.datetime.now() self.update_signal.emit({'msg': '对比开始...'}) self.update_signal.emit({'msg': '正在组合微信公众号爬取内容...'}) make_simple_csv_from_db(now) make_wechat_articles_full() self.update_signal.emit({'msg': "公众号爬取内容组装完毕!"}) self.update_signal.emit({'msg': '开始对比分析所有内容...'}) wechat_results = ana_wechat() web_results = ana_web() try: # 生成汇总表 self.update_signal.emit({'msg': '开始生成汇总表...'}) output_excel_path = os.path.join(BASE_DIR, f'summary/{now.year}年{now.month}月-分析结果汇总表.xlsx') workbook = load_workbook(TEMPLATE_PATH) wechat_sheet = workbook['公众号'] web_sheet = workbook['网站'] for row in wechat_results: wechat_sheet.append(row) for row in web_results: web_sheet.append(row) workbook.save(output_excel_path) workbook.close() # with open('w1.json', 'w', encoding='utf-8') as f: # f.write(json.dumps(wechat_results, ensure_ascii=False)) # with open('w2.json', 'w', encoding='utf-8') as f: # f.write(json.dumps(web_results, ensure_ascii=False)) # 生成简报 self.update_signal.emit({'msg': '开始生成汇总简报...'}) output_report_path = gen_doc(wechat_results, web_results) self.update_signal.emit({'msg': '分析完毕, 请查看结果栏, 可手动校对', 'output_excel_path': output_excel_path, 'output_report_path': output_report_path}) except PermissionError as e: self.update_signal.emit({'msg': str(e)}) self.update_signal.emit({'msg': '文件被占用请先关闭!'}) raise def run(self) -> None: try: self.ana() except Exception as e: self.update_signal.emit({'msg': traceback.format_exc()}) class MyThread(QThread): update_signal = Signal(object) def __init__(self, lsize) -> None: """ lsize: 多少kb需要调取Chrome """ super().__init__() self.lsize = lsize self.processes = [] self.running = False def capture_output(self, p): while self.running and p.poll() is None: output = p.stdout.readline() if output: self.update_signal.emit({'msg': output.strip()}) def run(self) -> None: self.update_signal.emit({'msg': '开始进行网站爬取...'}) df = pd.read_excel('web_sites.xlsx', sheet_name='Sheet1') ind = 0 for ind, row in df.iterrows(): group = row['单位'] name = row['主办'] url = fix_url_scheme(row['地址'].strip()) domain = urlparse(url).netloc.replace('www.', '') output = os.path.join(BASE_DIR, f'web_dir/{name}_{domain}.xlsx') # -u 代表不缓冲,直接输出 cmd = [PYTHON_PATH, '-u', '-m', 'scrapy', 'crawl', 'basespider', '-a', f'domain={domain}', '-a', f'start_url={url}', '-a', f'name={name}', '-a', f'group={group}', '-a', f'output={output}'] # cmd = [PYTHON_PATH, '-u', '-m', 'scrapy', 'crawl', 'basespider', '-a', f'domain={domain}', '-a', f'start_url={url}', '-a', f'name={name}', '-a', f'group={group}', '-o', f'web_dir/{name}_{domain}.xlsx'] process = subprocess.Popen(cmd, stdout=subprocess.PIPE, text=True, shell=False) self.processes.append(process) self.running = True getlog_thread = threading.Thread(target=self.capture_output, args=(process,), daemon=True) getlog_thread.start() # getlog_thread_err = threading.Thread(target=self.capture_err, args=(process,), daemon=True) # getlog_thread_err.start() for process in self.processes: process.wait() self.update_signal.emit({'msg': '网站爬取结束,校验中...'}) info_to_save = [] for ind, row in df.iterrows(): group = row['单位'] name = row['主办'] url = fix_url_scheme(row['地址'].strip()) domain = urlparse(url).netloc.replace("www.", "") output_filename = os.path.join(BASE_DIR, f'web_dir/{name}_{domain}.xlsx') if os.path.exists(output_filename): file_size = os.path.getsize(output_filename) if file_size < self.lsize * 1024: # Convert KB to bytes info_to_save.append([group, name, url]) if info_to_save: self.update_signal.emit({'msg': '存在未爬取站点,正在调用Chrome继续爬取...'}) chrom_main_from_list(info_to_save) self.update_signal.emit({'msg': '网站爬取完毕!'}) def close(self): self.running = False if self.processes: for i in self.processes: i.kill() self.terminate() class MainWindow(QMainWindow): def __init__(self): super(MainWindow, self).__init__() self.web_thread = None self.ana_thread = None self.wcplus = False self.logModel= QStringListModel([]) self.ui = Ui_MainWindow() self.ui.setupUi(self) self.ui.lSize.setValidator(QIntValidator()) self.ui.bWechat.clicked.connect(self.open_wcplus) self.ui.bWebSite.clicked.connect(lambda: self.open_file(WEB_SITES_PATH)) self.ui.bBiao.clicked.connect(lambda: self.open_file(BIAO_PATH)) self.ui.bStart.clicked.connect(self.start) self.ui.bAna.clicked.connect(self.start_ana) self.ui.bRes1.clicked.connect(lambda: self.open_file(self.ui.lRes1.text())) self.ui.bRes2.clicked.connect(lambda: self.open_file(self.ui.lRes2.text(), 'docx')) self.ui.bCal.clicked.connect(self.cbma_cal) self.ui.bOpenCalRes1.clicked.connect(lambda: self.open_file(self.ui.lCalRes1.text())) self.ui.bOpenCalRes2.clicked.connect(lambda: self.open_file(self.ui.lCalRes2.text())) self.ui.bOpenCalRes3.clicked.connect(lambda: self.open_file(self.ui.lCalRes3.text())) self.ui.bOpenCalRes4.clicked.connect(lambda: self.open_file(self.ui.lCalRes4.text())) self.ui.vLog.setModel(self.logModel) self.res1Workbook = None def open_wcplus(self): if self.wcplus is False: subprocess.Popen('.\wcplus.exe') self.wcplus = True def open_file(self, path, type='xlsx'): if path: # try: # os.startfile(path) # except Exception as e: # print("无法打开文件:", str(e)) if type == 'docx': app = win32.Dispatch("Word.Application") app.Visible = True app.Documents.Open(path) app.WindowState = 3 elif type == 'xlsx': app = win32.Dispatch("Excel.Application") app.Visible = True app.Workbooks.Open(path) app.WindowState = 3 def get_time(self): now = datetime.datetime.now() return now.strftime('%H:%M:%S') def start(self): if self.ui.bStart.text() == '开始爬取' or self.ui.bStart.text() == '重新开始': self.log('', True) if self.res1Workbook: self.res1Workbook.Close() self.ui.lSize.setEnabled(False) self.ui.bStart.setText('停止爬取') self.start_web(int(self.ui.lSize.text())) elif self.ui.bStart.text() == '停止爬取': self.update_log({'msg': '正在停止...'}) if self.web_thread: self.web_thread.close() self.log('', True) self.ui.lSize.setEnabled(True) self.ui.bStart.setText('开始爬取') def start_web(self, lsize): self.web_thread = MyThread(lsize) self.web_thread.update_signal.connect(self.update_log) self.web_thread.start() def start_ana(self): self.ana_thread = AnaThread() self.ana_thread.update_signal.connect(self.update_log) self.ana_thread.start() def cbma_cal(self): now_year = datetime.datetime.now().year self.update_log({'msg': '正在分析本年总院官微数据...'}) try: origin_path, cbma_path, cbma_cal_path, cbma_month_path = get_cbma_info_from_db_and_ana(now_year) except PermissionError as e: self.update_log({'msg': str(e)}) self.update_log({'msg': '文件被占用请先关闭!'}) raise self.ui.lCalRes1.setText(origin_path) self.ui.lCalRes2.setText(cbma_path) self.ui.lCalRes3.setText(cbma_cal_path) self.ui.lCalRes4.setText(cbma_month_path) self.update_log({'msg': '分析完毕!'}) def update_log(self, rdict): if isinstance(rdict, str): self.log(f'{self.get_time()}-{rdict}', False) elif isinstance(rdict, dict): self.log(f'{self.get_time()}-{rdict["msg"]}', False) if 'output_report_path' in rdict: self.ui.lRes2.setText(rdict['output_report_path']) # self.ui.bStart.setText('重新开始') # self.ui.lSize.setEnabled(True) if 'output_excel_path' in rdict: self.ui.lRes1.setText(rdict['output_excel_path']) # self.ui.bStart.setText('重新开始') # self.ui.lSize.setEnabled(True) def log(self, logLine: str, clear=False): log_list = self.logModel.stringList() if clear: log_list = [] else: log_list.append(logLine) self.logModel.setStringList(log_list) if clear: self.ui.vLog.scrollToTop() else: self.ui.vLog.scrollToBottom() def closeEvent(self, event): if self.wcplus: try: subprocess.Popen(['taskkill', '/F', '/IM', 'wcplus.exe']) except Exception as e: print(f"Error while terminating wcplus.exe: {str(e)}") self.wcplus = False if self.web_thread: self.web_thread.close() event.accept() if __name__ == "__main__": # pyside6-uic main.ui -o ui_mainwindow.py print('正在启动程序...') app = MyApplication(sys.argv) main_window = app.createMainWindow() main_window.show() print('启动成功') sys.exit(app.exec())