factory/apps/cm/coder.py

62 lines
2.0 KiB
Python

"""
1000 系列喷码机网口通讯封装
协议: STX(0x02) + 指令 + 数据 + ETX(0x03)
反馈: $XX 成功 / !XX 失败 (XX 为两位校验)
"""
import socket
from rest_framework.exceptions import ParseError
STX = b"\x02"
ETX = b"\x03"
LF = b"\x0a"
class CoderError(ParseError):
pass
class CoderClient:
def __init__(self, ip: str, port: int = 3100, timeout: float = 2.0):
if not ip:
raise CoderError("打码器IP未配置")
self.ip = ip
self.port = port
self.timeout = timeout
def _send(self, frame: bytes) -> bytes:
try:
with socket.create_connection((self.ip, self.port), timeout=self.timeout) as s:
s.sendall(frame)
return s.recv(64)
except (socket.timeout, OSError) as e:
raise CoderError(f"打码器通讯失败 {self.ip}:{self.port} - {e}")
@staticmethod
def _check_ack(resp: bytes):
if not resp:
raise CoderError("打码器无响应")
head = resp[:1]
if head == b"$":
return True
if head == b"!":
raise CoderError(f"打码器返回失败: {resp!r}")
raise CoderError(f"打码器响应不识别: {resp!r}")
def update_field(self, field_name: str, content: str) -> bool:
"""更新用户区: 02 55 <field> 0A <content> 03"""
frame = STX + b"\x55" + field_name.encode("ascii") + LF + content.encode("ascii") + ETX
return self._check_ack(self._send(frame))
def select_message(self, message_name: str) -> bool:
"""选择信息: 02 4D <name> 03"""
frame = STX + b"\x4d" + message_name.encode("ascii") + ETX
return self._check_ack(self._send(frame))
def get_status(self) -> bytes:
"""获取喷码机状态: 02 45 03"""
return self._send(STX + b"\x45" + ETX)
def send_raw(self, frame_str: str) -> bytes:
"""发送任意已拼好的帧字符串(不含 STX/ETX), 自动补头尾"""
return self._send(STX + frame_str.encode("ascii") + ETX)