# This Python file uses the following encoding: utf-8 import sys import os import time import csv import inspect import cv2 import json from itertools import chain from PyQt5 import uic from PyQt5.QtGui import QImage, QPixmap, QColor,QBrush, QKeySequence, QIcon, QPalette from PyQt5.QtCore import Qt, QThread, pyqtSignal, QObject, QRunnable, QMutex, QTimer, QEvent, QSize, QDateTime from PyQt5.QtWidgets import QApplication, QMainWindow, QVBoxLayout, QFrame, QWidget, QLayout, QLabel from PyQt5.QtWidgets import QLineEdit, QPushButton, QMessageBox, QShortcut, QDialog,QTableWidgetItem sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from Shared_CODE.CameraThread import CameraThread import menu_utils as utils import uart_group_config as group_config from mqtt_device import class_comm_mqtt_thread, class_comm_mqtt_interface from Shared_CODE.DialogModifyValue import DialogModifyValue from Shared_CODE.DialogModifyAlias import DialogModifyAlias from Shared_CODE.DialogModifyText import DialogModifyText from Shared_CODE.DialogInform import DialogInform from QT5_Project.Shared_CODE.FaceRecognitionProtocol import parse_reply from QT5_Project.Shared_CODE.DialogFaceEnrollItgSingle import EnrollItgSingleDialog from QT5_Project.Shared_CODE.DialogFaceUserManage import UserManageDialog, load_users, save_user, save_users_list, CSV_FILE from Shared_CODE.get_tip_prop import * from print_color import * import time from PyQt5.QtCore import QDateTime import re from datetime import datetime import platform import subprocess from typing import Callable, Optional # sys.path.append(sys.path[0] + "/../..") # sys_path = sys.path[0].replace("\\", "/") import serial import serial.tools.list_ports from Shared_CODE.FaceRecognitionProtocol import ( build_reset, build_uvc_view, build_face_view, build_verify, build_enroll_itg_single, build_delete_all, build_get_all_userid, build_delete_user, unpack_frame, parse_reply, parse_note, MID_REPLY, MID_NOTE, CMD_ENROLL, CMD_ENROLL_ITG ) # 设置 img 目录的路径 img_path = os.path.abspath(os.path.join(os.path.dirname(__file__), 'background')) COLOR_RED = QColor("#EE2D2D").name() COLOR_GREEN = QColor(Qt.green).name() COLOR_BLUE = QColor(Qt.blue).name() COLOR_YELLOW = QColor(Qt.yellow).name() COLOR_NORMAL = QColor("#000000").name() COLOR_ALARM_NORMAL = QColor("#31D19E").name() COLOR_ALARM_ERROR_TEXT = QColor("#B1E5FC").name() COLOR_ALARM_ERROR_BG = QColor("#E17176").name() COLOR_VALUE_NORMAL = QColor("#31D19E").name() ALIAS_QUERY_NONE = 0 ALIAS_QUERY_WAIT_RESPOINSE = 1 #设置系统时间 def is_linux(): return platform.system() == 'Linux' def set_system_time(year, month, day, hour, minute, second): if is_linux(): try: new_time = datetime(year, month, day, hour, minute, second).strftime("%Y-%m-%d %H:%M:%S") subprocess.run(f'sudo timedatectl set-time "{new_time}"', shell=True, check=True) print(f"System time updated to {new_time}") except subprocess.CalledProcessError as e: print(f"Failed to update system time: {e}") else: import ctypes import ctypes.wintypes TIME_ZONE_ID_UNKNOWN = 0 TIME_ZONE_ID_STANDARD = 1 TIME_ZONE_ID_DAYLIGHT = 2 class SYSTEMTIME(ctypes.Structure): _fields_ = [ ("wYear", ctypes.wintypes.WORD), ("wMonth", ctypes.wintypes.WORD), ("wDayOfWeek", ctypes.wintypes.WORD), ("wDay", ctypes.wintypes.WORD), ("wHour", ctypes.wintypes.WORD), ("wMinute", ctypes.wintypes.WORD), ("wSecond", ctypes.wintypes.WORD), ("wMilliseconds", ctypes.wintypes.WORD), ] system_time = SYSTEMTIME() ctypes.windll.kernel32.GetSystemTime(ctypes.byref(system_time)) system_time.wYear = year system_time.wMonth = month system_time.wDay = day system_time.wHour = hour system_time.wMinute = minute system_time.wSecond = second ctypes.windll.kernel32.SetSystemTime(ctypes.byref(system_time)) print(f"System time updated to {year}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}") def set_screen_blanking_time(minutes): """ 设置屏幕黑屏时间:param minutes: 屏幕黑屏时间,以分钟为单位 """ if is_linux(): # 如果是 Linux 系统,使用 xfconf-query 命令设置 Xfce 的屏幕黑屏时间 try: # Xfce 的屏幕黑屏时间是以秒为单位 seconds = int(minutes) * 60 subprocess.run([ 'xfconf-query', '-c', 'xfce4-power-manager', '-p', '/xfce4-power-manager/blank-on-ac', '-s', str(seconds), '-t', 'int', '--create' ], check=True) print(f"Xfce屏幕黑屏时间设置为 {minutes} 分钟") except subprocess.CalledProcessError as e: print(f"设置Xfce屏幕黑屏时间失败: {e}") except FileNotFoundError: print("xfconf-query 命令未找到,请确保已安装 xfce4-power-manager。") elif platform.system() == 'Windows': # 如果是 Windows 系统,使用 powercfg 命令设置屏幕黑屏时间 try: # 设置交流电源时的屏幕黑屏时间 subprocess.run(['powercfg', '-change', '-monitor-timeout-ac', str(minutes)], check=True) # 设置电池供电时的屏幕黑屏时间 subprocess.run(['powercfg', '-change', '-monitor-timeout-dc', str(minutes)], check=True) print(f"屏幕黑屏时间设置为 {minutes} 分钟 (适用于交流电和电池供电)") except subprocess.CalledProcessError as e: print(f"设置屏幕黑屏时间失败: {e}") else: print("当前操作系统不支持此操作。") def json_load_message(message) : json_dict = {} if isinstance(message, bytes) : json_dict = json.loads(message.decode('utf-8')) elif isinstance(message, str) : json_dict = json.loads(message.encode('utf-8')) else : json_dict = json.loads(message) return json_dict def modify_style_sheet(type_name : str, origin_style_sheet : str, lead_str : str, value_str : str) : if len(value_str) == 0 : new_style_items = "" else : new_style_items = "%s:%s;"%(lead_str, value_str) style_splits = origin_style_sheet.split("{") if len(style_splits) == 1 : style_items_str = style_splits[0] elif len(style_splits) == 2 : style_items_str = style_splits[1] else : style_items_str = "" style_items_str = style_items_str.replace("}", "") prop_splits = style_items_str.split(";") for each_style_prop in prop_splits : if lead_str in each_style_prop : key_value_splits = each_style_prop.split(":") cmp_str = key_value_splits[0] cmp_str = cmp_str.replace(" ", "") cmp_str = cmp_str.replace("\n", "") if lead_str == cmp_str : continue if len(each_style_prop) > 0 : new_style_items += each_style_prop new_style_items += ";" if len(type_name) == 0: return "%s"%(new_style_items) else : return "%s {%s}"%(type_name, new_style_items) def get_key_combine_str(str1, str2) : return str1 + "," + str2 def search_circuit_from_topic(topic : str) : circuit_id = -1 for config_dict in group_config.comm_thread_config : device_list = utils.dict_or_object_get_attr(config_dict, "device_list", None) if device_list != None : for item_dict in device_list : config_circuit_id = utils.dict_or_object_get_attr(item_dict, "circuit_id", -1) circuit_unique_name = utils.dict_or_object_get_attr(item_dict, "unique_name", None) if circuit_unique_name in topic : circuit_id = config_circuit_id break if circuit_id >= 0 : break return circuit_id def get_imag_value_file_name(tip_str : str, value : int) : file_name = get_tip_value_str(tip_str, "ImageValue", default_string = None) try : if isinstance(value, str) : fvalue = float(value) value_id = round(fvalue) else : value_id = value except Exception as e : return None if file_name != None : file_name_with_value = file_name%(value_id) imag_value_file_name = os.path.join(img_path, file_name_with_value) else : imag_value_file_name = None if imag_value_file_name != None : if os.path.exists(imag_value_file_name) : return imag_value_file_name.replace("\\", "/") return None def get_imag_file_name(tip_str : str, key_imag : str, default_string = None) : file_name = get_tip_value_str(tip_str, key_imag, default_string) select_image_file_name = None if file_name != None : select_image_file_name = os.path.join(img_path, file_name) if os.path.exists(select_image_file_name) : return select_image_file_name.replace("\\", "/") return None def get_select_imag_file_name(tip_str : str) : return get_imag_file_name(tip_str, "SelectImag", default_string = None) def get_select_icon_file_name(tip_str : str) : return get_imag_file_name(tip_str, "SelectIcon", default_string = None) def get_bk_imag_file_name(tip_str : str) : return get_imag_file_name(tip_str, "ImageBackGround", default_string = None) def get_indicator_imag_file_name(tip_str : str) : return get_imag_file_name(tip_str, "ImageIndicator", default_string = None) def get_compare_index(main_index : int, sub_index : int) : return (main_index + 1) * 64 + sub_index class SerialWorker(QThread): frame_received = pyqtSignal(dict) log_message = pyqtSignal(str) def __init__(self, ser_getter: Callable[[], Optional[serial.Serial]]): super().__init__() self.ser_getter = ser_getter self._running = True self.buf = b"" def run(self): self.log("[INFO] 串口线程运行") while self._running: ser = self.ser_getter() if not ser or not getattr(ser, 'is_open', False): time.sleep(0.2) continue try: data = ser.read(ser.in_waiting or 1) if data: self.buf += data while True: fr, used = unpack_frame(self.buf) if not fr: break self.buf = self.buf[used:] self.frame_received.emit(fr) except (serial.SerialException, OSError) as e: self.log(f"[WARN] 串口异常挂起: {e}") try: ser.close() except Exception: pass time.sleep(0.5) except Exception as e: self.log(f"[ERR] read: {e}") time.sleep(0.1) def stop(self): self._running = False def log(self, s: str): self.log_message.emit(s) class VideoWorker(QThread): pixmap_ready = pyqtSignal(QPixmap) log_message = pyqtSignal(str) def __init__(self, cam_index=0, target_size=QSize(360,480)): super().__init__() self.cam_index = cam_index self.target_size = target_size self._running = True self.cap = None def run(self): try: self.log_message.emit(f"[INFO] 尝试打开摄像头 {self.cam_index} ...") if sys.platform.startswith("win"): self.cap = cv2.VideoCapture(self.cam_index, cv2.CAP_DSHOW) else: self.cap = cv2.VideoCapture(self.cam_index) if not self.cap or not self.cap.isOpened(): self.log_message.emit(f"[ERR] 无法打开摄像头 index={self.cam_index}") return self.log_message.emit(f"[INFO] 摄像头 {self.cam_index} 已打开") while self._running: ret, frame = self.cap.read() if not ret: time.sleep(0.03) continue frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE) frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) h,w,ch = frame.shape img = QImage(frame.data, w, h, ch*w, QImage.Format_RGB888) pm = QPixmap.fromImage(img).scaled( self.target_size, Qt.KeepAspectRatio, Qt.SmoothTransformation ) self.pixmap_ready.emit(pm) time.sleep(0.03) finally: if self.cap: self.cap.release() self.log_message.emit("[INFO] 摄像头已关闭") def stop(self): self._running = False def log(self, s: str): self.log_message.emit(s) class UIFrameWork(QMainWindow, class_comm_mqtt_interface): mqtt_signal : pyqtSignal = pyqtSignal(str) def __init__(self): QMainWindow.__init__(self) self.topic_message_list : list = [] self.mutex : QMutex = QMutex() self.mqtt_signal.connect(self.mqtt_topic_message_process) self.menu_list = [] self.sort_menu_list = [] self.menu_key_index = 0 self.key_active = 0 self.select_object : QWidget = None self.wait_response_list = [] self.mqtt_thread : class_comm_mqtt_thread= None self.mqtt_value_dict = {} self.widget_timeout_list = [] self.alias_dict = {} self.alias_query_timer : QTimer = None self.alias_query_timer = QTimer() self.alias_query_timer.timeout.connect(self.process_alias_query) self.alias_query_timer.start(3000) self.video_visible_timer = QTimer() self.video_visible_timer.timeout.connect(self.video_visible_timeout) self.video_visible_timer.start(200) self.time_timer = QTimer() self.time_timer.timeout.connect(self.flush_system_cycle) self.time_timer.start(500) self.canvas_visible_arr = [ False, False, False, False, False, False, False, False, False, ] # 使用列表推导式动态生成 canvas_label_list self.canvas_label_list = [[None, -1, None] for _ in range(9)] # 使用列表推导式动态生成 set_video_image_func_arr self.set_video_image_func_arr = [getattr(self, f'set_video_image_{i}') for i in range(9)] self.page = -1 self.select_main_index = -1 self.exist_circuit_mask = 0 self.check_widget_timeout_timer : QTimer = None QShortcut(QKeySequence(Qt.Key_Up), self, activated=self.key_decrease_parameter) QShortcut(QKeySequence(Qt.Key_Down), self, activated=self.key_increase_parameter) QShortcut(QKeySequence(Qt.Key_PageDown), self, activated=self.key_enter_process) QShortcut(QKeySequence(Qt.Key_PageUp), self, activated=self.key_escape_process) QShortcut(QKeySequence(Qt.Key_End), self, activated=self.key_enter_process) QShortcut(QKeySequence(Qt.Key_Home), self, activated=self.key_escape_process) QShortcut(QKeySequence(Qt.Key_0), self, activated=self.key_Test_process0) QShortcut(QKeySequence(Qt.Key_1), self, activated=self.key_Test_process1) QShortcut(QKeySequence(Qt.Key_2), self, activated=self.key_Test_process2) QShortcut(QKeySequence(Qt.Key_3), self, activated=self.key_Test_process3) self.bind_channel = -1 #虚函数, 切换到本页面时调用 def virtual_on_page_enter(self) : pass #虚函数, 退出到本页面时调用 def virtual_on_page_leave(self) : pass #虚函数, 切换到页面, 需要在派生类中实现, page=-1表示退出到上一级菜单 def virtual_change_to_page(self, page) : pass #虚函数, 画布连接到摄像头, 需要在派生类中实现。 def virtual_connect_canvas_to_camera(self, canvas_id : int, camera_id : int, is_visible : bool = True) : pass #虚函数, action处理函数, 派生类可以额外处理 def virtual_widget_action_process(self, widget : QObject, action : str) : return def set_mqtt_value(self, circuit : int, mqtt_name : str, mqtt_value) : key = f'{circuit}+{mqtt_name}' self.mqtt_value_dict[key] = mqtt_value def get_mqtt_value(self, circuit : int, mqtt_name : str) : key = f'{circuit}+{mqtt_name}' if key in self.mqtt_value_dict.keys() : return self.mqtt_value_dict[key] return None def modify_object_style_sheet(self, object :QWidget, lead_str : str, value_str : str) : meta_object = object.metaObject() className = meta_object.className() object_name : str = object.objectName() object_type_name = className + "#" + object_name origin_style = object.styleSheet() new_style_sheet = modify_style_sheet(object_type_name, origin_style, lead_str, value_str) object.setStyleSheet(new_style_sheet) def reset_object_style_sheet(self, object :QWidget) : meta_object = object.metaObject() className = meta_object.className() object_name : str = object.objectName() object_type_name = className + "#" + object_name origin_style = object.whatsThis() object.setStyleSheet(origin_style) def load_window_ui(self, file_window_ui : str) : if os.path.exists(file_window_ui): uic.loadUi(file_window_ui, self) self.load_resource_bitmap() self.create_menu_list() self.sort_menu_list = sorted(self.menu_list, key=lambda x: x[0]) self.create_style_sheet_backup() self.create_timeout_list() self.create_alias_list() self.update_window_circuit_mask() self.exist_circuit_mask = self.get_circuit_mask() self.check_widget_timeout_timer : QTimer = QTimer() self.check_widget_timeout_timer.timeout.connect(self.process_widget_timeout_list) self.check_widget_timeout_timer.start(100) self.page = get_tip_page(self.statusTip()) self.menu_selectable_object_flush() def get_circuit_mask(self, child_widget : QWidget = None) : widget :QWidget = child_widget if widget == None : widget = self circuit_id_mask = 0 for each_object in widget.children(): child : QWidget = each_object if hasattr(child, "statusTip") : circuit_id = self.get_circuit_from_object(child) if circuit_id >= 0 : circuit_id_mask |= (0x01 << circuit_id) circuit_id_mask |= self.get_circuit_mask(child) return circuit_id_mask def update_window_circuit_mask(self) : self.exist_circuit_mask = self.get_circuit_mask() return def set_menu_main_index(self, main_index) : self.main_index = main_index if len(self.sort_menu_list) == 0 : return if self.main_index < 0 : self.menu_key_index = 0 self.select_object = self.sort_menu_list[self.menu_key_index][1] self.menu_selectable_object_flush() return menu_key_index = 0 for each_item in self.sort_menu_list : compare_index = each_item[0] if compare_index >= get_compare_index(main_index, 0) : break menu_key_index += 1 self.menu_key_index = menu_key_index self.select_object = self.sort_menu_list[self.menu_key_index][1] self.menu_selectable_object_flush() def process_alias_query(self): if not self.mqtt_thread: return # 如果没有 MQTT 线程,直接返回 for alias_unique_name, alias_value_dict in self.alias_dict.items(): # {"xxxxx,sss": 123} if not isinstance(alias_value_dict, int) or alias_value_dict != ALIAS_QUERY_NONE: continue # 仅处理需要查询的整数别名 alias_name, uniqe_name = alias_unique_name.split(",") # 初始化 select_object 变量 select_object: QWidget = None # 将别名的值设置为等待响应状态 self.alias_dict[alias_unique_name] = ALIAS_QUERY_WAIT_RESPOINSE # 发布 MQTT 消息并等待响应 self.mqtt_publish_and_wait_response( f"request/alias/{uniqe_name}", json.dumps({"name": alias_name}), select_object, 2000, False ) def process_widget_timeout_list(self) : list_index = 0 self.para_face() #刷新人脸数据显示 for timeout_items in self.widget_timeout_list: time_limit = timeout_items[0] cur_time = timeout_items[1] cur_time += 100 if cur_time >= time_limit: cur_time = 0 timeout_widget : QWidget = timeout_items[2] if isinstance(timeout_widget, QLineEdit) : lineedit_widget : QLineEdit = timeout_widget lineedit_widget.setText("通信超时") elif isinstance(timeout_widget, QLabel) : label_widget : QLabel = timeout_widget if "canvas" in label_widget.statusTip() : self.camera_signal_timeout(label_widget) else : if label_widget.isVisible() : label_widget.setVisible(False) #label_widget.hide() elif isinstance(timeout_widget, QPushButton) : button_widget : QPushButton = timeout_widget self.reset_object_style_sheet(button_widget) button_widget.setText("通信超时") timeout_items[1] = cur_time self.widget_timeout_list[list_index] = timeout_items list_index = list_index + 1 def reset_widget_timeout(self, widget : QObject) : Index = 0 for timeout_item in self.widget_timeout_list: cur_time = timeout_item[1] timeout_widget : QWidget = timeout_item[2] if timeout_widget == widget : timeout_item[1] = 0 self.widget_timeout_list[Index] = timeout_item break Index = Index + 1 def connect_camera_thread(self, camera_thread : CameraThread, canvas_id : int = 0, is_visible : bool = True) : if camera_thread == None or canvas_id < 0: return if canvas_id < len(self.set_video_image_func_arr) : canvas_object : QLabel = self.search_canvas_object(canvas_id) if canvas_object : canvas_info_item = self.canvas_label_list[canvas_id] origin_camera_thread : CameraThread = canvas_info_item[2] if origin_camera_thread != None and origin_camera_thread.is_signal_connect: origin_camera_thread.image_signal.disconnect(self.set_video_image_func_arr[canvas_id]) if is_visible : camera_thread.signal_connect(self.set_video_image_func_arr[canvas_id]) list_index : int = self.get_timeout_List_index(canvas_object) self.canvas_label_list[canvas_id] = [canvas_object, list_index, camera_thread] else : self.canvas_label_list[canvas_id] = [None, -1, None] self.canvas_visible_arr[canvas_id] = is_visible def camera_signal_timeout(self, camera_label : QLabel): if camera_label != None : image_path = os.path.join(img_path, 'IM05_002.png') if os.path.exists(image_path): image = QPixmap(image_path) scaled_image = image.scaled(camera_label.size(), Qt.KeepAspectRatio, Qt.SmoothTransformation) camera_label.setPixmap(scaled_image) def search_canvas_object(self, canvas_id = 0, child = None) : widget :QWidget = child if widget == None : widget = self child_count = 0 for each_object in widget.children(): child_count += 1 child : QWidget = each_object if hasattr(child, "statusTip") : tip_str : str = child.statusTip() config_canvas_id, config_camera_id = get_tip_canvas_id_and_camera_id(tip_str) if config_canvas_id >= 0 and config_camera_id >= 0 and config_canvas_id == canvas_id: if isinstance(child, QLabel) : return child find_canvas_object = self.search_canvas_object(canvas_id, child) if find_canvas_object != None : return find_canvas_object if child_count == 0 : return None def get_canvas_prop (self, canvas_id : int= 0) : config_camera_id = -1 is_face_detect : int = 0 canvas_object : QWidget = self.search_canvas_object(canvas_id) if canvas_object != None : tip_str : str = canvas_object.statusTip() config_canvas_id, config_camera_id = get_tip_canvas_id_and_camera_id(tip_str) is_face_detect = get_tip_face_detection(tip_str) return config_camera_id, is_face_detect def load_resource_bitmap(self, child = None) : widget :QWidget = self if child != None : widget = child for child_widget in widget.children(): if hasattr(child_widget, "statusTip") : tip_str : str = child_widget.statusTip() imag_file_name = get_bk_imag_file_name(tip_str) if imag_file_name != None : if isinstance(child_widget, QLabel) : label_child_object : QLabel = child_widget label_child_object.setScaledContents(True) image = QPixmap(imag_file_name) label_child_object.setPixmap(image) elif isinstance(child_widget, QWidget) : file_url = "url(%s)"%(imag_file_name) self.modify_object_style_sheet(child_widget, "background-image", file_url) self.load_resource_bitmap(child_widget) def create_style_sheet_backup(self, child = None) : widget :QWidget = child if child == None : widget = self for child_widget in widget.children(): if hasattr(child_widget, "styleSheet") : widget : QWidget = child_widget style_sheet : str = widget.styleSheet() widget.setWhatsThis(style_sheet) self.create_style_sheet_backup(child_widget) def create_timeout_list(self, child = None) : widget :QWidget = self if child == None : self.widget_timeout_list = [] else : widget = child for child_widget in widget.children(): if hasattr(child_widget, "statusTip") : tip_str : str = child_widget.statusTip() timeout_limit_ms = get_tip_timeout(tip_str) if timeout_limit_ms >= 0: self.widget_timeout_list.append([timeout_limit_ms, 0, child_widget]) self.create_timeout_list(child_widget) def get_timeout_List_index(self, object : QWidget) : list_index = -1 search_list_index = 0 for items in self.widget_timeout_list : child_widget : QWidget = items[2] if child_widget == object : list_index = search_list_index break search_list_index += 1 return list_index def get_page_circuit(self): if hasattr(self, "statusTip") : return get_tip_circuit(self.statusTip()) return -1 def set_page_circuit(self, circuit_id = -1) : if hasattr(self, "statusTip") : origin_status_tip : str = self.statusTip() origin_tip_splits = origin_status_tip.split(",") new_status_tip = "" for each_str in origin_tip_splits: if "circuit" not in each_str : new_status_tip += each_str new_status_tip += "," new_status_tip += "circuit=%d"%(circuit_id) self.setStatusTip(new_status_tip) self.update_window_circuit_mask() self.create_alias_list() self.process_alias_query() self.para_or_measure_query() self.para_face() def create_alias_list(self, child = None) : widget :QWidget = self if child == None : self.alias_dict = {} else : widget = child for child_widget in widget.children(): if hasattr(child_widget, "statusTip") : tip_str : str = child_widget.statusTip() alias_str = get_tip_alias_str(tip_str) if alias_str != None: unique_name = self.get_unique_name_from_object(child_widget) if unique_name != None : key_combine_str = get_key_combine_str(alias_str, unique_name) if key_combine_str not in self.alias_dict.keys() : self.alias_dict[key_combine_str] = ALIAS_QUERY_NONE self.create_alias_list(child_widget) def create_menu_list(self, child = None) : widget :QWidget = self if child == None : self.menu_list = [] else : widget = child for child_widget in widget.children(): if hasattr(child_widget, "statusTip") : tip_str : str = child_widget.statusTip() sub_index = get_tip_menu_sub_index(tip_str) if sub_index >= 0: main_index = self.get_main_index_from_object(child_widget) if main_index >= 0 : compare_index = get_compare_index(main_index, sub_index) else : compare_index = sub_index self.menu_list.append([compare_index, child_widget]) if isinstance(child_widget, QPushButton) : button : QPushButton = child_widget button.clicked.connect(self.on_menu_qpush_button_click) self.create_menu_list(child_widget) def on_select_object_action_process(self, select_object : QWidget) : tip_str = select_object.statusTip() action_str = get_tip_value_str(tip_str, "Action", None) input = False if action_str != None : if "password" in tip_str: # 第一步:弹出认证方式选择框 dialog_auth_choice = DialogModifyAlias(self) auth_dict = { "face": "人脸认证", "word": "密码认证" } dialog_auth_choice.set_alias_item_info(auth_dict, "password", "请选择认证方式") if dialog_auth_choice.exec() == QDialog.Accepted: choice = dialog_auth_choice.value # 第二步:根据选择执行不同的认证 if choice == "word": dialog_modify_text = DialogModifyValue(self) caption_str = "请输入密码:" dialog_modify_text.update_modify_info("", "0000", caption_str) input = False if dialog_modify_text.exec() == QDialog.Accepted: input = True modify_screen_time_str = dialog_modify_text.value pass_word = system_parameter() if modify_screen_time_str != pass_word.get_system_password(): inform_box = DialogInform() inform_box.information("提示", "密码错误,无法修改!") return print("密码认证成功") elif choice == "face": # 初始化锁标志 self._face_verify_locked = True # 开始验证时锁定 # 人脸认证异步处理 face_frame = self.parent_window.P05_01_FaceCameraView face_frame.face_verify_result = None # 重置标志位 self.do_verify() # 发送 CMD_VERIFY timeout = system_parameter().get_verify_timeout() start_time = time.time() # 停止并删除旧定时器 if hasattr(self, "_face_verify_timer"): if self._face_verify_timer.isActive(): self._face_verify_timer.stop() self._face_verify_timer.deleteLater() # 创建定时器轮询标志位 self._face_verify_timer = QTimer(self) self._face_verify_timer.setInterval(50) # 每50ms检查一次 def check_face_result(): nonlocal input if face_frame.face_verify_result is not None: self._face_verify_timer.stop() self._face_verify_locked = False # 解锁 if face_frame.face_verify_result: input = True inform_box = DialogInform() inform_box.information("提示", "人脸认证成功") self._after_face_verify(select_object, action_str) else: input = False inform_box = DialogInform() inform_box.information("提示", "人脸认证失败") elif time.time() - start_time > timeout: self._face_verify_timer.stop() self._face_verify_locked = False # 解锁 input = False inform_box = DialogInform() inform_box.information("提示", "人脸认证超时") # 解绑旧槽,绑定新槽 try: self._face_verify_timer.timeout.disconnect() except Exception: pass self._face_verify_timer.timeout.connect(check_face_result) self._face_verify_timer.start() return # 异步处理,直接返回 else: # 无认证要求,直接通过 input = True # ---------------- 认证成功后执行操作 ---------------- if input: if "CmdExecute" in action_str: mqtt_name = get_tip_mqtt_name(tip_str) unique_name = self.get_unique_name_from_object(select_object) if mqtt_name and unique_name and self.mqtt_thread and len(mqtt_name) > 0: publish_topic = "request/action/" + unique_name publish_message = '{"name" : "%s"}' % mqtt_name self.mqtt_publish_and_wait_response(publish_topic, publish_message, select_object, 1000) elif "ModifySystem" in action_str: self.on_system_param_modify(tip_str) elif "Modify" in action_str and isinstance(select_object, QLineEdit): self.on_line_edit_modify_click(select_object) elif "Users" in action_str: self.do_manage_users() elif "Verify" in action_str: self.do_verify() elif "EnrollItgSingle" in action_str: self.do_enroll_itg_single() elif "Reset" in action_str: self.reset_command() elif "ConnectCamera" in action_str: self.connectcamera() elif "VideoMode" in action_str: self.toggle_video_mode() elif "FaceBox" in action_str: self.toggle_face_box() elif "DeleteUser" in action_str: self.delete_user_by_id() elif "UserCount" in action_str: self.query_user_count() self.virtual_widget_action_process(select_object, action_str) # ------------------ 后续操作回调 ------------------ def _after_face_verify(self, select_object, action_str): """ 人脸认证成功后调用,继续执行后续动作 """ if "CmdExecute" in action_str: mqtt_name = get_tip_mqtt_name(select_object.statusTip()) unique_name = self.get_unique_name_from_object(select_object) if mqtt_name and unique_name and self.mqtt_thread and len(mqtt_name) > 0: publish_topic = "request/action/" + unique_name publish_message = '{"name" : "%s"}' % mqtt_name self.mqtt_publish_and_wait_response(publish_topic, publish_message, select_object, 1000) elif "ModifySystem" in action_str: self.on_system_param_modify(select_object.statusTip()) elif "Modify" in action_str and isinstance(select_object, QLineEdit): self.on_line_edit_modify_click(select_object) elif "Users" in action_str: self.do_manage_users() elif "Verify" in action_str: self.do_verify() elif "EnrollItgSingle" in action_str: self.do_enroll_itg_single() elif "Reset" in action_str: self.reset_command() elif "ConnectCamera" in action_str: self.connectcamera() elif "VideoMode" in action_str: self.toggle_video_mode() elif "FaceBox" in action_str: self.toggle_face_box() elif "DeleteUser" in action_str: self.delete_user_by_id() elif "UserCount" in action_str: self.query_user_count() self.virtual_widget_action_process(select_object, action_str) def search_menu_match_object(self, object) : match_object : QWidget = None menu_index = 0 for each_list in self.sort_menu_list : search_object = each_list[1] if search_object == object : match_object = search_object break menu_index += 1 return match_object, menu_index ################################################################################ def do_verify(self): pd_val = 0 timeout_val = system_parameter().get_verify_timeout() face_send = self.parent_window.P05_01_FaceCameraView face_send.send(build_verify(pd_val, timeout_val)) def do_enroll_itg_single(self): admin_val = 0 uname = " " face_dir = 31 timeout_val = system_parameter().get_verify_timeout() itg_val = 0 self.send(build_enroll_itg_single(admin_val, uname, face_dir, timeout_val, itg_val)) def do_manage_users(self): UserManageDialog(self, self.send).exec_() def reset_command(self): self.send(build_reset()) def connectcamera(self): running = self.video_worker and self.video_worker.isRunning() if running: self.video_worker.stop() self.video_worker.wait(300) self.video_worker = None # self.video_label.setText("未打开") # self.video_label.setPixmap(QPixmap()) self.log("[INFO] 视频已关闭") inform_box : DialogInform = DialogInform() inform_box.information("提示", "视频已关闭") return if self.video_label.width()<50 or self.video_label.height()<50: self.video_label.setMinimumSize(360,480) self.video_worker = VideoWorker(cam_index=0, target_size=self.video_label.size()) self.video_worker.pixmap_ready.connect(self.video_label.setPixmap) # self.video_worker.log_message.connect(self.log) self.video_worker.start() self.log("[INFO] 正在打开视频") inform_box : DialogInform = DialogInform() inform_box.information("提示", "正在打开视频") def toggle_video_mode(self): if self.current_video_mode==0: self.send_uvc(1) self.current_video_mode=1 self.log("[INFO] 已切换到红外视频模式") inform_box : DialogInform = DialogInform() inform_box.information("提示", "已切换到红外视频模式") else: self.send_uvc(0) self.current_video_mode=0 self.log("[INFO] 已切换到彩色视频模式") inform_box : DialogInform = DialogInform() inform_box.information("提示", "已切换到彩色视频模式") def toggle_face_box(self): if self.current_face_box==0: self.send_face_box(1) self.current_face_box=1 self.log("[INFO] 人脸框已开启") inform_box : DialogInform = DialogInform() inform_box.information("提示", "人脸框已开启") else: self.send_face_box(0) self.current_face_box=0 self.log("[INFO] 人脸框已关闭") inform_box : DialogInform = DialogInform() inform_box.information("提示", "人脸框已关闭") # ---------------- 发送指令 ---------------- def send_uvc(self, mode): if self.ser and getattr(self.ser,"is_open",False): self.ser.write(build_uvc_view(mode)) else: self.log("[WARN] 串口未连接,无法切换视频模式") def send_face_box(self, state): if self.ser and getattr(self.ser,"is_open",False): self.ser.write(build_face_view(state)) else: self.log("[WARN] 串口未连接,无法控制人脸框") def delete_user_by_id(self, CSV_FILE = CSV_FILE): users = load_users() # 弹出对话框选择用户ID dialog_modify_text = DialogModifyValue(self) caption_str = "选择用户ID删除" dialog_modify_text.update_modify_info("", 0, caption_str) if dialog_modify_text.exec() != QDialog.Accepted: return # 获取用户输入的ID并转换为整数 try: user_id = int(dialog_modify_text.value) except ValueError: DialogInform(self).information("提示", "请输入有效数字ID") return # 查找用户 user = next((u for u in users if u["user_id"] == user_id), None) if not user: DialogInform(self).information("提示", "用户不存在") return try: # 1️⃣ 下发删除命令 self.send(build_delete_user(user_id)) # 2️⃣ 删除 CSV 文件对应行 if os.path.exists(CSV_FILE): with open(CSV_FILE, "r", encoding="utf-8", newline="") as f: reader = csv.DictReader(f) fieldnames = reader.fieldnames new_rows = [row for row in reader if str(row.get("user_id")) != str(user_id)] with open(CSV_FILE, "w", encoding="utf-8", newline="") as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(new_rows) except Exception as e: DialogInform(self).information("提示", f"删除失败: {e}") return # 3️⃣ 更新内存用户列表 users = [u for u in users if u["user_id"] != user_id] save_users_list(users) # 4️⃣ 提示删除成功 DialogInform(self).information("提示", f"用户 {user.get('user_name', '')} (ID={user_id}) 已删除") def query_user_count(self): self.send(build_get_all_userid()) def refresh(self): users = load_users() self.table.setRowCount(len(users)) for r, u in enumerate(users): self.table.setItem(r, 0, QTableWidgetItem(str(u.get("user_id", "")))) self.table.setItem(r, 1, QTableWidgetItem(u.get("user_name", ""))) self.table.setItem(r, 2, QTableWidgetItem(u.get("created_at", ""))) ################################################################################ #刷新屏幕上的系统信息, 例如时间日期之类 def flush_system_info(self, child = None) : widget : QWidget= child if child == None : widget = self else : widget = child for child_widget in widget.children(): if hasattr(child_widget, "statusTip") : tip_str : str = child_widget.statusTip() system_str = get_tip_system(tip_str) if system_str == "DateTime" : qdate_time = QDateTime.currentDateTime() time_str = qdate_time.toString("yyyy-MM-dd hh:mm:ss") if isinstance(child_widget, QPushButton) : button_widget : QPushButton = child_widget button_widget.setText(time_str) elif isinstance(child_widget, QLineEdit) : line_edit_widget : QLineEdit = child_widget line_edit_widget.setText(time_str) elif isinstance(child_widget, QLabel) : label_widget : QLabel = child_widget label_widget.setText(time_str) self.flush_system_info(child_widget) def func_mqtt_param_modify(self, modify_str : str) : select_object : QWidget = self.select_object if select_object == None : return tip = select_object.statusTip() unique_name = self.get_unique_name_from_object(select_object) mqtt_name = get_tip_mqtt_name(tip) if self.mqtt_thread != None : request_topic = "request/param/modify/" + unique_name request_message = '{"name" : "%s", "value" : "%s"}'%(mqtt_name, modify_str) self.mqtt_publish_and_wait_response(request_topic, request_message, select_object, 1000) def func_call_back_on_mqtt_param_info(self, info_str : str) : select_object : QWidget = self.select_object if select_object == None : return tip = select_object.statusTip() unique_name = self.get_unique_name_from_object(select_object) mqtt_name = get_tip_mqtt_name(tip) if self.mqtt_thread != None : request_topic = "request/param/info/" + unique_name request_message = '{"name" : "%s", "value" : "%s"}'%(mqtt_name, info_str) self.mqtt_publish_and_wait_response(request_topic, request_message, select_object, 1000) def on_menu_qpush_button_click(self, sender : QPushButton): sender : QPushButton = self.sender() match_object : QWidget = None match_object, menu_index = self.search_menu_match_object(sender) if match_object != None : self.menu_key_index = menu_index self.menu_selectable_object_flush() if isinstance(match_object, QPushButton) : self.on_select_object_action_process(match_object) #修改系统时间与日期 def func_modify_system_time(self, modify_str : str) : numbers = re.findall(r'\d+', modify_str) if len(numbers) == 6 : year = int(numbers[0]) month = int(numbers[1]) day = int(numbers[2]) hour = int(numbers[3]) minute = int(numbers[4]) second = int(numbers[5]) set_system_time(year, month, day, hour, minute, second) return year, month, day, hour, minute, second return None #修改系统黑屏时间 def func_modify_screen_blanking_time(self, modify_str: str): numbers = re.findall(r'\d+', modify_str) if len(numbers) == 1: minutes = int(numbers[0]) set_screen_blanking_time(minutes) return minutes return None def on_system_param_modify(self, tip : str) : system = get_tip_system(tip) if system == "DateTime" : dialog_modify_text : DialogModifyText = DialogModifyText(self) caption_str = "修改系统时间" cur_time = QDateTime.currentDateTime() date_time_str = cur_time.toString("yyyy-MM-dd hh:mm:ss") dialog_modify_text.update_modify_info(date_time_str, caption_str) if dialog_modify_text.exec() == QDialog.Accepted: modify_date_time_str = dialog_modify_text.value self.func_modify_system_time(modify_date_time_str) for circuit_id in range(8) : unique_name = self.get_unique_name_from_circuit(circuit_id) if unique_name != None and self.mqtt_thread != None: time_message = '{"name" : "SetupTime", "value" : "%s"}'%(modify_date_time_str) self.mqtt_thread.publish("request/action/" + unique_name, time_message) dialog_inform = DialogInform(self) dialog_inform.information("系统时间修改", "系统时间修改成功") elif system == "ScreenBlankingTime": dialog_modify_text = DialogModifyValue(self) caption_str = "修改屏幕黑屏时间(分钟)" sbt = system_parameter() dialog_modify_text.update_modify_info("", str(sbt.get_screen_blanking_time()), caption_str) if dialog_modify_text.exec() == QDialog.Accepted: modify_screen_time_str = dialog_modify_text.value result = self.func_modify_screen_blanking_time(modify_screen_time_str) sbt.change_screen_blanking_time(modify_screen_time_str) if result is not None: self.BlankTimelineEdit.setText(str(result)) dialog_inform = DialogInform(self) dialog_inform.information("屏幕黑屏时间修改", "屏幕黑屏时间修改成功") elif system == "SystemPassWord": dialog_modify_text = DialogModifyValue(self) caption_str = "修改系统密码" pass_word = system_parameter() dialog_modify_text.update_modify_info("", pass_word.get_system_password(), caption_str) if dialog_modify_text.exec() == QDialog.Accepted: modify_screen_time_str = dialog_modify_text.value pass_word.set_system_password(modify_screen_time_str) dialog_inform = DialogInform(self) dialog_inform.information("系统密码修改", "系统密码修改成功") elif system == "FaceRecogTimeout": dialog_modify_text = DialogModifyValue(self) caption_str = "修改人脸识别超时时间(秒)" frt = system_parameter() dialog_modify_text.update_modify_info("", str(frt.get_verify_timeout()), caption_str) if dialog_modify_text.exec() == QDialog.Accepted: modify_screen_time_str = dialog_modify_text.value result = frt.set_verify_timeout(modify_screen_time_str) dialog_inform = DialogInform(self) dialog_inform.information("人脸识别超时时间修改", "人脸识别超时时间修改成功") def on_line_edit_modify_click(self, sender :QLineEdit) : match_object : QLineEdit = None match_object, menu_index = self.search_menu_match_object(sender) if match_object != None : self.menu_key_index = menu_index if isinstance(match_object, QLineEdit) : self.dialog_modify_value = None tip = match_object.statusTip() alias_name = get_tip_alias_str(tip) unique_name = self.get_unique_name_from_object(match_object) if unique_name != None and alias_name != None : key_combine_str = get_key_combine_str(alias_name, unique_name) value_dict = utils.dict_or_object_get_attr(self.alias_dict, key_combine_str, None) if isinstance(value_dict, dict) : dialog_modify_alias : DialogModifyAlias = DialogModifyAlias(self) caption_str = get_tip_caption_str(match_object.statusTip()) value_text = match_object.text() dialog_modify_alias.set_alias_item_info(value_dict, value_text, caption_str) self.dialog_modify_alias = dialog_modify_alias if dialog_modify_alias.exec() == QDialog.Accepted : modify_value = dialog_modify_alias.value self.func_mqtt_param_modify(modify_value) else : inform_box : DialogInform = DialogInform() inform_box.information('别名数据未获取', 'message: %s'%(alias_name)) else : dialog_modify_value : DialogModifyValue = DialogModifyValue(self) caption_str = get_tip_caption_str(match_object.statusTip()) try : origin_value = float(match_object.text()) except Exception as e : origin_value = None inform_box : DialogInform = DialogInform() inform_box.information('无效值', str(e)) if origin_value != None : format_str = get_tip_format(tip) dialog_modify_value.update_modify_info(format_str, origin_value, caption_str) if dialog_modify_value.exec() == QDialog.Accepted : modify_value = dialog_modify_value.value self.func_mqtt_param_modify(modify_value) def set_video_image(self, image_object : QObject, canvas_id = 0) : if canvas_id >= len(self.canvas_label_list) : return camera_info_items = self.canvas_label_list[canvas_id] camera_label: QLabel = camera_info_items[0] timeout_list_index = camera_info_items[1] camera_thread : CameraThread = camera_info_items[2] if camera_label != None: shape = image_object.shape width = shape[1] height = shape[0] if camera_label.isVisible() : qt_img = QImage(image_object.data, width, height, QImage.Format_RGB888) camera_label.setScaledContents(True) camera_label.setPixmap(QPixmap.fromImage(qt_img)) if timeout_list_index >= 0 : self.widget_timeout_list[timeout_list_index][1] = 0 def set_video_image_0(self, image_object : QObject) : self.set_video_image(image_object, 0) def set_video_image_1(self, image_object : QObject) : self.set_video_image(image_object, 1) def set_video_image_2(self, image_object : QObject) : self.set_video_image(image_object, 2) def set_video_image_3(self, image_object : QObject) : self.set_video_image(image_object, 3) def set_video_image_4(self, image_object : QObject) : self.set_video_image(image_object, 4) def set_video_image_5(self, image_object : QObject) : self.set_video_image(image_object, 5) def set_video_image_6(self, image_object : QObject) : self.set_video_image(image_object, 6) def set_video_image_7(self, image_object : QObject) : self.set_video_image(image_object, 7) def set_video_image_8(self, image_object : QObject) : self.set_video_image(image_object, 8) def update_mqtt_value(self, circuit, json_dict) : for json_key, json_value in json_dict.items() : self.set_mqtt_value(circuit, json_key, json_value) #更新视频显示与否 def update_canvas_camera_visible(self, child = None) : widget :QWidget = child if widget == None : widget = self for each_object in widget.children(): child : QWidget = each_object if isinstance(child, QLabel) : widget_label : QLabel = child tip_str = widget_label.statusTip() visible_mqtt = get_tip_visible_mqtt(tip_str, None) if visible_mqtt != None : #该Label具有Visible控制, 检查一下是不是视频流 canvas_id, camera_id = get_tip_canvas_id_and_camera_id(tip_str) if canvas_id >= 0 and camera_id >= 0 : circuit_id = self.get_circuit_from_object(widget_label) visible_value = self.get_mqtt_value(circuit_id, visible_mqtt) is_visible = False if visible_value != None : try : is_visible = True if int(visible_value) > 0 else False except Exception as e : is_visible = False widget_label.setVisible(is_visible) if is_visible != self.canvas_visible_arr[canvas_id] and canvas_id < len(self.canvas_visible_arr): self.canvas_visible_arr[canvas_id] = is_visible self.virtual_connect_canvas_to_camera(canvas_id, camera_id, is_visible) self.update_canvas_camera_visible(child) def video_visible_timeout(self) : self.update_canvas_camera_visible() #通过json包刷新界面数据 def flush_widgets(self, circuit, json_dict, child = None): widget :QWidget = child if widget == None : widget = self for each_object in widget.children(): child : QWidget = each_object if not hasattr(child, "statusTip") : continue tip_str : str = child.statusTip() mqtt_name = get_tip_mqtt_name(tip_str) circuit_id = self.get_circuit_from_object(each_object) self.flush_widgets(circuit, json_dict, child) if circuit_id != circuit : continue if mqtt_name in json_dict.keys() : value_or_dict = json_dict[mqtt_name] if isinstance(value_or_dict, dict) : value_text = utils.dict_or_object_get_attr(value_or_dict, "value", "") else : value_text = value_or_dict color_str = utils.dict_or_object_get_attr(value_or_dict, "color", None) bk_color_str = utils.dict_or_object_get_attr(value_or_dict, "bk_color", None) imag_file_name = get_imag_value_file_name(tip_str, value_text) timeout_ms = get_tip_timeout(tip_str) if timeout_ms > 0 : self.reset_widget_timeout(child) if isinstance(child, QPushButton) : widget_button : QPushButton = child if imag_file_name != None : self.modify_object_style_sheet(widget_button, "border-image", "url(%s)"%(imag_file_name)) else : self.reset_object_style_sheet(widget_button) if isinstance(child, QLabel) : widget_label : QLabel = child if imag_file_name != None : if widget_label.isVisible() == False : widget_label.setVisible(True) self.modify_object_style_sheet(widget_label, "background-image", "url(%s)"%(imag_file_name)) else : self.reset_object_style_sheet(widget_label) widget_label.setText(value_text) if isinstance(child, QLineEdit) : widget_line_edit : QLineEdit = child alias_name = get_tip_alias_str(widget_line_edit.statusTip()) unique_name = self.get_unique_name_from_object(widget_line_edit) if alias_name != None : fvalue = float(value_text) value = round(fvalue) value_str = str(value) display_text = "(%s)"%(value_str) alias_key_str = get_key_combine_str(alias_name, unique_name) if alias_key_str in self.alias_dict.keys() : alias_value_dict = self.alias_dict[alias_key_str] if isinstance(alias_value_dict, dict) : display_text = utils.dict_or_object_get_attr(alias_value_dict, value_str, value_text) else : display_text = value_text widget_line_edit.setText(display_text) if bk_color_str != None : if isinstance(child, QLineEdit) : self.modify_object_style_sheet(child, "background-color", bk_color_str) if color_str != None : self.modify_object_style_sheet(child, "color", color_str) def get_circuit_from_object(self, object :QWidget) : parent : QWidget= object circuit_id = -1 while parent != None : if hasattr(parent, "statusTip") : tip_str = parent.statusTip() circuit_id = get_tip_circuit(tip_str) if circuit_id >= 0 or parent == self: break parent = parent.parent() return circuit_id def get_main_index_from_object(self, object :QWidget) : parent : QWidget= object main_index = -1 while parent != None : if not hasattr(parent, "statusTip") : return -1 tip_str = parent.statusTip() main_index = get_tip_main_index(tip_str) if main_index >= 0 or parent == self: break parent = parent.parent() return main_index def get_unique_name_from_circuit(self, circuit_id : int) : for config_dict in group_config.comm_thread_config : device_list = utils.dict_or_object_get_attr(config_dict, "device_list", None) if device_list != None : for item_dict in device_list : config_circuit_id = utils.dict_or_object_get_attr(item_dict, "circuit_id", -1) if config_circuit_id == circuit_id : circuit_unique_name = utils.dict_or_object_get_attr(item_dict, "unique_name", None) return circuit_unique_name return None def get_unique_name_from_object(self, object :QWidget) : circuit_id = self.get_circuit_from_object(object) return self.get_unique_name_from_circuit(circuit_id) def key_increase_parameter(self): menu_count = len(self.sort_menu_list) if menu_count == 0: return tip_str = self.sort_menu_list[self.menu_key_index][1].statusTip() group_items_count = get_tip_group_end(tip_str) if group_items_count == -1: next_menu_index = (self.menu_key_index + 1) % menu_count else: next_menu_index = (self.menu_key_index - group_items_count + 1) % menu_count self.menu_key_index = next_menu_index self.select_object = self.sort_menu_list[self.menu_key_index][1] self.menu_selectable_object_flush() def key_decrease_parameter(self): menu_count = len(self.sort_menu_list) if menu_count == 0: return tip_str = self.sort_menu_list[self.menu_key_index][1].statusTip() group_items_count = get_tip_group_start(tip_str) if group_items_count == -1: prev_menu_index = (self.menu_key_index - 1) % menu_count else: prev_menu_index = (self.menu_key_index + group_items_count - 1) % menu_count self.menu_key_index = prev_menu_index self.select_object = self.sort_menu_list[self.menu_key_index][1] self.menu_selectable_object_flush() def key_enter_process(self): menu_count = len(self.sort_menu_list) if menu_count == 0: return self.select_object = self.sort_menu_list[self.menu_key_index][1] self.on_select_object_action_process(self.select_object) def key_escape_process(self): #在菜单第一项, 按下复位, 退出到上一页 if self.menu_key_index == 0: self.virtual_change_to_page(-1) menu_count = len(self.sort_menu_list) # 获取当前页面可交互控件个数 if menu_count == 0: # 若注册可交互控件个数为0,则退出处理,返回 return # 重置菜单选择 self.select_object = self.sort_menu_list[self.menu_key_index][1] self.menu_key_index = 0 self.menu_selectable_object_flush() def key_Test_process0(self): json_dict = {"Uab" :"1111", "Ubc" : "1111", "Uca" : "1111", "Uo" : "11" , "Iab" : "1111", "Ibc" : "1111", "Ica" : "1111", "Io" : "1111", "SwitchStatus" : {"value" :"0", "bk_color" : COLOR_NORMAL, "color" : COLOR_RED} } self.flush_widgets(0, json_dict) def key_Test_process1(self): json_dict = {"Uab" :"2222", "Ubc" : "2222", "Uca" : "2222", "Uo" : "22" , "Iab" : "2222", "Ibc" : "2222", "Ica" : "2222", "Io" : "2222", "SwitchStatus" : {"value" :"1", "bk_color" : COLOR_NORMAL, "color" : COLOR_RED} } self.flush_widgets(1, json_dict) def key_Test_process2(self): json_dict = {"Uab" :"3333", "Ubc" : "3333", "Uca" : "3333", "Uo" : "33" , "Iab" : "3333", "Ibc" : "3333", "Ica" : "3333", "Io" : "3333"} self.flush_widgets(2, json_dict) def key_Test_process3(self): json_dict = {"Uab" :"33333", "Ubc" : "33333", "Uca" : "33333"} self.flush_widgets(3, json_dict) def menu_selectable_object_flush(self) : menu_count = len(self.sort_menu_list) for i in range(menu_count): object : QWidget = self.sort_menu_list[i][1] is_select_object_draw = False if isinstance(object, QPushButton) : button_object : QPushButton = object image_file_name = get_select_imag_file_name(object.statusTip()) image_icon_name = get_select_icon_file_name(object.statusTip()) if i == self.menu_key_index: button_object.setFocus() if image_file_name != None : is_select_object_draw = True file_url = "url(%s)"%(image_file_name) self.modify_object_style_sheet(button_object, "border-image", file_url) elif image_icon_name != None : is_select_object_draw = True button_size : QSize = button_object.size() button_object.setIcon(QIcon(image_icon_name)) button_object.setIconSize(button_size) else : if image_file_name != None : is_select_object_draw = True self.reset_object_style_sheet(button_object) elif image_icon_name != None : is_select_object_draw = True button_object.setIcon(QIcon()) elif isinstance(object, QLabel) : label_object : QLabel = object image_file_name = get_select_imag_file_name(object.statusTip()) if i == self.menu_key_index : if image_file_name != None : is_select_object_draw = True file_url = "url(%s)"%(image_file_name) self.modify_object_style_sheet(label_object, "border-image", file_url) else : if image_file_name != None : is_select_object_draw = True self.reset_object_style_sheet(label_object) elif isinstance(object, QLineEdit) : line_object : QLineEdit = object image_file_name = get_select_imag_file_name(object.statusTip()) if i == self.menu_key_index : if image_file_name != None : is_select_object_draw = True file_url = "url(%s)"%(image_file_name) self.modify_object_style_sheet(line_object, "border-image", file_url) else : if image_file_name != None : is_select_object_draw = True self.reset_object_style_sheet(line_object) if is_select_object_draw == False : if i == self.menu_key_index : self.modify_object_style_sheet(object, "background-color", COLOR_BLUE) else : self.reset_object_style_sheet(object) def response_topic_message_process(self, topic: str, message : str, is_timeout :bool = False, inform : bool = False) : find_reponse_topic_msg = None for wait_topic_msg in self.wait_response_list : if wait_topic_msg[0] == topic : find_reponse_topic_msg = wait_topic_msg self.wait_response_list.remove(wait_topic_msg) break if find_reponse_topic_msg == None : return select_object :QWidget = find_reponse_topic_msg[2] request_message : str = find_reponse_topic_msg[1] inform_box : DialogInform = None if inform : inform_box : DialogInform = DialogInform() try : json_dict = json_load_message(message) result = utils.dict_or_object_get_attr(json_dict, "result", 0) result = int(result) except Exception as e: json_dict = None result = 0 print_error_msg(str(e) + "invalid json message: %s" % (message)) if "response/action" in topic : if is_timeout : inform_message = "命令执行超时" if inform == True: inform_box.information('命令执行超时', inform_message) else : inform_message = '命令执行成功' if result == 1 else '命令执行失败!!!' if inform == True: inform_box.information('命令执行', inform_message) elif "response/param/modify" in topic : if is_timeout : inform_message = '参数修改超时' if inform == True: inform_box.information('参数修改超时', inform_message) else : inform_message = '参数修改成功' if result == 1 else '参数修改失败!!!' if inform == True: inform_box.information('参数修改', inform_message) if result == 1 : self.para_or_measure_query() elif "response/alias" in topic : unique_name = topic.replace("response/alias/", "") if is_timeout : if json_dict != None : alias_name = None if "name" in json_dict.keys() : alias_name = json_dict["name"] if alias_name != None : key_str = get_key_combine_str(alias_name, unique_name) if key_str in self.alias_dict.keys() : if isinstance(self.alias_dict[key_str], int) : self.alias_dict[key_str] = ALIAS_QUERY_NONE if inform == True: inform_box.information('获取别名超时', message) else : dialog_modify_alias : DialogModifyAlias = DialogModifyAlias(self) if isinstance(json_dict, list): for item in json_dict: for alias_key_name, alias_dict in item.items(): if alias_key_name in request_message: key_str = get_key_combine_str(alias_key_name, unique_name) if key_str in self.alias_dict.keys(): self.alias_dict[key_str] = alias_dict if isinstance(select_object, QLineEdit): line_select_object: QLineEdit = select_object caption_str = get_tip_value_str(select_object.statusTip(), "Caption", "") alias_value = int(line_select_object.text()) dialog_modify_alias.set_alias_item_info(alias_dict, alias_value, caption_str) self.dialog_modify_alias = dialog_modify_alias dialog_modify_alias.exec() break else: for alias_key_name, alias_dict in json_dict.items(): if alias_key_name in request_message: key_str = get_key_combine_str(alias_key_name, unique_name) if key_str in self.alias_dict.keys(): self.alias_dict[key_str] = alias_dict if isinstance(select_object, QLineEdit): line_select_object: QLineEdit = select_object caption_str = get_tip_value_str(select_object.statusTip(), "Caption", "") alias_value = int(line_select_object.text()) dialog_modify_alias.set_alias_item_info(alias_dict, alias_value, caption_str) self.dialog_modify_alias = dialog_modify_alias dialog_modify_alias.exec() break def mqtt_topic_message_process(self, msg) : topic_msg_item = True while topic_msg_item != None : self.mutex.lock() topic_msg_count = len(self.topic_message_list) if topic_msg_count > 0: topic_msg_item = self.topic_message_list.pop(0) else : topic_msg_item = None self.mutex.unlock() if topic_msg_item != None : topic = topic_msg_item[0] message = topic_msg_item[1] try : json_dict = json_load_message(message) except Exception as e : json_dict = None print_error_msg("invalid json message : %s"%(message)) if json_dict != None: if "measure" in topic : circuit_id = search_circuit_from_topic(topic) self.update_mqtt_value(circuit_id, json_dict) # print("@Line:",inspect.currentframe().f_lineno,' ',' = ',circuit_id,' = ',json_dict) self.flush_widgets(circuit_id, json_dict) elif "param/info" in topic: circuit_id = search_circuit_from_topic(topic) #print("@Line:",inspect.currentframe().f_lineno,' ',' = ',circuit_id,' = ',json_dict) self.update_mqtt_value(circuit_id, json_dict) self.flush_widgets(circuit_id, json_dict) elif "response/alarm" in topic : circuit_id = search_circuit_from_topic(topic) self.flush_alarminfo(circuit_id, json_dict) self.update_mqtt_value(circuit_id, json_dict) self.flush_widgets(circuit_id, json_dict) elif "response" in topic : self.response_topic_message_process(topic, message, is_timeout = False, inform = True) elif "alarm/test" in topic : circuit_id = search_circuit_from_topic(topic) # print("@Line:",inspect.currentframe().f_lineno,' ',' = ',circuit_id,' = ',json_dict) # 将告警信息刷新到页面 def flush_alarminfo(self, circuit, json_dict, child = None): alarmTitle : QLineEdit = self.findChild(QLineEdit,"alarmTitle") if alarmTitle != None: display_title = "%s"%(json_dict["index"]) alarmTitle.setText(display_title) alarmTime : QLineEdit = self.findChild(QLineEdit,"alarmTime") if alarmTime != None: display_year = "%s-"%(json_dict["AlarmYear"]) display_month = "%s-"%(json_dict["AlarmMonth"]) display_day = "%s"%(json_dict["AlarmDay"]) display_hour = " %s:"%(json_dict["AlarmHour"]) display_minute = "%s: "%(json_dict["AlarmMinute"]) display_second = "%s"%(json_dict["AlarmSecond"]) disp_time = display_year + display_month + display_day + display_hour+ display_minute+ display_second alarmTime.setText(disp_time) alarmMsg : QLineEdit = self.findChild(QLineEdit,"alarmMsg") if alarmMsg != None: display_msg = "%s"%(json_dict["AlarmCodeMsg"]) alarmMsg.setText(display_msg) def on_connect(self, mqtt_thread, userdata, flags, rc) : if rc == 0: self.mqtt_thread = mqtt_thread self.mqtt_thread.subscribe("response/#") self.mqtt_thread.subscribe("param/info/#") self.mqtt_thread.subscribe("param/modify/#") self.mqtt_thread.subscribe("measure/#") self.mqtt_thread.subscribe("status/#") self.mqtt_thread.subscribe("alarm/#") self.mqtt_thread.subscribe("response/alarm/#") self.mqtt_thread.subscribe("action/#") else : self.mqtt_thread = None def on_message(self, mqtt_thread, topic, message) : circuit_id = search_circuit_from_topic(topic) if circuit_id < 0 : return if (self.exist_circuit_mask & (0x1 << circuit_id)) == 0 : return self.mutex.lock() if len(self.topic_message_list) < 100 : self.topic_message_list.append([topic, message]) self.mutex.unlock() self.mqtt_signal.emit("Refresh UI") def response_topic_messge_timeout(self): timer = self.sender() for wait_response_items in self.wait_response_list: wait_timer : QTimer = wait_response_items[3] _inform : bool = wait_response_items[4] if wait_timer == timer : topic : str = wait_response_items[0] message : str = wait_response_items[1] wait_timer.stop() self.response_topic_message_process(topic, message, is_timeout=True, inform = _inform) break # def mqtt_publish_and_wait_response(self, publish_topic : str, publish_message : str, select_object : QWidget = None, timeout_ms : int = 1000, inform : bool = True) : # if self.mqtt_thread != None : # self.mqtt_thread.publish(publish_topic, publish_message) # response_topic = publish_topic.replace("request", "response") # #开启超时定时器 # wait_timer = QTimer() # wait_timer.timeout.connect(self.response_topic_messge_timeout) # wait_timer.start(timeout_ms) # self.wait_response_list.append([response_topic, publish_message, select_object, wait_timer, inform]) def mqtt_publish_and_wait_response(self, publish_topic: str, publish_message: str, select_object: QWidget = None, timeout_ms: int = 1000, inform: bool = True): if self.mqtt_thread is not None: # 发布MQTT消息 self.mqtt_thread.publish(publish_topic, publish_message) # 构建响应主题 response_topic = publish_topic.replace("request", "response") # 设置并启动超时定时器 wait_timer = QTimer() wait_timer.timeout.connect(lambda: self.response_topic_message_timeout(response_topic)) wait_timer.start(timeout_ms) # 将响应信息添加到等待列表 self.wait_response_list.append([response_topic, publish_message, select_object, wait_timer, inform]) def response_topic_message_timeout(self, response_topic): # 处理超时事件 for response_info in self.wait_response_list: if response_info[0] == response_topic: response_info[3].stop() # 停止定时器 self.wait_response_list.remove(response_info) # 在这里添加超时后的处理逻辑,例如通知用户请求超时 if response_info[4]: # 检查是否需要通知 self.inform_timeout(response_info) break def inform_timeout(self, response_info): # 通知用户请求超时的逻辑 # response_info[2] 是 select_object # 根据具体需求实现通知逻辑 print(f"Request for {response_info[1]} on topic {response_info[0]} timed out.") def batch_mqtt_publish_and_wait_response(self, publish_list: list, timeout_ms: int = 1000, inform: bool = True): if self.mqtt_thread is not None: for publish_topic, publish_message, select_object in publish_list: # 发布MQTT消息 self.mqtt_thread.publish(publish_topic, publish_message) # 构建响应主题 response_topic = publish_topic.replace("request", "response") # 设置并启动超时定时器 wait_timer = QTimer() wait_timer.timeout.connect(lambda rt=response_topic: self.response_topic_message_timeout(rt)) wait_timer.start(timeout_ms) # 将响应信息添加到等待列表 self.wait_response_list.append([response_topic, publish_message, select_object, wait_timer, inform]) def para_or_measure_query(self, child = None) : widget :QWidget = self if child == None : self.param_dict = {} else : widget = child for child_widget in widget.children(): if hasattr(child_widget, "statusTip") : tip_str : str = child_widget.statusTip() if "ScreenBlankingTime" in tip_str : sbt = system_parameter() child_widget.setText(sbt.get_screen_blanking_time()) continue mqtt_str = get_tip_mqtt_name(tip_str) if mqtt_str != None and mqtt_str not in self.param_dict.keys(): unique_name = self.get_unique_name_from_object(child_widget) if unique_name != None and self.mqtt_thread != None: self.mqtt_thread.publish("request/param/info/"+unique_name, '{"name":"%s"}'%mqtt_str) self.param_dict[mqtt_str] = ALIAS_QUERY_NONE self.para_or_measure_query(child_widget) def para_face(self, child = None) : widget :QWidget = self if child == None : self.param_dict = {} else : widget = child for child_widget in widget.children(): if hasattr(child_widget, "statusTip") : tip_str : str = child_widget.statusTip() if "FaceRecogTimeout" in tip_str : frt = system_parameter() child_widget.setText(str(frt.get_verify_timeout())) continue self.para_face(child_widget) def flush_system_cycle(self) : self.flush_system_info() if __name__ == '__main__': style_sheet = "QLabel {color : RED;\nbackground-color : GREEN;}" print(style_sheet) new_style_items = modify_style_sheet("QLabel", style_sheet, "background-color", "WHITE") new_style_items = modify_style_sheet("QLabel", new_style_items, "color", "YELLOW") print(new_style_items) sys.exit(0)