""" 伟迪捷 1880 喷码机 ASCII 协议封装 帧格式: 纯 ASCII, 字段以 '|' 分隔, 整帧以 (0x0D) 结尾 - 写指令(SLA/JDA/CQI/JDI/SST/SNO) 通常无回包 - 查询指令(GST/GFT/GJD) 回包同样以 结尾 """ import socket from rest_framework.exceptions import ParseError CR = b"\x0d" class CoderError(ParseError): pass class CoderClient: def __init__(self, ip: str, port: int = 3100, timeout: float = 2.0): if not ip: raise CoderError("喷码IP未配置") self.ip = ip self.port = port self.timeout = timeout def _send(self, frame: bytes, expect_reply: bool = False) -> bytes: try: with socket.create_connection((self.ip, self.port), timeout=self.timeout) as s: s.sendall(frame) if not expect_reply: return b"" buf = b"" while CR not in buf: chunk = s.recv(256) if not chunk: break buf += chunk return buf except (socket.timeout, OSError) as e: raise CoderError(f"喷码通讯失败 {self.ip}:{self.port} - {e}") @staticmethod def _encode(text: str, label: str) -> bytes: if "\r" in text or "\n" in text: raise CoderError(f"{label}含 CR/LF, 会破坏帧结构") if "|" in text: raise CoderError(f"{label}含 '|', 与 1880 分隔符冲突") try: return text.encode("latin-1") except UnicodeEncodeError as e: raise CoderError(f"{label}含喷码机不支持的字符: {e}") def send_command(self, command: str, expect_reply: bool = False) -> bytes: """直接下发一条已拼装好的 1880 指令(不含末尾 CR), 由本方法统一补 CR。""" if "\r" in command or "\n" in command: raise CoderError("指令含 CR/LF, 由 send_command 统一补 CR, 不要手动拼") return self._send(command.encode("latin-1") + CR, expect_reply=expect_reply) def select_job(self, job_name: str): """SLA|| 选择信息模板""" self._send(b"SLA|" + self._encode(job_name, "模板名") + b"|" + CR) def update_fields(self, fields: dict): """JDA|=|...| 一帧更新多个用户区""" if not fields: return parts = [b"JDA"] for k, v in fields.items(): parts.append(self._encode(str(k), "用户区名") + b"=" + self._encode(str(v), "喷印内容")) self._send(b"|".join(parts) + b"|" + CR) def clear_queue(self): """CQI 清空喷码机内部缓存队列""" self._send(b"CQI" + CR) def push_queue(self, fields: dict): """JDI|1|=|...| 入队列数据""" if not fields: return parts = [b"JDI", b"1"] for k, v in fields.items(): parts.append(self._encode(str(k), "用户区名") + b"=" + self._encode(str(v), "喷印内容")) self._send(b"|".join(parts) + b"|" + CR) def set_state(self, state: int): """SST|| 0关机 1开机 3运行 4离线""" self._send(b"SST|" + str(int(state)).encode("ascii") + b"|" + CR) def register_print_callback(self): """SNO|PRC|1| 注册打印完成反馈, 之后喷码机每打印一次会回 PRC""" self._send(b"SNO|PRC|1|" + CR) def get_status(self) -> dict: """GST| -> STS||||||""" resp = self._send(b"GST|" + CR, expect_reply=True) parts = resp.rstrip(b"\r\n").decode("latin-1").split("|") if not parts or parts[0] != "STS" or len(parts) < 6: raise CoderError(f"GST 响应不识别: {resp!r}") try: return { "overall": int(parts[1]), "error": int(parts[2]), "current_job": parts[3], "batch_count": int(parts[4]), "total_count": int(parts[5]), } except ValueError as e: raise CoderError(f"GST 响应解析失败: {resp!r} - {e}") def get_faults(self) -> list: """GFT| -> FLT||[|||]*<CR>""" resp = self._send(b"GFT|" + CR, expect_reply=True) parts = resp.rstrip(b"\r\n").decode("latin-1").split("|") if not parts or parts[0] != "FLT" or len(parts) < 2: raise CoderError(f"GFT 响应不识别: {resp!r}") try: count = int(parts[1]) except ValueError as e: raise CoderError(f"GFT 响应解析失败: {resp!r} - {e}") faults = [] for i in range(count): base = 2 + i * 3 if base + 2 >= len(parts): break faults.append({ "code": parts[base], "clearable": parts[base + 1] == "1", "title": parts[base + 2], }) return faults def get_current_job(self) -> dict: """GJD|<CR> -> JDL|<count>|<field>=<value>|...|<CR>""" resp = self._send(b"GJD|" + CR, expect_reply=True) parts = resp.rstrip(b"\r\n").decode("latin-1").split("|") if not parts or parts[0] != "JDL" or len(parts) < 2: raise CoderError(f"GJD 响应不识别: {resp!r}") try: count = int(parts[1]) except ValueError as e: raise CoderError(f"GJD 响应解析失败: {resp!r} - {e}") fields = {} for i in range(count): idx = 2 + i if idx >= len(parts): break pair = parts[idx] if "=" not in pair: continue k, v = pair.split("=", 1) fields[k] = v return fields