# -*- coding: utf-8 -*- from __future__ import annotations import struct from dataclasses import dataclass from typing import Optional, Tuple, List SYNC = b"\xEF\xAA" # Msg IDs MID_REPLY = 0x00 MID_NOTE = 0x01 MID_IMAGE = 0x02 # Host->Module (Command) IDs (subset used by the tool) CMD_RESET = 0x10 CMD_GET_STATUS = 0x11 CMD_VERIFY = 0x12 CMD_ENROLL = 0x13 CMD_DELETE_USER = 0x20 CMD_DELETE_ALL = 0x21 CMD_GET_USER_INFO = 0x22 CMD_FACE_RESET = 0x23 CMD_GET_ALL_USERID = 0x24 CMD_ENROLL_ITG = 0x26 CMD_GET_VERSION = 0x30 CMD_INIT_ENCRYPTION = 0x50 CMD_SET_RELEASE_KEY = 0x52 CMD_SET_DEBUG_KEY = 0x53 CMD_GET_SN = 0x93 CMD_READ_USB_UVC = 0xB0 CMD_SET_USB_UVC = 0xB1 CMD_FACE_VIEW = 0xB5 CMD_UVC_VIEW = 0xC0 CMD_UPGRADE_FW = 0xF6 CMD_ENROLL_WITH_PHOTO = 0xF7 CMD_LED_CONTROL = 0xF9 CMD_ENROLL_SINGLE = 0x1D CMD_DEMO_MODE = 0xFE # NOTE nids NID_READY = 0x00 NID_FACE_STATE = 0x01 NID_UNKNOWNERROR = 0x02 NID_OTA_DONE = 0x03 NID_EYE_STATE = 0x04 # REPLY result codes (partial) MR_SUCCESS = 0x00 MR_REJECTED = 0x01 MR_ABORTED = 0x02 MR_FAILED4_CAMERA = 0x04 MR_FAILED4_UNKNOWNREASON = 0x05 MR_FAILED4_INVALIDPARAM = 0x06 MR_FAILED4_NOMEMORY = 0x07 MR_FAILED4_UNKNOWNUSER = 0x08 MR_FAILED4_MAXUSER = 0x09 MR_FAILED4_FACEENROLLED = 0x0A MR_FAILED4_LIVENESSCHECK = 0x0C MR_FAILED4_TIMEOUT = 0x0D MR_FAILED4_AUTHORIZATION = 0x0E MR_FAILED4_READ_FILE = 0x13 MR_FAILED4_WRITE_FILE = 0x14 MR_FAILED4_NO_ENCRYPT = 0x15 MR_FAILED4_NO_RGBIMAGE = 0x17 MR_FAILED4_JPGPHOTO_LARGE = 0x18 MR_FAILED4_JPGPHOTO_SMALL = 0x19 # ---- 映射表 ---- # 指令名称映射(指令ID -> 中文名称) CMD_NAMES = { CMD_RESET: "复位", CMD_GET_STATUS: "获取状态", CMD_VERIFY: "人脸验证", CMD_LED_CONTROL: "LED控制", CMD_ENROLL: "人脸录入(多帧)", CMD_ENROLL_SINGLE: "人脸录入(单帧)", CMD_ENROLL_ITG: "人脸录入(集成式)", CMD_DELETE_USER: "删除单个用户", CMD_DELETE_ALL: "删除所有用户", CMD_GET_USER_INFO: "获取用户信息", CMD_GET_ALL_USERID: "获取所有用户ID", CMD_GET_VERSION: "获取版本信息", CMD_INIT_ENCRYPTION: "初始化加密", CMD_ENROLL_WITH_PHOTO: "照片录入注册", } # 结果码名称映射(结果码 -> 中文名称) RESULT_NAMES = { 0x00: "成功", 0x01: "被拒绝", 0x02: "已中止", 0x04: "失败:相机异常", 0x05: "失败:未知错误", 0x06: "失败:参数无效", 0x07: "失败:内存不足", 0x08: "失败:用户不存在", 0x09: "失败:超过最大用户数", 0x0A: "失败:已录入该用户", 0x0C: "失败:活体检测未通过", 0x0D: "失败:超时", 0x0E: "失败:认证失败", 0x13: "失败:文件读取错误", 0x14: "失败:文件写入错误", 0x15: "失败:未启用加密", 0x17: "失败:无RGB图像", 0x18: "失败:JPG文件过大", 0x19: "失败:JPG文件过小", } # 通知状态名称映射(状态码 -> 中文名称) NOTE_NAMES = { 0x00: "就绪", 0x01: "人脸状态", 0x02: "未知错误", 0x03: "OTA升级完成", 0x04: "眼睛状态", } # 帧头同步字 SYNC = b"\xEF\xAA" def xor_checksum(data: bytes) -> int: """ 计算异或校验 (XOR),范围为整个帧的 MsgID + Size + Data 部分, 不包括 SYNC(2字节) 和最后的校验字节本身。 """ chk = 0 for b in data: chk ^= b return chk & 0xFF def pack_frame(msg_id: int, data: bytes = b"") -> bytes: """ 封装一帧数据 格式: SYNC(2) + MsgID(1) + Size(2) + Data(N) + Chk(1) - MsgID: 命令字 - Size : Data 长度 (big endian, 2字节) - Data : 负载 - Chk : 校验 = MsgID..Data 的所有字节异或 """ size = struct.pack(">H", len(data)) # 大端编码 head = bytes([msg_id]) + size + data # MsgID + Size + Data chk = xor_checksum(head) # 计算校验 return SYNC + head + bytes([chk]) # 拼装完整帧 def unpack_frame(buf: bytes) -> Tuple[Optional[dict], int]: """ 尝试从缓冲区中解析出一帧。 返回: (frame_dict, consumed_bytes) - 如果没有完整帧: (None, 0) - 如果解析成功: ({"msg_id":..,"size":..,"data":..,"raw":..}, 已消耗字节数) - 如果校验失败: ({"error":"checksum","raw":..}, 已消耗字节数) buf: 原始接收缓冲区 """ if len(buf) < 6: # 最小帧长: SYNC(2) + MsgID(1) + Size(2) + Chk(1) return None, 0 # 查找 SYNC idx = buf.find(SYNC) if idx == -1: # 没找到帧头,整个缓冲区丢弃 return None, len(buf) # 余下字节不足以解析长度字段,继续等待 if len(buf) - idx < 6: return None, 0 msg_id = buf[idx+2] size = struct.unpack(">H", buf[idx+3:idx+5])[0] total = 2 + 1 + 2 + size + 1 # 整个帧长度 # 数据不完整,继续等待 if len(buf) - idx < total: return None, 0 # 截取一帧 frame = buf[idx: idx+total] content = frame[2:-1] # 不包括 SYNC 和最后的校验字节 chk = frame[-1] # 校验 if xor_checksum(content) != chk: return {"error": "checksum", "raw": frame}, total # 提取有效载荷 data = frame[5:-1] return { "msg_id": msg_id, "size": size, "data": data, "raw": frame }, total # ------ Builders for key commands ------ def build_reset() -> bytes: return pack_frame(CMD_RESET) def build_get_status() -> bytes: return pack_frame(CMD_GET_STATUS) def build_led_control(state: int) -> bytes: """ 构建LED控制指令帧(0xF9指令),符合校验位逻辑 :param state: LED状态值,0=灭,1=亮 """ # 数据部分:1字节状态值(根据模组要求定义,如0x00=灭,0x01=亮) data = struct.pack("B", state & 0xFF) # 确保是单字节 return pack_frame(CMD_LED_CONTROL, data) # CMD_LED_CONTROL = 0xF9 #pd_rightaway: int,验证成功后是否立即断电(0=不立即断电,1=立即断电),仅保留低8位有效(通过&0xFF确保) #timeout: int,验证超时时间(单位:秒),范围通常为1-255秒,仅保留低8位有效(通过&0xFF确保) def build_verify(pd_rightaway: int = 0, timeout: int = 10) -> bytes: data = struct.pack("BB", pd_rightaway & 0xFF, timeout & 0xFF) return pack_frame(CMD_VERIFY, data) def build_enroll(admin: int, user_name: str, face_dir: int, timeout: int = 10) -> bytes: name_bytes = user_name.encode("utf-8")[:32] name_bytes = name_bytes + bytes(32 - len(name_bytes)) data = struct.pack("B", admin & 0xFF) + name_bytes + struct.pack("BB", face_dir & 0xFF, timeout & 0xFF) return pack_frame(CMD_ENROLL, data) def build_enroll_single(admin: int, user_name: str, timeout: int = 10) -> bytes: name_bytes = user_name.encode("utf-8")[:32] name_bytes = name_bytes + bytes(32 - len(name_bytes)) # face_dir not used per manual for ENROLL_SINGLE data = struct.pack("B", admin & 0xFF) + name_bytes + struct.pack("BB", 0x00, timeout & 0xFF) return pack_frame(0x1D, data) def build_enroll_itg_single(admin: int, user_name: str, face_dir: int, timeout: int, itg_param: int) -> bytes: """ 构造 ENROLL_ITG_SINGLE 命令帧 admin: 0=普通用户, 1=管理员 user_name: 最多 32 字节 face_dir: 人脸方向 bitmask timeout: 超时时间 (秒) itg_param: ITG 参数 (4 字节) """ # user_name 补齐到 32 字节 name_bytes = user_name.encode("utf-8")[:32] name_bytes = name_bytes.ljust(32, b"\x00") payload = struct.pack( ">B32sBBI", admin & 0xFF, name_bytes, face_dir & 0xFF, timeout & 0xFF, itg_param & 0xFFFFFFFF, ) return pack_frame(CMD_ENROLL_ITG, payload) def build_delete_user(user_id: int) -> bytes: return pack_frame(CMD_DELETE_USER, struct.pack("BB", (user_id>>8)&0xFF, user_id&0xFF)) def build_delete_all() -> bytes: return pack_frame(CMD_DELETE_ALL) def build_get_user_info(user_id: int) -> bytes: return pack_frame(CMD_GET_USER_INFO, struct.pack("BB", (user_id>>8)&0xFF, user_id&0xFF)) def build_get_all_userid() -> bytes: return pack_frame(CMD_GET_ALL_USERID) def build_get_version() -> bytes: return pack_frame(CMD_GET_VERSION) def build_uvc_view(state: int) -> bytes: return pack_frame(CMD_UVC_VIEW, struct.pack("B", state & 0xFF)) def build_face_view(state: int) -> bytes: return pack_frame(CMD_FACE_VIEW, struct.pack("B", state & 0xFF)) def build_init_encryption(seed4: bytes, mode: int = 0) -> bytes: if len(seed4) != 4: raise ValueError("seed must be 4 bytes") return pack_frame(CMD_INIT_ENCRYPTION, seed4 + bytes([mode & 0xFF])) def build_enroll_with_photo_begin(photo_size: int) -> bytes: # According to manual: Seq=0, Photo data=4-byte big-endian size data = b"\x00\x00" + struct.pack(">I", photo_size) return pack_frame(CMD_ENROLL_WITH_PHOTO, data) def build_enroll_with_photo_chunk(seq: int, chunk: bytes) -> bytes: # Seq starts from 1 and increases; MTU=246 data = struct.pack(">H", seq & 0xFFFF) + chunk return pack_frame(CMD_ENROLL_WITH_PHOTO, data) # ---- Parsers (Reply / Note / Image) ---- def parse_reply(data: bytes) -> dict: if len(data) < 2: return {"type":"REPLY","error":"short"} mid = data[0] result = data[1] rest = data[2:] # 初始化 info 字典 info = { "type":"REPLY", "mid": mid, "mid_name": CMD_NAMES.get(mid, f"0x{mid:02X}"), "result": result, "result_name": RESULT_NAMES.get(result, f"0x{result:02X}") } if mid == CMD_VERIFY and len(rest) >= 36: uid = (rest[0]<<8)|rest[1] name = rest[2:34].rstrip(b"\x00").decode("utf-8", errors="ignore") admin = rest[34] unlock = rest[35] info.update({"user_id": uid, "user_name": name, "admin": admin, "unlock_status": unlock}) elif mid in (CMD_ENROLL, 0x1D, CMD_ENROLL_ITG) and len(rest) >= 3: uid = (rest[0]<<8)|rest[1] face_dir = rest[2] info.update({"user_id": uid, "face_direction": face_dir}) elif mid == CMD_GET_STATUS and len(rest) >= 1: status = rest[0] info.update({"status": status, "status_name": { 0: "空闲", 1: "录入中", 2: "验证中" }.get(status, f"0x{status:02X}")}) elif mid == CMD_GET_USER_INFO and len(rest) >= 35: uid = (rest[0]<<8)|rest[1] name = rest[2:34].decode("ascii", errors="ignore") admin = rest[34] info.update({"user_id": uid, "user_name": name, "admin": admin}) elif mid == CMD_GET_ALL_USERID and len(rest) >= 1: n = rest[0] ids = [(rest[i]<<8)|rest[i+1] for i in range(1, 1+2*n, 2) if i+1 < len(rest)] info.update({"count": n, "user_ids": ids}) elif mid == CMD_GET_VERSION: info["version_str"] = rest.decode("ascii", errors="ignore") elif mid == CMD_LED_CONTROL and len(rest) >= 1: led_state = rest[0] info.update({"led_state": led_state, "led_state_name": { 0: "灭", 1: "亮" }.get(led_state, f"0x{led_state:02X}")}) elif mid == CMD_ENROLL_WITH_PHOTO: if len(rest) >= 2: seq = (rest[0]<<8)|rest[1] info["seq"] = seq if len(rest) >= 6: uid = (rest[2]<<8)|rest[3] info["user_id"] = uid return info def parse_note(data: bytes) -> dict: if len(data) < 1: return {"type":"NOTE","error":"short"} nid = data[0] rest = data[1:] info = {"type":"NOTE","nid": nid, "nid_name": NOTE_NAMES.get(nid, f"0x{nid:02X}")} if nid == NID_FACE_STATE and len(rest) >= 16: vals = struct.unpack(">hhhhhhhh", rest[:16]) info.update({ "state": vals[0], "left": vals[1], "top": vals[2], "right": vals[3], "bottom": vals[4], "yaw": vals[5], "pitch": vals[6], "roll": vals[7] }) return info def parse_image(data: bytes) -> dict: return {"type":"IMAGE","jpeg":data}