62 lines
2.0 KiB
Python
62 lines
2.0 KiB
Python
"""
|
|
1000 系列喷码机网口通讯封装
|
|
协议: STX(0x02) + 指令 + 数据 + ETX(0x03)
|
|
反馈: $XX 成功 / !XX 失败 (XX 为两位校验)
|
|
"""
|
|
import socket
|
|
from rest_framework.exceptions import ParseError
|
|
|
|
STX = b"\x02"
|
|
ETX = b"\x03"
|
|
LF = b"\x0a"
|
|
|
|
|
|
class CoderError(ParseError):
|
|
pass
|
|
|
|
|
|
class CoderClient:
|
|
def __init__(self, ip: str, port: int = 3100, timeout: float = 2.0):
|
|
if not ip:
|
|
raise CoderError("打码器IP未配置")
|
|
self.ip = ip
|
|
self.port = port
|
|
self.timeout = timeout
|
|
|
|
def _send(self, frame: bytes) -> bytes:
|
|
try:
|
|
with socket.create_connection((self.ip, self.port), timeout=self.timeout) as s:
|
|
s.sendall(frame)
|
|
return s.recv(64)
|
|
except (socket.timeout, OSError) as e:
|
|
raise CoderError(f"打码器通讯失败 {self.ip}:{self.port} - {e}")
|
|
|
|
@staticmethod
|
|
def _check_ack(resp: bytes):
|
|
if not resp:
|
|
raise CoderError("打码器无响应")
|
|
head = resp[:1]
|
|
if head == b"$":
|
|
return True
|
|
if head == b"!":
|
|
raise CoderError(f"打码器返回失败: {resp!r}")
|
|
raise CoderError(f"打码器响应不识别: {resp!r}")
|
|
|
|
def update_field(self, field_name: str, content: str) -> bool:
|
|
"""更新用户区: 02 55 <field> 0A <content> 03"""
|
|
frame = STX + b"\x55" + field_name.encode("ascii") + LF + content.encode("ascii") + ETX
|
|
return self._check_ack(self._send(frame))
|
|
|
|
def select_message(self, message_name: str) -> bool:
|
|
"""选择信息: 02 4D <name> 03"""
|
|
frame = STX + b"\x4d" + message_name.encode("ascii") + ETX
|
|
return self._check_ack(self._send(frame))
|
|
|
|
def get_status(self) -> bytes:
|
|
"""获取喷码机状态: 02 45 03"""
|
|
return self._send(STX + b"\x45" + ETX)
|
|
|
|
def send_raw(self, frame_str: str) -> bytes:
|
|
"""发送任意已拼好的帧字符串(不含 STX/ETX), 自动补头尾"""
|
|
return self._send(STX + frame_str.encode("ascii") + ETX)
|