Files

1008 lines
45 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# This Python file uses the following encoding: utf-8
import sys
import time
from PyQt5.QtWidgets import QFileDialog,QHeaderView
from PyQt5.QtWidgets import QApplication, QMainWindow, QStackedWidget, QWidget, QLayout, QLabel, QLineEdit, QPushButton, QMessageBox, QShortcut, QDialog,QTextEdit
from PyQt5 import uic
from PyQt5.QtCore import Qt, QThread, pyqtSignal, QObject, QRunnable, QMutex, QTimer, QEvent
from PyQt5.QtGui import QImage, QPixmap, QColor, QKeySequence
from UIFrameWork import UIFrameWork
from UIFrameWork import *
# 添加当前脚本的父级目录到 sys.path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from Shared_CODE.CameraThread import CameraThread
from PyQt5.QtWidgets import QApplication, QMainWindow, QStackedWidget
from PyQt5.QtWidgets import QDesktopWidget
import menu_utils as utils
import uart_group_config as group_config
from mqtt_device import class_comm_mqtt_thread, class_comm_mqtt_interface
from print_color import *
from Shared_CODE.get_tip_prop import *
from QT5_Project.Shared_CODE.FaceRecognitionProtocol import *
from QT5_Project.Shared_CODE.DialogFaceEnrollItgSingle import EnrollItgSingleDialog
from QT5_Project.Shared_CODE.DialogFaceUserManage import UserManageDialog, save_user, load_users, save_users_list
# 设置 UI 目录的路径
ui_path = os.path.abspath(os.path.join(os.path.dirname(__file__), 'UI'))
# 页面文件,动态构建 UI 文件的路径
uiFile_P00DeviceList = os.path.join(ui_path, "P00DeviceList.ui")
uiFile_P01DeviceMenu = os.path.join(ui_path, "P01DeviceMenu.ui")
uiFile_P02DataView = os.path.join(ui_path, "P02DataView.ui")
uiFile_P03SwitchAction = os.path.join(ui_path, "P03SwitchAction.ui")
uiFile_P04ParamSet = os.path.join(ui_path, "P04ParamSet.ui")
uiFile_P05CameraView = os.path.join(ui_path, "P05CameraView.ui")
uiFile_P05_01_FaceCameraView = os.path.join(ui_path, "P05_01_FaceCameraView.ui")
uiFile_P06FaultQuery = os.path.join(ui_path, "P06FaultQuery.ui")
uiFile_P07SystemSet = os.path.join(ui_path, "P07SystemSet.ui")
uiFile_P08FieldTestAction = os.path.join(ui_path, "P08FieldTestAction.ui")
uiFile_P11LightDeviceMenu = os.path.join(ui_path, "P11LightDeviceMenu.ui")
uiFile_P12LightDataView = os.path.join(ui_path, "P12LightDataView.ui")
uiFile_P13LightSwitchAction = os.path.join(ui_path, "P13LightSwitchAction.ui")
uiFile_P14LightParamSet = os.path.join(ui_path, "P14LightParamSet.ui")
uiFile_P16LightFaultQuery = os.path.join(ui_path, "P16LightFaultQuery.ui")
uiFile_P17LightSystemSet = os.path.join(ui_path, "P17LightSystemSet.ui")
uiFile_P18LightFieldTestAction = os.path.join(ui_path, "P18LightFieldTestAction.ui")
COLOR_RED = QColor("#EE2D2D").name()
COLOR_GREEN = QColor(Qt.green).name()
COLOR_BLUE = QColor(Qt.blue).name()
COLOR_YELLOW = QColor(Qt.yellow).name()
COLOR_NORMAL = QColor("#000000").name()
COLOR_ALARM_NORMAL = QColor("#ECECEC").name()
COLOR_ALARM_ERROR_TEXT = QColor("#B1E5FC").name()
COLOR_ALARM_ERROR_BG = QColor("#E17176").name()
COLOR_VALUE_NORMAL_YELLOW = QColor("#F8C822").name()
COLOR_VALUE_NORMAL_BLUE = QColor("#4E76D4").name()
COLOR_VALUE_NORMAL_READ = QColor("#CE5D62").name()
COLOR_VALUE_NORMAL_GREEN = QColor("#83BF6E").name()
def get_camera_id_and_canvas_id(action : str) :
camera_id = -1
canvas_id = -1
if "canvas" in action and "camera" in action :
action_splits = action.split("_")
for action_split_str in action_splits:
if "camera" in action_split_str:
camera_id = get_value_from_lead_value_str(action_split_str, "camera", -1)
elif "canvas" in action_split_str:
canvas_id = get_value_from_lead_value_str(action_split_str, "canvas", -1)
return camera_id, canvas_id
#根据回路选择摄像头url地址
def search_camera_url(camera_id : int) :
cameral_url = None
for config_dict in group_config.comm_thread_config :
device_list = utils.dict_or_object_get_attr(config_dict, "device_list", None)
if device_list != None :
for item_dict in device_list :
config_camera_id = utils.dict_or_object_get_attr(item_dict, "circuit_id", -1)
if config_camera_id == camera_id :
cameral_url = utils.dict_or_object_get_attr(item_dict, "camera_url", None)
if cameral_url != None :
return cameral_url
return cameral_url
menu_page_widget_list = []
class PageTemplate(UIFrameWork): # 页面模板类,包含各子页面公用功能
def __init__(self, parent_window): #
UIFrameWork.__init__(self) #
self.parent_window : APPWindow = parent_window
def connect_matched_camera_canvas(self) :
page_widget : UIFrameWork = self.parent_window.stack.currentWidget()
if page_widget != None :
for canvas_id in range(9) :
camera_id , face_detect = page_widget.get_canvas_prop(canvas_id)
if camera_id >= 0 :
self.parent_window.connect_camera_to_canvas(page_widget, camera_id, canvas_id, face_detect)
def virtual_connect_canvas_to_camera(self, canvas_id : int, camera_id : int, is_visible : bool = True) :
if camera_id >= 0 and canvas_id >= 0 and canvas_id < len(self.parent_window.camera_thread_list) :
face_detect = 0
canvas_object = self.search_canvas_object(canvas_id)
if canvas_object != None :
face_detect = get_tip_face_detection(canvas_object.statusTip())
camera_thread : CameraThread = self.parent_window.camera_thread_list[camera_id]
if camera_thread != None :
if not camera_thread.is_emit and is_visible:
self.show_camera_error(camera_id)
if camera_thread.is_emit:
self.connect_camera_thread(camera_thread, canvas_id, is_visible)
self.parent_window.camera_signal.emit(camera_id)
camera_thread.face_detection = face_detect
if hasattr(self, "CameraChioceLabel") :
# self.CameraChioceLabel.setStyleSheet("color: rgb(255, 255, 255); font-size: 50px;")
self.CameraChioceLabel.setText("回路" + str(camera_id+1))
else:
self.show_camera_error(camera_id)
def show_camera_error(self, camera_id) :
inform_box : DialogInform = DialogInform()
inform_box.information("提示", f"视频流{camera_id+1}无法初始化或失去连接!")
#定义虚函数, 进入页面时调用
def virtual_on_page_enter(self) :
pass
#定义虚函数, 退出页面时调用
def virtual_on_page_leave(self) :
pass
#重载虚函数, 切换页面时调用
def virtual_change_to_page(self, page) :
if self.parent_window == None :
return
self_page_widget : PageTemplate = self.parent_window.stack.currentWidget()
if self_page_widget == None :
return
target_page_widget : PageTemplate = None
if page >= 0 :
target_page_widget = self.parent_window.search_page_widget(page)
if target_page_widget in menu_page_widget_list:
while len(menu_page_widget_list) > 0 :
pop_page_widget = menu_page_widget_list.pop()
if pop_page_widget == None or pop_page_widget == target_page_widget:
break
else :
while len(menu_page_widget_list) > 0 :
pop_page_widget = menu_page_widget_list.pop()
if pop_page_widget == self :
continue
else :
target_page_widget = pop_page_widget
break
if target_page_widget == None :
target_page_widget = self.parent_window.allpages_list[0]
if target_page_widget != None :
menu_page_widget_list.append(target_page_widget)
self_page_widget.virtual_on_page_leave()
self.parent_window.stack.setCurrentWidget(target_page_widget)
target_page_widget.virtual_on_page_enter()
self.connect_matched_camera_canvas()
def virtual_widget_action_process(self, widget : QWidget, action : str) :
action_splits = action.split("+")
page_id = -1
circuit_id = -1
main_index = -1
add_circuit = -1
binding_circuit = -1
for action_split_str in action_splits :
if "SetPage" in action_split_str :
page_id = get_value_from_lead_value_str(action_split_str, "SetPage", -1)
elif "SetCircuit" in action_split_str :
circuit_id = get_value_from_lead_value_str(action_split_str, "SetCircuit", -1)
elif "SetMain" in action_split_str:
main_index = get_value_from_lead_value_str(action_split_str, "SetMain", -1)
elif "AddCircuit" in action_split_str :
add_circuit = get_value_from_lead_value_str(action_split_str, "AddCircuit", -1)
elif "SetBinding" in action_split_str :
binding_circuit = get_value_from_lead_value_str(action_split_str, "SetBinding", 1)
print("<virtual_widget_action_process>@Line:",inspect.currentframe().f_lineno,'<circuit_id> = ',circuit_id,'<Action> = ',action_splits)
if add_circuit >= 0 :
tip_str : str = page_widget.statusTip()
circuit_id = get_tip_circuit(tip_str)
circuit_id = circuit_id + 1
if binding_circuit > 0 :
circuit_id = self.get_circuit_from_object(widget)
target_page_widget = self.parent_window.search_page_widget(page_id)
page_widget : PageTemplate = target_page_widget
if page_id >= 0 : # 功能标记:执行页面跳转功能
target_page_widget = self.parent_window.search_page_widget(page_id)
if target_page_widget != None :
if circuit_id >= 0 :
target_page_widget.set_page_circuit(circuit_id)
self.virtual_change_to_page(page_id)
if main_index >= 0 : # 功能标记:执行控件主索引设定
self.set_menu_main_index(main_index) # 设置页面主索引字段<main>,可以跨组选择控件
if circuit_id >= 0 :
indicator : QLineEdit = target_page_widget.findChild(QLineEdit,"BindNum_Title")
indicator.setText("当前回路为 %d"%(circuit_id + 1))
# 显示提示图标
indicator_Label : QLabel = target_page_widget.findChild(QLabel,"indicator_Label")
tip_str : str = indicator_Label.statusTip()
imag_file_name = get_imag_value_file_name(tip_str, circuit_id)
if imag_file_name != None :
self.modify_object_style_sheet(indicator_Label, "background-image", "url(%s)"%(imag_file_name))
else :
self.reset_object_style_sheet(indicator_Label)
camera_id, canvas_id = get_camera_id_and_canvas_id(action)
self.virtual_connect_canvas_to_camera(canvas_id, camera_id)
#P00设备列表页面
class QDeviceListPage(PageTemplate):
def __init__(self, parent_window):
PageTemplate.__init__(self, parent_window)
self.load_window_ui(uiFile_P00DeviceList)
# 回路1测试开关合闸1总开关0无报警
json_dict0 = { "SwitchStatus" : {"value" :"1", "bk_color" : COLOR_NORMAL, "color" : COLOR_RED},
"SwitchType" : {"value" :"0", "bk_color" : COLOR_NORMAL, "color" : COLOR_RED},
"AlarmMessage" : {"value" :"无故障","type" : "alarm", "color" : COLOR_ALARM_NORMAL},
"Voltage" : {"value" :"1143", "color" : COLOR_VALUE_NORMAL_YELLOW},
"Current" : {"value" :"60", "color" : COLOR_VALUE_NORMAL_READ},
"Power" : {"value" :"20", "color" : COLOR_VALUE_NORMAL_BLUE},
"Electricity" : {"value" :"45321", "color" : COLOR_VALUE_NORMAL_GREEN},
}
# 回路2测试开关合闸1分开关1无报警
json_dict1 = { "SwitchStatus" : {"value" :"0", "bk_color" : COLOR_NORMAL, "color" : COLOR_RED},
"SwitchType" : {"value" :"1", "bk_color" : COLOR_NORMAL, "color" : COLOR_RED},
"AlarmMessage" : {"value" :"开关未到位","type" : "alarm","color" : COLOR_ALARM_ERROR_TEXT},
"Voltage" : {"value" :"1120", "color" : COLOR_VALUE_NORMAL_YELLOW},
"Current" : {"value" :"45", "color" : COLOR_VALUE_NORMAL_READ},
"Power" : {"value" :"30", "color" : COLOR_VALUE_NORMAL_BLUE},
"Electricity" : {"value" :"46321", "color" : COLOR_VALUE_NORMAL_GREEN},
}
# 回路3测试开关合闸1总开关0无报警
json_dict2 = { "SwitchStatus" : {"value" :"0", "bk_color" : COLOR_NORMAL, "color" : COLOR_RED},
"SwitchType" : {"value" :"1", "bk_color" : COLOR_NORMAL, "color" : COLOR_RED},
"AlarmMessage" : {"value" :"正常运行,无告警信息","type" : "alarm", "color" : COLOR_ALARM_NORMAL},
"Voltage" : {"value" :"1065", "color" : COLOR_VALUE_NORMAL_YELLOW},
"Current" : {"value" :"30", "color" : COLOR_VALUE_NORMAL_READ},
"Power" : {"value" :"16", "color" : COLOR_VALUE_NORMAL_BLUE},
"Electricity" : {"value" :"1321", "color" : COLOR_VALUE_NORMAL_GREEN},
}
# 回路4测试开关合闸1总开关0无报警
json_dict3 = { "SwitchStatus" : {"value" :"1", "bk_color" : COLOR_NORMAL, "color" : COLOR_RED},
"SwitchType" : {"value" :"1", "bk_color" : COLOR_NORMAL, "color" : COLOR_RED},
"AlarmMessage" : {"value" :"正常运行,无告警信息","type" : "alarm", "color" : COLOR_ALARM_NORMAL},
"Voltage" : {"value" :"1141", "color" : COLOR_VALUE_NORMAL_YELLOW},
"Current" : {"value" :"61", "color" : COLOR_VALUE_NORMAL_READ},
"Power" : {"value" :"26", "color" : COLOR_VALUE_NORMAL_BLUE},
"Electricity" : {"value" :"32321", "color" : COLOR_VALUE_NORMAL_GREEN},
}
# 回路5测试开关合闸1总开关0无报警
json_dict4 = { "SwitchStatus" : {"value" :"1", "bk_color" : COLOR_NORMAL, "color" : COLOR_RED},
"SwitchType" : {"value" :"1", "bk_color" : COLOR_NORMAL, "color" : COLOR_RED},
"AlarmMessage" : {"value" :"正常运行,无告警信息","type" : "alarm", "color" : COLOR_ALARM_NORMAL},
"Voltage" : {"value" :"2133", "color" : COLOR_VALUE_NORMAL_YELLOW},
"Current" : {"value" :"61", "color" : COLOR_VALUE_NORMAL_READ},
"Power" : {"value" :"26", "color" : COLOR_VALUE_NORMAL_BLUE},
"Electricity" : {"value" :"4453", "color" : COLOR_VALUE_NORMAL_GREEN},
}
# 回路6测试开关合闸1总开关0无报警
json_dict5 = { "SwitchStatus" : {"value" :"1", "bk_color" : COLOR_NORMAL, "color" : COLOR_RED},
"SwitchType" : {"value" :"1", "bk_color" : COLOR_NORMAL, "color" : COLOR_RED},
"AlarmMessage" : {"value" :"正常运行,无告警信息","type" : "alarm", "color" : COLOR_ALARM_NORMAL},
"Voltage" : {"value" :"1212", "color" : COLOR_VALUE_NORMAL_YELLOW},
"Current" : {"value" :"61", "color" : COLOR_VALUE_NORMAL_READ},
"Power" : {"value" :"27", "color" : COLOR_VALUE_NORMAL_BLUE},
"Electricity" : {"value" :"11212", "color" : COLOR_VALUE_NORMAL_GREEN},
}
# 回路7测试开关合闸1总开关0无报警
json_dict6 = { "SwitchStatus" : {"value" :"1", "bk_color" : COLOR_NORMAL, "color" : COLOR_RED},
"SwitchType" : {"value" :"1", "bk_color" : COLOR_NORMAL, "color" : COLOR_RED},
"AlarmMessage" : {"value" :"正常运行,无告警信息","type" : "alarm", "color" : COLOR_ALARM_NORMAL},
"Voltage" : {"value" :"1564", "color" : COLOR_VALUE_NORMAL_YELLOW},
"Current" : {"value" :"61", "color" : COLOR_VALUE_NORMAL_READ},
"Power" : {"value" :"32", "color" : COLOR_VALUE_NORMAL_BLUE},
"Electricity" : {"value" :"45341", "color" : COLOR_VALUE_NORMAL_GREEN},
}
# self.flush_widgets(0, json_dict0)
# self.flush_widgets(1, json_dict1)
# self.flush_widgets(2, json_dict2)
# self.flush_widgets(3, json_dict3)
# # self.flush_widgets(4, json_dict4)
# # self.flush_widgets(5, json_dict5)
# # self.flush_widgets(6, json_dict6)
#P01设备列表页面
class QDeviceMenuPage(PageTemplate):
def __init__(self, parent_window):
PageTemplate.__init__(self, parent_window)
self.load_window_ui(uiFile_P01DeviceMenu)
#P02数据查看页面
class QDataViewPage(PageTemplate):
def __init__(self, parent_window):
PageTemplate.__init__(self, parent_window)
self.load_window_ui(uiFile_P02DataView)
json_dict0 = {"Uab" :"1111", "Ubc" : "1111", "Uca" : "1111", "Uo" : "11" , "Iab" : "1111", "Ibc" : "1111", "Ica" : "1111", "Io" : "1111"}
json_dict1 = {"Uab" :"2222", "Ubc" : "2222", "Uca" : "2222", "Uo" : "22" , "Iab" : "2222", "Ibc" : "2222", "Ica" : "2222", "Io" : "2222"}
json_dict2 = {"Uab" :"3333", "Ubc" : "3333", "Uca" : "3333", "Uo" : "33" , "Iab" : "3333", "Ibc" : "3333", "Ica" : "3333", "Io" : "3333"}
# self.flush_widgets(0, json_dict0)
# self.flush_widgets(1, json_dict1)
# self.flush_widgets(2, json_dict2)
self.camera_str = "0"
def virtual_on_page_enter(self):
children = self.findChildren(QLabel)
for child in children:
if child.objectName() == "TestCameraView" :
status_str = child.statusTip()
get_circuit = self.get_circuit_from_object(child)
print(status_str, get_circuit)
new_status_str = status_str.replace(f"camera{self.camera_str}", f"camera{get_circuit}")
self.camera_str = get_circuit
self.parent_window.camera_signal.emit(int(get_circuit))
child.setStatusTip(new_status_str)
#P03开关操作页面
class QSwitchActionPage(PageTemplate):
def __init__(self, parent_window):
PageTemplate.__init__(self, parent_window)
self.load_window_ui(uiFile_P03SwitchAction)
json_dict0 = {"SwitchStatus" : {"value" :"1", "bk_color" : COLOR_NORMAL, "color" : COLOR_RED}}
json_dict1 = {"SwitchStatus" : {"value" :"0", "bk_color" : COLOR_NORMAL, "color" : COLOR_RED}}
json_dict2 = {"SwitchStatus" : {"value" :"1", "bk_color" : COLOR_NORMAL, "color" : COLOR_RED}}
# self.flush_widgets(0, json_dict0)
# self.flush_widgets(1, json_dict1)
# self.flush_widgets(2, json_dict2)
self.camera_str = "0"
def virtual_on_page_enter(self):
children = self.findChildren(QLabel)
for child in children:
if child.objectName() == "TestCameraView" :
status_str = child.statusTip()
get_circuit = self.get_circuit_from_object(child)
print(status_str, get_circuit)
new_status_str = status_str.replace(f"camera{self.camera_str}", f"camera{get_circuit}")
self.camera_str = get_circuit
self.parent_window.camera_signal.emit(int(get_circuit))
child.setStatusTip(new_status_str)
#P04参数设置页面
class QParamSetPage(PageTemplate):
def __init__(self, parent_window):
PageTemplate.__init__(self, parent_window)
self.load_window_ui(uiFile_P04ParamSet)
json_dict0 = {"Uab" :"1000", "Ubc" : "5", "Uca" : "50"}
json_dict1 = {"Uab" :"1111", "Ubc" : "111", "Uca" : "111"}
json_dict2 = {"Uab" :"2222", "Ubc" : "222", "Uca" : "222"}
# self.flush_widgets(0, json_dict0)
# self.flush_widgets(1, json_dict1)
# self.flush_widgets(2, json_dict2)
'''
# 查找 QLineEdit 控件
self.line_edit = self.findChild(QLineEdit)
if self.line_edit:
# 重写 QLineEdit 的鼠标按下事件
self.line_edit.mousePressEvent = self.on_line_edit_clicked
def on_line_edit_clicked(self, event):
self.key_enter_process()
# 调用原始的鼠标按下事件处理方法
QLineEdit.mousePressEvent(self.line_edit, event)
'''
#P05视频查看页面 QFaultQueryPage
class QCameraViewPage(PageTemplate):
def __init__(self, parent_window):
PageTemplate.__init__(self, parent_window)
self.load_window_ui(uiFile_P05CameraView)
def virtual_on_page_enter(self) :
self.parent_window.camera_signal.emit(0)
#P05_01人脸视频查看页面
class QFaceCameraViewPage(PageTemplate):
def __init__(self, parent_window):
PageTemplate.__init__(self, parent_window)
self.load_window_ui(uiFile_P05_01_FaceCameraView)
# 读取配置
self.default_baud = getattr(group_config, "BAUDRATE", 115200)
# 串口/线程变量
self.ser = None
self.worker = SerialWorker(lambda: self.ser)
self.worker.frame_received.connect(self.on_frame)
self.worker.log_message.connect(self.log)
self.worker.start()
self.video_worker = None
self.last_port = None
self.last_enroll_name = ""
self.current_video_mode = 0
self.current_face_box = 0
# 定时器:串口监控 & 视频重连
self.t_port = QTimer(self)
self.t_port.setInterval(1000)
self.t_port.timeout.connect(self._port_tick)
self.t_port.start()
self.t_video = QTimer(self)
self.t_video.setInterval(2000)
self.t_video.timeout.connect(self._check_video)
self.t_video.start()
self.face_verify_result = None # 用于存储人脸验证结果
self.verify_in_progress = False # 标志当前是否有验证在进行中
self.auto_connect_serial()
self.refresh()
header = self.table.horizontalHeader()
# 用户ID列固定宽度
self.table.setColumnWidth(0, 100)
# 用户名列自适应内容
header.setSectionResizeMode(1, QHeaderView.ResizeToContents)
# 注册时间列填充剩余空间
header.setSectionResizeMode(2, QHeaderView.Stretch)
# 串口管理
def auto_connect_serial(self):
ports = [p.device for p in serial.tools.list_ports.comports()]
if not ports:
self.log("[WARN] 未检测到任何串口设备")
return
sys_platform = platform.system()
preferred_port = group_config.DEFAULT_PORT.get(sys_platform)
if preferred_port and preferred_port in ports:
self.last_port = preferred_port
self.log(f"[INFO] 已选择默认串口 {preferred_port},等待自动连接")
else:
self.log(f"[WARN] {sys_platform} 平台未找到配置串口或设备不在列表: {ports}")
def _port_tick(self):
ports = {p.device for p in serial.tools.list_ports.comports()}
# 如果串口对象不存在或已关闭
if not self.ser or not getattr(self.ser, "is_open", False):
# 串口掉线,自动关闭
if self.ser: # 如果之前有串口对象
self.close_serial()
# 尝试自动重连
if self.last_port in ports:
self.open_serial(self.last_port)
def open_serial(self, port=None):
port = port or self.cb_port.currentText() or self.last_port
if not port:
self.log("[WARN] 没有选择串口")
return
try:
baud = self.default_baud
self.ser = serial.Serial(port, baudrate=baud, timeout=0.05)
self.last_port = port
# self.btn_conn.setText("断开")
self.log(f"[INFO] 已连接 {port} @ {baud}")
except Exception as e:
self.log(f"[ERR] 打开串口失败: {e}")
self.ser = None
def close_serial(self):
if self.ser:
try:
self.ser.close()
except:
pass
self.ser = None
self.log("[INFO] 串口已关闭")
def toggle_conn(self):
if self.ser and getattr(self.ser, "is_open", False):
self.close_serial()
else:
self.open_serial()
def send(self, frame: bytes):
if not self.ser or not getattr(self.ser, "is_open", False):
inform_box : DialogInform = DialogInform()
inform_box.information("提示", "请先连接串口")
return
try:
self.ser.write(frame)
self.log("> " + frame.hex(" "))
except Exception as e:
self.log(f"[ERR] write: {e}")
# ---------------- 定时器检测 ----------------
def _check_video(self):
if self.video_worker and not self.video_worker.isRunning():
self.log("[WARN] 摄像头线程已停止")
self.video_worker=None # 自动重连可在这里实现
def on_frame(self, fr: dict):
"""
接收到一帧数据后的处理
"""
raw_bytes = fr.get("raw", b"")
self.log("< " + raw_bytes.hex(" "))
msg_id = fr.get("msg_id")
data = fr.get("data", b"")
if msg_id == MID_REPLY:
info = parse_reply(data)
self.log(f"[REPLY] {info}")
mid = info.get("mid")
result = info.get("result")
# 命令分发字典
dispatch_reply = {
CMD_ENROLL_SINGLE: self._handle_enroll_reply,
CMD_VERIFY: self._handle_verify_reply,
CMD_GET_ALL_USERID :self._handle_get_all_userid_reply,
}
handler = dispatch_reply.get(mid)
if handler:
handler(info)
elif msg_id == MID_NOTE:
info = parse_note(data)
self.log(f"[NOTE] {info}")
self._handle_note(info)
# ---------------- NOTE 处理 ----------------
def _handle_note(self, info: dict):
"""
处理 NOTE 消息,给用户交互提示
"""
user_message = info.get("user_message")
nid = info.get("nid")
# 仅在验证进行中时提示 NOTE
if getattr(self, "verify_in_progress", False):
if user_message:
DialogInform(self).information("提示", user_message)
# 日志记录(工程用)
if nid == NID_FACE_STATE:
state = info.get("state")
self.log(f"[INFO] 人脸状态: {state}, yaw={info.get('yaw')}, pitch={info.get('pitch')}, roll={info.get('roll')}")
elif nid == NID_READY:
self.log("[INFO] 模组已就绪")
elif nid == NID_OTA_DONE:
self.log("[INFO] 固件升级完成")
elif nid == NID_UNKNOWNERROR:
self.log("[ERROR] 模组发生未知错误")
# ---------------- REPLY 处理 ----------------
def _handle_enroll_reply(self, info: dict):
user_id = info.get("user_id")
if info.get("result") == 0x00:
if user_id:
if save_user(user_id):
self.refresh()
self.log(f"[INFO] 用户ID={user_id} 已保存")
else:
DialogInform(self).information("提示", f"用户ID {user_id} 已存在!")
else:
self.log(f"[ERROR] 注册失败, 用户ID={user_id}")
def _handle_verify_reply(self, info: dict):
"""
验证结果处理
"""
user_id = info.get("user_id")
result = info.get("result")
# 验证结束,状态机复位
self.verify_in_progress = False
if result == 0x00:
print(f"[INFO] 用户(ID={user_id}) 验证通过")
self.face_verify_result = True
else:
print(f"[INFO] 用户(ID={user_id}) 验证失败")
self.face_verify_result = False
def _handle_get_all_userid_reply(self, info: dict):
count = info.get("count")
result = info.get("result")
user_ids = info.get("user_ids", [])
if result == 0x00:
DialogInform(self).information("提示", f"{count} 个用户\n用户ID 列表:" + ", ".join(map(str, user_ids)))
# 日志
def log(self, s: str):
ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.txt_log.append(f"[{ts}] {s}")
self.txt_log.moveCursor(self.txt_log.textCursor().End)
pass
def save_log(self):
path, _ = QFileDialog.getSaveFileName(self, "保存日志", "", "Text files (*.txt)")
if not path:
return
with open(path, "w", encoding="utf-8") as f:
f.write(self.txt_log.toPlainText())
self.log(f"[INFO] 日志已保存到 {path}")
def closeEvent(self,e):
if self.video_worker and self.video_worker.isRunning():
self.video_worker.stop()
self.video_worker.wait(300)
if self.ser and getattr(self.ser,"is_open",False):
self.close_serial()
super().closeEvent(e)
#P06故障查询页面 QFaultQueryPage
class QFaultQueryPage(PageTemplate):
def __init__(self, parent_window):
PageTemplate.__init__(self, parent_window)
self.load_window_ui(uiFile_P06FaultQuery)
self.alarm_index = 0
def virtual_on_page_enter(self) :
self.alarm_index = 0
self.query_alarm_index()
self.timer = QTimer()
self.timer.timeout.connect(self.query_alarm_index)
self.timer.start(4000)
def virtual_on_page_leave(self) :
self.alarm_index = 0
def query_alarm_index(self):
circuit_id = self.get_page_circuit()
unique_name = self.get_unique_name_from_circuit(circuit_id)
if unique_name != None :
publish_topic = "request/alarm/" + unique_name
publish_message = '{"index" : "%s"}'%(self.alarm_index)
self.mqtt_publish_and_wait_response(publish_topic, publish_message, object, 1000)
#action 处理虚函数, 只适用于故障查询页面
def virtual_widget_action_process(self, widget : QWidget, action : str) :
PageTemplate.virtual_widget_action_process(self, widget, action)
alarm_adjust = 0
if "CmdAlarmNext" in action :
alarm_adjust = 1
elif "CmdAlarmPrev" in action :
alarm_adjust = -1
if alarm_adjust != 0 :
self.alarm_index = self.alarm_index + alarm_adjust
if self.alarm_index < 0 :
self.alarm_index = 0
self.query_alarm_index()
#P07系统参数设置页面
class QSystemSetPage(PageTemplate):
def __init__(self, parent_window):
PageTemplate.__init__(self, parent_window)
self.load_window_ui(uiFile_P07SystemSet)
#P08现场试验页面
class QFieldTestAction(PageTemplate):
def __init__(self, parent_window):
PageTemplate.__init__(self, parent_window)
self.load_window_ui(uiFile_P08FieldTestAction)
# P11照明菜单选择
class QLightMenuPage(PageTemplate):
def __init__(self, parent_window):
PageTemplate.__init__(self, parent_window)
self.load_window_ui(uiFile_P11LightDeviceMenu)
# P12照明运行数据页面
class QRunDataPage(PageTemplate):
def __init__(self, parent_window):
PageTemplate.__init__(self, parent_window)
self.load_window_ui(uiFile_P12LightDataView)
self.camera_str = "0"
def virtual_on_page_enter(self):
children = self.findChildren(QLabel)
for child in children:
if child.objectName() == "TestCameraView" :
status_str = child.statusTip()
get_circuit = self.get_circuit_from_object(child)
print(status_str, get_circuit)
new_status_str = status_str.replace(f"camera{self.camera_str}", f"camera{get_circuit}")
self.camera_str = get_circuit
self.parent_window.camera_signal.emit(int(get_circuit))
child.setStatusTip(new_status_str)
# P13照明开关操作页面
class QLightSwitchActionPage(PageTemplate):
def __init__(self, parent_window):
PageTemplate.__init__(self, parent_window)
self.load_window_ui(uiFile_P13LightSwitchAction)
self.camera_str = "0"
def virtual_on_page_enter(self):
children = self.findChildren(QLabel)
for child in children:
if child.objectName() == "TestCameraView" :
status_str = child.statusTip()
get_circuit = self.get_circuit_from_object(child)
print(status_str, get_circuit)
new_status_str = status_str.replace(f"camera{self.camera_str}", f"camera{get_circuit}")
self.camera_str = get_circuit
self.parent_window.camera_signal.emit(int(get_circuit))
child.setStatusTip(new_status_str)
# P14照明参数设置页面
class QLightParamSetPage(PageTemplate):
def __init__(self, parent_window):
PageTemplate.__init__(self, parent_window)
self.load_window_ui(uiFile_P14LightParamSet)
# P16照明故障查询
class QLightFaultQuery(PageTemplate):
def __init__(self, parent_window):
PageTemplate.__init__(self, parent_window)
self.load_window_ui(uiFile_P16LightFaultQuery)
def virtual_on_page_enter(self) :
self.alarm_index = 0
self.query_alarm_index()
self.timer = QTimer()
self.timer.timeout.connect(self.query_alarm_index)
self.timer.start(4000)
def virtual_on_page_leave(self) :
self.alarm_index = 0
def query_alarm_index(self):
circuit_id = self.get_page_circuit()
unique_name = self.get_unique_name_from_circuit(circuit_id)
if unique_name != None :
publish_topic = "request/alarm/" + unique_name
publish_message = '{"index" : "%s"}'%(self.alarm_index)
self.mqtt_publish_and_wait_response(publish_topic, publish_message, object, 1000)
#action 处理虚函数, 只适用于故障查询页面
def virtual_widget_action_process(self, widget : QWidget, action : str) :
PageTemplate.virtual_widget_action_process(self, widget, action)
alarm_adjust = 0
if "CmdAlarmNext" in action :
alarm_adjust = 1
elif "CmdAlarmPrev" in action :
alarm_adjust = -1
if alarm_adjust != 0 :
self.alarm_index = self.alarm_index + alarm_adjust
if self.alarm_index < 0 :
self.alarm_index = 0
self.query_alarm_index()
# P17照明系统参数
class QLightSystemSet(PageTemplate):
def __init__(self, parent_window):
PageTemplate.__init__(self, parent_window)
self.load_window_ui(uiFile_P17LightSystemSet)
# P18现场试验页面
class QLightFieldTestAction(PageTemplate):
def __init__(self, parent_window):
PageTemplate.__init__(self, parent_window)
self.load_window_ui(uiFile_P18LightFieldTestAction)
class APPWindow(QMainWindow):
camera_signal = pyqtSignal(int)
def __init__(self):
super().__init__()
# 获取屏幕分辨率
screen = QDesktopWidget().screenGeometry()
screen_width = screen.width()
screen_height = screen.height()
self.stack = QStackedWidget(self)
self.setGeometry(0, 0, screen_width, screen_height) # 设置窗口大小为屏幕分辨率
self.stack.setGeometry(0, 0, screen_width, screen_height) # 设置堆叠窗口大小为屏幕分辨率
#调试的时候用下面两行,全屏就注释
# self.setGeometry(0, 0, 1024, 768)
# self.stack.setGeometry(0, 0, 1024, 768)
QFaceCameraViewPage.video_worker = None
QFaceCameraViewPage.ser = None
self.showFullScreen()
self.menu_sequence_list : PageTemplate = []
self.camera_thread_list : CameraThread = [None, None, None, None, None, None, None, None, None]
self.allpages_list : PageTemplate = []
self.P00_DeviceList : PageTemplate = QDeviceListPage(parent_window = self)
self.P01_DeviceMenu : PageTemplate = QDeviceMenuPage(parent_window = self)
self.P02_DataView : PageTemplate = QDataViewPage(parent_window = self)
self.P03_SwitchAction : PageTemplate = QSwitchActionPage(parent_window = self)
self.P04_ParamSet : PageTemplate = QParamSetPage(parent_window = self)
self.P05_CameraView : PageTemplate = QCameraViewPage(parent_window = self)
self.P05_01_FaceCameraView : PageTemplate = QFaceCameraViewPage(parent_window = self)
self.P06_FaultQuery : PageTemplate = QFaultQueryPage(parent_window = self)
self.P07_SystemSet : PageTemplate = QSystemSetPage(parent_window = self)
self.P08_FieldTestAction : PageTemplate = QFieldTestAction(parent_window = self)
self.P11_DeviceMenu : PageTemplate = QLightMenuPage(parent_window = self)
self.P12_DateView : PageTemplate = QRunDataPage(parent_window = self)
self.P13_SwitchAction : PageTemplate = QLightSwitchActionPage(parent_window = self)
self.P14_ParamSet : PageTemplate = QLightParamSetPage(parent_window = self)
self.P16_LightFaultQuery : PageTemplate = QLightFaultQuery(parent_window = self)
self.P17_LightSystemSet : PageTemplate = QLightSystemSet(parent_window = self)
self.P18_LightFieldTestAction : PageTemplate = QLightFieldTestAction(parent_window = self)
self.allpages_list.append(self.P00_DeviceList)
self.allpages_list.append(self.P01_DeviceMenu)
self.allpages_list.append(self.P02_DataView)
self.allpages_list.append(self.P03_SwitchAction)
self.allpages_list.append(self.P04_ParamSet)
self.allpages_list.append(self.P05_CameraView)
self.allpages_list.append(self.P05_01_FaceCameraView)
self.allpages_list.append(self.P06_FaultQuery)
self.allpages_list.append(self.P07_SystemSet)
self.allpages_list.append(self.P08_FieldTestAction)
self.allpages_list.append(self.P11_DeviceMenu)
self.allpages_list.append(self.P12_DateView)
self.allpages_list.append(self.P13_SwitchAction)
self.allpages_list.append(self.P14_ParamSet)
self.allpages_list.append(self.P16_LightFaultQuery)
self.allpages_list.append(self.P17_LightSystemSet)
self.allpages_list.append(self.P18_LightFieldTestAction)
self.stack.addWidget(self.P00_DeviceList)
self.stack.addWidget(self.P01_DeviceMenu)
self.stack.addWidget(self.P02_DataView)
self.stack.addWidget(self.P03_SwitchAction)
self.stack.addWidget(self.P04_ParamSet)
self.stack.addWidget(self.P05_CameraView)
self.stack.addWidget(self.P05_01_FaceCameraView)
self.stack.addWidget(self.P06_FaultQuery)
self.stack.addWidget(self.P07_SystemSet)
self.stack.addWidget(self.P08_FieldTestAction)
self.stack.addWidget(self.P11_DeviceMenu)
self.stack.addWidget(self.P12_DateView)
self.stack.addWidget(self.P13_SwitchAction)
self.stack.addWidget(self.P14_ParamSet)
self.stack.addWidget(self.P16_LightFaultQuery)
self.stack.addWidget(self.P17_LightSystemSet)
self.stack.addWidget(self.P18_LightFieldTestAction)
test_init = system_parameter()
set_screen_blanking_time(test_init.get_screen_blanking_time())
def search_page_widget(self, page) :
for list_item in self.allpages_list:
window_page : UIFrameWork = list_item
if window_page.page == page :
return window_page
return None
def connect_camera_to_canvas(self, page_widget : UIFrameWork, camera_id: int , canvas_id : int, face_detect : int = 0) :
if camera_id >= 0 and canvas_id >= 0 and canvas_id < len(self.camera_thread_list) :
camera_thread : CameraThread = self.camera_thread_list[camera_id]
if camera_thread != None :
page_widget.connect_camera_thread(camera_thread, canvas_id)
camera_thread.face_detection = face_detect
def create_all_camera_thread(self) :
for camera_id in range(len(self.camera_thread_list)) :
_camera_url = search_camera_url(camera_id)
if _camera_url != None :
camera_thread = CameraThread(_camera_url, camera_id)
camera_thread.set_video_cycle_ms(10)
self.camera_thread_list[camera_id] = camera_thread
self.camera_signal.connect(camera_thread.change_camera_url)
#摄像头与 界面进行绑定
for index in range(self.stack.count()) :
page_widget : UIFrameWork = self.stack.widget(index)
page_widget.connect_matched_camera_canvas()
time.sleep(0.1)
#开启多个摄像头线程
for camera_thread in self.camera_thread_list:
thread_to_start : CameraThread = camera_thread
if thread_to_start != None :
thread_to_start.start()
time.sleep(0.01)
def get_page_currentwidget(self) :
return self.stack.currentWidget().sort_menu_list[self.stack.currentWidget().menu_key_index][1]
def closeEvent(self, event):
reply = QMessageBox.question(self, '确认', '确定要退出吗?',
QMessageBox.Yes | QMessageBox.No)
if reply == QMessageBox.Yes:
for camera_thread in self.camera_thread_list:
thread_to_stop : CameraThread = camera_thread
if QFaceCameraViewPage.video_worker and QFaceCameraViewPage.video_worker.isRunning():
QFaceCameraViewPage.video_worker.stop()
QFaceCameraViewPage.video_worker.wait(300)
if QFaceCameraViewPage.ser and getattr(QFaceCameraViewPage.ser,"is_open",False):
self.close_serial()
if thread_to_stop != None :
thread_to_stop.close()
for camera_thread in self.camera_thread_list:
thread_to_stop : CameraThread = camera_thread
if thread_to_stop != None :
thread_to_stop.wait()
event.accept() # 接受关闭事件
else:
event.ignore() # 忽略关闭事件
if __name__ == '__main__':
#创建mqtt线程
fvalue = float('06.00')
value = round(fvalue)
_user_name = utils.dict_or_object_get_attr(group_config.mqtt_server, "user_name", "admin")
_password = utils.dict_or_object_get_attr(group_config.mqtt_server, "password", "admin")
_server = utils.dict_or_object_get_attr(group_config.mqtt_server, "remote", "127.0.0.1")
_port = utils.dict_or_object_get_attr(group_config.mqtt_server, "port", 1883)
global_mqtt_thread = class_comm_mqtt_thread()
global_mqtt_thread.set_mqtt_server(server = _server, port = _port, keep_alive = 60, user_name=_user_name, password=_password)
app = QApplication([])
app_main_window = APPWindow()
app_main_window.show()
app_main_window.create_all_camera_thread()
for config_dict in group_config.comm_thread_config :
device_list = utils.dict_or_object_get_attr(config_dict, "device_list", None)
if device_list != None :
for item_dict in device_list :
circuit_id = utils.dict_or_object_get_attr(item_dict, "circuit_id", -1)
circuit_unique_name = utils.dict_or_object_get_attr(item_dict, "unique_name", None)
if circuit_unique_name != None :
global_mqtt_thread.add_unique_object(circuit_unique_name, app_main_window.allpages_list)
global_mqtt_thread.start()
app.exec_()
global_mqtt_thread.close()
global_mqtt_thread.join()
sys.exit(0)