349 lines
13 KiB
Python
349 lines
13 KiB
Python
import sys
|
|
from PySide6.QtCore import QStringListModel, QThread, Signal
|
|
from PySide6.QtWidgets import QApplication, QMainWindow
|
|
from PySide6.QtGui import QIntValidator
|
|
from ui_mainwindow import Ui_MainWindow
|
|
import win32com.client as win32
|
|
import subprocess
|
|
import os
|
|
import datetime
|
|
from mycode.main import make_simple_csv_from_db, make_wechat_articles_full, ana_web, ana_wechat, output_dir, get_cbma_info_from_db_and_ana
|
|
from mycode.crawl_chrome import chrom_main_from_list
|
|
import pandas as pd
|
|
from urllib.parse import urlparse
|
|
from openpyxl import load_workbook
|
|
import threading
|
|
import traceback
|
|
from docxtpl import DocxTemplate
|
|
import json
|
|
# from queue import Queue
|
|
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
WEB_SITES_PATH = os.path.join(BASE_DIR, 'web_sites.xlsx')
|
|
BIAO_PATH = os.path.join(BASE_DIR, 'biao.xlsx')
|
|
PYTHON_PATH = os.path.join(BASE_DIR, 'runtime/python.exe')
|
|
TEMPLATE_PATH = os.path.join(BASE_DIR, 'summary/template.xlsx')
|
|
TEMPLATE_REPORT_PATH = os.path.join(BASE_DIR, 'summary/template_report.docx')
|
|
|
|
|
|
def fix_url_scheme(url, default_scheme='http'):
|
|
# 检查URL是否包含方案
|
|
if not url.startswith('http://') and not url.startswith('https://'):
|
|
# 如果没有方案,添加默认方案
|
|
url = f'{default_scheme}://{url}'
|
|
return url
|
|
|
|
|
|
class MyApplication(QApplication):
|
|
def __init__(self, argv):
|
|
super(MyApplication, self).__init__(argv)
|
|
self.main_window = None
|
|
|
|
def createMainWindow(self):
|
|
if self.main_window is None:
|
|
self.main_window = MainWindow()
|
|
return self.main_window
|
|
|
|
|
|
def gen_doc(w1, w2):
|
|
now = datetime.datetime.now()
|
|
now_3 = now - datetime.timedelta(days=3)
|
|
# with open('w2.json', 'r', encoding='utf-8') as f:
|
|
# w2 = json.loads(f.read())
|
|
# with open('w1.json', 'r', encoding='utf-8') as f:
|
|
# w1 = json.loads(f.read())
|
|
cate_dict = {}
|
|
context = {'y': now.year, 'm': now.month, 'd': now.day,
|
|
'mo': now_3.month, 'do': now_3.day, 'su': 'xx', 'w1': w1, 'w2': w2}
|
|
output_report_path = os.path.join(
|
|
BASE_DIR, f'summary/{now.year}年{now.month}月-分析结果简报.docx')
|
|
doc = DocxTemplate(TEMPLATE_REPORT_PATH)
|
|
for i in w1:
|
|
if i[5] in cate_dict:
|
|
cate_dict[i[5]] = cate_dict[i[5]] + 1
|
|
else:
|
|
cate_dict[i[5]] = 1
|
|
for i in w2:
|
|
if i[5] in cate_dict:
|
|
cate_dict[i[5]] = cate_dict[i[5]] + 1
|
|
else:
|
|
cate_dict[i[5]] = 1
|
|
context['su'] = ''
|
|
for k, v in cate_dict.items():
|
|
context['su'] = context['su'] + f', {k}{v}项'
|
|
doc.render(context)
|
|
doc.save(output_report_path)
|
|
return output_report_path
|
|
|
|
|
|
class AnaThread(QThread):
|
|
update_signal = Signal(object)
|
|
|
|
def ana(self):
|
|
now = datetime.datetime.now()
|
|
self.update_signal.emit({'msg': '对比开始...'})
|
|
self.update_signal.emit({'msg': '正在组合微信公众号爬取内容...'})
|
|
make_simple_csv_from_db(now)
|
|
make_wechat_articles_full()
|
|
self.update_signal.emit({'msg': "公众号爬取内容组装完毕!"})
|
|
self.update_signal.emit({'msg': '开始对比分析所有内容...'})
|
|
wechat_results = ana_wechat()
|
|
web_results = ana_web()
|
|
try:
|
|
# 生成汇总表
|
|
self.update_signal.emit({'msg': '开始生成汇总表...'})
|
|
output_excel_path = os.path.join(
|
|
BASE_DIR, f'summary/{now.year}年{now.month}月-分析结果汇总表.xlsx')
|
|
workbook = load_workbook(TEMPLATE_PATH)
|
|
wechat_sheet = workbook['公众号']
|
|
web_sheet = workbook['网站']
|
|
for row in wechat_results:
|
|
wechat_sheet.append(row)
|
|
for row in web_results:
|
|
web_sheet.append(row)
|
|
workbook.save(output_excel_path)
|
|
workbook.close()
|
|
# with open('w1.json', 'w', encoding='utf-8') as f:
|
|
# f.write(json.dumps(wechat_results, ensure_ascii=False))
|
|
|
|
# with open('w2.json', 'w', encoding='utf-8') as f:
|
|
# f.write(json.dumps(web_results, ensure_ascii=False))
|
|
# 生成简报
|
|
self.update_signal.emit({'msg': '开始生成汇总简报...'})
|
|
output_report_path = gen_doc(wechat_results, web_results)
|
|
self.update_signal.emit(
|
|
{'msg': '分析完毕, 请查看结果栏, 可手动校对', 'output_excel_path': output_excel_path, 'output_report_path': output_report_path})
|
|
except PermissionError as e:
|
|
self.update_signal.emit({'msg': str(e)})
|
|
self.update_signal.emit({'msg': '文件被占用请先关闭!'})
|
|
raise
|
|
|
|
def run(self) -> None:
|
|
try:
|
|
self.ana()
|
|
except Exception as e:
|
|
self.update_signal.emit({'msg': traceback.format_exc()})
|
|
|
|
|
|
class MyThread(QThread):
|
|
update_signal = Signal(object)
|
|
|
|
def __init__(self, lsize) -> None:
|
|
"""
|
|
lsize: 多少kb需要调取Chrome
|
|
"""
|
|
super().__init__()
|
|
self.lsize = lsize
|
|
self.processes = []
|
|
self.running = False
|
|
|
|
def capture_output(self, p):
|
|
while self.running and p.poll() is None:
|
|
output = p.stdout.readline()
|
|
if output:
|
|
self.update_signal.emit({'msg': output.strip()})
|
|
|
|
def run(self) -> None:
|
|
self.update_signal.emit({'msg': '开始进行网站爬取...'})
|
|
df = pd.read_excel('web_sites.xlsx', sheet_name='Sheet1')
|
|
ind = 0
|
|
for ind, row in df.iterrows():
|
|
group = row['单位']
|
|
name = row['主办']
|
|
url = fix_url_scheme(row['地址'].strip())
|
|
domain = urlparse(url).netloc.replace('www.', '')
|
|
output = os.path.join(BASE_DIR, f'web_dir/{name}_{domain}.xlsx')
|
|
# -u 代表不缓冲,直接输出
|
|
cmd = [PYTHON_PATH, '-u', '-m', 'scrapy', 'crawl', 'basespider', '-a',
|
|
f'domain={domain}', '-a', f'start_url={url}', '-a', f'name={name}', '-a', f'group={group}', '-a', f'output={output}']
|
|
# cmd = [PYTHON_PATH, '-u', '-m', 'scrapy', 'crawl', 'basespider', '-a', f'domain={domain}', '-a', f'start_url={url}', '-a', f'name={name}', '-a', f'group={group}', '-o', f'web_dir/{name}_{domain}.xlsx']
|
|
process = subprocess.Popen(
|
|
cmd, stdout=subprocess.PIPE, text=True, shell=False)
|
|
self.processes.append(process)
|
|
self.running = True
|
|
getlog_thread = threading.Thread(
|
|
target=self.capture_output, args=(process,), daemon=True)
|
|
getlog_thread.start()
|
|
# getlog_thread_err = threading.Thread(target=self.capture_err, args=(process,), daemon=True)
|
|
# getlog_thread_err.start()
|
|
|
|
for process in self.processes:
|
|
process.wait()
|
|
self.update_signal.emit({'msg': '网站爬取结束,校验中...'})
|
|
info_to_save = []
|
|
for ind, row in df.iterrows():
|
|
group = row['单位']
|
|
name = row['主办']
|
|
url = fix_url_scheme(row['地址'].strip())
|
|
domain = urlparse(url).netloc.replace("www.", "")
|
|
output_filename = os.path.join(
|
|
BASE_DIR, f'web_dir/{name}_{domain}.xlsx')
|
|
if os.path.exists(output_filename):
|
|
file_size = os.path.getsize(output_filename)
|
|
if file_size < self.lsize * 1024: # Convert KB to bytes
|
|
info_to_save.append([group, name, url])
|
|
if info_to_save:
|
|
self.update_signal.emit({'msg': '存在未爬取站点,正在调用Chrome继续爬取...'})
|
|
chrom_main_from_list(info_to_save)
|
|
self.update_signal.emit({'msg': '网站爬取完毕!'})
|
|
|
|
def close(self):
|
|
self.running = False
|
|
if self.processes:
|
|
for i in self.processes:
|
|
i.kill()
|
|
self.terminate()
|
|
|
|
|
|
class MainWindow(QMainWindow):
|
|
|
|
def __init__(self):
|
|
super(MainWindow, self).__init__()
|
|
self.web_thread = None
|
|
self.ana_thread = None
|
|
self.wcplus = False
|
|
self.logModel = QStringListModel([])
|
|
self.ui = Ui_MainWindow()
|
|
self.ui.setupUi(self)
|
|
self.ui.lSize.setValidator(QIntValidator())
|
|
self.ui.bWechat.clicked.connect(self.open_wcplus)
|
|
self.ui.bWebSite.clicked.connect(
|
|
lambda: self.open_file(WEB_SITES_PATH))
|
|
self.ui.bBiao.clicked.connect(lambda: self.open_file(BIAO_PATH))
|
|
self.ui.bStart.clicked.connect(self.start)
|
|
self.ui.bAna.clicked.connect(self.start_ana)
|
|
self.ui.bRes1.clicked.connect(
|
|
lambda: self.open_file(self.ui.lRes1.text()))
|
|
self.ui.bRes2.clicked.connect(
|
|
lambda: self.open_file(self.ui.lRes2.text(), 'docx'))
|
|
self.ui.bCal.clicked.connect(
|
|
lambda: self.cbma_cal(self.ui.lYear.text()))
|
|
self.ui.bOpenCalRes1.clicked.connect(
|
|
lambda: self.open_file(self.ui.lCalRes1.text()))
|
|
self.ui.bOpenCalRes2.clicked.connect(
|
|
lambda: self.open_file(self.ui.lCalRes2.text()))
|
|
self.ui.bOpenCalRes3.clicked.connect(
|
|
lambda: self.open_file(self.ui.lCalRes3.text()))
|
|
self.ui.bOpenCalRes4.clicked.connect(
|
|
lambda: self.open_file(self.ui.lCalRes4.text()))
|
|
self.ui.vLog.setModel(self.logModel)
|
|
self.res1Workbook = None
|
|
|
|
def open_wcplus(self):
|
|
if self.wcplus is False:
|
|
subprocess.Popen('.\wcplus.exe')
|
|
self.wcplus = True
|
|
|
|
def open_file(self, path, type='xlsx'):
|
|
if path:
|
|
# try:
|
|
# os.startfile(path)
|
|
# except Exception as e:
|
|
# print("无法打开文件:", str(e))
|
|
if type == 'docx':
|
|
app = win32.Dispatch("Word.Application")
|
|
app.Visible = True
|
|
app.Documents.Open(path)
|
|
app.WindowState = 3
|
|
elif type == 'xlsx':
|
|
app = win32.Dispatch("Excel.Application")
|
|
app.Visible = True
|
|
app.Workbooks.Open(path)
|
|
app.WindowState = 3
|
|
|
|
def get_time(self):
|
|
now = datetime.datetime.now()
|
|
return now.strftime('%H:%M:%S')
|
|
|
|
def start(self):
|
|
if self.ui.bStart.text() == '开始爬取' or self.ui.bStart.text() == '重新开始':
|
|
self.log('', True)
|
|
if self.res1Workbook:
|
|
self.res1Workbook.Close()
|
|
self.ui.lSize.setEnabled(False)
|
|
self.ui.bStart.setText('停止爬取')
|
|
self.start_web(int(self.ui.lSize.text()))
|
|
elif self.ui.bStart.text() == '停止爬取':
|
|
self.update_log({'msg': '正在停止...'})
|
|
if self.web_thread:
|
|
self.web_thread.close()
|
|
self.log('', True)
|
|
self.ui.lSize.setEnabled(True)
|
|
self.ui.bStart.setText('开始爬取')
|
|
|
|
def start_web(self, lsize):
|
|
self.web_thread = MyThread(lsize)
|
|
self.web_thread.update_signal.connect(self.update_log)
|
|
self.web_thread.start()
|
|
|
|
def start_ana(self):
|
|
self.ana_thread = AnaThread()
|
|
self.ana_thread.update_signal.connect(self.update_log)
|
|
self.ana_thread.start()
|
|
|
|
def cbma_cal(self, year):
|
|
try:
|
|
now_year = int(year)
|
|
except Exception:
|
|
now_year = datetime.datetime.now().year
|
|
self.update_log({'msg': '正在分析本年总院官微数据...'})
|
|
try:
|
|
origin_path, cbma_path, cbma_cal_path, cbma_month_path = get_cbma_info_from_db_and_ana(
|
|
now_year)
|
|
except PermissionError as e:
|
|
self.update_log({'msg': str(e)})
|
|
self.update_log({'msg': '文件被占用请先关闭!'})
|
|
raise
|
|
self.ui.lCalRes1.setText(origin_path)
|
|
self.ui.lCalRes2.setText(cbma_path)
|
|
self.ui.lCalRes3.setText(cbma_cal_path)
|
|
self.ui.lCalRes4.setText(cbma_month_path)
|
|
self.update_log({'msg': '分析完毕!'})
|
|
|
|
def update_log(self, rdict):
|
|
if isinstance(rdict, str):
|
|
self.log(f'{self.get_time()}-{rdict}', False)
|
|
elif isinstance(rdict, dict):
|
|
self.log(f'{self.get_time()}-{rdict["msg"]}', False)
|
|
if 'output_report_path' in rdict:
|
|
self.ui.lRes2.setText(rdict['output_report_path'])
|
|
# self.ui.bStart.setText('重新开始')
|
|
# self.ui.lSize.setEnabled(True)
|
|
if 'output_excel_path' in rdict:
|
|
self.ui.lRes1.setText(rdict['output_excel_path'])
|
|
# self.ui.bStart.setText('重新开始')
|
|
# self.ui.lSize.setEnabled(True)
|
|
|
|
def log(self, logLine: str, clear=False):
|
|
log_list = self.logModel.stringList()
|
|
if clear:
|
|
log_list = []
|
|
else:
|
|
log_list.append(logLine)
|
|
self.logModel.setStringList(log_list)
|
|
if clear:
|
|
self.ui.vLog.scrollToTop()
|
|
else:
|
|
self.ui.vLog.scrollToBottom()
|
|
|
|
def closeEvent(self, event):
|
|
if self.wcplus:
|
|
try:
|
|
subprocess.Popen(['taskkill', '/F', '/IM', 'wcplus.exe'])
|
|
except Exception as e:
|
|
print(f"Error while terminating wcplus.exe: {str(e)}")
|
|
self.wcplus = False
|
|
if self.web_thread:
|
|
self.web_thread.close()
|
|
event.accept()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# pyside6-uic main.ui -o ui_mainwindow.py
|
|
print('正在启动程序...')
|
|
app = MyApplication(sys.argv)
|
|
main_window = app.createMainWindow()
|
|
main_window.show()
|
|
print('启动成功')
|
|
sys.exit(app.exec())
|