import sys from PySide6.QtCore import QStringListModel, QThread, Signal from PySide6.QtWidgets import QApplication, QMainWindow from PySide6.QtGui import QIntValidator from ui_mainwindow import Ui_MainWindow import win32com.client as win32 import subprocess import os import datetime from mycode.main import make_simple_csv_from_db, make_wechat_articles_full, ana_web, ana_wechat, output_dir from mycode.crawl_chrome import chrom_main_from_list import pandas as pd from urllib.parse import urlparse from openpyxl import load_workbook import threading import select # from queue import Queue BASE_DIR = os.path.dirname(os.path.abspath(__file__)) WEB_SITES_PATH = os.path.join(BASE_DIR, 'web_sites.xlsx') BIAO_PATH = os.path.join(BASE_DIR, 'biao.xlsx') PYTHON_PATH = os.path.join(BASE_DIR, 'runtime/python.exe') TEMPLATE_PATH = os.path.join(BASE_DIR, 'summary/template.xlsx') class MyApplication(QApplication): def __init__(self, argv): super(MyApplication, self).__init__(argv) self.main_window = None def createMainWindow(self): if self.main_window is None: self.main_window = MainWindow() return self.main_window class MyThread(QThread): update_signal = Signal(dict) def __init__(self, lsize) -> None: super().__init__() self.lsize = lsize self.processes = [] self.running = False def capture_output(self, p): while self.running and p.poll() is None: output = p.stdout.readline() if output: self.update_signal.emit({'msg': output.strip()}) def run(self) -> None: month = datetime.datetime.now().month self.update_signal.emit({'msg': '巡查任务开始...'}) self.update_signal.emit({'msg': '正在组合微信公众号爬取内容...'}) make_simple_csv_from_db() make_wechat_articles_full() self.update_signal.emit({'msg': "公众号爬取内容组装完毕!"}) self.update_signal.emit({'msg': '开始进行网站爬取...'}) df = pd.read_excel('web_sites.xlsx', sheet_name='Sheet1') ind = 0 for ind, row in df.iterrows(): group = row['单位'] name = row['主办'] url = row['地址'] domain = urlparse(url).netloc.replace('www.', '') # output = os.path.join(BASE_DIR, f'web_dir/{name}_{domain}.xlsx') # -u 代表不缓冲,直接输出 cmd = [PYTHON_PATH, '-u', '-m', 'scrapy', 'crawl', 'basespider', '-a', f'domain={domain}', '-a', f'start_url={url}', '-a', f'name={name}', '-a', f'group={group}', '-o', f'web_dir/{name}_{domain}.xlsx'] process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, shell=False) self.processes.append(process) self.running = True getlog_thread = threading.Thread(target=self.capture_output, args=(process,), daemon=True) getlog_thread.start() for process in self.processes: process.wait() self.update_signal.emit({'msg': '网站爬取结束,校验中...'}) info_to_save = [] for ind, row in df.iterrows(): group = row['单位'] name = row['主办'] url = row['地址'] domain = urlparse(url).netloc.replace("www.", "") output_filename = os.path.join(BASE_DIR, f'web_dir/{name}_{domain}.xlsx') if os.path.exists(output_filename): file_size = os.path.getsize(output_filename) if file_size < self.lsize * 1024: # Convert KB to bytes info_to_save.append([group, name, url]) if info_to_save: self.update_signal.emit({'msg': '存在未爬取站点,正在调用Chrome继续爬取...'}) chrom_main_from_list(info_to_save) self.update_signal.emit({'msg': '网站爬取完毕!'}) self.update_signal.emit({'msg': '开始对比分析所有内容...'}) wechat_results = ana_wechat() web_results = ana_web() output_excel_path = os.path.join(BASE_DIR, f'summary/{month}月-总院及下属公司官方公众号巡查结果汇总表.xlsx') workbook = load_workbook(TEMPLATE_PATH) # 选择要操作的工作表 wechat_sheet = workbook['公众号'] web_sheet = workbook['网站'] for row in wechat_results: wechat_sheet.append(row) for row in web_results: web_sheet.append(row) workbook.save(output_excel_path) workbook.close() self.update_signal.emit({'msg': '巡查任务执行完毕, 请查看结果栏, 可手动校对', 'output_excel_path': output_excel_path}) self.exec() def close(self): self.running = False if self.processes: for i in self.processes: i.kill() self.terminate() class MainWindow(QMainWindow): def __init__(self): super(MainWindow, self).__init__() self.worker_thread = None self.wcplus = False self.logModel= QStringListModel([]) self.ui = Ui_MainWindow() self.ui.setupUi(self) self.ui.lSize.setValidator(QIntValidator()) self.ui.bWechat.clicked.connect(self.open_wcplus) self.ui.bWebSite.clicked.connect(self.open_websites_xlsx) self.ui.bBiao.clicked.connect(self.open_biao_xlsx) self.ui.bStart.clicked.connect(self.start) self.ui.bRes1.clicked.connect(self.open_res1) self.ui.bRes2.clicked.connect(self.open_res2) self.ui.vLog.setModel(self.logModel) self.res1Workbook = None def open_wcplus(self): if self.wcplus is False: subprocess.Popen('.\wechat.exe') self.wcplus = True def open_websites_xlsx(self): app = win32.Dispatch("Excel.Application") app.Visible = True app.Workbooks.Open(WEB_SITES_PATH) app.WindowState = 3 def open_biao_xlsx(self): app = win32.Dispatch("Excel.Application") app.Visible = True app.Workbooks.Open(BIAO_PATH) app.WindowState = 3 def open_res1(self): if self.ui.lRes1.text(): app = win32.Dispatch("Excel.Application") app.Visible = True self.res1Workbook = app.Workbooks.Open(self.ui.lRes1.text()) app.WindowState = 3 def open_res2(self): if self.ui.lRes2.text(): app = win32.Dispatch("Excel.Application") app.Visible = True app.Workbooks.Open(self.ui.lRes2.text()) app.WindowState = 3 def get_time(self): now = datetime.datetime.now() return now.strftime('%H:%M:%S') def start(self): if self.ui.bStart.text() == '开始巡查' or self.ui.bStart.text() == '重新开始': self.log('', True) if self.res1Workbook: self.res1Workbook.Close() self.ui.lSize.setEnabled(False) self.ui.bStart.setText('停止巡查') self.start_web(int(self.ui.lSize.text())) elif self.ui.bStart.text() == '停止巡查': self.update_log({'msg': '正在停止...'}) if self.worker_thread: self.worker_thread.close() self.log('', True) self.ui.lSize.setEnabled(True) self.ui.bStart.setText('开始巡查') def start_web(self, lsize): self.worker_thread = MyThread(lsize) self.worker_thread.update_signal.connect(self.update_log) self.worker_thread.start() def update_log(self, rdict): self.log(f'{self.get_time()}-{rdict["msg"]}', False) if 'output_excel_path' in rdict: self.ui.lRes1.setText(rdict['output_excel_path']) self.ui.bStart.setText('重新开始') self.ui.lSize.setEnabled(True) def log(self, logLine: str, clear=False): log_list = self.logModel.stringList() if clear: log_list = [] else: log_list.append(logLine) self.logModel.setStringList(log_list) if clear: self.ui.vLog.scrollToTop() else: self.ui.vLog.scrollToBottom() def closeEvent(self, event): if self.wcplus: try: subprocess.Popen(['taskkill', '/F', '/IM', 'wechat.exe']) except Exception as e: print(f"Error while terminating WeChat.exe: {str(e)}") self.wcplus = False if self.worker_thread: self.worker_thread.close() event.accept() if __name__ == "__main__": app = MyApplication(sys.argv) main_window = app.createMainWindow() main_window.show() sys.exit(app.exec())