""" 伟迪捷 1880 喷码机 ASCII 协议封装 帧格式: 纯 ASCII, 字段以 '|' 分隔, 整帧以 (0x0D) 结尾 - 写指令(SLA/JDA/CQI/JDI/SST/SNO) 通常无回包 - 查询指令(GST/GFT/GJD) 回包同样以 结尾 """ import socket from rest_framework.exceptions import ParseError CR = b"\x0d" class CoderError(ParseError): pass class CoderClient: def __init__(self, ip: str, port: int = 3100, timeout: float = 2.0): if not ip: raise CoderError("喷码IP未配置") self.ip = ip self.port = port self.timeout = timeout def _send(self, frame: bytes, expect_reply: bool = False) -> bytes: try: with socket.create_connection((self.ip, self.port), timeout=self.timeout) as s: s.sendall(frame) if expect_reply: buf = b"" while CR not in buf: chunk = s.recv(256) if not chunk: break buf += chunk return buf # 写指令: 短暂等待, 捕获机器拒绝(ERR/!); 超时则视为成功 s.settimeout(0.3) try: buf = b"" while CR not in buf: chunk = s.recv(256) if not chunk: break buf += chunk except socket.timeout: buf = b"" if buf: text = buf.rstrip(b"\r\n").decode("latin-1", errors="replace") if text.startswith("ERR") or text.startswith("!"): raise CoderError(f"喷码机拒绝指令 {frame!r}: {text}") return b"" except CoderError: raise except (socket.timeout, OSError) as e: raise CoderError(f"喷码通讯失败 {self.ip}:{self.port} - {e}") @staticmethod def _encode(text: str, label: str) -> bytes: if "\r" in text or "\n" in text: raise CoderError(f"{label}含 CR/LF, 会破坏帧结构") if "|" in text: raise CoderError(f"{label}含 '|', 与 1880 分隔符冲突") try: return text.encode("latin-1") except UnicodeEncodeError as e: raise CoderError(f"{label}含喷码机不支持的字符: {e}") def send_command(self, command: str, expect_reply: bool = False) -> bytes: """直接下发一条已拼装好的 1880 指令(不含末尾 CR), 由本方法统一补 CR。""" if "\r" in command or "\n" in command: raise CoderError("指令含 CR/LF, 由 send_command 统一补 CR, 不要手动拼") return self._send(command.encode("latin-1") + CR, expect_reply=expect_reply) def select_job(self, job_name: str): """SLA|| 选择信息模板""" self._send(b"SLA|" + self._encode(job_name, "模板名") + b"|" + CR) def update_fields(self, fields: dict): """JDA|=|...| 一帧更新多个用户区""" if not fields: return parts = [b"JDA"] for k, v in fields.items(): parts.append(self._encode(str(k), "用户区名") + b"=" + self._encode(str(v), "喷印内容")) self._send(b"|".join(parts) + b"|" + CR) def clear_queue(self): """CQI 清空喷码机内部缓存队列""" self._send(b"CQI" + CR) def push_queue(self, fields: dict): """JDI|1|=|...| 入队列数据""" if not fields: return parts = [b"JDI", b"1"] for k, v in fields.items(): parts.append(self._encode(str(k), "用户区名") + b"=" + self._encode(str(v), "喷印内容")) self._send(b"|".join(parts) + b"|" + CR) def set_state(self, state: int): """SST|| 0关机 1开机 3运行 4离线""" self._send(b"SST|" + str(int(state)).encode("ascii") + b"|" + CR) def register_print_callback(self): """SNO|PRC|1| 注册打印完成反馈, 之后喷码机每打印一次会回 PRC""" self._send(b"SNO|PRC|1|" + CR) def get_status(self) -> dict: """GST -> STS||||||""" resp = self._send(b"GST" + CR, expect_reply=True) parts = resp.rstrip(b"\r\n").decode("latin-1").split("|") if not parts or parts[0] != "STS" or len(parts) < 6: raise CoderError(f"GST 响应不识别: {resp!r}") try: return { "overall": int(parts[1]), "error": int(parts[2]), "current_job": parts[3], "batch_count": int(parts[4]), "total_count": int(parts[5]), } except ValueError as e: raise CoderError(f"GST 响应解析失败: {resp!r} - {e}") def get_faults(self) -> list: """GFT -> FLT||[|||]*<CR>""" resp = self._send(b"GFT" + CR, expect_reply=True) parts = resp.rstrip(b"\r\n").decode("latin-1").split("|") if not parts or parts[0] != "FLT" or len(parts) < 2: raise CoderError(f"GFT 响应不识别: {resp!r}") try: count = int(parts[1]) except ValueError as e: raise CoderError(f"GFT 响应解析失败: {resp!r} - {e}") faults = [] for i in range(count): base = 2 + i * 3 if base + 2 >= len(parts): break faults.append({ "code": parts[base], "clearable": parts[base + 1] == "1", "title": parts[base + 2], }) return faults def get_current_job(self) -> dict: """GJD<CR> -> JDL|<count>|<field>=<value>|...|<CR>""" resp = self._send(b"GJD" + CR, expect_reply=True) parts = resp.rstrip(b"\r\n").decode("latin-1").split("|") if not parts or parts[0] != "JDL" or len(parts) < 2: raise CoderError(f"GJD 响应不识别: {resp!r}") try: count = int(parts[1]) except ValueError as e: raise CoderError(f"GJD 响应解析失败: {resp!r} - {e}") fields = {} for i in range(count): idx = 2 + i if idx >= len(parts): break pair = parts[idx] if "=" not in pair: continue k, v = pair.split("=", 1) fields[k] = v return fields # ============================================================================ # Hanslaser(大族激光) 老款打标机 标准通讯协议(Winsocket/TCP) # 帧格式: STX(0x02) + 内容 + ETX(0x03), 每条命令下位机都回一帧 # 单件打标流程: $Initialize_<文件> -> $Data_<v1,v2,...> -> $MarkStart_ # 该协议无队列/批量能力, 只能逐件: 一组 Data 对应一次 MarkStart # ============================================================================ STX = b"\x02" ETX = b"\x03" class HanslaserClient: encoding = "gbk" # 大族激光为国产机, 文档 code page 936 def __init__(self, ip: str, port: int, timeout: float = 5.0): if not ip: raise CoderError("打标机IP未配置") if not port: raise CoderError("打标机端口未配置") self.ip = ip self.port = port self.timeout = timeout def _encode(self, text: str, label: str) -> bytes: if "\x02" in text or "\x03" in text: raise CoderError(f"{label}含 STX/ETX, 会破坏帧结构") if "," in text: raise CoderError(f"{label}含 ',', 与 Hanslaser $Data 分隔符冲突") if "\r" in text or "\n" in text: raise CoderError(f"{label}含 CR/LF") try: return text.encode(self.encoding) except UnicodeEncodeError as e: raise CoderError(f"{label}含打标机不支持的字符: {e}") def _send_recv(self, body: bytes) -> str: """发 STX+body+ETX, 读回一整帧, 返回去掉 STX/ETX 后的内容字符串。""" frame = STX + body + ETX try: with socket.create_connection((self.ip, self.port), timeout=self.timeout) as s: s.sendall(frame) buf = b"" while ETX not in buf: chunk = s.recv(256) if not chunk: break buf += chunk except (socket.timeout, OSError) as e: raise CoderError(f"打标通讯失败 {self.ip}:{self.port} - {e}") if ETX not in buf: raise CoderError(f"打标机无完整回包(缺 ETX): {buf!r}") start = buf.find(STX) end = buf.find(ETX, start + 1 if start >= 0 else 0) content = buf[(start + 1 if start >= 0 else 0):end] return content.decode(self.encoding, errors="replace") def initialize(self, filename: str): """$Initialize_<文件名> -> $Initialize_OK / $Initialize_FALSE""" resp = self._send_recv(b"$Initialize_" + self._encode(filename, "打标文件名")) if resp != "$Initialize_OK": raise CoderError(f"打标文件初始化失败: {resp}") def send_data(self, values: list): """$Data_<v1,v2,...> -> $Receive_OK / $Receive_ERROR / $SysNoReady""" if not values: raise CoderError("打标数据为空") payload = b",".join(self._encode(str(v), "打标内容") for v in values) resp = self._send_recv(b"$Data_" + payload) if resp == "$Receive_OK": return if resp == "$SysNoReady": raise CoderError("打标机未准备好($SysNoReady)") raise CoderError(f"打标数据被拒绝: {resp}") def mark_start(self): """$MarkStart_ -> $MarkStart_OK / $MarkStart_OK_<次数> / $MarkStart_ERROR 下位机开始打标即回 $MarkStart_OK; 打标完成会再回 $MarkStart_OK_<次数>。 单件场景只确认开始成功, 不阻塞等待打标完成。 """ resp = self._send_recv(b"$MarkStart_") if not resp.startswith("$MarkStart_OK"): raise CoderError(f"启动打标失败: {resp}") return resp def mark_stop(self): """$MarkStop_ -> $MarkStop_OK / $MarkStop_ERROR""" resp = self._send_recv(b"$MarkStop_") if not resp.startswith("$MarkStop_OK"): raise CoderError(f"停止打标失败: {resp}") return resp