from flask import Flask, jsonify, request, send_from_directory from flask_cors import CORS import json from docx import Document import io import fitz # PyMuPDF import requests import os import re from flask_jwt_extended import JWTManager, create_access_token, jwt_required, get_jwt_identity from datetime import timedelta import uuid import subprocess import sqlite3 from simhash import Simhash import traceback class ParseError(Exception): def __init__(self, msg="请求错误"): self.msg = msg app = Flask(__name__, static_folder='dist/assets', static_url_path='/assets') CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) CORS(app) @app.errorhandler(Exception) def handle_exception(e): traceback.print_exc() response = jsonify({"err_msg": str(e)}) response.status_code = 500 response.headers['Access-Control-Allow-Origin'] = '*' return response VUE_DIST_DIR = os.path.join(os.path.dirname(__file__), 'dist') app.config.update( JWT_SECRET_KEY='carbon', JWT_ACCESS_TOKEN_EXPIRES=timedelta(days=1), JWT_REFRESH_TOKEN_EXPIRES=timedelta(days=30), JWT_ALGORITHM='HS256', # 签名算法 ) jwt = JWTManager(app) LLM_URL = "http://106.0.4.200:9000/v1/chat/completions" API_KEY = "JJVAide0hw3eaugGmxecyYYFw45FX2LfhnYJtC+W2rw" MODEL = "Qwen/QwQ-32B" HEADERS = { "Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json" } OCR_URL = "http://127.0.0.1:3402/ocr_full" DB_PATH = "file_score.db" HAMMING_THRESHOLD = 5 # 指纹相似阈值,可调 # --- 数据库初始化 --- def init_db(): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute(""" CREATE TABLE IF NOT EXISTS file_score_cache ( id INTEGER PRIMARY KEY AUTOINCREMENT, fingerprint TEXT NOT NULL, score INTEGER NOT NULL, metadata TEXT ) """) conn.commit() conn.close() init_db() # --- 计算 SimHash 指纹 --- def get_fingerprint(text): return Simhash(text).value # --- 汉明距离 --- def hamming_distance(a, b): return bin(a ^ b).count("1") # --- 查询相似文件 --- def find_similar(fingerprint): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute("SELECT fingerprint, score FROM file_score_cache") rows = c.fetchall() conn.close() best_score = None for fp_hex, score in rows: fp = int(fp_hex, 16) # ← 转回整数 if hamming_distance(fingerprint, fp) <= HAMMING_THRESHOLD: best_score = score break return best_score # --- 保存文件分数 --- def save_score(fingerprint, score, metadata=""): fingerprint_hex = hex(fingerprint)[2:] conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute( "INSERT INTO file_score_cache (fingerprint, score, metadata) VALUES (?, ?, ?)", (fingerprint_hex, score, metadata) ) conn.commit() conn.close() def get_standard(): with open("./standard.json", "r", encoding="utf-8") as f: standard = json.load(f) return standard @app.route("/api/system_info/", methods=["GET"]) def get_system_info(): return jsonify({"base_name": "转型金融核算系统"}), 200 @app.route("/api/standard/", methods=["GET"]) @jwt_required() def get_s(): return get_standard() @app.route("/api/check_token/", methods=["GET"]) @jwt_required() def check_token(): return jsonify(), 200 @app.route('/') def index(): return send_from_directory(VUE_DIST_DIR, 'index.html') def get_users(): with open("./users.json", "r", encoding="utf-8") as f: users = json.load(f) return users @app.route('/api/login/', methods=["POST"]) def login(): username = request.json.get("username", "unknown") password = request.json.get("password", "unknown") users = get_users() if username in users and users[username]["password"] == password: access = create_access_token(identity=username) return jsonify({"access": access, "userInfo": {"username": username, "name": users[username]["name"]}}), 200 return jsonify({"err_msg": "用户名或密码错误"}), 400 @app.route("/api/cal/", methods=["POST"]) @jwt_required() def cal(): data = get_standard() file1 = request.files.get("file1", None) file2 = request.files.get("file2", None) file3 = request.files.get("file3", None) file4 = request.files.get("file4", None) file5 = request.files.get("file5", None) file6 = request.files.get("file6", None) if file1 or file2 or file3 or file4 or file5 or file6: pass else: return jsonify({"err_msg": "请至少上传一个文件"}), 400 total_score = 0 if file1: for item in data: if item["thirdLevel"] in [ "碳中和路线图", "短期/中期/长期减碳目标", "设立碳管理相关部门", "气候相关风险评估机制", "内部碳定价机制", "碳管理数字化平台建设", "碳交易与履约能力", "CCER等减排项目开发管理", "数字化碳管理平台", ]: item["result"] = item["scoringCriteria"][0]["选项"] item["score"] = item["fullScore"] total_score += item["score"] if file2: for item in data: if item["thirdLevel"] in [ "能源与碳排放管理体系", "碳排放数据监测、报告与核查", "参与权威信息平台披露", "碳中和目标与进展经第三方认证", "碳排放实时监测覆盖率达标", "数据自动化采集比例达标", "数据质量与校验机制", ]: item["result"] = item["scoringCriteria"][0]["选项"] item["score"] = item["fullScore"] total_score += item["score"] if file3: for item in data: if item["thirdLevel"] in [ "ESG报告", "工业固废/生物质资源利用率数据", "硫化物减排措施", "氮氧化物减排措施", "其他污染物减排措施", "项目选址生态避让与保护", "矿山生态修复与复垦方案", "厂区绿化与生态碳汇措施", "低碳产品认证与标识", "产品耐久性与回收性设计", "无环保处罚与信访记录", "环境应急管理体系", "员工健康安全管理体系与制度", "符合标准的物理环境与防护措施", "员工心理健康支持计划", "社区沟通与透明度机制", "社区经济与发展贡献措施", "社区负面影响缓解措施", "供应商行为准则", "供应商筛查与评估机制", "供应商审核与改进机制", "完善的治理结构", "商业道德与反腐败制度", ]: item["result"] = item["scoringCriteria"][0]["选项"] item["score"] = item["fullScore"] total_score += item["score"] if file4: for item in data: if item["thirdLevel"] in [ "资金分配明细", "资本金比例与到位证明", "融资渠道多样性", "成本效益分析", "碳减排收益量化", "社会效益评估", "风险管控方案", "关键风险应对策略与预案", "金融机构或第三方风险分担机制", "绿色金融资质认证与资金用途", "融资条款与ESG绩效挂钩", "国际合作资金申请与利用", "应急响应与能力建设机制", ]: item["result"] = item["scoringCriteria"][0]["选项"] item["score"] = item["fullScore"] total_score += item["score"] if file5: for item in data: if item["thirdLevel"] in [ "AI预测减碳潜力应用", "智能优化控制算法应用", "ERP/EMS/MES系统集成度达标", "IoT设备覆盖率达标", "跨系统数据协同能力", "碳数据安全管理措施", "系统抗攻击能力达标", "数据合规性与审计追踪机制", ]: item["result"] = item["scoringCriteria"][0]["选项"] item["score"] = item["fullScore"] total_score += item["score"] e_filename = None e_content, e_err_msg = None, None if file6: # 获取文件名和类型 filename = file6.filename e_filename = filename file_type = filename.rsplit('.', 1)[1].lower() if '.' in filename else None content, err_msg = parse_file(file6, file_type) e_content = content e_err_msg = err_msg if content: try: res = ask(f'以下内容为用户报告: {content}', "tec") except ParseError as e: return jsonify({"err_msg": e.msg}), 400 if res == "是": for item in data: if item["firstLevel"] == "二、技术路径(35 分)": item["result"] = item["scoringCriteria"][0]["选项"] item["score"] = item["fullScore"] total_score += item["score"] else: return jsonify({"err_msg": err_msg}), 400 if file1: filename = file1.filename file_type = filename.rsplit('.', 1)[1].lower() if '.' in filename else None if filename == e_filename: content, err_msg = e_content, e_err_msg else: content, err_msg = parse_file(file1, file_type) if content: if bool(re.search(r'碳?减排目标', content)): data[3]["result"] = "有" data[3]["score"] = data[3]["fullScore"] total_score += data[3]["score"] def cal_percent(decline_patterns, content, data, index, total_score): decline_percent = None for pattern in decline_patterns: match = re.search(pattern, content, re.DOTALL) if match: decline_percent = float(match.group(1)) break if decline_percent: if decline_percent >= 10: data[index]["result"] = 3 data[index]["score"] = 5 elif decline_percent >= 5: data[index]["result"] = 2 data[index]["score"] = 2.5 elif decline_percent > 0: data[index]["result"] = 1 data[index]["score"] = 1.5 total_score += data[index].get("score", 0) return total_score # 碳排放总量 decline_patterns1 = [ r'碳排放总量[^,。]*?下降\s*([\d.]+)%', r'碳排放[^,。]*?总量[^,。]*?下降\s*([\d.]+)%', r'碳总量[^,。]*?下降\s*([\d.]+)%', r'排放总量[^,。]*?下降\s*([\d.]+)%', r'排放[^,。]*?下降\s*([\d.]+)%' ] total_score = cal_percent(decline_patterns1, content, data, 0, total_score) # 碳排放强度 decline_patterns2 = [ r'碳排放强度[^,。]*?下降\s*([\d.]+)%', r'碳强度[^,。]*?总量[^,。]*?下降\s*([\d.]+)%', r'排放强度[^,。]*?下降\s*([\d.]+)%' ] total_score = cal_percent(decline_patterns2, content, data, 1, total_score) # 产品碳足迹 decline_patterns3 = [ r'产品碳足迹[^,。]*?下降\s*([\d.]+)%', r'碳足迹[^,。]*?下降\s*([\d.]+)%', r'产品足迹[^,。]*?下降\s*([\d.]+)%' ] total_score = cal_percent(decline_patterns3, content, data, 2, total_score) else: return jsonify({"err_msg": err_msg}), 400 return jsonify({"total_score": round(total_score, 2), "data": data}) @app.route("/api/cal_dh/", methods=["POST"]) @jwt_required() def cal_dh(): file1 = request.files.get("file1", None) file2 = request.files.get("file2", None) file3 = request.files.get("file3", None) file4 = request.files.get("file4", None) file5 = request.files.get("file5", None) file6 = request.files.get("file6", None) if file1 or file2 or file3 or file4 or file5 or file6: pass else: return jsonify({"err_msg": "请至少上传一个文件"}), 400 # 开始处理 content = parse_files([file1, file2, file3, file4, file5, file6]) fingerprint = get_fingerprint(content) score = find_similar(fingerprint) if score is not None: return jsonify({"total_score": round(score, 2)}) try: res = ask(content, "tec_dh") score = round(float(res), 2) save_score(fingerprint, score) return jsonify({"total_score": score}) except ParseError as e: return jsonify({"err_msg": e.msg}), 400 def parse_files(files): contents = [] filenames = [] for file in files: if file: filename = file.filename if filename in filenames: continue file_type = filename.rsplit('.', 1)[1].lower() if '.' in filename else None content, err_msg = parse_file(file, file_type) if err_msg: raise ParseError(err_msg) contents.append(content) return '\n'.join(contents) def ask(input:str, p_name:str, stream=False): with open (f"promot/{p_name}.md", "r", encoding="utf-8") as f: promot_str = f.read() his = [{"role":"system", "content": promot_str}] his.append({"role":"user", "content": input}) payload = { "model": MODEL, "messages": his, "temperature": 0, "stream": stream, "chat_template_kwargs": {"enable_thinking": False} } response = requests.post(LLM_URL, headers=HEADERS, json=payload, stream=stream, timeout=(60, 240)) if not stream: if response.json().get("detail") == "Internal server error": raise ParseError("模型处理错误超过最大token限制") return response.json()["choices"][0]["message"]["content"] def parse_file(file_content, file_type): try: if file_type == "pdf": pdf_bytes = file_content.read() pdf_stream = io.BytesIO(pdf_bytes) doc = None try: doc = fitz.open(stream=pdf_stream, filetype="pdf") text_content = "" for page_num in range(len(doc)): page = doc[page_num] text_content += page.get_text() + "\n" t_plain = text_content.strip() if t_plain: return t_plain, None else: return None, "无法直接提取文本,请使用 OCR 处理" finally: if doc: doc.close() pdf_stream.close() # 显式关闭流 elif file_type == "docx": doc_stream = io.BytesIO(file_content.read()) try: doc = Document(doc_stream) text_content = "" for paragraph in doc.paragraphs: text_content += paragraph.text + "\n" for table in doc.tables: for row in table.rows: for cell in row.cells: text_content += cell.text + " " text_content += "\n" return text_content, None finally: doc_stream.close() # 确保流被关闭 elif file_type == "doc": file_name = f'{uuid.uuid4()}.doc' file_path = os.path.join(CURRENT_DIR, file_name) try: file_content.save(file_path) completed = subprocess.run(['catdoc', file_path], capture_output=True, text=True) if completed.returncode != 0: return None, completed.stderr return completed.stdout, None finally: if os.path.exists(file_path): os.remove(file_path) # 确保临时文件被删除 elif file_type == "txt": text_content = file_content.read().decode("utf-8") return text_content, None return None, "不支持的文件类型" except Exception as e: return None, f"文件解析错误: {str(e)}" if __name__ == "__main__": # get_ocr_engine() app.run(debug=True, port=3401)