factory/apps/mtm/services.py

206 lines
9.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from apps.mtm.models import Goal, Mgroup, RoutePack, Route
from django.core.cache import cache
from django.db.models import Q
from apps.mtm.models import Material, Process, Shift
from rest_framework.exceptions import ParseError
from apps.utils.tools import ranstr
from datetime import datetime, timedelta
from apps.wf.models import Ticket
from django.db.models import Sum
from typing import List
from apps.utils.snowflake import idWorker
from django.utils import timezone
def get_mgroup_goals(mgroupId, year, reload=False):
"""
获取工段某年的全部目标值, 以字典形式返回, 带缓存
"""
goals = Goal.objects.filter(mgroup__id=mgroupId, year=year)
key = f'mgroup_{mgroupId}_goals_{year}'
if reload is False:
mgroup_goals = cache.get(key, None)
if mgroup_goals is not None:
return mgroup_goals
mgroup_goals = {}
if not goals.exists():
# 尝试寻找去年的
goals_last_year = Goal.objects.filter(mgroup__id=mgroupId, year=year-1)
if goals_last_year.exists():
for goal in goals_last_year:
# 复用去年的数据创建
goal.id = idWorker.get_id()
goal.create_time = timezone.now()
goal.update_time = goal.create_time
goal.year = year
goal.save()
goals = Goal.objects.filter(mgroup__id=mgroupId, year=year)
for goal in goals:
mgroup_goals[f'{goal.goal_cate.code}_year'] = goal.goal_val
for i in range(12):
mgroup_goals[f'{goal.goal_cate.code}_{i+1}'] = getattr(
goal, f'goal_val_{i+1}')
cache.set(key, mgroup_goals)
return mgroup_goals
def daoru_material(path: str):
"""
导入物料信息
"""
type_dict = {'主要原料': 30, '半成品': 20, '成品': 10,
'辅助材料': 40, '加工工具': 50, '辅助工装': 60, '办公用品': 70}
from apps.utils.snowflake import idWorker
from openpyxl import load_workbook
wb = load_workbook(path, read_only=True)
sheet = wb.active
process_l = Process.objects.all()
process_d = {p.name: p for p in process_l}
i = 3
if sheet['a2'].value != '物料编号':
raise ParseError('列错误导入失败')
while sheet[f'b{i}'].value is not None or sheet[f'd{i}'].value is not None:
type_str = sheet[f'b{i}'].value.replace(' ', '')
try:
type = type_dict[type_str]
cate = sheet[f'c{i}'].value.replace(' ', '') if sheet[f'c{i}'].value else ""
number = str(sheet[f'a{i}'].value).replace(' ', '') if sheet[f'a{i}'].value else None
if sheet[f'd{i}'].value:
name = str(sheet[f'd{i}'].value).replace(' ', '')
else:
raise ParseError(f'{i}行物料信息错误: 物料名称必填')
specification = str(sheet[f'e{i}'].value).replace(
'×', '*').replace(' ', '') if sheet[f'e{i}'].value else None
model = str(sheet[f'f{i}'].value).replace(' ', '') if sheet[f'f{i}'].value else None
unit = sheet[f'g{i}'].value.replace(' ', '')
count_safe = float(sheet[f'i{i}'].value) if sheet[f'i{i}'].value else None
unit_price = float(sheet[f'j{i}'].value) if sheet[f'j{i}'].value else None
bin_number_main = sheet[f'k{i}'].value.replace(' ', '') if sheet[f'k{i}'].value else None
except Exception as e:
raise ParseError(f'{i}行物料信息错误: {e}')
if type in [20, 30]:
try:
process = process_d[sheet[f'h{i}'].value.replace(' ', '')]
except Exception as e:
raise ParseError(f'{i}行物料信息错误: {e}')
try:
filters = {'type': type, 'name': name, 'specification': specification,
'model': model, 'unit__iexact': unit}
if type in [20, 30]:
filters['process'] = process
default = {'type': type, 'name': name, 'specification': specification,
'model': model, 'unit': unit, 'number': number if number else f'm{type}_{ranstr(6)}', 'id': idWorker.get_id(),
'count_safe': count_safe, 'unit_price': unit_price, 'cate': cate, 'bin_number_main': bin_number_main}
material, is_created = Material.objects.get_or_create(
**filters, defaults=default)
if not is_created:
material.count_safe = count_safe
material.save()
except Exception as e:
raise ParseError(f'{i}行物料有误, 导入失败--{e}')
i = i + 1
print(number, type, name, specification, model, unit, '导入成功')
def mgroup_run_change(mgroup: Mgroup, new_run: bool, last_timex: datetime, note: str = ''):
"""
调用工段运行变动
"""
from apps.wpm.services import get_sflog
from apps.wpm.tasks import cal_exp_duration_sec
from apps.wpm.models import StLog, SfLogExp
mgroup.is_running = new_run
mgroup.save(update_fields=["is_running"])
last_stlog = StLog.objects.filter(mgroup=mgroup, is_shutdown=True).order_by("-start_time").first() # 找到最后一次停机记录
need_cal_exp = True
if last_stlog:
if last_timex >= last_stlog.start_time: # 认为是有效信号
if last_stlog.end_time is None and new_run: # 从停到开
last_stlog.end_time = last_timex
last_stlog.duration_sec = (last_stlog.end_time - last_stlog.start_time).total_seconds()
last_stlog.save()
elif last_stlog.end_time and new_run is False and last_timex > last_stlog.end_time: # 从开到停
cur_sflog = get_sflog(mgroup, last_timex)
last_stlog, is_created = StLog.objects.get_or_create(
mgroup=mgroup,
is_shutdown=True,
start_time=last_timex,
defaults={
'title': '停机',
'end_time': None,
'sflog': cur_sflog,
'reason':note,
}
)
if is_created:
SfLogExp.objects.get_or_create(stlog=last_stlog, sflog=cur_sflog)
need_cal_exp = False
elif new_run is False:
cur_sflog = get_sflog(mgroup, last_timex)
last_stlog = StLog.objects.create(title="停机", is_shutdown=True, mgroup=mgroup, end_time=None, start_time=last_timex, sflog=cur_sflog, reason = note)
mgroup.is_running = False
mgroup.save()
SfLogExp.objects.get_or_create(stlog=last_stlog, sflog=cur_sflog)
need_cal_exp = False
if last_stlog and need_cal_exp:
cal_exp_duration_sec(last_stlog.id) # 触发时间分配
def bind_routepack(ticket: Ticket, transition, new_ticket_data: dict):
routepack = RoutePack.objects.get(id=new_ticket_data['t_id'])
routepack.get_gjson(need_update=True)
if routepack.ticket and routepack.ticket.id!=ticket.id:
raise ParseError('重复创建工单')
if not Route.objects.filter(routepack=routepack).exists():
raise ParseError('缺少步骤')
r_qs = Route.objects.filter(routepack=routepack).order_by('sort', 'process__sort', 'create_time')
first_route = r_qs.first()
if first_route.batch_bind:
first_route.batch_bind = False
first_route.save(update_fields=['batch_bind'])
# last_route = r_qs.last()
# if last_route.material_out != routepack.material:
# raise ParseError('最后一步产出与工艺包不一致')
ticket_data = ticket.ticket_data
ticket_data.update({
't_model': 'routepack',
't_id': routepack.id,
})
ticket.ticket_data = ticket_data
ticket.create_by = routepack.create_by
ticket.save()
if routepack.ticket is None:
routepack.ticket = ticket
routepack.state = RoutePack.RP_S_AUDIT
routepack.save()
def routepack_audit_end(ticket: Ticket):
routepack = RoutePack.objects.get(id=ticket.ticket_data['t_id'])
routepack.state = RoutePack.RP_S_CONFIRM
routepack.save()
def routepack_ticket_change(ticket: Ticket):
routepack = RoutePack.objects.get(id=ticket.ticket_data['t_id'])
if ticket.act_state in [Ticket.TICKET_ACT_STATE_DRAFT, Ticket.TICKET_ACT_STATE_BACK, Ticket.TICKET_ACT_STATE_RETREAT]:
routepack.state = RoutePack.RP_S_CREATE
routepack.save()
def get_mgroup_days(mgroup: Mgroup, now: datetime):
"""
获取给定时间所属的班年月日
"""
if now is None:
now = datetime.now()
shift_rule_name = mgroup.shift_rule
shift = Shift.objects.filter(rule=shift_rule_name).order_by('sort').first()
if shift is None:
raise ParseError('工段班次规则不存在')
current_time = now.time().replace(second=0, microsecond=0)
if current_time >= shift.start_time_o:
now_b = now - timedelta(days=1)
else:
now = now - timedelta(days=1)
now_b = now - timedelta(days=1)
return {"year_s": now.year, "month_s": now.month, "day_s": now.day, "year_s_b": now_b.year, "month_s_b": now_b.month, "day_s_b": now_b.day}