225 lines
9.0 KiB
Python
225 lines
9.0 KiB
Python
import pandas as pd
|
|
import os
|
|
import sqlite3
|
|
from mycode.base import BASE_DIR
|
|
import re
|
|
from openpyxl import load_workbook
|
|
from urllib.parse import urlparse
|
|
|
|
wechat_dir = os.path.join(BASE_DIR, 'article')
|
|
web_dir = os.path.join(BASE_DIR, 'web_dir')
|
|
output_dir = os.path.join(BASE_DIR, 'summary')
|
|
df_s = pd.read_excel(os.path.join(BASE_DIR, 'biao.xlsx'), sheet_name='筛查内容')
|
|
|
|
def fix_url_scheme(url, default_scheme='http'):
|
|
# 检查URL是否包含方案
|
|
if not url.startswith('http://') and not url.startswith('https://'):
|
|
# 如果没有方案,添加默认方案
|
|
url = f'{default_scheme}://{url}'
|
|
return url
|
|
|
|
def trans_to_json():
|
|
json_str = df_s.to_json(orient='records', force_ascii=False)
|
|
with open('biao.json', 'w', encoding='utf-8') as f:
|
|
f.write(json_str)
|
|
|
|
def make_simple_csv_from_db():
|
|
conn = sqlite3.connect(os.path.join(BASE_DIR, 'db_folder/test.db'))
|
|
query = "select id, g.nickname, a.title, a.content_url, datetime(a.p_date, 'unixepoch', 'localtime') as pub_date from articles a LEFT JOIN gzhs g on g.biz = a.biz"
|
|
df = pd.read_sql_query(query, conn)
|
|
# 关闭数据库连接
|
|
conn.close()
|
|
# 将数据写入CSV文件
|
|
df.to_csv(os.path.join(wechat_dir, 'articles.csv'), index=False)
|
|
|
|
|
|
def get_cbma_info_from_db_and_ana(year: str = '2023'):
|
|
conn = sqlite3.connect(os.path.join(BASE_DIR, 'db_folder/test.db'))
|
|
query = f'''
|
|
SELECT
|
|
id,
|
|
strftime('%Y年%m月%d日', datetime(a.p_date, 'unixepoch', 'localtime')) as pub_date,
|
|
g.nickname,
|
|
a.title,
|
|
a.content_url,
|
|
a.read_num
|
|
FROM
|
|
articles a
|
|
LEFT JOIN
|
|
gzhs g ON g.biz = a.biz
|
|
WHERE
|
|
pub_date > '{year}'
|
|
AND
|
|
g.biz = 'MzIzMDU4Njg3MA=='
|
|
ORDER BY
|
|
pub_date
|
|
'''
|
|
df = pd.read_sql_query(query, conn)
|
|
# 关闭数据库连接
|
|
conn.close()
|
|
for ind, row in df.iterrows():
|
|
id = row['id']
|
|
full_path = os.path.join(wechat_dir, row['nickname'], row['id'] + '.md')
|
|
try:
|
|
with open(full_path, encoding='utf-8') as f:
|
|
content = f.read()
|
|
# 从content中获取来源
|
|
a_match = re.findall('来源丨(.*?)\n', content)
|
|
a_list = []
|
|
if a_match:
|
|
# a = a_match[0].replace('\xa0', '、').replace(' ', '、')
|
|
# a = re.sub(r'、+', '、', a)
|
|
a = re.sub(r'[\xa0\s]+', '、', a_match[0])
|
|
df.at[ind, 'source'] = a
|
|
except FileNotFoundError:
|
|
print(full_path + '---不存在')
|
|
# 填充到execl中
|
|
template_path = os.path.join(BASE_DIR, 'summary/template_cbma.xlsx')
|
|
workbook = load_workbook(template_path)
|
|
sheet = workbook['公众号更新数']
|
|
sheet.cell(row=1, column=1, value=f'关于{year}年度中国建材总院新媒体更新情况明细表\n(官微)')
|
|
for ind, row in df.iterrows():
|
|
sheet.cell(row=ind+3, column=1, value=str(ind+1))
|
|
sheet.cell(row=ind+3, column=2, value=row['pub_date'])
|
|
sheet.cell(row=ind+3, column=3, value=row['title'])
|
|
sheet.cell(row=ind+3, column=4, value=row['source'])
|
|
sheet.cell(row=ind+3, column=6, value=row['read_num'])
|
|
sheet.cell(row=ind+3, column=7, value=row['content_url'])
|
|
output_path = os.path.join(BASE_DIR, f'summary/{year}年_cbma.xlsx')
|
|
workbook.save(output_path)
|
|
# 开始统计分数
|
|
t_1 = (df['source'].str.contains('瑞泰科技')).sum()
|
|
t_2 = (df['source'].str.contains('国检集团')).sum()
|
|
t_3 = (df['source'].str.contains('中材高新')).sum()
|
|
t_4 = (df['source'].str.contains('哈玻院')).sum()
|
|
t_5 = (df['source'].str.contains('中国新材院')).sum()
|
|
t_6 = (df['source'].str.contains('秦皇岛院')).sum()
|
|
t_7 = (df['source'].str.contains('西安墙材院')).sum()
|
|
t_8 = (df['source'].str.contains('咸阳陶瓷院')).sum()
|
|
t_9 = (df['source'].str.contains('钟表所')).sum()
|
|
t_10 = (df['source'].str.contains('总院北分')).sum()
|
|
t_11 = (df['source'].str.contains('中岩科技')).sum()
|
|
t_12 = (df['source'].str.contains('水泥新材院')).sum()
|
|
t_13 = (df['source'].str.contains('中建材科创院')).sum()
|
|
t_14 = (df['source'].str.contains('科建苑')).sum()
|
|
template_cal_path = os.path.join(BASE_DIR, 'summary/tempalte_cbma_cal.xlsx')
|
|
workbook2 = load_workbook(template_cal_path)
|
|
sheet2= workbook2['打分表']
|
|
sheet2.cell(row=1, column=1, value=f'中国建材总院宣传工作计分表({year}年度)')
|
|
sheet2.cell(row=6, column=5, value=t_1)
|
|
sheet2.cell(row=6, column=7, value=t_2)
|
|
sheet2.cell(row=6, column=9, value=t_3)
|
|
sheet2.cell(row=6, column=11, value=t_4)
|
|
sheet2.cell(row=6, column=13, value=t_5)
|
|
sheet2.cell(row=6, column=15, value=t_6)
|
|
sheet2.cell(row=6, column=17, value=t_7)
|
|
sheet2.cell(row=6, column=19, value=t_8)
|
|
sheet2.cell(row=6, column=21, value=t_9)
|
|
sheet2.cell(row=6, column=23, value=t_10)
|
|
sheet2.cell(row=6, column=25, value=t_11)
|
|
sheet2.cell(row=6, column=27, value=t_12)
|
|
sheet2.cell(row=6, column=29, value=t_13)
|
|
sheet2.cell(row=6, column=31, value=t_14)
|
|
output_path2 = os.path.join(BASE_DIR, f'summary/{year}年_cbma_cal.xlsx')
|
|
workbook2.save(output_path2)
|
|
return output_path, output_path2
|
|
|
|
|
|
def make_wechat_articles_full():
|
|
df = pd.read_csv(os.path.join(wechat_dir, 'articles.csv'))
|
|
df['content'] = ''
|
|
for ind, row in df.iterrows():
|
|
full_path = os.path.join(wechat_dir, row['nickname'], row['id'] + '.md')
|
|
try:
|
|
with open(full_path, encoding='utf-8') as f:
|
|
df.at[ind, 'content'] = f.read()
|
|
except FileNotFoundError:
|
|
print(full_path + '---不存在')
|
|
output_path = os.path.join(wechat_dir, 'articles_full.csv')
|
|
df.to_csv(output_path)
|
|
|
|
def ana_wechat():
|
|
articles_full_path = os.path.join(wechat_dir, 'articles_full.csv')
|
|
if not os.path.exists(articles_full_path):
|
|
make_wechat_articles_full()
|
|
|
|
df = pd.read_csv(articles_full_path)
|
|
df['content'] = df['content'].fillna('')
|
|
|
|
output_data = []
|
|
index = 1
|
|
|
|
for ind, row in df_s.iterrows():
|
|
mask = df['content'].str.contains(row['错误表述'])
|
|
result = df[mask]
|
|
|
|
if not result.empty:
|
|
for ind2, row2 in result.iterrows():
|
|
if row['错误表述'] == '“两学一做”学习' and '“两学一做”学习教育' in row2['content']:
|
|
continue
|
|
if row['错误表述'] == '20大':
|
|
continue
|
|
output_row = [
|
|
index,
|
|
row2['nickname'],
|
|
row2['title'],
|
|
row['错误表述'],
|
|
row['建议修改词语'],
|
|
row['错误分类'],
|
|
row2['content_url']
|
|
]
|
|
output_data.append(output_row)
|
|
index += 1
|
|
print(f'找到公众号问题{index}---{row2["nickname"]}')
|
|
# output_data.insert(0, ['序号', '信源名称', '文章标题', '错误表述', '建议修改词语', '错误分类', '原文链接'])
|
|
|
|
return output_data
|
|
|
|
|
|
def ana_web():
|
|
output_data = []
|
|
index = 1
|
|
# for file in os.listdir(web_dir):
|
|
# full_path = os.path.join(web_dir, file)
|
|
# if '$' in full_path:
|
|
# continue
|
|
# print(full_path)
|
|
# 只分析websites中的
|
|
df = pd.read_excel('web_sites.xlsx', sheet_name='Sheet1')
|
|
for ind, row in df.iterrows():
|
|
group = row['单位']
|
|
name = row['主办']
|
|
url = fix_url_scheme(row['地址'].strip())
|
|
domain = urlparse(url).netloc.replace('www.', '')
|
|
full_path = os.path.join(BASE_DIR, f'web_dir/{name}_{domain}.xlsx')
|
|
if os.path.exists(full_path) and os.path.getsize(full_path) > 0:
|
|
df = pd.read_excel(os.path.join(full_path), engine='openpyxl')
|
|
for ind, row in df_s.iterrows():
|
|
mask = df['text'].str.contains(row['错误表述'], na=False)
|
|
result = df[mask]
|
|
if not result.empty:
|
|
for ind2, row2 in result.iterrows():
|
|
if row['错误表述'] == '“两学一做”学习' and '“两学一做”学习教育' in row2['text']:
|
|
continue
|
|
if row['错误表述'] == '20大':
|
|
continue
|
|
output_row = [
|
|
index,
|
|
row2['name'],
|
|
"/",
|
|
row['错误表述'],
|
|
row['建议修改词语'],
|
|
row['错误分类'],
|
|
row2['url']
|
|
]
|
|
output_data.append(output_row)
|
|
index += 1
|
|
print(f'找到官网问题{index}---{row2["name"]}')
|
|
# output_data.insert(0, ['序号', '信源名称', '文章标题', '错误表述', '建议修改词语', '错误分类', '原文链接'])
|
|
|
|
return output_data
|
|
|
|
if __name__ == "__main__":
|
|
get_cbma_info_from_db_and_ana()
|
|
|