feat: cm 新增 1000 系列喷码机网口下发接口

- LabelTemplate 增加 coder_ip/coder_port 字段, 模板即指定目标喷头
- 新增 CoderClient 封装 STX/ETX 帧, 支持更新用户区/选择信息/查状态
- LabelTemplateViewSet 新增 send_to_coder action, 复用 gen_commands 拼内容下发, 用户区名默认 1, 可在 body 临时覆盖

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
caoqianming 2026-05-08 11:24:00 +08:00
parent 7e9b559723
commit 9c220dbc1d
5 changed files with 125 additions and 2 deletions

61
apps/cm/coder.py Normal file
View File

@ -0,0 +1,61 @@
"""
1000 系列喷码机网口通讯封装
协议: STX(0x02) + 指令 + 数据 + ETX(0x03)
反馈: $XX 成功 / !XX 失败 (XX 为两位校验)
"""
import socket
from rest_framework.exceptions import ParseError
STX = b"\x02"
ETX = b"\x03"
LF = b"\x0a"
class CoderError(ParseError):
pass
class CoderClient:
def __init__(self, ip: str, port: int = 3100, timeout: float = 2.0):
if not ip:
raise CoderError("打码器IP未配置")
self.ip = ip
self.port = port
self.timeout = timeout
def _send(self, frame: bytes) -> bytes:
try:
with socket.create_connection((self.ip, self.port), timeout=self.timeout) as s:
s.sendall(frame)
return s.recv(64)
except (socket.timeout, OSError) as e:
raise CoderError(f"打码器通讯失败 {self.ip}:{self.port} - {e}")
@staticmethod
def _check_ack(resp: bytes):
if not resp:
raise CoderError("打码器无响应")
head = resp[:1]
if head == b"$":
return True
if head == b"!":
raise CoderError(f"打码器返回失败: {resp!r}")
raise CoderError(f"打码器响应不识别: {resp!r}")
def update_field(self, field_name: str, content: str) -> bool:
"""更新用户区: 02 55 <field> 0A <content> 03"""
frame = STX + b"\x55" + field_name.encode("ascii") + LF + content.encode("ascii") + ETX
return self._check_ack(self._send(frame))
def select_message(self, message_name: str) -> bool:
"""选择信息: 02 4D <name> 03"""
frame = STX + b"\x4d" + message_name.encode("ascii") + ETX
return self._check_ack(self._send(frame))
def get_status(self) -> bytes:
"""获取喷码机状态: 02 45 03"""
return self._send(STX + b"\x45" + ETX)
def send_raw(self, frame_str: str) -> bytes:
"""发送任意已拼好的帧字符串(不含 STX/ETX), 自动补头尾"""
return self._send(STX + frame_str.encode("ascii") + ETX)

View File

@ -0,0 +1,23 @@
# Generated for coder fields on LabelTemplate
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('cm', '0006_alter_lablemat_batch'),
]
operations = [
migrations.AddField(
model_name='labeltemplate',
name='coder_ip',
field=models.GenericIPAddressField(blank=True, null=True, verbose_name='打码器IP'),
),
migrations.AddField(
model_name='labeltemplate',
name='coder_port',
field=models.PositiveIntegerField(default=3100, verbose_name='打码器端口'),
),
]

View File

@ -22,6 +22,8 @@ class LabelTemplate(BaseModel):
name = models.TextField("名称")
commands = models.JSONField("指令模板", default=list, blank=True)
process_json = models.JSONField("工序", default=list, blank=True)
coder_ip = models.GenericIPAddressField("打码器IP", null=True, blank=True)
coder_port = models.PositiveIntegerField("打码器端口", default=3100)
@classmethod
def gen_commands(cls, label_template, label_template_name, tdata):

View File

@ -17,6 +17,13 @@ class Tid2Serializer(serializers.Serializer):
data = serializers.JSONField(label='数据', allow_null=True, required=False)
class CoderSendSerializer(serializers.Serializer):
tdata = serializers.JSONField(label='模板数据', required=False, default=dict)
coder_ip = serializers.IPAddressField(label='打码器IP(可覆盖模板)', required=False, allow_null=True)
coder_port = serializers.IntegerField(label='打码器端口(可覆盖模板)', required=False, allow_null=True)
coder_field = serializers.CharField(label='用户区名(可覆盖模板)', required=False, allow_null=True)
class LabelMatSerializer(serializers.ModelSerializer):
material_name = serializers.StringRelatedField(source='material', read_only=True)
material_origin_name = serializers.StringRelatedField(source='material_origin', read_only=True)

View File

@ -1,6 +1,7 @@
from apps.cm.models import LableMat, LabelTemplate
from rest_framework.decorators import action
from apps.cm.serializers import TidSerializer, LabelMatSerializer, LabelTemplateSerializer, Tid2Serializer
from apps.cm.serializers import TidSerializer, LabelMatSerializer, LabelTemplateSerializer, Tid2Serializer, CoderSendSerializer
from apps.cm.coder import CoderClient
from apps.inm.models import MaterialBatch, MIOItem
from apps.wpm.models import WMaterial
from rest_framework.exceptions import ParseError, NotFound
@ -113,4 +114,33 @@ class LabelTemplateViewSet(CustomModelViewSet):
label_template_name = request.data.get("label_template_name", None)
data = request.data.get("data", {})
return Response({"commands": LabelTemplate.gen_commands(label_template, label_template_name, data)})
@action(methods=["post"], detail=True, serializer_class=CoderSendSerializer, perms_map={"post": "*"})
def send_to_coder(self, request, pk=None, *args, **kwargs):
"""
下发指令到打码器
按模板 commands tdata 格式化后, 通过模板上配置的 IP/端口下发到 1000 系列喷码机用户区
body 中的 coder_ip/coder_port/coder_field 可临时覆盖模板配置
"""
sr = CoderSendSerializer(data=request.data)
sr.is_valid(raise_exception=True)
vdata = sr.validated_data
lt: LabelTemplate = self.get_object()
ip = vdata.get("coder_ip") or lt.coder_ip
port = vdata.get("coder_port") or lt.coder_port
field = vdata.get("coder_field") or "1"
if not ip:
raise ParseError("模板未配置打码器IP, 也未在请求中提供 coder_ip")
commands = LabelTemplate.gen_commands(lt.id, None, vdata.get("tdata") or {})
if not commands:
raise ParseError("模板未生成任何指令")
client = CoderClient(ip=ip, port=port)
results = []
for content in commands:
client.update_field(field, content)
results.append(content)
return Response({"sent": results, "ip": ip, "port": port, "field": field})