406 lines
13 KiB
Python
406 lines
13 KiB
Python
# -*- coding: utf-8 -*-
|
||
from __future__ import annotations
|
||
import struct
|
||
from dataclasses import dataclass
|
||
from typing import Optional, Tuple, List
|
||
from QT5_Project.Shared_CODE.DialogInform import DialogInform
|
||
|
||
SYNC = b"\xEF\xAA"
|
||
|
||
# Msg IDs
|
||
MID_REPLY = 0x00
|
||
MID_NOTE = 0x01
|
||
MID_IMAGE = 0x02
|
||
|
||
# Host->Module (Command) IDs (subset used by the tool)
|
||
CMD_RESET = 0x10
|
||
CMD_GET_STATUS = 0x11
|
||
CMD_VERIFY = 0x12
|
||
CMD_ENROLL = 0x13
|
||
CMD_DELETE_USER = 0x20
|
||
CMD_DELETE_ALL = 0x21
|
||
CMD_GET_USER_INFO = 0x22
|
||
CMD_FACE_RESET = 0x23
|
||
CMD_GET_ALL_USERID = 0x24
|
||
CMD_ENROLL_ITG = 0x26
|
||
CMD_GET_VERSION = 0x30
|
||
CMD_INIT_ENCRYPTION = 0x50
|
||
CMD_SET_RELEASE_KEY = 0x52
|
||
CMD_SET_DEBUG_KEY = 0x53
|
||
CMD_GET_SN = 0x93
|
||
|
||
CMD_READ_USB_UVC = 0xB0
|
||
CMD_SET_USB_UVC = 0xB1
|
||
CMD_FACE_VIEW = 0xB5
|
||
|
||
CMD_UVC_VIEW = 0xC0
|
||
|
||
CMD_UPGRADE_FW = 0xF6
|
||
CMD_ENROLL_WITH_PHOTO = 0xF7
|
||
CMD_LED_CONTROL = 0xF9
|
||
|
||
CMD_ENROLL_SINGLE = 0x1D
|
||
CMD_DEMO_MODE = 0xFE
|
||
|
||
# NOTE nids
|
||
NID_READY = 0x00
|
||
NID_FACE_STATE = 0x01
|
||
NID_UNKNOWNERROR = 0x02
|
||
NID_OTA_DONE = 0x03
|
||
NID_EYE_STATE = 0x04
|
||
|
||
# REPLY result codes (partial)
|
||
MR_SUCCESS = 0x00
|
||
MR_REJECTED = 0x01
|
||
MR_ABORTED = 0x02
|
||
MR_FAILED4_CAMERA = 0x04
|
||
MR_FAILED4_UNKNOWNREASON = 0x05
|
||
MR_FAILED4_INVALIDPARAM = 0x06
|
||
MR_FAILED4_NOMEMORY = 0x07
|
||
MR_FAILED4_UNKNOWNUSER = 0x08
|
||
MR_FAILED4_MAXUSER = 0x09
|
||
MR_FAILED4_FACEENROLLED = 0x0A
|
||
MR_FAILED4_LIVENESSCHECK = 0x0C
|
||
MR_FAILED4_TIMEOUT = 0x0D
|
||
MR_FAILED4_AUTHORIZATION = 0x0E
|
||
MR_FAILED4_READ_FILE = 0x13
|
||
MR_FAILED4_WRITE_FILE = 0x14
|
||
MR_FAILED4_NO_ENCRYPT = 0x15
|
||
MR_FAILED4_NO_RGBIMAGE = 0x17
|
||
MR_FAILED4_JPGPHOTO_LARGE = 0x18
|
||
MR_FAILED4_JPGPHOTO_SMALL = 0x19
|
||
|
||
# ---- 映射表 ----
|
||
# 指令名称映射(指令ID -> 中文名称)
|
||
CMD_NAMES = {
|
||
CMD_RESET: "复位",
|
||
CMD_GET_STATUS: "获取状态",
|
||
CMD_VERIFY: "人脸验证",
|
||
CMD_LED_CONTROL: "LED控制",
|
||
CMD_ENROLL: "人脸录入(多帧)",
|
||
CMD_ENROLL_SINGLE: "人脸录入(单帧)",
|
||
CMD_ENROLL_ITG: "人脸录入(集成式)",
|
||
CMD_DELETE_USER: "删除单个用户",
|
||
CMD_DELETE_ALL: "删除所有用户",
|
||
CMD_GET_USER_INFO: "获取用户信息",
|
||
CMD_GET_ALL_USERID: "获取所有用户ID",
|
||
CMD_GET_VERSION: "获取版本信息",
|
||
CMD_INIT_ENCRYPTION: "初始化加密",
|
||
CMD_ENROLL_WITH_PHOTO: "照片录入注册",
|
||
CMD_FACE_VIEW: "人脸框显示切换",
|
||
CMD_UVC_VIEW: "视频模式切换",
|
||
}
|
||
|
||
# 结果码名称映射(结果码 -> 中文名称)
|
||
RESULT_NAMES = {
|
||
0x00: "成功",
|
||
0x01: "被拒绝",
|
||
0x02: "已中止",
|
||
0x04: "失败:相机异常",
|
||
0x05: "失败:未知错误",
|
||
0x06: "失败:参数无效",
|
||
0x07: "失败:内存不足",
|
||
0x08: "失败:用户不存在",
|
||
0x09: "失败:超过最大用户数",
|
||
0x0A: "失败:已录入该用户",
|
||
0x0C: "失败:活体检测未通过",
|
||
0x0D: "失败:超时",
|
||
0x0E: "失败:认证失败",
|
||
0x13: "失败:文件读取错误",
|
||
0x14: "失败:文件写入错误",
|
||
0x15: "失败:未启用加密",
|
||
0x17: "失败:无RGB图像",
|
||
0x18: "失败:JPG文件过大",
|
||
0x19: "失败:JPG文件过小",
|
||
}
|
||
|
||
# 通知状态名称映射(状态码 -> 中文名称)
|
||
NOTE_NAMES = {
|
||
0x00: "就绪",
|
||
0x01: "人脸状态",
|
||
0x02: "未知错误",
|
||
0x03: "OTA升级完成",
|
||
0x04: "眼睛状态",
|
||
}
|
||
|
||
# 帧头同步字
|
||
SYNC = b"\xEF\xAA"
|
||
|
||
#########################################################################################
|
||
|
||
|
||
#########################################################################################
|
||
|
||
def xor_checksum(data: bytes) -> int:
|
||
"""
|
||
计算异或校验 (XOR),范围为整个帧的 MsgID + Size + Data 部分,
|
||
不包括 SYNC(2字节) 和最后的校验字节本身。
|
||
"""
|
||
chk = 0
|
||
for b in data:
|
||
chk ^= b
|
||
return chk & 0xFF
|
||
|
||
|
||
def pack_frame(msg_id: int, data: bytes = b"") -> bytes:
|
||
"""
|
||
封装一帧数据
|
||
格式: SYNC(2) + MsgID(1) + Size(2) + Data(N) + Chk(1)
|
||
- MsgID: 命令字
|
||
- Size : Data 长度 (big endian, 2字节)
|
||
- Data : 负载
|
||
- Chk : 校验 = MsgID..Data 的所有字节异或
|
||
"""
|
||
size = struct.pack(">H", len(data)) # 大端编码
|
||
head = bytes([msg_id]) + size + data # MsgID + Size + Data
|
||
chk = xor_checksum(head) # 计算校验
|
||
return SYNC + head + bytes([chk]) # 拼装完整帧
|
||
|
||
|
||
def unpack_frame(buf: bytes) -> Tuple[Optional[dict], int]:
|
||
"""
|
||
尝试从缓冲区中解析出一帧。
|
||
返回: (frame_dict, consumed_bytes)
|
||
- 如果没有完整帧: (None, 0)
|
||
- 如果解析成功: ({"msg_id":..,"size":..,"data":..,"raw":..}, 已消耗字节数)
|
||
- 如果校验失败: ({"error":"checksum","raw":..}, 已消耗字节数)
|
||
|
||
buf: 原始接收缓冲区
|
||
"""
|
||
if len(buf) < 6: # 最小帧长: SYNC(2) + MsgID(1) + Size(2) + Chk(1)
|
||
return None, 0
|
||
|
||
# 查找 SYNC
|
||
idx = buf.find(SYNC)
|
||
if idx == -1:
|
||
# 没找到帧头,整个缓冲区丢弃
|
||
return None, len(buf)
|
||
|
||
# 余下字节不足以解析长度字段,继续等待
|
||
if len(buf) - idx < 6:
|
||
return None, 0
|
||
|
||
msg_id = buf[idx+2]
|
||
size = struct.unpack(">H", buf[idx+3:idx+5])[0]
|
||
total = 2 + 1 + 2 + size + 1 # 整个帧长度
|
||
|
||
# 数据不完整,继续等待
|
||
if len(buf) - idx < total:
|
||
return None, 0
|
||
|
||
# 截取一帧
|
||
frame = buf[idx: idx+total]
|
||
content = frame[2:-1] # 不包括 SYNC 和最后的校验字节
|
||
chk = frame[-1]
|
||
|
||
# 校验
|
||
if xor_checksum(content) != chk:
|
||
return {"error": "checksum", "raw": frame}, total
|
||
|
||
# 提取有效载荷
|
||
data = frame[5:-1]
|
||
return {
|
||
"msg_id": msg_id,
|
||
"size": size,
|
||
"data": data,
|
||
"raw": frame
|
||
}, total
|
||
|
||
# ------ Builders for key commands ------
|
||
|
||
def build_reset() -> bytes:
|
||
return pack_frame(CMD_RESET)
|
||
|
||
def build_get_status() -> bytes:
|
||
return pack_frame(CMD_GET_STATUS)
|
||
|
||
def build_led_control(state: int) -> bytes:
|
||
"""
|
||
构建LED控制指令帧(0xF9指令),符合校验位逻辑
|
||
:param state: LED状态值,0=灭,1=亮
|
||
"""
|
||
# 数据部分:1字节状态值(根据模组要求定义,如0x00=灭,0x01=亮)
|
||
data = struct.pack("B", state & 0xFF) # 确保是单字节
|
||
return pack_frame(CMD_LED_CONTROL, data) # CMD_LED_CONTROL = 0xF9
|
||
|
||
#pd_rightaway: int,验证成功后是否立即断电(0=不立即断电,1=立即断电),仅保留低8位有效(通过&0xFF确保)
|
||
#timeout: int,验证超时时间(单位:秒),范围通常为1-255秒,仅保留低8位有效(通过&0xFF确保)
|
||
def build_verify(pd_rightaway, timeout: int = 10) -> bytes:
|
||
data = struct.pack("BB", pd_rightaway & 0xFF, timeout & 0xFF)
|
||
return pack_frame(CMD_VERIFY, data)
|
||
|
||
def build_enroll(admin: int, user_name: str, face_dir: int, timeout: int = 10) -> bytes:
|
||
name_bytes = user_name.encode("utf-8")[:32]
|
||
name_bytes = name_bytes + bytes(32 - len(name_bytes))
|
||
data = struct.pack("B", admin & 0xFF) + name_bytes + struct.pack("BB", face_dir & 0xFF, timeout & 0xFF)
|
||
return pack_frame(CMD_ENROLL, data)
|
||
|
||
def build_enroll_single(admin: int, user_name: str, timeout: int = 10) -> bytes:
|
||
name_bytes = user_name.encode("utf-8")[:32]
|
||
name_bytes = name_bytes + bytes(32 - len(name_bytes))
|
||
# face_dir not used per manual for ENROLL_SINGLE
|
||
data = struct.pack("B", admin & 0xFF) + name_bytes + struct.pack("BB", 0x00, timeout & 0xFF)
|
||
return pack_frame(0x1D, data)
|
||
|
||
def build_enroll_itg_single(admin: int, user_name: str, face_dir: int, timeout: int, itg_param: int) -> bytes:
|
||
"""
|
||
构造 ENROLL_ITG_SINGLE 命令帧
|
||
admin: 0=普通用户, 1=管理员
|
||
user_name: 最多 32 字节
|
||
face_dir: 人脸方向 bitmask
|
||
timeout: 超时时间 (秒)
|
||
itg_param: ITG 参数 (4 字节)
|
||
"""
|
||
# user_name 补齐到 32 字节
|
||
name_bytes = user_name.encode("utf-8")[:32]
|
||
name_bytes = name_bytes.ljust(32, b"\x00")
|
||
|
||
payload = struct.pack(
|
||
">B32sBBI",
|
||
admin & 0xFF,
|
||
name_bytes,
|
||
face_dir & 0xFF,
|
||
timeout & 0xFF,
|
||
itg_param & 0xFFFFFFFF,
|
||
)
|
||
return pack_frame(CMD_ENROLL_ITG, payload)
|
||
|
||
def build_delete_user(user_id: int) -> bytes:
|
||
return pack_frame(CMD_DELETE_USER, struct.pack("BB", (user_id>>8)&0xFF, user_id&0xFF))
|
||
|
||
def build_delete_all() -> bytes:
|
||
return pack_frame(CMD_DELETE_ALL)
|
||
|
||
def build_get_user_info(user_id: int) -> bytes:
|
||
return pack_frame(CMD_GET_USER_INFO, struct.pack("BB", (user_id>>8)&0xFF, user_id&0xFF))
|
||
|
||
def build_get_all_userid() -> bytes:
|
||
return pack_frame(CMD_GET_ALL_USERID)
|
||
|
||
def build_get_version() -> bytes:
|
||
return pack_frame(CMD_GET_VERSION)
|
||
|
||
def build_uvc_view(state: int) -> bytes:
|
||
return pack_frame(CMD_UVC_VIEW, struct.pack("B", state & 0xFF))
|
||
|
||
def build_face_view(state: int) -> bytes:
|
||
return pack_frame(CMD_FACE_VIEW, struct.pack("B", state & 0xFF))
|
||
|
||
def build_init_encryption(seed4: bytes, mode: int = 0) -> bytes:
|
||
if len(seed4) != 4:
|
||
raise ValueError("seed must be 4 bytes")
|
||
return pack_frame(CMD_INIT_ENCRYPTION, seed4 + bytes([mode & 0xFF]))
|
||
|
||
def build_enroll_with_photo_begin(photo_size: int) -> bytes:
|
||
# According to manual: Seq=0, Photo data=4-byte big-endian size
|
||
data = b"\x00\x00" + struct.pack(">I", photo_size)
|
||
return pack_frame(CMD_ENROLL_WITH_PHOTO, data)
|
||
|
||
def build_enroll_with_photo_chunk(seq: int, chunk: bytes) -> bytes:
|
||
# Seq starts from 1 and increases; MTU=246
|
||
data = struct.pack(">H", seq & 0xFFFF) + chunk
|
||
return pack_frame(CMD_ENROLL_WITH_PHOTO, data)
|
||
|
||
# ---- Parsers (Reply / Note / Image) ----
|
||
def parse_reply(data: bytes) -> dict:
|
||
if len(data) < 2:
|
||
return {"type": "REPLY", "error": "short"}
|
||
|
||
mid = data[0]
|
||
result = data[1]
|
||
rest = data[2:]
|
||
|
||
# 通用字段
|
||
info = {
|
||
"type": "REPLY",
|
||
"mid": mid,
|
||
"mid_name": CMD_NAMES.get(mid, f"0x{mid:02X}"),
|
||
"result": result,
|
||
"result_name": RESULT_NAMES.get(result, f"0x{result:02X}"),
|
||
"ok": (result == 0), # 成功标志
|
||
}
|
||
|
||
# ========== 分支解析 ==========
|
||
if mid == CMD_VERIFY and len(rest) >= 36:
|
||
uid = (rest[0] << 8) | rest[1]
|
||
name = rest[2:34].rstrip(b"\x00").decode("utf-8", errors="ignore")
|
||
admin = rest[34]
|
||
unlock = rest[35]
|
||
info.update({
|
||
"user_id": uid,
|
||
"user_name": name,
|
||
"admin": admin,
|
||
"unlock_status": unlock,
|
||
})
|
||
|
||
elif mid in (CMD_ENROLL, 0x1D, CMD_ENROLL_ITG) and len(rest) >= 3:
|
||
uid = (rest[0] << 8) | rest[1]
|
||
face_dir = rest[2]
|
||
info.update({"user_id": uid, "face_direction": face_dir})
|
||
|
||
elif mid == CMD_GET_STATUS and len(rest) >= 1:
|
||
status = rest[0]
|
||
status_map = {0: "空闲", 1: "录入中", 2: "验证中"}
|
||
info.update({"status": status, "status_name": status_map.get(status, f"0x{status:02X}")})
|
||
|
||
elif mid == CMD_GET_USER_INFO and len(rest) >= 35:
|
||
uid = (rest[0] << 8) | rest[1]
|
||
name = rest[2:34].decode("ascii", errors="ignore")
|
||
admin = rest[34]
|
||
info.update({"user_id": uid, "user_name": name, "admin": admin})
|
||
|
||
elif mid == CMD_GET_ALL_USERID and len(rest) >= 1:
|
||
n = rest[0]
|
||
ids = [(rest[i] << 8) | rest[i + 1] for i in range(1, 1 + 2 * n, 2) if i + 1 < len(rest)]
|
||
info.update({"count": n, "user_ids": ids})
|
||
|
||
elif mid == CMD_GET_VERSION:
|
||
info["version_str"] = rest.decode("ascii", errors="ignore")
|
||
|
||
elif mid == CMD_LED_CONTROL and len(rest) >= 1:
|
||
led_state = rest[0]
|
||
led_map = {0: "灭", 1: "亮"}
|
||
info.update({"led_state": led_state, "led_state_name": led_map.get(led_state, f"0x{led_state:02X}")})
|
||
|
||
elif mid == CMD_ENROLL_WITH_PHOTO:
|
||
if len(rest) >= 2:
|
||
seq = (rest[0] << 8) | rest[1]
|
||
info["seq"] = seq
|
||
if len(rest) >= 6:
|
||
uid = (rest[2] << 8) | rest[3]
|
||
info["user_id"] = uid
|
||
|
||
# 自动生成提示消息
|
||
if info["ok"]:
|
||
if mid == CMD_VERIFY and "user_id" in info:
|
||
info["message"] = (
|
||
f"{info['mid_name']} 成功 - 用户ID: {info['user_id']}"
|
||
)
|
||
else:
|
||
info["message"] = f"{info['mid_name']} 成功"
|
||
else:
|
||
info["message"] = f"{info['mid_name']} 失败: {info['result_name']}"
|
||
|
||
return info
|
||
|
||
|
||
def parse_note(data: bytes) -> dict:
|
||
if len(data) < 1:
|
||
return {"type":"NOTE","error":"short"}
|
||
nid = data[0]
|
||
rest = data[1:]
|
||
info = {"type":"NOTE","nid": nid, "nid_name": NOTE_NAMES.get(nid, f"0x{nid:02X}")}
|
||
|
||
if nid == NID_FACE_STATE and len(rest) >= 16:
|
||
vals = struct.unpack(">hhhhhhhh", rest[:16])
|
||
info.update({
|
||
"state": vals[0],
|
||
"left": vals[1], "top": vals[2], "right": vals[3], "bottom": vals[4],
|
||
"yaw": vals[5], "pitch": vals[6], "roll": vals[7]
|
||
})
|
||
|
||
return info
|
||
|
||
def parse_image(data: bytes) -> dict:
|
||
return {"type":"IMAGE","jpeg":data}
|