153 lines
5.7 KiB
Python
153 lines
5.7 KiB
Python
"""
|
|
伟迪捷 1880 喷码机 ASCII 协议封装
|
|
|
|
帧格式: 纯 ASCII, 字段以 '|' 分隔, 整帧以 <CR>(0x0D) 结尾
|
|
- 写指令(SLA/JDA/CQI/JDI/SST/SNO) 通常无回包
|
|
- 查询指令(GST/GFT/GJD) 回包同样以 <CR> 结尾
|
|
"""
|
|
import socket
|
|
from rest_framework.exceptions import ParseError
|
|
|
|
CR = b"\x0d"
|
|
|
|
|
|
class CoderError(ParseError):
|
|
pass
|
|
|
|
|
|
class CoderClient:
|
|
def __init__(self, ip: str, port: int = 3100, timeout: float = 2.0):
|
|
if not ip:
|
|
raise CoderError("喷码IP未配置")
|
|
self.ip = ip
|
|
self.port = port
|
|
self.timeout = timeout
|
|
|
|
def _send(self, frame: bytes, expect_reply: bool = False) -> bytes:
|
|
try:
|
|
with socket.create_connection((self.ip, self.port), timeout=self.timeout) as s:
|
|
s.sendall(frame)
|
|
if not expect_reply:
|
|
return b""
|
|
buf = b""
|
|
while CR not in buf:
|
|
chunk = s.recv(256)
|
|
if not chunk:
|
|
break
|
|
buf += chunk
|
|
return buf
|
|
except (socket.timeout, OSError) as e:
|
|
raise CoderError(f"喷码通讯失败 {self.ip}:{self.port} - {e}")
|
|
|
|
@staticmethod
|
|
def _encode(text: str, label: str) -> bytes:
|
|
if "\r" in text or "\n" in text:
|
|
raise CoderError(f"{label}含 CR/LF, 会破坏帧结构")
|
|
if "|" in text:
|
|
raise CoderError(f"{label}含 '|', 与 1880 分隔符冲突")
|
|
try:
|
|
return text.encode("latin-1")
|
|
except UnicodeEncodeError as e:
|
|
raise CoderError(f"{label}含喷码机不支持的字符: {e}")
|
|
|
|
def send_command(self, command: str, expect_reply: bool = False) -> bytes:
|
|
"""直接下发一条已拼装好的 1880 指令(不含末尾 CR), 由本方法统一补 CR。"""
|
|
if "\r" in command or "\n" in command:
|
|
raise CoderError("指令含 CR/LF, 由 send_command 统一补 CR, 不要手动拼")
|
|
return self._send(command.encode("latin-1") + CR, expect_reply=expect_reply)
|
|
|
|
def select_job(self, job_name: str):
|
|
"""SLA|<jobname>|<CR> 选择信息模板"""
|
|
self._send(b"SLA|" + self._encode(job_name, "模板名") + b"|" + CR)
|
|
|
|
def update_fields(self, fields: dict):
|
|
"""JDA|<field>=<value>|...|<CR> 一帧更新多个用户区"""
|
|
if not fields:
|
|
return
|
|
parts = [b"JDA"]
|
|
for k, v in fields.items():
|
|
parts.append(self._encode(str(k), "用户区名") + b"=" + self._encode(str(v), "喷印内容"))
|
|
self._send(b"|".join(parts) + b"|" + CR)
|
|
|
|
def clear_queue(self):
|
|
"""CQI<CR> 清空喷码机内部缓存队列"""
|
|
self._send(b"CQI" + CR)
|
|
|
|
def push_queue(self, fields: dict):
|
|
"""JDI|1|<field>=<value>|...|<CR> 入队列数据"""
|
|
if not fields:
|
|
return
|
|
parts = [b"JDI", b"1"]
|
|
for k, v in fields.items():
|
|
parts.append(self._encode(str(k), "用户区名") + b"=" + self._encode(str(v), "喷印内容"))
|
|
self._send(b"|".join(parts) + b"|" + CR)
|
|
|
|
def set_state(self, state: int):
|
|
"""SST|<State>|<CR> 0关机 1开机 3运行 4离线"""
|
|
self._send(b"SST|" + str(int(state)).encode("ascii") + b"|" + CR)
|
|
|
|
def register_print_callback(self):
|
|
"""SNO|PRC|1|<CR> 注册打印完成反馈, 之后喷码机每打印一次会回 PRC<CR>"""
|
|
self._send(b"SNO|PRC|1|" + CR)
|
|
|
|
def get_status(self) -> dict:
|
|
"""GST<CR> -> STS|<overall>|<error>|<currentjob>|<batchcount>|<totalcount>|<CR>"""
|
|
resp = self._send(b"GST" + CR, expect_reply=True)
|
|
parts = resp.rstrip(b"\r\n").decode("latin-1").split("|")
|
|
if not parts or parts[0] != "STS" or len(parts) < 6:
|
|
raise CoderError(f"GST 响应不识别: {resp!r}")
|
|
try:
|
|
return {
|
|
"overall": int(parts[1]),
|
|
"error": int(parts[2]),
|
|
"current_job": parts[3],
|
|
"batch_count": int(parts[4]),
|
|
"total_count": int(parts[5]),
|
|
}
|
|
except ValueError as e:
|
|
raise CoderError(f"GST 响应解析失败: {resp!r} - {e}")
|
|
|
|
def get_faults(self) -> list:
|
|
"""GFT<CR> -> FLT|<count>|[<code>|<clearable>|<title>|]*<CR>"""
|
|
resp = self._send(b"GFT" + CR, expect_reply=True)
|
|
parts = resp.rstrip(b"\r\n").decode("latin-1").split("|")
|
|
if not parts or parts[0] != "FLT" or len(parts) < 2:
|
|
raise CoderError(f"GFT 响应不识别: {resp!r}")
|
|
try:
|
|
count = int(parts[1])
|
|
except ValueError as e:
|
|
raise CoderError(f"GFT 响应解析失败: {resp!r} - {e}")
|
|
faults = []
|
|
for i in range(count):
|
|
base = 2 + i * 3
|
|
if base + 2 >= len(parts):
|
|
break
|
|
faults.append({
|
|
"code": parts[base],
|
|
"clearable": parts[base + 1] == "1",
|
|
"title": parts[base + 2],
|
|
})
|
|
return faults
|
|
|
|
def get_current_job(self) -> dict:
|
|
"""GJD<CR> -> JDL|<count>|<field>=<value>|...|<CR>"""
|
|
resp = self._send(b"GJD" + CR, expect_reply=True)
|
|
parts = resp.rstrip(b"\r\n").decode("latin-1").split("|")
|
|
if not parts or parts[0] != "JDL" or len(parts) < 2:
|
|
raise CoderError(f"GJD 响应不识别: {resp!r}")
|
|
try:
|
|
count = int(parts[1])
|
|
except ValueError as e:
|
|
raise CoderError(f"GJD 响应解析失败: {resp!r} - {e}")
|
|
fields = {}
|
|
for i in range(count):
|
|
idx = 2 + i
|
|
if idx >= len(parts):
|
|
break
|
|
pair = parts[idx]
|
|
if "=" not in pair:
|
|
continue
|
|
k, v = pair.split("=", 1)
|
|
fields[k] = v
|
|
return fields
|