""" 1000 系列喷码机网口通讯封装 协议: STX(0x02) + 指令 + 数据 + ETX(0x03) 反馈: $XX 成功 / !XX 失败 (XX 为两位校验) """ import socket from rest_framework.exceptions import ParseError STX = b"\x02" ETX = b"\x03" LF = b"\x0a" class CoderError(ParseError): pass class CoderClient: def __init__(self, ip: str, port: int = 3100, timeout: float = 2.0): if not ip: raise CoderError("打码器IP未配置") self.ip = ip self.port = port self.timeout = timeout def _send(self, frame: bytes) -> bytes: try: with socket.create_connection((self.ip, self.port), timeout=self.timeout) as s: s.sendall(frame) return s.recv(64) except (socket.timeout, OSError) as e: raise CoderError(f"打码器通讯失败 {self.ip}:{self.port} - {e}") @staticmethod def _check_ack(resp: bytes): if not resp: raise CoderError("打码器无响应") head = resp[:1] if head == b"$": return True if head == b"!": raise CoderError(f"打码器返回失败: {resp!r}") raise CoderError(f"打码器响应不识别: {resp!r}") @staticmethod def _encode_payload(value: str, label: str) -> bytes: if any(c in value for c in ("\x02", "\x03", "\x0a")): raise CoderError(f"{label}含控制字符(STX/ETX/LF), 会破坏帧结构") try: return value.encode("latin-1") except UnicodeEncodeError as e: raise CoderError(f"{label}含喷码机不支持的字符: {e}") def update_field(self, field_name: str, content: str) -> bool: """更新用户区: 02 55 0A 03""" frame = STX + b"\x55" + self._encode_payload(field_name, "用户区名") + LF + self._encode_payload(content, "喷印内容") + ETX return self._check_ack(self._send(frame)) def select_message(self, message_name: str) -> bool: """选择信息: 02 4D 03""" frame = STX + b"\x4d" + self._encode_payload(message_name, "信息名") + ETX return self._check_ack(self._send(frame)) def get_status(self) -> bytes: """获取喷码机状态: 02 45 03""" return self._send(STX + b"\x45" + ETX) def send_raw(self, frame_str: str) -> bytes: """发送任意已拼好的帧字符串(不含 STX/ETX), 自动补头尾""" return self._send(STX + frame_str.encode("ascii") + ETX)