carbo_server/apps/carbon/views.py

330 lines
14 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from django.shortcuts import render
from .models import Work, Fingerprint
from .serializers import WorkSerializer, WorkCreateSerializer, WorkDqCalSerializer, WorkDhCalSerializer
from apps.utils.viewsets import CustomModelViewSet
from rest_framework.decorators import action
import os
from django.conf import settings
import json
from apps.carbon.service import parse_file, get_fingerprint, hamming_distance, split_simhash
import requests
from rest_framework.exceptions import ParseError
import re
from rest_framework.response import Response
from django.db import transaction, IntegrityError
from django.db.models import Q
# Create your views here.
LLM_URL = "http://106.0.4.200:9000/v1/chat/completions"
API_KEY = "JJVAide0hw3eaugGmxecyYYFw45FX2LfhnYJtC+W2rw"
MODEL = "Qwen/QwQ-32B"
HEADERS = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
HAMMING_THRESHOLD = 5 # 指纹相似阈值,可调
def get_standard():
standard_path = os.path.join(settings.BASE_DIR, 'apps', 'carbon', 'standard.json')
with open(standard_path, 'r', encoding='utf-8') as f:
standard_content = json.load(f)
return standard_content
def ask(input:str, p_name:str, stream=False):
promot_path = os.path.join(settings.BASE_DIR, 'apps', 'carbon', 'promot', f"{p_name}.md")
with open(promot_path, "r", encoding="utf-8") as f:
promot_str = f.read()
his = [{"role":"system", "content": promot_str}]
his.append({"role":"user", "content": input})
payload = {
"model": MODEL,
"messages": his,
"temperature": 0,
"stream": stream,
"chat_template_kwargs": {"enable_thinking": False}
}
response = requests.post(LLM_URL, headers=HEADERS, json=payload, stream=stream, timeout=(60, 240))
if not stream:
if response.json().get("detail") == "Internal server error":
raise ParseError("模型处理错误超过最大token限制")
return response.json()["choices"][0]["message"]["content"]
def simhash_to_db(n: int) -> int:
return n if n < (1 << 63) else n - (1 << 64)
def simhash_from_db(n: int) -> int:
return n if n >= 0 else n + (1 << 64)
class WorkViewSet(CustomModelViewSet):
queryset = Work.objects.all()
serializer_class = WorkSerializer
create_serializer_class = WorkCreateSerializer
search_fields = ['name', "description"]
@action(methods=['get'], detail=False, perms_map={'get': '*'})
def my(self, request, *args, **kwargs):
user = self.request.user
queryset = self.filter_queryset(self.get_queryset().filter(create_by=user))
page = self.paginate_queryset(queryset)
if page is not None:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
@action(detail=True, methods=['post'], serializer_class=WorkDqCalSerializer)
@transaction.atomic
def cal_dq(self, request, pk):
work = self.get_object()
sr = WorkDqCalSerializer(work, data=request.data)
sr.is_valid(raise_exception=True)
sr.save()
total_score = 0
work = Work.objects.get(pk=pk)
# 开始计算贷前评分和指纹
data = get_standard()
if work.dq_file1:
for item in data:
if item["thirdLevel"] in [
"碳中和路线图",
"短期/中期/长期减碳目标",
"设立碳管理相关部门",
"气候相关风险评估机制",
"内部碳定价机制",
"碳管理数字化平台建设",
"碳交易与履约能力",
"CCER等减排项目开发管理",
"数字化碳管理平台",
]:
item["result"] = item["scoringCriteria"][0]["选项"]
item["score"] = item["fullScore"]
total_score += item["score"]
if work.dq_file2:
for item in data:
if item["thirdLevel"] in [
"能源与碳排放管理体系",
"碳排放数据监测、报告与核查",
"参与权威信息平台披露",
"碳中和目标与进展经第三方认证",
"碳排放实时监测覆盖率达标",
"数据自动化采集比例达标",
"数据质量与校验机制",
]:
item["result"] = item["scoringCriteria"][0]["选项"]
item["score"] = item["fullScore"]
total_score += item["score"]
if work.dq_file3:
for item in data:
if item["thirdLevel"] in [
"ESG报告",
"工业固废/生物质资源利用率数据",
"硫化物减排措施",
"氮氧化物减排措施",
"其他污染物减排措施",
"项目选址生态避让与保护",
"矿山生态修复与复垦方案",
"厂区绿化与生态碳汇措施",
"低碳产品认证与标识",
"产品耐久性与回收性设计",
"无环保处罚与信访记录",
"环境应急管理体系",
"员工健康安全管理体系与制度",
"符合标准的物理环境与防护措施",
"员工心理健康支持计划",
"社区沟通与透明度机制",
"社区经济与发展贡献措施",
"社区负面影响缓解措施",
"供应商行为准则",
"供应商筛查与评估机制",
"供应商审核与改进机制",
"完善的治理结构",
"商业道德与反腐败制度",
]:
item["result"] = item["scoringCriteria"][0]["选项"]
item["score"] = item["fullScore"]
total_score += item["score"]
if work.dq_file4:
for item in data:
if item["thirdLevel"] in [
"资金分配明细",
"资本金比例与到位证明",
"融资渠道多样性",
"成本效益分析",
"碳减排收益量化",
"社会效益评估",
"风险管控方案",
"关键风险应对策略与预案",
"金融机构或第三方风险分担机制",
"绿色金融资质认证与资金用途",
"融资条款与ESG绩效挂钩",
"国际合作资金申请与利用",
"应急响应与能力建设机制",
]:
item["result"] = item["scoringCriteria"][0]["选项"]
item["score"] = item["fullScore"]
total_score += item["score"]
if work.dq_file5:
for item in data:
if item["thirdLevel"] in [
"AI预测减碳潜力应用",
"智能优化控制算法应用",
"ERP/EMS/MES系统集成度达标",
"IoT设备覆盖率达标",
"跨系统数据协同能力",
"碳数据安全管理措施",
"系统抗攻击能力达标",
"数据合规性与审计追踪机制",
]:
item["result"] = item["scoringCriteria"][0]["选项"]
item["score"] = item["fullScore"]
total_score += item["score"]
if work.dq_file6:
path = (settings.BASE_DIR + work.dq_file6.path).replace('\\', '/')
file_content = parse_file(path)
if file_content:
res = ask(f'以下内容为用户报告: {file_content}', "tec")
if res == "":
for item in data:
if item["firstLevel"] == "二、技术路径35 分)":
item["result"] = item["scoringCriteria"][0]["选项"]
item["score"] = item["fullScore"]
total_score += item["score"]
if work.dq_file1:
path = (settings.BASE_DIR + work.dq_file1.path).replace('\\', '/')
content = parse_file(path)
if content:
if bool(re.search(r'碳?减排目标', content)):
data[3]["result"] = ""
data[3]["score"] = data[3]["fullScore"]
total_score += data[3]["score"]
def cal_percent(decline_patterns, content, data, index, total_score):
decline_percent = None
for pattern in decline_patterns:
match = re.search(pattern, content, re.DOTALL)
if match:
decline_percent = float(match.group(1))
break
if decline_percent:
if decline_percent >= 10:
data[index]["result"] = 3
data[index]["score"] = 5
elif decline_percent >= 5:
data[index]["result"] = 2
data[index]["score"] = 2.5
elif decline_percent > 0:
data[index]["result"] = 1
data[index]["score"] = 1.5
total_score += data[index].get("score", 0)
return total_score
# 碳排放总量
decline_patterns1 = [
r'碳排放总量[^,。]*?下降\s*([\d.]+)%',
r'碳排放[^,。]*?总量[^,。]*?下降\s*([\d.]+)%',
r'碳总量[^,。]*?下降\s*([\d.]+)%',
r'排放总量[^,。]*?下降\s*([\d.]+)%',
r'排放[^,。]*?下降\s*([\d.]+)%'
]
total_score = cal_percent(decline_patterns1, content, data, 0, total_score)
# 碳排放强度
decline_patterns2 = [
r'碳排放强度[^,。]*?下降\s*([\d.]+)%',
r'碳强度[^,。]*?总量[^,。]*?下降\s*([\d.]+)%',
r'排放强度[^,。]*?下降\s*([\d.]+)%'
]
total_score = cal_percent(decline_patterns2, content, data, 1, total_score)
# 产品碳足迹
decline_patterns3 = [
r'产品碳足迹[^,。]*?下降\s*([\d.]+)%',
r'碳足迹[^,。]*?下降\s*([\d.]+)%',
r'产品足迹[^,。]*?下降\s*([\d.]+)%'
]
total_score = cal_percent(decline_patterns3, content, data, 2, total_score)
total_score = round(total_score, 2)
work.score_dq = total_score
work.save(update_fields=["score_dq"])
return Response({"total_score": total_score, "data": data})
@staticmethod
def parse_files(work: Work):
contents = []
filenames = []
for file in [work.dh_file1, work.dh_file2, work.dh_file3, work.dh_file4, work.dh_file5, work.dh_file6]:
if file:
if file.name in filenames:
continue
path = (settings.BASE_DIR + file.path).replace('\\', '/')
content = parse_file(path)
filenames.append(file.name)
contents.append(content)
return '\n'.join(contents)
@action(detail=True, methods=['post'], serializer_class=WorkDhCalSerializer)
@transaction.atomic
def cal_dh(self, request, pk):
work = self.get_object()
sr = WorkDhCalSerializer(work, data=request.data)
sr.is_valid(raise_exception=True)
sr.save()
work = Work.objects.get(pk=pk)
content = WorkViewSet.parse_files(work)
fp_u = get_fingerprint(content) # unsigned
fp_int = simhash_to_db(fp_u) # signed for db
fp_hex = format(fp_u, "016x")
s1, s2, s3, s4 = split_simhash(fp_int)
# 1⃣ 分段粗筛
candidates = (
Fingerprint.objects
.filter(
Q(seg1=s1) |
Q(seg2=s2) |
Q(seg3=s3) |
Q(seg4=s4)
)
.only("fp_int", "score")
)
# 2⃣ 精确海明距离
for obj in candidates:
if hamming_distance(fp_u, obj.fp_int) <= HAMMING_THRESHOLD:
work.score_dh = obj.score
work.save(update_fields=["score_dh"])
return Response({"total_score": obj.score})
# 3⃣ 未命中 → 调用 AI
res = ask(content, "tec_dh")
score = round(float(res), 2)
work.score_dh = score
work.save(update_fields=["score_dh"])
# 4⃣ 并发安全写入指纹库
try:
Fingerprint.objects.create(
fp_hex=fp_hex,
fp_int=fp_int,
seg1=s1,
seg2=s2,
seg3=s3,
seg4=s4,
score=score,
)
except IntegrityError:
# 并发下已存在,忽略即可
pass
return Response({"total_score": score})