zcspider/start.py

349 lines
13 KiB
Python

import sys
from PySide6.QtCore import QStringListModel, QThread, Signal
from PySide6.QtWidgets import QApplication, QMainWindow
from PySide6.QtGui import QIntValidator
from ui_mainwindow import Ui_MainWindow
import win32com.client as win32
import subprocess
import os
import datetime
from mycode.main import make_simple_csv_from_db, make_wechat_articles_full, ana_web, ana_wechat, output_dir, get_cbma_info_from_db_and_ana
from mycode.crawl_chrome import chrom_main_from_list
import pandas as pd
from urllib.parse import urlparse
from openpyxl import load_workbook
import threading
import traceback
from docxtpl import DocxTemplate
import json
# from queue import Queue
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
WEB_SITES_PATH = os.path.join(BASE_DIR, 'web_sites.xlsx')
BIAO_PATH = os.path.join(BASE_DIR, 'biao.xlsx')
PYTHON_PATH = os.path.join(BASE_DIR, 'runtime/python.exe')
TEMPLATE_PATH = os.path.join(BASE_DIR, 'summary/template.xlsx')
TEMPLATE_REPORT_PATH = os.path.join(BASE_DIR, 'summary/template_report.docx')
def fix_url_scheme(url, default_scheme='http'):
# 检查URL是否包含方案
if not url.startswith('http://') and not url.startswith('https://'):
# 如果没有方案,添加默认方案
url = f'{default_scheme}://{url}'
return url
class MyApplication(QApplication):
def __init__(self, argv):
super(MyApplication, self).__init__(argv)
self.main_window = None
def createMainWindow(self):
if self.main_window is None:
self.main_window = MainWindow()
return self.main_window
def gen_doc(w1, w2):
now = datetime.datetime.now()
now_3 = now - datetime.timedelta(days=3)
# with open('w2.json', 'r', encoding='utf-8') as f:
# w2 = json.loads(f.read())
# with open('w1.json', 'r', encoding='utf-8') as f:
# w1 = json.loads(f.read())
cate_dict = {}
context = {'y': now.year, 'm': now.month, 'd': now.day,
'mo': now_3.month, 'do': now_3.day, 'su': 'xx', 'w1': w1, 'w2': w2}
output_report_path = os.path.join(
BASE_DIR, f'summary/{now.year}{now.month}月-分析结果简报.docx')
doc = DocxTemplate(TEMPLATE_REPORT_PATH)
for i in w1:
if i[5] in cate_dict:
cate_dict[i[5]] = cate_dict[i[5]] + 1
else:
cate_dict[i[5]] = 1
for i in w2:
if i[5] in cate_dict:
cate_dict[i[5]] = cate_dict[i[5]] + 1
else:
cate_dict[i[5]] = 1
context['su'] = ''
for k, v in cate_dict.items():
context['su'] = context['su'] + f', {k}{v}'
doc.render(context)
doc.save(output_report_path)
return output_report_path
class AnaThread(QThread):
update_signal = Signal(object)
def ana(self):
now = datetime.datetime.now()
self.update_signal.emit({'msg': '对比开始...'})
self.update_signal.emit({'msg': '正在组合微信公众号爬取内容...'})
make_simple_csv_from_db(now)
make_wechat_articles_full()
self.update_signal.emit({'msg': "公众号爬取内容组装完毕!"})
self.update_signal.emit({'msg': '开始对比分析所有内容...'})
wechat_results = ana_wechat()
web_results = ana_web()
try:
# 生成汇总表
self.update_signal.emit({'msg': '开始生成汇总表...'})
output_excel_path = os.path.join(
BASE_DIR, f'summary/{now.year}{now.month}月-分析结果汇总表.xlsx')
workbook = load_workbook(TEMPLATE_PATH)
wechat_sheet = workbook['公众号']
web_sheet = workbook['网站']
for row in wechat_results:
wechat_sheet.append(row)
for row in web_results:
web_sheet.append(row)
workbook.save(output_excel_path)
workbook.close()
# with open('w1.json', 'w', encoding='utf-8') as f:
# f.write(json.dumps(wechat_results, ensure_ascii=False))
# with open('w2.json', 'w', encoding='utf-8') as f:
# f.write(json.dumps(web_results, ensure_ascii=False))
# 生成简报
self.update_signal.emit({'msg': '开始生成汇总简报...'})
output_report_path = gen_doc(wechat_results, web_results)
self.update_signal.emit(
{'msg': '分析完毕, 请查看结果栏, 可手动校对', 'output_excel_path': output_excel_path, 'output_report_path': output_report_path})
except PermissionError as e:
self.update_signal.emit({'msg': str(e)})
self.update_signal.emit({'msg': '文件被占用请先关闭!'})
raise
def run(self) -> None:
try:
self.ana()
except Exception as e:
self.update_signal.emit({'msg': traceback.format_exc()})
class MyThread(QThread):
update_signal = Signal(object)
def __init__(self, lsize) -> None:
"""
lsize: 多少kb需要调取Chrome
"""
super().__init__()
self.lsize = lsize
self.processes = []
self.running = False
def capture_output(self, p):
while self.running and p.poll() is None:
output = p.stdout.readline()
if output:
self.update_signal.emit({'msg': output.strip()})
def run(self) -> None:
self.update_signal.emit({'msg': '开始进行网站爬取...'})
df = pd.read_excel('web_sites.xlsx', sheet_name='Sheet1')
ind = 0
for ind, row in df.iterrows():
group = row['单位']
name = row['主办']
url = fix_url_scheme(row['地址'].strip())
domain = urlparse(url).netloc.replace('www.', '')
output = os.path.join(BASE_DIR, f'web_dir/{name}_{domain}.xlsx')
# -u 代表不缓冲,直接输出
cmd = [PYTHON_PATH, '-u', '-m', 'scrapy', 'crawl', 'basespider', '-a',
f'domain={domain}', '-a', f'start_url={url}', '-a', f'name={name}', '-a', f'group={group}', '-a', f'output={output}']
# cmd = [PYTHON_PATH, '-u', '-m', 'scrapy', 'crawl', 'basespider', '-a', f'domain={domain}', '-a', f'start_url={url}', '-a', f'name={name}', '-a', f'group={group}', '-o', f'web_dir/{name}_{domain}.xlsx']
process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, text=True, shell=False)
self.processes.append(process)
self.running = True
getlog_thread = threading.Thread(
target=self.capture_output, args=(process,), daemon=True)
getlog_thread.start()
# getlog_thread_err = threading.Thread(target=self.capture_err, args=(process,), daemon=True)
# getlog_thread_err.start()
for process in self.processes:
process.wait()
self.update_signal.emit({'msg': '网站爬取结束,校验中...'})
info_to_save = []
for ind, row in df.iterrows():
group = row['单位']
name = row['主办']
url = fix_url_scheme(row['地址'].strip())
domain = urlparse(url).netloc.replace("www.", "")
output_filename = os.path.join(
BASE_DIR, f'web_dir/{name}_{domain}.xlsx')
if os.path.exists(output_filename):
file_size = os.path.getsize(output_filename)
if file_size < self.lsize * 1024: # Convert KB to bytes
info_to_save.append([group, name, url])
if info_to_save:
self.update_signal.emit({'msg': '存在未爬取站点,正在调用Chrome继续爬取...'})
chrom_main_from_list(info_to_save)
self.update_signal.emit({'msg': '网站爬取完毕!'})
def close(self):
self.running = False
if self.processes:
for i in self.processes:
i.kill()
self.terminate()
class MainWindow(QMainWindow):
def __init__(self):
super(MainWindow, self).__init__()
self.web_thread = None
self.ana_thread = None
self.wcplus = False
self.logModel = QStringListModel([])
self.ui = Ui_MainWindow()
self.ui.setupUi(self)
self.ui.lSize.setValidator(QIntValidator())
self.ui.bWechat.clicked.connect(self.open_wcplus)
self.ui.bWebSite.clicked.connect(
lambda: self.open_file(WEB_SITES_PATH))
self.ui.bBiao.clicked.connect(lambda: self.open_file(BIAO_PATH))
self.ui.bStart.clicked.connect(self.start)
self.ui.bAna.clicked.connect(self.start_ana)
self.ui.bRes1.clicked.connect(
lambda: self.open_file(self.ui.lRes1.text()))
self.ui.bRes2.clicked.connect(
lambda: self.open_file(self.ui.lRes2.text(), 'docx'))
self.ui.bCal.clicked.connect(
lambda: self.cbma_cal(self.ui.lYear.text()))
self.ui.bOpenCalRes1.clicked.connect(
lambda: self.open_file(self.ui.lCalRes1.text()))
self.ui.bOpenCalRes2.clicked.connect(
lambda: self.open_file(self.ui.lCalRes2.text()))
self.ui.bOpenCalRes3.clicked.connect(
lambda: self.open_file(self.ui.lCalRes3.text()))
self.ui.bOpenCalRes4.clicked.connect(
lambda: self.open_file(self.ui.lCalRes4.text()))
self.ui.vLog.setModel(self.logModel)
self.res1Workbook = None
def open_wcplus(self):
if self.wcplus is False:
subprocess.Popen('.\wcplus.exe')
self.wcplus = True
def open_file(self, path, type='xlsx'):
if path:
# try:
# os.startfile(path)
# except Exception as e:
# print("无法打开文件:", str(e))
if type == 'docx':
app = win32.Dispatch("Word.Application")
app.Visible = True
app.Documents.Open(path)
app.WindowState = 3
elif type == 'xlsx':
app = win32.Dispatch("Excel.Application")
app.Visible = True
app.Workbooks.Open(path)
app.WindowState = 3
def get_time(self):
now = datetime.datetime.now()
return now.strftime('%H:%M:%S')
def start(self):
if self.ui.bStart.text() == '开始爬取' or self.ui.bStart.text() == '重新开始':
self.log('', True)
if self.res1Workbook:
self.res1Workbook.Close()
self.ui.lSize.setEnabled(False)
self.ui.bStart.setText('停止爬取')
self.start_web(int(self.ui.lSize.text()))
elif self.ui.bStart.text() == '停止爬取':
self.update_log({'msg': '正在停止...'})
if self.web_thread:
self.web_thread.close()
self.log('', True)
self.ui.lSize.setEnabled(True)
self.ui.bStart.setText('开始爬取')
def start_web(self, lsize):
self.web_thread = MyThread(lsize)
self.web_thread.update_signal.connect(self.update_log)
self.web_thread.start()
def start_ana(self):
self.ana_thread = AnaThread()
self.ana_thread.update_signal.connect(self.update_log)
self.ana_thread.start()
def cbma_cal(self, year):
try:
now_year = int(year)
except Exception:
now_year = datetime.datetime.now().year
self.update_log({'msg': '正在分析本年总院官微数据...'})
try:
origin_path, cbma_path, cbma_cal_path, cbma_month_path = get_cbma_info_from_db_and_ana(
now_year)
except PermissionError as e:
self.update_log({'msg': str(e)})
self.update_log({'msg': '文件被占用请先关闭!'})
raise
self.ui.lCalRes1.setText(origin_path)
self.ui.lCalRes2.setText(cbma_path)
self.ui.lCalRes3.setText(cbma_cal_path)
self.ui.lCalRes4.setText(cbma_month_path)
self.update_log({'msg': '分析完毕!'})
def update_log(self, rdict):
if isinstance(rdict, str):
self.log(f'{self.get_time()}-{rdict}', False)
elif isinstance(rdict, dict):
self.log(f'{self.get_time()}-{rdict["msg"]}', False)
if 'output_report_path' in rdict:
self.ui.lRes2.setText(rdict['output_report_path'])
# self.ui.bStart.setText('重新开始')
# self.ui.lSize.setEnabled(True)
if 'output_excel_path' in rdict:
self.ui.lRes1.setText(rdict['output_excel_path'])
# self.ui.bStart.setText('重新开始')
# self.ui.lSize.setEnabled(True)
def log(self, logLine: str, clear=False):
log_list = self.logModel.stringList()
if clear:
log_list = []
else:
log_list.append(logLine)
self.logModel.setStringList(log_list)
if clear:
self.ui.vLog.scrollToTop()
else:
self.ui.vLog.scrollToBottom()
def closeEvent(self, event):
if self.wcplus:
try:
subprocess.Popen(['taskkill', '/F', '/IM', 'wcplus.exe'])
except Exception as e:
print(f"Error while terminating wcplus.exe: {str(e)}")
self.wcplus = False
if self.web_thread:
self.web_thread.close()
event.accept()
if __name__ == "__main__":
# pyside6-uic main.ui -o ui_mainwindow.py
print('正在启动程序...')
app = MyApplication(sys.argv)
main_window = app.createMainWindow()
main_window.show()
print('启动成功')
sys.exit(app.exec())