cp_flask/cp_app.py

357 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, jsonify, request, send_from_directory
from flask_cors import CORS
import json
from docx import Document
import io
import fitz # PyMuPDF
import requests
import os
import re
from flask_jwt_extended import JWTManager, create_access_token, jwt_required, get_jwt_identity
from datetime import timedelta
import uuid
import subprocess
class ParseError(Exception):
def __init__(self, msg="请求错误"):
self.msg = msg
app = Flask(__name__, static_folder='dist/assets', static_url_path='/assets')
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
CORS(app)
VUE_DIST_DIR = os.path.join(os.path.dirname(__file__), 'dist')
app.config.update(
JWT_SECRET_KEY='carbon',
JWT_ACCESS_TOKEN_EXPIRES=timedelta(days=1),
JWT_REFRESH_TOKEN_EXPIRES=timedelta(days=30),
JWT_ALGORITHM='HS256', # 签名算法
)
jwt = JWTManager(app)
LLM_URL = "http://106.0.4.200:9000/v1/chat/completions"
API_KEY = "JJVAide0hw3eaugGmxecyYYFw45FX2LfhnYJtC+W2rw"
MODEL = "Qwen/QwQ-32B"
HEADERS = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
OCR_URL = "http://127.0.0.1:3402/ocr_full"
def get_standard():
with open("./standard.json", "r", encoding="utf-8") as f:
standard = json.load(f)
return standard
@app.route("/api/system_info/", methods=["GET"])
def get_system_info():
return jsonify({"base_name": "转型金融核算系统"}), 200
@app.route("/api/standard/", methods=["GET"])
@jwt_required()
def get_s():
return get_standard()
@app.route("/api/check_token/", methods=["GET"])
@jwt_required()
def check_token():
return jsonify(), 200
@app.route('/')
def index():
return send_from_directory(VUE_DIST_DIR, 'index.html')
def get_users():
with open("./users.json", "r", encoding="utf-8") as f:
users = json.load(f)
return users
@app.route('/api/login/', methods=["POST"])
def login():
username = request.json.get("username", "unknown")
password = request.json.get("password", "unknown")
users = get_users()
if username in users and users[username]["password"] == password:
access = create_access_token(identity=username)
return jsonify({"access": access, "userInfo": {"username": username, "name": users[username]["name"]}}), 200
return jsonify({"err_msg": "用户名或密码错误"}), 400
@app.route("/api/cal/", methods=["POST"])
@jwt_required()
def cal():
data = get_standard()
file1 = request.files.get("file1", None)
file2 = request.files.get("file2", None)
file3 = request.files.get("file3", None)
file4 = request.files.get("file4", None)
file5 = request.files.get("file5", None)
file6 = request.files.get("file6", None)
if file1 or file2 or file3 or file4 or file5 or file6:
pass
else:
return jsonify({"err_msg": "请至少上传一个文件"}), 400
total_score = 0
if file1:
for item in data:
if item["thirdLevel"] in [
"碳中和路线图",
"短期/中期/长期减碳目标",
"设立碳管理相关部门",
"气候相关风险评估机制",
"内部碳定价机制",
"碳管理数字化平台建设",
"碳交易与履约能力",
"CCER等减排项目开发管理",
"数字化碳管理平台",
]:
item["result"] = item["scoringCriteria"][0]["选项"]
item["score"] = item["fullScore"]
total_score += item["score"]
if file2:
for item in data:
if item["thirdLevel"] in [
"能源与碳排放管理体系",
"碳排放数据监测、报告与核查",
"参与权威信息平台披露",
"碳中和目标与进展经第三方认证",
"碳排放实时监测覆盖率达标",
"数据自动化采集比例达标",
"数据质量与校验机制",
]:
item["result"] = item["scoringCriteria"][0]["选项"]
item["score"] = item["fullScore"]
total_score += item["score"]
if file3:
for item in data:
if item["thirdLevel"] in [
"ESG报告",
"工业固废/生物质资源利用率数据",
"硫化物减排措施",
"氮氧化物减排措施",
"其他污染物减排措施",
"项目选址生态避让与保护",
"矿山生态修复与复垦方案",
"厂区绿化与生态碳汇措施",
"低碳产品认证与标识",
"产品耐久性与回收性设计",
"无环保处罚与信访记录",
"环境应急管理体系",
"员工健康安全管理体系与制度",
"符合标准的物理环境与防护措施",
"员工心理健康支持计划",
"社区沟通与透明度机制",
"社区经济与发展贡献措施",
"社区负面影响缓解措施",
"供应商行为准则",
"供应商筛查与评估机制",
"供应商审核与改进机制",
"完善的治理结构",
"商业道德与反腐败制度",
]:
item["result"] = item["scoringCriteria"][0]["选项"]
item["score"] = item["fullScore"]
total_score += item["score"]
if file4:
for item in data:
if item["thirdLevel"] in [
"资金分配明细",
"资本金比例与到位证明",
"融资渠道多样性",
"成本效益分析",
"碳减排收益量化",
"社会效益评估",
"风险管控方案",
"关键风险应对策略与预案",
"金融机构或第三方风险分担机制",
"绿色金融资质认证与资金用途",
"融资条款与ESG绩效挂钩",
"国际合作资金申请与利用",
"应急响应与能力建设机制",
]:
item["result"] = item["scoringCriteria"][0]["选项"]
item["score"] = item["fullScore"]
total_score += item["score"]
if file5:
for item in data:
if item["thirdLevel"] in [
"AI预测减碳潜力应用",
"智能优化控制算法应用",
"ERP/EMS/MES系统集成度达标",
"IoT设备覆盖率达标",
"跨系统数据协同能力",
"碳数据安全管理措施",
"系统抗攻击能力达标",
"数据合规性与审计追踪机制",
]:
item["result"] = item["scoringCriteria"][0]["选项"]
item["score"] = item["fullScore"]
total_score += item["score"]
e_filename = None
e_content, e_err_msg = None, None
if file6:
# 获取文件名和类型
filename = file6.filename
e_filename = filename
file_type = filename.rsplit('.', 1)[1].lower() if '.' in filename else None
content, err_msg = parse_file(file6, file_type)
e_content = content
e_err_msg = err_msg
if content:
try:
res = ask(f'以下内容为用户报告: {content}', "tec")
except ParseError as e:
return jsonify({"err_msg": e.msg}), 400
if res == "":
for item in data:
if item["firstLevel"] == "二、技术路径35 分)":
item["result"] = item["scoringCriteria"][0]["选项"]
item["score"] = item["fullScore"]
total_score += item["score"]
else:
return jsonify({"err_msg": err_msg}), 400
if file1:
filename = file1.filename
file_type = filename.rsplit('.', 1)[1].lower() if '.' in filename else None
if filename == e_filename:
content, err_msg = e_content, e_err_msg
else:
content, err_msg = parse_file(file1, file_type)
if content:
if bool(re.search(r'碳?减排目标', content)):
data[3]["result"] = ""
data[3]["score"] = data[3]["fullScore"]
total_score += data[3]["score"]
def cal_percent(decline_patterns, content, data, index, total_score):
decline_percent = None
for pattern in decline_patterns:
match = re.search(pattern, content, re.DOTALL)
if match:
decline_percent = float(match.group(1))
break
if decline_percent:
if decline_percent >= 10:
data[index]["result"] = 3
data[index]["score"] = 5
elif decline_percent >= 5:
data[index]["result"] = 2
data[index]["score"] = 2.5
elif decline_percent > 0:
data[index]["result"] = 1
data[index]["score"] = 1.5
total_score += data[index].get("score", 0)
return total_score
# 碳排放总量
decline_patterns1 = [
r'碳排放总量[^,。]*?下降\s*([\d.]+)%',
r'碳排放[^,。]*?总量[^,。]*?下降\s*([\d.]+)%',
r'碳总量[^,。]*?下降\s*([\d.]+)%',
r'排放总量[^,。]*?下降\s*([\d.]+)%',
r'排放[^,。]*?下降\s*([\d.]+)%'
]
total_score = cal_percent(decline_patterns1, content, data, 0, total_score)
# 碳排放强度
decline_patterns2 = [
r'碳排放强度[^,。]*?下降\s*([\d.]+)%',
r'碳强度[^,。]*?总量[^,。]*?下降\s*([\d.]+)%',
r'排放强度[^,。]*?下降\s*([\d.]+)%'
]
total_score = cal_percent(decline_patterns2, content, data, 1, total_score)
# 产品碳足迹
decline_patterns3 = [
r'产品碳足迹[^,。]*?下降\s*([\d.]+)%',
r'碳足迹[^,。]*?下降\s*([\d.]+)%',
r'产品足迹[^,。]*?下降\s*([\d.]+)%'
]
total_score = cal_percent(decline_patterns3, content, data, 2, total_score)
else:
return jsonify({"err_msg": err_msg}), 400
return jsonify({"total_score": round(total_score, 2), "data": data})
def ask(input:str, p_name:str, stream=False):
with open (f"promot/{p_name}.md", "r", encoding="utf-8") as f:
promot_str = f.read()
his = [{"role":"system", "content": promot_str}]
his.append({"role":"user", "content": input})
payload = {
"model": MODEL,
"messages": his,
"temperature": 0,
"stream": stream,
"chat_template_kwargs": {"enable_thinking": False}
}
response = requests.post(LLM_URL, headers=HEADERS, json=payload, stream=stream, timeout=(60, 240))
if not stream:
if response.json().get("detail") == "Internal server error":
raise ParseError("模型处理错误超过最大token限制")
return response.json()["choices"][0]["message"]["content"]
def parse_file(file_content, file_type):
try:
if file_type == "pdf":
# 将文件内容转换为字节流
pdf_bytes = file_content.read()
pdf_stream = io.BytesIO(pdf_bytes)
doc = fitz.open(stream=pdf_stream, filetype="pdf")
text_content = ""
# 首先尝试直接提取文本
for page_num in range(len(doc)):
page = doc[page_num]
text_content += page.get_text() + "\n"
t_plain = text_content.strip()
doc.close()
if t_plain:
return t_plain, None
else:
return None, "无法直接提取文本请使用OCR处理"
elif file_type == "docx":
# 将文件内容转换为字节流
doc_stream = io.BytesIO(file_content.read())
doc = Document(doc_stream)
# 提取所有段落的文本
text_content = ""
for paragraph in doc.paragraphs:
text_content += paragraph.text + "\n"
# 提取表格中的文本
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
text_content += cell.text + " "
text_content += "\n"
return text_content, None
elif file_type == "doc":
file_name = f'{uuid.uuid4()}.doc'
file_path = os.path.join(CURRENT_DIR, file_name)
file_content.save(file_path)
completed = subprocess.run(['catdoc', file_path], capture_output=True, text=True)
os.remove(file_path)
if completed.returncode != 0:
return None, completed.stderr
return completed.stdout, None
elif file_type == "txt":
text_content = file_content.read().decode("utf-8")
return text_content, None
return None, "不支持的文件类型"
except Exception as e:
return None, f"文件解析错误: {str(e)}"
if __name__ == "__main__":
# get_ocr_engine()
app.run(debug=True, port=3401)