Files

2004 lines
91 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# This Python file uses the following encoding: utf-8
import sys
import os
import time
import csv
import inspect
import cv2
import json
from itertools import chain
from PyQt5 import uic
from PyQt5.QtGui import QImage, QPixmap, QColor,QBrush, QKeySequence, QIcon, QPalette
from PyQt5.QtCore import Qt, QThread, pyqtSignal, QObject, QRunnable, QMutex, QTimer, QEvent, QSize, QDateTime
from PyQt5.QtWidgets import QApplication, QMainWindow, QVBoxLayout, QFrame, QWidget, QLayout, QLabel
from PyQt5.QtWidgets import QLineEdit, QPushButton, QMessageBox, QShortcut, QDialog,QTableWidgetItem
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from Shared_CODE.CameraThread import CameraThread
import menu_utils as utils
import uart_group_config as group_config
from mqtt_device import class_comm_mqtt_thread, class_comm_mqtt_interface
from Shared_CODE.DialogModifyValue import DialogModifyValue
from Shared_CODE.DialogModifyAlias import DialogModifyAlias
from Shared_CODE.DialogModifyText import DialogModifyText
from Shared_CODE.DialogInform import DialogInform
from QT5_Project.Shared_CODE.FaceRecognitionProtocol import parse_reply
from QT5_Project.Shared_CODE.DialogFaceEnrollItgSingle import EnrollItgSingleDialog
from QT5_Project.Shared_CODE.DialogFaceUserManage import UserManageDialog, load_users, save_user, save_users_list, CSV_FILE
from Shared_CODE.get_tip_prop import *
from print_color import *
import time
from PyQt5.QtCore import QDateTime
import re
from datetime import datetime
import platform
import subprocess
from typing import Callable, Optional
# sys.path.append(sys.path[0] + "/../..")
# sys_path = sys.path[0].replace("\\", "/")
import serial
import serial.tools.list_ports
from Shared_CODE.FaceRecognitionProtocol import (
build_reset, build_uvc_view, build_face_view, build_verify,
build_enroll_itg_single, build_delete_all, build_get_all_userid,
build_delete_user, unpack_frame, parse_reply, parse_note,build_enroll_single,
MID_REPLY, MID_NOTE, CMD_ENROLL, CMD_ENROLL_ITG
)
# 设置 img 目录的路径
img_path = os.path.abspath(os.path.join(os.path.dirname(__file__), 'background'))
COLOR_RED = QColor("#EE2D2D").name()
COLOR_GREEN = QColor(Qt.green).name()
COLOR_BLUE = QColor(Qt.blue).name()
COLOR_YELLOW = QColor(Qt.yellow).name()
COLOR_NORMAL = QColor("#000000").name()
COLOR_ALARM_NORMAL = QColor("#31D19E").name()
COLOR_ALARM_ERROR_TEXT = QColor("#B1E5FC").name()
COLOR_ALARM_ERROR_BG = QColor("#E17176").name()
COLOR_VALUE_NORMAL = QColor("#31D19E").name()
ALIAS_QUERY_NONE = 0
ALIAS_QUERY_WAIT_RESPOINSE = 1
#设置系统时间
def is_linux():
return platform.system() == 'Linux'
def set_system_time(year, month, day, hour, minute, second):
if is_linux():
try:
new_time = datetime(year, month, day, hour, minute, second).strftime("%Y-%m-%d %H:%M:%S")
subprocess.run(f'sudo timedatectl set-time "{new_time}"', shell=True, check=True)
print(f"System time updated to {new_time}")
except subprocess.CalledProcessError as e:
print(f"Failed to update system time: {e}")
else:
import ctypes
import ctypes.wintypes
TIME_ZONE_ID_UNKNOWN = 0
TIME_ZONE_ID_STANDARD = 1
TIME_ZONE_ID_DAYLIGHT = 2
class SYSTEMTIME(ctypes.Structure):
_fields_ = [
("wYear", ctypes.wintypes.WORD),
("wMonth", ctypes.wintypes.WORD),
("wDayOfWeek", ctypes.wintypes.WORD),
("wDay", ctypes.wintypes.WORD),
("wHour", ctypes.wintypes.WORD),
("wMinute", ctypes.wintypes.WORD),
("wSecond", ctypes.wintypes.WORD),
("wMilliseconds", ctypes.wintypes.WORD),
]
system_time = SYSTEMTIME()
ctypes.windll.kernel32.GetSystemTime(ctypes.byref(system_time))
system_time.wYear = year
system_time.wMonth = month
system_time.wDay = day
system_time.wHour = hour
system_time.wMinute = minute
system_time.wSecond = second
ctypes.windll.kernel32.SetSystemTime(ctypes.byref(system_time))
print(f"System time updated to {year}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}")
def set_screen_blanking_time(minutes):
"""
设置屏幕黑屏时间:param minutes: 屏幕黑屏时间,以分钟为单位
"""
if is_linux():
# 如果是 Linux 系统,使用 xfconf-query 命令设置 Xfce 的屏幕黑屏时间
try:
# Xfce 的屏幕黑屏时间是以秒为单位
seconds = int(minutes) * 60
subprocess.run([
'xfconf-query', '-c', 'xfce4-power-manager',
'-p', '/xfce4-power-manager/blank-on-ac',
'-s', str(seconds), '-t', 'int', '--create'
], check=True)
print(f"Xfce屏幕黑屏时间设置为 {minutes} 分钟")
except subprocess.CalledProcessError as e:
print(f"设置Xfce屏幕黑屏时间失败: {e}")
except FileNotFoundError:
print("xfconf-query 命令未找到,请确保已安装 xfce4-power-manager。")
elif platform.system() == 'Windows':
# 如果是 Windows 系统,使用 powercfg 命令设置屏幕黑屏时间
try:
# 设置交流电源时的屏幕黑屏时间
subprocess.run(['powercfg', '-change', '-monitor-timeout-ac', str(minutes)], check=True)
# 设置电池供电时的屏幕黑屏时间
subprocess.run(['powercfg', '-change', '-monitor-timeout-dc', str(minutes)], check=True)
print(f"屏幕黑屏时间设置为 {minutes} 分钟 (适用于交流电和电池供电)")
except subprocess.CalledProcessError as e:
print(f"设置屏幕黑屏时间失败: {e}")
else:
print("当前操作系统不支持此操作。")
def json_load_message(message) :
json_dict = {}
if isinstance(message, bytes) :
json_dict = json.loads(message.decode('utf-8'))
elif isinstance(message, str) :
json_dict = json.loads(message.encode('utf-8'))
else :
json_dict = json.loads(message)
return json_dict
def modify_style_sheet(type_name : str, origin_style_sheet : str, lead_str : str, value_str : str) :
if len(value_str) == 0 :
new_style_items = ""
else :
new_style_items = "%s:%s;"%(lead_str, value_str)
style_splits = origin_style_sheet.split("{")
if len(style_splits) == 1 :
style_items_str = style_splits[0]
elif len(style_splits) == 2 :
style_items_str = style_splits[1]
else :
style_items_str = ""
style_items_str = style_items_str.replace("}", "")
prop_splits = style_items_str.split(";")
for each_style_prop in prop_splits :
if lead_str in each_style_prop :
key_value_splits = each_style_prop.split(":")
cmp_str = key_value_splits[0]
cmp_str = cmp_str.replace(" ", "")
cmp_str = cmp_str.replace("\n", "")
if lead_str == cmp_str :
continue
if len(each_style_prop) > 0 :
new_style_items += each_style_prop
new_style_items += ";"
if len(type_name) == 0:
return "%s"%(new_style_items)
else :
return "%s {%s}"%(type_name, new_style_items)
def get_key_combine_str(str1, str2) :
return str1 + "," + str2
def search_circuit_from_topic(topic : str) :
circuit_id = -1
for config_dict in group_config.comm_thread_config :
device_list = utils.dict_or_object_get_attr(config_dict, "device_list", None)
if device_list != None :
for item_dict in device_list :
config_circuit_id = utils.dict_or_object_get_attr(item_dict, "circuit_id", -1)
circuit_unique_name = utils.dict_or_object_get_attr(item_dict, "unique_name", None)
if circuit_unique_name in topic :
circuit_id = config_circuit_id
break
if circuit_id >= 0 :
break
return circuit_id
def get_imag_value_file_name(tip_str : str, value : int) :
file_name = get_tip_value_str(tip_str, "ImageValue", default_string = None)
try :
if isinstance(value, str) :
fvalue = float(value)
value_id = round(fvalue)
else :
value_id = value
except Exception as e :
return None
if file_name != None :
file_name_with_value = file_name%(value_id)
imag_value_file_name = os.path.join(img_path, file_name_with_value)
else :
imag_value_file_name = None
if imag_value_file_name != None :
if os.path.exists(imag_value_file_name) :
return imag_value_file_name.replace("\\", "/")
return None
def get_imag_file_name(tip_str : str, key_imag : str, default_string = None) :
file_name = get_tip_value_str(tip_str, key_imag, default_string)
select_image_file_name = None
if file_name != None :
select_image_file_name = os.path.join(img_path, file_name)
if os.path.exists(select_image_file_name) :
return select_image_file_name.replace("\\", "/")
return None
def get_select_imag_file_name(tip_str : str) :
return get_imag_file_name(tip_str, "SelectImag", default_string = None)
def get_select_icon_file_name(tip_str : str) :
return get_imag_file_name(tip_str, "SelectIcon", default_string = None)
def get_bk_imag_file_name(tip_str : str) :
return get_imag_file_name(tip_str, "ImageBackGround", default_string = None)
def get_indicator_imag_file_name(tip_str : str) :
return get_imag_file_name(tip_str, "ImageIndicator", default_string = None)
def get_compare_index(main_index : int, sub_index : int) :
return (main_index + 1) * 64 + sub_index
class SerialWorker(QThread):
frame_received = pyqtSignal(dict)
log_message = pyqtSignal(str)
def __init__(self, ser_getter: Callable[[], Optional[serial.Serial]]):
super().__init__()
self.ser_getter = ser_getter
self._running = True
self.buf = b""
def run(self):
self.log("[INFO] 串口线程运行")
while self._running:
ser = self.ser_getter()
if not ser or not getattr(ser, 'is_open', False):
time.sleep(0.2)
continue
try:
data = ser.read(ser.in_waiting or 1)
if data:
self.buf += data
while True:
fr, used = unpack_frame(self.buf)
if not fr:
break
self.buf = self.buf[used:]
self.frame_received.emit(fr)
except (serial.SerialException, OSError) as e:
self.log(f"[WARN] 串口异常挂起: {e}")
try:
ser.close()
except Exception:
pass
time.sleep(0.5)
except Exception as e:
self.log(f"[ERR] read: {e}")
time.sleep(0.1)
def stop(self):
self._running = False
def log(self, s: str):
self.log_message.emit(s)
class VideoWorker(QThread):
pixmap_ready = pyqtSignal(QPixmap)
log_message = pyqtSignal(str)
def __init__(self, cam_index=0, target_size=QSize(360,480)):
super().__init__()
self.cam_index = cam_index
self.target_size = target_size
self._running = True
self.cap = None
def run(self):
try:
self.log_message.emit(f"[INFO] 尝试打开摄像头 {self.cam_index} ...")
if sys.platform.startswith("win"):
self.cap = cv2.VideoCapture(self.cam_index, cv2.CAP_DSHOW)
else:
self.cap = cv2.VideoCapture(self.cam_index)
if not self.cap or not self.cap.isOpened():
self.log_message.emit(f"[ERR] 无法打开摄像头 index={self.cam_index}")
return
self.log_message.emit(f"[INFO] 摄像头 {self.cam_index} 已打开")
while self._running:
ret, frame = self.cap.read()
if not ret:
time.sleep(0.03)
continue
frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
h,w,ch = frame.shape
img = QImage(frame.data, w, h, ch*w, QImage.Format_RGB888)
pm = QPixmap.fromImage(img).scaled(
self.target_size, Qt.KeepAspectRatio, Qt.SmoothTransformation
)
self.pixmap_ready.emit(pm)
time.sleep(0.03)
finally:
if self.cap:
self.cap.release()
self.log_message.emit("[INFO] 摄像头已关闭")
def stop(self):
self._running = False
def log(self, s: str):
self.log_message.emit(s)
class UIFrameWork(QMainWindow, class_comm_mqtt_interface):
mqtt_signal : pyqtSignal = pyqtSignal(str)
def __init__(self):
QMainWindow.__init__(self)
self.topic_message_list : list = []
self.mutex : QMutex = QMutex()
self.mqtt_signal.connect(self.mqtt_topic_message_process)
self.menu_list = []
self.sort_menu_list = []
self.menu_key_index = 0
self.key_active = 0
self.select_object : QWidget = None
self.wait_response_list = []
self.mqtt_thread : class_comm_mqtt_thread= None
self.mqtt_value_dict = {}
self.widget_timeout_list = []
self.alias_dict = {}
self.alias_query_timer : QTimer = None
self.alias_query_timer = QTimer()
self.alias_query_timer.timeout.connect(self.process_alias_query)
self.alias_query_timer.start(3000)
self.video_visible_timer = QTimer()
self.video_visible_timer.timeout.connect(self.video_visible_timeout)
self.video_visible_timer.start(200)
self.time_timer = QTimer()
self.time_timer.timeout.connect(self.flush_system_cycle)
self.time_timer.start(500)
self.canvas_visible_arr = [
False,
False,
False,
False,
False,
False,
False,
False,
False,
]
# 使用列表推导式动态生成 canvas_label_list
self.canvas_label_list = [[None, -1, None] for _ in range(9)]
# 使用列表推导式动态生成 set_video_image_func_arr
self.set_video_image_func_arr = [getattr(self, f'set_video_image_{i}') for i in range(9)]
self.page = -1
self.select_main_index = -1
self.exist_circuit_mask = 0
self.check_widget_timeout_timer : QTimer = None
QShortcut(QKeySequence(Qt.Key_Up), self, activated=self.key_decrease_parameter)
QShortcut(QKeySequence(Qt.Key_Down), self, activated=self.key_increase_parameter)
QShortcut(QKeySequence(Qt.Key_PageDown), self, activated=self.key_enter_process)
QShortcut(QKeySequence(Qt.Key_PageUp), self, activated=self.key_escape_process)
QShortcut(QKeySequence(Qt.Key_End), self, activated=self.key_enter_process)
QShortcut(QKeySequence(Qt.Key_Home), self, activated=self.key_escape_process)
QShortcut(QKeySequence(Qt.Key_0), self, activated=self.key_Test_process0)
QShortcut(QKeySequence(Qt.Key_1), self, activated=self.key_Test_process1)
QShortcut(QKeySequence(Qt.Key_2), self, activated=self.key_Test_process2)
QShortcut(QKeySequence(Qt.Key_3), self, activated=self.key_Test_process3)
self.bind_channel = -1
#虚函数, 切换到本页面时调用
def virtual_on_page_enter(self) :
pass
#虚函数, 退出到本页面时调用
def virtual_on_page_leave(self) :
pass
#虚函数, 切换到页面, 需要在派生类中实现, page=-1表示退出到上一级菜单
def virtual_change_to_page(self, page) :
pass
#虚函数, 画布连接到摄像头, 需要在派生类中实现。
def virtual_connect_canvas_to_camera(self, canvas_id : int, camera_id : int, is_visible : bool = True) :
pass
#虚函数, action处理函数, 派生类可以额外处理
def virtual_widget_action_process(self, widget : QObject, action : str) :
return
def set_mqtt_value(self, circuit : int, mqtt_name : str, mqtt_value) :
key = f'{circuit}+{mqtt_name}'
self.mqtt_value_dict[key] = mqtt_value
def get_mqtt_value(self, circuit : int, mqtt_name : str) :
key = f'{circuit}+{mqtt_name}'
if key in self.mqtt_value_dict.keys() :
return self.mqtt_value_dict[key]
return None
def modify_object_style_sheet(self, object :QWidget, lead_str : str, value_str : str) :
meta_object = object.metaObject()
className = meta_object.className()
object_name : str = object.objectName()
object_type_name = className + "#" + object_name
origin_style = object.styleSheet()
new_style_sheet = modify_style_sheet(object_type_name, origin_style, lead_str, value_str)
object.setStyleSheet(new_style_sheet)
def reset_object_style_sheet(self, object :QWidget) :
meta_object = object.metaObject()
className = meta_object.className()
object_name : str = object.objectName()
object_type_name = className + "#" + object_name
origin_style = object.whatsThis()
object.setStyleSheet(origin_style)
def load_window_ui(self, file_window_ui : str) :
if os.path.exists(file_window_ui):
uic.loadUi(file_window_ui, self)
self.load_resource_bitmap()
self.create_menu_list()
self.sort_menu_list = sorted(self.menu_list, key=lambda x: x[0])
self.create_style_sheet_backup()
self.create_timeout_list()
self.create_alias_list()
self.update_window_circuit_mask()
self.exist_circuit_mask = self.get_circuit_mask()
self.check_widget_timeout_timer : QTimer = QTimer()
self.check_widget_timeout_timer.timeout.connect(self.process_widget_timeout_list)
self.check_widget_timeout_timer.start(100)
self.page = get_tip_page(self.statusTip())
self.menu_selectable_object_flush()
def get_circuit_mask(self, child_widget : QWidget = None) :
widget :QWidget = child_widget
if widget == None :
widget = self
circuit_id_mask = 0
for each_object in widget.children():
child : QWidget = each_object
if hasattr(child, "statusTip") :
circuit_id = self.get_circuit_from_object(child)
if circuit_id >= 0 :
circuit_id_mask |= (0x01 << circuit_id)
circuit_id_mask |= self.get_circuit_mask(child)
return circuit_id_mask
def update_window_circuit_mask(self) :
self.exist_circuit_mask = self.get_circuit_mask()
return
def set_menu_main_index(self, main_index) :
self.main_index = main_index
if len(self.sort_menu_list) == 0 :
return
if self.main_index < 0 :
self.menu_key_index = 0
self.select_object = self.sort_menu_list[self.menu_key_index][1]
self.menu_selectable_object_flush()
return
menu_key_index = 0
for each_item in self.sort_menu_list :
compare_index = each_item[0]
if compare_index >= get_compare_index(main_index, 0) :
break
menu_key_index += 1
self.menu_key_index = menu_key_index
self.select_object = self.sort_menu_list[self.menu_key_index][1]
self.menu_selectable_object_flush()
def process_alias_query(self):
if not self.mqtt_thread:
return # 如果没有 MQTT 线程,直接返回
for alias_unique_name, alias_value_dict in self.alias_dict.items(): # {"xxxxx,sss": 123}
if not isinstance(alias_value_dict, int) or alias_value_dict != ALIAS_QUERY_NONE:
continue # 仅处理需要查询的整数别名
alias_name, uniqe_name = alias_unique_name.split(",")
# 初始化 select_object 变量
select_object: QWidget = None
# 将别名的值设置为等待响应状态
self.alias_dict[alias_unique_name] = ALIAS_QUERY_WAIT_RESPOINSE
# 发布 MQTT 消息并等待响应
self.mqtt_publish_and_wait_response(
f"request/alias/{uniqe_name}",
json.dumps({"name": alias_name}),
select_object,
2000,
False
)
def process_widget_timeout_list(self) :
list_index = 0
self.para_face() #刷新人脸数据显示
for timeout_items in self.widget_timeout_list:
time_limit = timeout_items[0]
cur_time = timeout_items[1]
cur_time += 100
if cur_time >= time_limit:
cur_time = 0
timeout_widget : QWidget = timeout_items[2]
if isinstance(timeout_widget, QLineEdit) :
lineedit_widget : QLineEdit = timeout_widget
lineedit_widget.setText("通信超时")
elif isinstance(timeout_widget, QLabel) :
label_widget : QLabel = timeout_widget
if "canvas" in label_widget.statusTip() :
self.camera_signal_timeout(label_widget)
else :
if label_widget.isVisible() :
label_widget.setVisible(False)
#label_widget.hide()
elif isinstance(timeout_widget, QPushButton) :
button_widget : QPushButton = timeout_widget
self.reset_object_style_sheet(button_widget)
button_widget.setText("通信超时")
timeout_items[1] = cur_time
self.widget_timeout_list[list_index] = timeout_items
list_index = list_index + 1
def reset_widget_timeout(self, widget : QObject) :
Index = 0
for timeout_item in self.widget_timeout_list:
cur_time = timeout_item[1]
timeout_widget : QWidget = timeout_item[2]
if timeout_widget == widget :
timeout_item[1] = 0
self.widget_timeout_list[Index] = timeout_item
break
Index = Index + 1
def connect_camera_thread(self, camera_thread : CameraThread, canvas_id : int = 0, is_visible : bool = True) :
if camera_thread == None or canvas_id < 0:
return
if canvas_id < len(self.set_video_image_func_arr) :
canvas_object : QLabel = self.search_canvas_object(canvas_id)
if canvas_object :
canvas_info_item = self.canvas_label_list[canvas_id]
origin_camera_thread : CameraThread = canvas_info_item[2]
if origin_camera_thread != None and origin_camera_thread.is_signal_connect:
origin_camera_thread.image_signal.disconnect(self.set_video_image_func_arr[canvas_id])
if is_visible :
camera_thread.signal_connect(self.set_video_image_func_arr[canvas_id])
list_index : int = self.get_timeout_List_index(canvas_object)
self.canvas_label_list[canvas_id] = [canvas_object, list_index, camera_thread]
else :
self.canvas_label_list[canvas_id] = [None, -1, None]
self.canvas_visible_arr[canvas_id] = is_visible
def camera_signal_timeout(self, camera_label : QLabel):
if camera_label != None :
image_path = os.path.join(img_path, 'IM05_002.png')
if os.path.exists(image_path):
image = QPixmap(image_path)
scaled_image = image.scaled(camera_label.size(), Qt.KeepAspectRatio, Qt.SmoothTransformation)
camera_label.setPixmap(scaled_image)
def search_canvas_object(self, canvas_id = 0, child = None) :
widget :QWidget = child
if widget == None :
widget = self
child_count = 0
for each_object in widget.children():
child_count += 1
child : QWidget = each_object
if hasattr(child, "statusTip") :
tip_str : str = child.statusTip()
config_canvas_id, config_camera_id = get_tip_canvas_id_and_camera_id(tip_str)
if config_canvas_id >= 0 and config_camera_id >= 0 and config_canvas_id == canvas_id:
if isinstance(child, QLabel) :
return child
find_canvas_object = self.search_canvas_object(canvas_id, child)
if find_canvas_object != None :
return find_canvas_object
if child_count == 0 :
return None
def get_canvas_prop (self, canvas_id : int= 0) :
config_camera_id = -1
is_face_detect : int = 0
canvas_object : QWidget = self.search_canvas_object(canvas_id)
if canvas_object != None :
tip_str : str = canvas_object.statusTip()
config_canvas_id, config_camera_id = get_tip_canvas_id_and_camera_id(tip_str)
is_face_detect = get_tip_face_detection(tip_str)
return config_camera_id, is_face_detect
def load_resource_bitmap(self, child = None) :
widget :QWidget = self
if child != None :
widget = child
for child_widget in widget.children():
if hasattr(child_widget, "statusTip") :
tip_str : str = child_widget.statusTip()
imag_file_name = get_bk_imag_file_name(tip_str)
if imag_file_name != None :
if isinstance(child_widget, QLabel) :
label_child_object : QLabel = child_widget
label_child_object.setScaledContents(True)
image = QPixmap(imag_file_name)
label_child_object.setPixmap(image)
elif isinstance(child_widget, QWidget) :
file_url = "url(%s)"%(imag_file_name)
self.modify_object_style_sheet(child_widget, "background-image", file_url)
self.load_resource_bitmap(child_widget)
def create_style_sheet_backup(self, child = None) :
widget :QWidget = child
if child == None :
widget = self
for child_widget in widget.children():
if hasattr(child_widget, "styleSheet") :
widget : QWidget = child_widget
style_sheet : str = widget.styleSheet()
widget.setWhatsThis(style_sheet)
self.create_style_sheet_backup(child_widget)
def create_timeout_list(self, child = None) :
widget :QWidget = self
if child == None :
self.widget_timeout_list = []
else :
widget = child
for child_widget in widget.children():
if hasattr(child_widget, "statusTip") :
tip_str : str = child_widget.statusTip()
timeout_limit_ms = get_tip_timeout(tip_str)
if timeout_limit_ms >= 0:
self.widget_timeout_list.append([timeout_limit_ms, 0, child_widget])
self.create_timeout_list(child_widget)
def get_timeout_List_index(self, object : QWidget) :
list_index = -1
search_list_index = 0
for items in self.widget_timeout_list :
child_widget : QWidget = items[2]
if child_widget == object :
list_index = search_list_index
break
search_list_index += 1
return list_index
def get_page_circuit(self):
if hasattr(self, "statusTip") :
return get_tip_circuit(self.statusTip())
return -1
def set_page_circuit(self, circuit_id = -1) :
if hasattr(self, "statusTip") :
origin_status_tip : str = self.statusTip()
origin_tip_splits = origin_status_tip.split(",")
new_status_tip = ""
for each_str in origin_tip_splits:
if "circuit" not in each_str :
new_status_tip += each_str
new_status_tip += ","
new_status_tip += "circuit=%d"%(circuit_id)
self.setStatusTip(new_status_tip)
self.update_window_circuit_mask()
self.create_alias_list()
self.process_alias_query()
self.para_or_measure_query()
self.para_face()
def create_alias_list(self, child = None) :
widget :QWidget = self
if child == None :
self.alias_dict = {}
else :
widget = child
for child_widget in widget.children():
if hasattr(child_widget, "statusTip") :
tip_str : str = child_widget.statusTip()
alias_str = get_tip_alias_str(tip_str)
if alias_str != None:
unique_name = self.get_unique_name_from_object(child_widget)
if unique_name != None :
key_combine_str = get_key_combine_str(alias_str, unique_name)
if key_combine_str not in self.alias_dict.keys() :
self.alias_dict[key_combine_str] = ALIAS_QUERY_NONE
self.create_alias_list(child_widget)
def create_menu_list(self, child = None) :
widget :QWidget = self
if child == None :
self.menu_list = []
else :
widget = child
for child_widget in widget.children():
if hasattr(child_widget, "statusTip") :
tip_str : str = child_widget.statusTip()
sub_index = get_tip_menu_sub_index(tip_str)
if sub_index >= 0:
main_index = self.get_main_index_from_object(child_widget)
if main_index >= 0 :
compare_index = get_compare_index(main_index, sub_index)
else :
compare_index = sub_index
self.menu_list.append([compare_index, child_widget])
if isinstance(child_widget, QPushButton) :
button : QPushButton = child_widget
button.clicked.connect(self.on_menu_qpush_button_click)
self.create_menu_list(child_widget)
def on_select_object_action_process(self, select_object : QWidget) :
tip_str = select_object.statusTip()
action_str = get_tip_value_str(tip_str, "Action", None)
input = False
if action_str != None :
if "password" in tip_str:
# 第一步:弹出认证方式选择框
dialog_auth_choice = DialogModifyAlias(self)
auth_dict = {
"face": "人脸认证",
"word": "密码认证"
}
dialog_auth_choice.set_alias_item_info(auth_dict, "password", "请选择认证方式")
if dialog_auth_choice.exec() == QDialog.Accepted:
choice = dialog_auth_choice.value
# 第二步:根据选择执行不同的认证
if choice == "word":
dialog_modify_text = DialogModifyValue(self)
caption_str = "请输入密码:"
dialog_modify_text.update_modify_info("", "0000", caption_str)
input = False
if dialog_modify_text.exec() == QDialog.Accepted:
input = True
modify_screen_time_str = dialog_modify_text.value
pass_word = system_parameter()
if modify_screen_time_str != pass_word.get_system_password():
inform_box = DialogInform()
inform_box.information("提示", "密码错误,无法修改!")
return
print("密码认证成功")
elif choice == "face":
# 初始化锁标志
self._face_verify_locked = True # 开始验证时锁定
# 人脸认证异步处理
face_frame = self.parent_window.P05_01_FaceCameraView
face_frame.face_verify_result = None # 重置标志位
self.do_verify() # 发送 CMD_VERIFY
timeout = system_parameter().get_verify_timeout()
start_time = time.time()
# 停止并删除旧定时器
if hasattr(self, "_face_verify_timer"):
if self._face_verify_timer.isActive():
self._face_verify_timer.stop()
self._face_verify_timer.deleteLater()
# 创建定时器轮询标志位
self._face_verify_timer = QTimer(self)
self._face_verify_timer.setInterval(50) # 每50ms检查一次
def check_face_result():
nonlocal input
if face_frame.face_verify_result is not None:
self._face_verify_timer.stop()
self._face_verify_locked = False # 解锁
if face_frame.face_verify_result:
input = True
self._after_face_verify(select_object, action_str)
else:
input = False
inform_box = DialogInform()
inform_box.information("提示", "人脸认证失败")
elif time.time() - start_time > timeout:
self._face_verify_timer.stop()
self._face_verify_locked = False # 解锁
input = False
# 解绑旧槽,绑定新槽
try:
self._face_verify_timer.timeout.disconnect()
except Exception:
pass
self._face_verify_timer.timeout.connect(check_face_result)
self._face_verify_timer.start()
return # 异步处理,直接返回
else:
# 无认证要求,直接通过
input = True
# ---------------- 认证成功后执行操作 ----------------
if input:
if "CmdExecute" in action_str:
mqtt_name = get_tip_mqtt_name(tip_str)
unique_name = self.get_unique_name_from_object(select_object)
if mqtt_name and unique_name and self.mqtt_thread and len(mqtt_name) > 0:
publish_topic = "request/action/" + unique_name
publish_message = '{"name" : "%s"}' % mqtt_name
self.mqtt_publish_and_wait_response(publish_topic, publish_message, select_object, 1000)
elif "ModifySystem" in action_str:
self.on_system_param_modify(tip_str)
elif "Modify" in action_str and isinstance(select_object, QLineEdit):
self.on_line_edit_modify_click(select_object)
elif "Users" in action_str:
self.do_manage_users()
elif "Verify" in action_str:
self.do_verify()
elif "EnrollItgSingle" in action_str:
self.do_enroll_itg_single()
elif "Reset" in action_str:
self.reset_command()
elif "ConnectCamera" in action_str:
self.connectcamera()
elif "VideoMode" in action_str:
self.toggle_video_mode()
elif "FaceBox" in action_str:
self.toggle_face_box()
elif "DeleteUser" in action_str:
self.delete_user_by_id()
elif "UserCount" in action_str:
self.query_user_count()
self.virtual_widget_action_process(select_object, action_str)
# ------------------ 后续操作回调 ------------------
def _after_face_verify(self, select_object, action_str):
"""
人脸认证成功后调用,继续执行后续动作
"""
if "CmdExecute" in action_str:
mqtt_name = get_tip_mqtt_name(select_object.statusTip())
unique_name = self.get_unique_name_from_object(select_object)
if mqtt_name and unique_name and self.mqtt_thread and len(mqtt_name) > 0:
publish_topic = "request/action/" + unique_name
publish_message = '{"name" : "%s"}' % mqtt_name
self.mqtt_publish_and_wait_response(publish_topic, publish_message, select_object, 1000)
elif "ModifySystem" in action_str:
self.on_system_param_modify(select_object.statusTip())
elif "Modify" in action_str and isinstance(select_object, QLineEdit):
self.on_line_edit_modify_click(select_object)
elif "Users" in action_str:
self.do_manage_users()
elif "Verify" in action_str:
self.do_verify()
elif "EnrollItgSingle" in action_str:
self.do_enroll_itg_single()
elif "Reset" in action_str:
self.reset_command()
elif "ConnectCamera" in action_str:
self.connectcamera()
elif "VideoMode" in action_str:
self.toggle_video_mode()
elif "FaceBox" in action_str:
self.toggle_face_box()
elif "DeleteUser" in action_str:
self.delete_user_by_id()
elif "UserCount" in action_str:
self.query_user_count()
self.virtual_widget_action_process(select_object, action_str)
def search_menu_match_object(self, object) :
match_object : QWidget = None
menu_index = 0
for each_list in self.sort_menu_list :
search_object = each_list[1]
if search_object == object :
match_object = search_object
break
menu_index += 1
return match_object, menu_index
################################################################################
def do_verify(self):
pd_val = 0
timeout_val = system_parameter().get_verify_timeout()
face_send = self.parent_window.P05_01_FaceCameraView
face_send.verify_in_progress = True
face_send.send(build_verify(pd_val, timeout_val))
def do_enroll_itg_single(self):
admin_val = 0
uname = " "
timeout_val = system_parameter().get_verify_timeout()
face_send = self.parent_window.P05_01_FaceCameraView
face_send.verify_in_progress = True
self.send(build_enroll_single(admin_val, uname, timeout_val))
def do_manage_users(self):
UserManageDialog(self, self.send).exec_()
def reset_command(self):
self.send(build_reset())
def connectcamera(self):
running = self.video_worker and self.video_worker.isRunning()
if running:
self.video_worker.stop()
self.video_worker.wait(300)
self.video_worker = None
# self.video_label.setText("未打开")
# self.video_label.setPixmap(QPixmap())
self.log("[INFO] 视频已关闭")
inform_box : DialogInform = DialogInform()
inform_box.information("提示", "视频已关闭")
return
if self.video_label.width()<50 or self.video_label.height()<50:
self.video_label.setMinimumSize(360,480)
self.video_worker = VideoWorker(cam_index=0, target_size=self.video_label.size())
self.video_worker.pixmap_ready.connect(self.video_label.setPixmap)
# self.video_worker.log_message.connect(self.log)
self.video_worker.start()
self.log("[INFO] 正在打开视频")
inform_box : DialogInform = DialogInform()
inform_box.information("提示", "正在打开视频")
def toggle_video_mode(self):
if self.current_video_mode==0:
self.send_uvc(1)
self.current_video_mode=1
self.log("[INFO] 已切换到红外视频模式")
inform_box : DialogInform = DialogInform()
inform_box.information("提示", "已切换到红外视频模式")
else:
self.send_uvc(0)
self.current_video_mode=0
self.log("[INFO] 已切换到彩色视频模式")
inform_box : DialogInform = DialogInform()
inform_box.information("提示", "已切换到彩色视频模式")
def toggle_face_box(self):
if self.current_face_box==0:
self.send_face_box(1)
self.current_face_box=1
self.log("[INFO] 人脸框已开启")
inform_box : DialogInform = DialogInform()
inform_box.information("提示", "人脸框已开启")
else:
self.send_face_box(0)
self.current_face_box=0
self.log("[INFO] 人脸框已关闭")
inform_box : DialogInform = DialogInform()
inform_box.information("提示", "人脸框已关闭")
# ---------------- 发送指令 ----------------
def send_uvc(self, mode):
if self.ser and getattr(self.ser,"is_open",False):
self.ser.write(build_uvc_view(mode))
else:
self.log("[WARN] 串口未连接,无法切换视频模式")
def send_face_box(self, state):
if self.ser and getattr(self.ser,"is_open",False):
self.ser.write(build_face_view(state))
else:
self.log("[WARN] 串口未连接,无法控制人脸框")
def delete_user_by_id(self, CSV_FILE=CSV_FILE):
users = load_users()
# 弹出对话框选择用户ID
dialog_modify_text = DialogModifyValue(self)
caption_str = "选择用户ID删除"
dialog_modify_text.update_modify_info("", 0, caption_str)
if dialog_modify_text.exec() != QDialog.Accepted:
return
# 获取用户输入的ID并转换为整数
try:
user_id = int(dialog_modify_text.value)
except ValueError:
DialogInform(self).information("提示", "请输入有效数字ID")
return
user_id_str = str(user_id)
# 查找用户(用字符串匹配,避免类型问题)
user = next((u for u in users if str(u["user_id"]).strip() == user_id_str), None)
if not user:
DialogInform(self).information("提示", "用户不存在")
return
try:
# 1⃣ 下发删除命令(串口模块)
try:
self.send(build_delete_user(user_id))
except Exception as e:
DialogInform(self).information("提示", f"警告:串口删除失败,但本地仍会删除\n{e}")
# 2⃣ 删除 CSV 文件对应行
if os.path.exists(CSV_FILE):
with open(CSV_FILE, "r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
fieldnames = reader.fieldnames
new_rows = [row for row in reader if str(row.get("user_id")).strip() != user_id_str]
with open(CSV_FILE, "w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(new_rows)
except Exception as e:
DialogInform(self).information("提示", f"删除失败: {e}")
return
# 3⃣ 更新内存用户列表
users = [u for u in users if str(u["user_id"]).strip() != user_id_str]
save_users_list(users)
# 4⃣ 提示删除成功
DialogInform(self).information("提示", f"用户 {user.get('user_name', '')} (ID={user_id}) 已删除")
self.refresh()
def query_user_count(self):
self.send(build_get_all_userid())
def refresh(self):
users = load_users()
self.table.setRowCount(len(users))
for r, u in enumerate(users):
self.table.setItem(r, 0, QTableWidgetItem(str(u.get("user_id", ""))))
self.table.setItem(r, 1, QTableWidgetItem(u.get("user_name", "")))
self.table.setItem(r, 2, QTableWidgetItem(u.get("created_at", "")))
################################################################################
#刷新屏幕上的系统信息, 例如时间日期之类
def flush_system_info(self, child = None) :
widget : QWidget= child
if child == None :
widget = self
else :
widget = child
for child_widget in widget.children():
if hasattr(child_widget, "statusTip") :
tip_str : str = child_widget.statusTip()
system_str = get_tip_system(tip_str)
if system_str == "DateTime" :
qdate_time = QDateTime.currentDateTime()
time_str = qdate_time.toString("yyyy-MM-dd hh:mm:ss")
if isinstance(child_widget, QPushButton) :
button_widget : QPushButton = child_widget
button_widget.setText(time_str)
elif isinstance(child_widget, QLineEdit) :
line_edit_widget : QLineEdit = child_widget
line_edit_widget.setText(time_str)
elif isinstance(child_widget, QLabel) :
label_widget : QLabel = child_widget
label_widget.setText(time_str)
self.flush_system_info(child_widget)
def func_mqtt_param_modify(self, modify_str : str) :
select_object : QWidget = self.select_object
if select_object == None :
return
tip = select_object.statusTip()
unique_name = self.get_unique_name_from_object(select_object)
mqtt_name = get_tip_mqtt_name(tip)
if self.mqtt_thread != None :
request_topic = "request/param/modify/" + unique_name
request_message = '{"name" : "%s", "value" : "%s"}'%(mqtt_name, modify_str)
self.mqtt_publish_and_wait_response(request_topic, request_message, select_object, 1000)
def func_call_back_on_mqtt_param_info(self, info_str : str) :
select_object : QWidget = self.select_object
if select_object == None :
return
tip = select_object.statusTip()
unique_name = self.get_unique_name_from_object(select_object)
mqtt_name = get_tip_mqtt_name(tip)
if self.mqtt_thread != None :
request_topic = "request/param/info/" + unique_name
request_message = '{"name" : "%s", "value" : "%s"}'%(mqtt_name, info_str)
self.mqtt_publish_and_wait_response(request_topic, request_message, select_object, 1000)
def on_menu_qpush_button_click(self, sender : QPushButton):
sender : QPushButton = self.sender()
match_object : QWidget = None
match_object, menu_index = self.search_menu_match_object(sender)
if match_object != None :
self.menu_key_index = menu_index
self.menu_selectable_object_flush()
if isinstance(match_object, QPushButton) :
self.on_select_object_action_process(match_object)
#修改系统时间与日期
def func_modify_system_time(self, modify_str : str) :
numbers = re.findall(r'\d+', modify_str)
if len(numbers) == 6 :
year = int(numbers[0])
month = int(numbers[1])
day = int(numbers[2])
hour = int(numbers[3])
minute = int(numbers[4])
second = int(numbers[5])
set_system_time(year, month, day, hour, minute, second)
return year, month, day, hour, minute, second
return None
#修改系统黑屏时间
def func_modify_screen_blanking_time(self, modify_str: str):
numbers = re.findall(r'\d+', modify_str)
if len(numbers) == 1:
minutes = int(numbers[0])
set_screen_blanking_time(minutes)
return minutes
return None
def on_system_param_modify(self, tip : str) :
system = get_tip_system(tip)
if system == "DateTime" :
dialog_modify_text : DialogModifyText = DialogModifyText(self)
caption_str = "修改系统时间"
cur_time = QDateTime.currentDateTime()
date_time_str = cur_time.toString("yyyy-MM-dd hh:mm:ss")
dialog_modify_text.update_modify_info(date_time_str, caption_str)
if dialog_modify_text.exec() == QDialog.Accepted:
modify_date_time_str = dialog_modify_text.value
self.func_modify_system_time(modify_date_time_str)
for circuit_id in range(8) :
unique_name = self.get_unique_name_from_circuit(circuit_id)
if unique_name != None and self.mqtt_thread != None:
time_message = '{"name" : "SetupTime", "value" : "%s"}'%(modify_date_time_str)
self.mqtt_thread.publish("request/action/" + unique_name, time_message)
dialog_inform = DialogInform(self)
dialog_inform.information("系统时间修改", "系统时间修改成功")
elif system == "ScreenBlankingTime":
dialog_modify_text = DialogModifyValue(self)
caption_str = "修改屏幕黑屏时间(分钟)"
sbt = system_parameter()
dialog_modify_text.update_modify_info("", str(sbt.get_screen_blanking_time()), caption_str)
if dialog_modify_text.exec() == QDialog.Accepted:
modify_screen_time_str = dialog_modify_text.value
result = self.func_modify_screen_blanking_time(modify_screen_time_str)
sbt.change_screen_blanking_time(modify_screen_time_str)
if result is not None:
self.BlankTimelineEdit.setText(str(result))
dialog_inform = DialogInform(self)
dialog_inform.information("屏幕黑屏时间修改", "屏幕黑屏时间修改成功")
elif system == "SystemPassWord":
dialog_modify_text = DialogModifyValue(self)
caption_str = "修改系统密码"
pass_word = system_parameter()
dialog_modify_text.update_modify_info("", pass_word.get_system_password(), caption_str)
if dialog_modify_text.exec() == QDialog.Accepted:
modify_screen_time_str = dialog_modify_text.value
pass_word.set_system_password(modify_screen_time_str)
dialog_inform = DialogInform(self)
dialog_inform.information("系统密码修改", "系统密码修改成功")
elif system == "FaceRecogTimeout":
dialog_modify_text = DialogModifyValue(self)
caption_str = "修改人脸识别超时时间(秒)"
frt = system_parameter()
dialog_modify_text.update_modify_info("", str(frt.get_verify_timeout()), caption_str)
if dialog_modify_text.exec() == QDialog.Accepted:
modify_screen_time_str = dialog_modify_text.value
result = frt.set_verify_timeout(modify_screen_time_str)
dialog_inform = DialogInform(self)
dialog_inform.information("人脸识别超时时间修改", "人脸识别超时时间修改成功")
def on_line_edit_modify_click(self, sender :QLineEdit) :
match_object : QLineEdit = None
match_object, menu_index = self.search_menu_match_object(sender)
if match_object != None :
self.menu_key_index = menu_index
if isinstance(match_object, QLineEdit) :
self.dialog_modify_value = None
tip = match_object.statusTip()
alias_name = get_tip_alias_str(tip)
unique_name = self.get_unique_name_from_object(match_object)
if unique_name != None and alias_name != None :
key_combine_str = get_key_combine_str(alias_name, unique_name)
value_dict = utils.dict_or_object_get_attr(self.alias_dict, key_combine_str, None)
if isinstance(value_dict, dict) :
dialog_modify_alias : DialogModifyAlias = DialogModifyAlias(self)
caption_str = get_tip_caption_str(match_object.statusTip())
value_text = match_object.text()
dialog_modify_alias.set_alias_item_info(value_dict, value_text, caption_str)
self.dialog_modify_alias = dialog_modify_alias
if dialog_modify_alias.exec() == QDialog.Accepted :
modify_value = dialog_modify_alias.value
self.func_mqtt_param_modify(modify_value)
else :
inform_box : DialogInform = DialogInform()
inform_box.information('别名数据未获取', 'message: %s'%(alias_name))
else :
dialog_modify_value : DialogModifyValue = DialogModifyValue(self)
caption_str = get_tip_caption_str(match_object.statusTip())
try :
origin_value = float(match_object.text())
except Exception as e :
origin_value = None
inform_box : DialogInform = DialogInform()
inform_box.information('无效值', str(e))
if origin_value != None :
format_str = get_tip_format(tip)
dialog_modify_value.update_modify_info(format_str, origin_value, caption_str)
if dialog_modify_value.exec() == QDialog.Accepted :
modify_value = dialog_modify_value.value
self.func_mqtt_param_modify(modify_value)
def set_video_image(self, image_object : QObject, canvas_id = 0) :
if canvas_id >= len(self.canvas_label_list) :
return
camera_info_items = self.canvas_label_list[canvas_id]
camera_label: QLabel = camera_info_items[0]
timeout_list_index = camera_info_items[1]
camera_thread : CameraThread = camera_info_items[2]
if camera_label != None:
shape = image_object.shape
width = shape[1]
height = shape[0]
if camera_label.isVisible() :
qt_img = QImage(image_object.data, width, height, QImage.Format_RGB888)
camera_label.setScaledContents(True)
camera_label.setPixmap(QPixmap.fromImage(qt_img))
if timeout_list_index >= 0 :
self.widget_timeout_list[timeout_list_index][1] = 0
def set_video_image_0(self, image_object : QObject) :
self.set_video_image(image_object, 0)
def set_video_image_1(self, image_object : QObject) :
self.set_video_image(image_object, 1)
def set_video_image_2(self, image_object : QObject) :
self.set_video_image(image_object, 2)
def set_video_image_3(self, image_object : QObject) :
self.set_video_image(image_object, 3)
def set_video_image_4(self, image_object : QObject) :
self.set_video_image(image_object, 4)
def set_video_image_5(self, image_object : QObject) :
self.set_video_image(image_object, 5)
def set_video_image_6(self, image_object : QObject) :
self.set_video_image(image_object, 6)
def set_video_image_7(self, image_object : QObject) :
self.set_video_image(image_object, 7)
def set_video_image_8(self, image_object : QObject) :
self.set_video_image(image_object, 8)
def update_mqtt_value(self, circuit, json_dict) :
for json_key, json_value in json_dict.items() :
self.set_mqtt_value(circuit, json_key, json_value)
#更新视频显示与否
def update_canvas_camera_visible(self, child = None) :
widget :QWidget = child
if widget == None :
widget = self
for each_object in widget.children():
child : QWidget = each_object
if isinstance(child, QLabel) :
widget_label : QLabel = child
tip_str = widget_label.statusTip()
visible_mqtt = get_tip_visible_mqtt(tip_str, None)
if visible_mqtt != None :
#该Label具有Visible控制, 检查一下是不是视频流
canvas_id, camera_id = get_tip_canvas_id_and_camera_id(tip_str)
if canvas_id >= 0 and camera_id >= 0 :
circuit_id = self.get_circuit_from_object(widget_label)
visible_value = self.get_mqtt_value(circuit_id, visible_mqtt)
is_visible = False
if visible_value != None :
try :
is_visible = True if int(visible_value) > 0 else False
except Exception as e :
is_visible = False
widget_label.setVisible(is_visible)
if is_visible != self.canvas_visible_arr[canvas_id] and canvas_id < len(self.canvas_visible_arr):
self.canvas_visible_arr[canvas_id] = is_visible
self.virtual_connect_canvas_to_camera(canvas_id, camera_id, is_visible)
self.update_canvas_camera_visible(child)
def video_visible_timeout(self) :
self.update_canvas_camera_visible()
#通过json包刷新界面数据
def flush_widgets(self, circuit, json_dict, child = None):
widget :QWidget = child
if widget == None :
widget = self
for each_object in widget.children():
child : QWidget = each_object
if not hasattr(child, "statusTip") :
continue
tip_str : str = child.statusTip()
mqtt_name = get_tip_mqtt_name(tip_str)
circuit_id = self.get_circuit_from_object(each_object)
self.flush_widgets(circuit, json_dict, child)
if circuit_id != circuit :
continue
if mqtt_name in json_dict.keys() :
value_or_dict = json_dict[mqtt_name]
if isinstance(value_or_dict, dict) :
value_text = utils.dict_or_object_get_attr(value_or_dict, "value", "")
else :
value_text = value_or_dict
color_str = utils.dict_or_object_get_attr(value_or_dict, "color", None)
bk_color_str = utils.dict_or_object_get_attr(value_or_dict, "bk_color", None)
imag_file_name = get_imag_value_file_name(tip_str, value_text)
timeout_ms = get_tip_timeout(tip_str)
if timeout_ms > 0 :
self.reset_widget_timeout(child)
if isinstance(child, QPushButton) :
widget_button : QPushButton = child
if imag_file_name != None :
self.modify_object_style_sheet(widget_button, "border-image", "url(%s)"%(imag_file_name))
else :
self.reset_object_style_sheet(widget_button)
if isinstance(child, QLabel) :
widget_label : QLabel = child
if imag_file_name != None :
if widget_label.isVisible() == False :
widget_label.setVisible(True)
self.modify_object_style_sheet(widget_label, "background-image", "url(%s)"%(imag_file_name))
else :
self.reset_object_style_sheet(widget_label)
widget_label.setText(value_text)
if isinstance(child, QLineEdit) :
widget_line_edit : QLineEdit = child
alias_name = get_tip_alias_str(widget_line_edit.statusTip())
unique_name = self.get_unique_name_from_object(widget_line_edit)
if alias_name != None :
fvalue = float(value_text)
value = round(fvalue)
value_str = str(value)
display_text = "(%s)"%(value_str)
alias_key_str = get_key_combine_str(alias_name, unique_name)
if alias_key_str in self.alias_dict.keys() :
alias_value_dict = self.alias_dict[alias_key_str]
if isinstance(alias_value_dict, dict) :
display_text = utils.dict_or_object_get_attr(alias_value_dict, value_str, value_text)
else :
display_text = value_text
widget_line_edit.setText(display_text)
if bk_color_str != None :
if isinstance(child, QLineEdit) :
self.modify_object_style_sheet(child, "background-color", bk_color_str)
if color_str != None :
self.modify_object_style_sheet(child, "color", color_str)
def get_circuit_from_object(self, object :QWidget) :
parent : QWidget= object
circuit_id = -1
while parent != None :
if hasattr(parent, "statusTip") :
tip_str = parent.statusTip()
circuit_id = get_tip_circuit(tip_str)
if circuit_id >= 0 or parent == self:
break
parent = parent.parent()
return circuit_id
def get_main_index_from_object(self, object :QWidget) :
parent : QWidget= object
main_index = -1
while parent != None :
if not hasattr(parent, "statusTip") :
return -1
tip_str = parent.statusTip()
main_index = get_tip_main_index(tip_str)
if main_index >= 0 or parent == self:
break
parent = parent.parent()
return main_index
def get_unique_name_from_circuit(self, circuit_id : int) :
for config_dict in group_config.comm_thread_config :
device_list = utils.dict_or_object_get_attr(config_dict, "device_list", None)
if device_list != None :
for item_dict in device_list :
config_circuit_id = utils.dict_or_object_get_attr(item_dict, "circuit_id", -1)
if config_circuit_id == circuit_id :
circuit_unique_name = utils.dict_or_object_get_attr(item_dict, "unique_name", None)
return circuit_unique_name
return None
def get_unique_name_from_object(self, object :QWidget) :
circuit_id = self.get_circuit_from_object(object)
return self.get_unique_name_from_circuit(circuit_id)
def key_increase_parameter(self):
menu_count = len(self.sort_menu_list)
if menu_count == 0:
return
tip_str = self.sort_menu_list[self.menu_key_index][1].statusTip()
group_items_count = get_tip_group_end(tip_str)
if group_items_count == -1:
next_menu_index = (self.menu_key_index + 1) % menu_count
else:
next_menu_index = (self.menu_key_index - group_items_count + 1) % menu_count
self.menu_key_index = next_menu_index
self.select_object = self.sort_menu_list[self.menu_key_index][1]
self.menu_selectable_object_flush()
def key_decrease_parameter(self):
menu_count = len(self.sort_menu_list)
if menu_count == 0:
return
tip_str = self.sort_menu_list[self.menu_key_index][1].statusTip()
group_items_count = get_tip_group_start(tip_str)
if group_items_count == -1:
prev_menu_index = (self.menu_key_index - 1) % menu_count
else:
prev_menu_index = (self.menu_key_index + group_items_count - 1) % menu_count
self.menu_key_index = prev_menu_index
self.select_object = self.sort_menu_list[self.menu_key_index][1]
self.menu_selectable_object_flush()
def key_enter_process(self):
menu_count = len(self.sort_menu_list)
if menu_count == 0:
return
self.select_object = self.sort_menu_list[self.menu_key_index][1]
self.on_select_object_action_process(self.select_object)
def key_escape_process(self):
#在菜单第一项, 按下复位, 退出到上一页
if self.menu_key_index == 0:
self.virtual_change_to_page(-1)
menu_count = len(self.sort_menu_list) # 获取当前页面可交互控件个数
if menu_count == 0: # 若注册可交互控件个数为0则退出处理返回
return
# 重置菜单选择
self.select_object = self.sort_menu_list[self.menu_key_index][1]
self.menu_key_index = 0
self.menu_selectable_object_flush()
def key_Test_process0(self):
json_dict = {"Uab" :"1111",
"Ubc" : "1111",
"Uca" : "1111",
"Uo" : "11" ,
"Iab" : "1111",
"Ibc" : "1111",
"Ica" : "1111",
"Io" : "1111",
"SwitchStatus" : {"value" :"0", "bk_color" : COLOR_NORMAL, "color" : COLOR_RED}
}
self.flush_widgets(0, json_dict)
def key_Test_process1(self):
json_dict = {"Uab" :"2222",
"Ubc" : "2222",
"Uca" : "2222",
"Uo" : "22" ,
"Iab" : "2222",
"Ibc" : "2222",
"Ica" : "2222",
"Io" : "2222",
"SwitchStatus" : {"value" :"1", "bk_color" : COLOR_NORMAL, "color" : COLOR_RED}
}
self.flush_widgets(1, json_dict)
def key_Test_process2(self):
json_dict = {"Uab" :"3333", "Ubc" : "3333", "Uca" : "3333", "Uo" : "33" , "Iab" : "3333", "Ibc" : "3333", "Ica" : "3333", "Io" : "3333"}
self.flush_widgets(2, json_dict)
def key_Test_process3(self):
json_dict = {"Uab" :"33333", "Ubc" : "33333", "Uca" : "33333"}
self.flush_widgets(3, json_dict)
def menu_selectable_object_flush(self) :
menu_count = len(self.sort_menu_list)
for i in range(menu_count):
object : QWidget = self.sort_menu_list[i][1]
is_select_object_draw = False
if isinstance(object, QPushButton) :
button_object : QPushButton = object
image_file_name = get_select_imag_file_name(object.statusTip())
image_icon_name = get_select_icon_file_name(object.statusTip())
if i == self.menu_key_index:
button_object.setFocus()
if image_file_name != None :
is_select_object_draw = True
file_url = "url(%s)"%(image_file_name)
self.modify_object_style_sheet(button_object, "border-image", file_url)
elif image_icon_name != None :
is_select_object_draw = True
button_size : QSize = button_object.size()
button_object.setIcon(QIcon(image_icon_name))
button_object.setIconSize(button_size)
else :
if image_file_name != None :
is_select_object_draw = True
self.reset_object_style_sheet(button_object)
elif image_icon_name != None :
is_select_object_draw = True
button_object.setIcon(QIcon())
elif isinstance(object, QLabel) :
label_object : QLabel = object
image_file_name = get_select_imag_file_name(object.statusTip())
if i == self.menu_key_index :
if image_file_name != None :
is_select_object_draw = True
file_url = "url(%s)"%(image_file_name)
self.modify_object_style_sheet(label_object, "border-image", file_url)
else :
if image_file_name != None :
is_select_object_draw = True
self.reset_object_style_sheet(label_object)
elif isinstance(object, QLineEdit) :
line_object : QLineEdit = object
image_file_name = get_select_imag_file_name(object.statusTip())
if i == self.menu_key_index :
if image_file_name != None :
is_select_object_draw = True
file_url = "url(%s)"%(image_file_name)
self.modify_object_style_sheet(line_object, "border-image", file_url)
else :
if image_file_name != None :
is_select_object_draw = True
self.reset_object_style_sheet(line_object)
if is_select_object_draw == False :
if i == self.menu_key_index :
self.modify_object_style_sheet(object, "background-color", COLOR_BLUE)
else :
self.reset_object_style_sheet(object)
def response_topic_message_process(self, topic: str,
message : str,
is_timeout :bool = False, inform : bool = False) :
find_reponse_topic_msg = None
for wait_topic_msg in self.wait_response_list :
if wait_topic_msg[0] == topic :
find_reponse_topic_msg = wait_topic_msg
self.wait_response_list.remove(wait_topic_msg)
break
if find_reponse_topic_msg == None :
return
select_object :QWidget = find_reponse_topic_msg[2]
request_message : str = find_reponse_topic_msg[1]
inform_box : DialogInform = None
if inform :
inform_box : DialogInform = DialogInform()
try :
json_dict = json_load_message(message)
result = utils.dict_or_object_get_attr(json_dict, "result", 0)
result = int(result)
except Exception as e:
json_dict = None
result = 0
print_error_msg(str(e) + "invalid json message: %s" % (message))
if "response/action" in topic :
if is_timeout :
inform_message = "命令执行超时"
if inform == True:
inform_box.information('命令执行超时', inform_message)
else :
inform_message = '命令执行成功' if result == 1 else '命令执行失败!!!'
if inform == True:
inform_box.information('命令执行', inform_message)
elif "response/param/modify" in topic :
if is_timeout :
inform_message = '参数修改超时'
if inform == True:
inform_box.information('参数修改超时', inform_message)
else :
inform_message = '参数修改成功' if result == 1 else '参数修改失败!!!'
if inform == True:
inform_box.information('参数修改', inform_message)
if result == 1 :
self.para_or_measure_query()
elif "response/alias" in topic :
unique_name = topic.replace("response/alias/", "")
if is_timeout :
if json_dict != None :
alias_name = None
if "name" in json_dict.keys() :
alias_name = json_dict["name"]
if alias_name != None :
key_str = get_key_combine_str(alias_name, unique_name)
if key_str in self.alias_dict.keys() :
if isinstance(self.alias_dict[key_str], int) :
self.alias_dict[key_str] = ALIAS_QUERY_NONE
if inform == True:
inform_box.information('获取别名超时', message)
else :
dialog_modify_alias : DialogModifyAlias = DialogModifyAlias(self)
if isinstance(json_dict, list):
for item in json_dict:
for alias_key_name, alias_dict in item.items():
if alias_key_name in request_message:
key_str = get_key_combine_str(alias_key_name, unique_name)
if key_str in self.alias_dict.keys():
self.alias_dict[key_str] = alias_dict
if isinstance(select_object, QLineEdit):
line_select_object: QLineEdit = select_object
caption_str = get_tip_value_str(select_object.statusTip(), "Caption", "")
alias_value = int(line_select_object.text())
dialog_modify_alias.set_alias_item_info(alias_dict, alias_value, caption_str)
self.dialog_modify_alias = dialog_modify_alias
dialog_modify_alias.exec()
break
else:
for alias_key_name, alias_dict in json_dict.items():
if alias_key_name in request_message:
key_str = get_key_combine_str(alias_key_name, unique_name)
if key_str in self.alias_dict.keys():
self.alias_dict[key_str] = alias_dict
if isinstance(select_object, QLineEdit):
line_select_object: QLineEdit = select_object
caption_str = get_tip_value_str(select_object.statusTip(), "Caption", "")
alias_value = int(line_select_object.text())
dialog_modify_alias.set_alias_item_info(alias_dict, alias_value, caption_str)
self.dialog_modify_alias = dialog_modify_alias
dialog_modify_alias.exec()
break
def mqtt_topic_message_process(self, msg) :
topic_msg_item = True
while topic_msg_item != None :
self.mutex.lock()
topic_msg_count = len(self.topic_message_list)
if topic_msg_count > 0:
topic_msg_item = self.topic_message_list.pop(0)
else :
topic_msg_item = None
self.mutex.unlock()
if topic_msg_item != None :
topic = topic_msg_item[0]
message = topic_msg_item[1]
try :
json_dict = json_load_message(message)
except Exception as e :
json_dict = None
print_error_msg("invalid json message : %s"%(message))
if json_dict != None:
if "measure" in topic :
circuit_id = search_circuit_from_topic(topic)
self.update_mqtt_value(circuit_id, json_dict)
# print("<mqtt_topic_message_process>@Line:",inspect.currentframe().f_lineno,'<measure> ','<circuit_id> = ',circuit_id,'<json_dict> = ',json_dict)
self.flush_widgets(circuit_id, json_dict)
elif "param/info" in topic:
circuit_id = search_circuit_from_topic(topic)
#print("<mqtt_topic_message_process>@Line:",inspect.currentframe().f_lineno,'<param/info> ','<circuit_id> = ',circuit_id,'<json_dict> = ',json_dict)
self.update_mqtt_value(circuit_id, json_dict)
self.flush_widgets(circuit_id, json_dict)
elif "response/alarm" in topic :
circuit_id = search_circuit_from_topic(topic)
self.flush_alarminfo(circuit_id, json_dict)
self.update_mqtt_value(circuit_id, json_dict)
self.flush_widgets(circuit_id, json_dict)
elif "response" in topic :
self.response_topic_message_process(topic, message, is_timeout = False, inform = True)
elif "alarm/test" in topic :
circuit_id = search_circuit_from_topic(topic)
# print("<mqtt_topic_message_process>@Line:",inspect.currentframe().f_lineno,'<alarm/test> ','<circuit_id> = ',circuit_id,'<json_dict> = ',json_dict)
# 将告警信息刷新到页面
def flush_alarminfo(self, circuit, json_dict, child = None):
alarmTitle : QLineEdit = self.findChild(QLineEdit,"alarmTitle")
if alarmTitle != None:
display_title = "%s"%(json_dict["index"])
alarmTitle.setText(display_title)
alarmTime : QLineEdit = self.findChild(QLineEdit,"alarmTime")
if alarmTime != None:
display_year = "%s-"%(json_dict["AlarmYear"])
display_month = "%s-"%(json_dict["AlarmMonth"])
display_day = "%s"%(json_dict["AlarmDay"])
display_hour = " %s:"%(json_dict["AlarmHour"])
display_minute = "%s: "%(json_dict["AlarmMinute"])
display_second = "%s"%(json_dict["AlarmSecond"])
disp_time = display_year + display_month + display_day + display_hour+ display_minute+ display_second
alarmTime.setText(disp_time)
alarmMsg : QLineEdit = self.findChild(QLineEdit,"alarmMsg")
if alarmMsg != None:
display_msg = "%s"%(json_dict["AlarmCodeMsg"])
alarmMsg.setText(display_msg)
def on_connect(self, mqtt_thread, userdata, flags, rc) :
if rc == 0:
self.mqtt_thread = mqtt_thread
self.mqtt_thread.subscribe("response/#")
self.mqtt_thread.subscribe("param/info/#")
self.mqtt_thread.subscribe("param/modify/#")
self.mqtt_thread.subscribe("measure/#")
self.mqtt_thread.subscribe("status/#")
self.mqtt_thread.subscribe("alarm/#")
self.mqtt_thread.subscribe("response/alarm/#")
self.mqtt_thread.subscribe("action/#")
else :
self.mqtt_thread = None
def on_message(self, mqtt_thread, topic, message) :
circuit_id = search_circuit_from_topic(topic)
if circuit_id < 0 :
return
if (self.exist_circuit_mask & (0x1 << circuit_id)) == 0 :
return
self.mutex.lock()
if len(self.topic_message_list) < 100 :
self.topic_message_list.append([topic, message])
self.mutex.unlock()
self.mqtt_signal.emit("Refresh UI")
def response_topic_messge_timeout(self):
timer = self.sender()
for wait_response_items in self.wait_response_list:
wait_timer : QTimer = wait_response_items[3]
_inform : bool = wait_response_items[4]
if wait_timer == timer :
topic : str = wait_response_items[0]
message : str = wait_response_items[1]
wait_timer.stop()
self.response_topic_message_process(topic, message, is_timeout=True, inform = _inform)
break
# def mqtt_publish_and_wait_response(self, publish_topic : str, publish_message : str, select_object : QWidget = None, timeout_ms : int = 1000, inform : bool = True) :
# if self.mqtt_thread != None :
# self.mqtt_thread.publish(publish_topic, publish_message)
# response_topic = publish_topic.replace("request", "response")
# #开启超时定时器
# wait_timer = QTimer()
# wait_timer.timeout.connect(self.response_topic_messge_timeout)
# wait_timer.start(timeout_ms)
# self.wait_response_list.append([response_topic, publish_message, select_object, wait_timer, inform])
def mqtt_publish_and_wait_response(self, publish_topic: str, publish_message: str, select_object: QWidget = None, timeout_ms: int = 1000, inform: bool = True):
if self.mqtt_thread is not None:
# 发布MQTT消息
self.mqtt_thread.publish(publish_topic, publish_message)
# 构建响应主题
response_topic = publish_topic.replace("request", "response")
# 设置并启动超时定时器
wait_timer = QTimer()
wait_timer.timeout.connect(lambda: self.response_topic_message_timeout(response_topic))
wait_timer.start(timeout_ms)
# 将响应信息添加到等待列表
self.wait_response_list.append([response_topic, publish_message, select_object, wait_timer, inform])
def response_topic_message_timeout(self, response_topic):
# 处理超时事件
for response_info in self.wait_response_list:
if response_info[0] == response_topic:
response_info[3].stop() # 停止定时器
self.wait_response_list.remove(response_info)
# 在这里添加超时后的处理逻辑,例如通知用户请求超时
if response_info[4]: # 检查是否需要通知
self.inform_timeout(response_info)
break
def inform_timeout(self, response_info):
# 通知用户请求超时的逻辑
# response_info[2] 是 select_object
# 根据具体需求实现通知逻辑
print(f"Request for {response_info[1]} on topic {response_info[0]} timed out.")
def batch_mqtt_publish_and_wait_response(self, publish_list: list, timeout_ms: int = 1000, inform: bool = True):
if self.mqtt_thread is not None:
for publish_topic, publish_message, select_object in publish_list:
# 发布MQTT消息
self.mqtt_thread.publish(publish_topic, publish_message)
# 构建响应主题
response_topic = publish_topic.replace("request", "response")
# 设置并启动超时定时器
wait_timer = QTimer()
wait_timer.timeout.connect(lambda rt=response_topic: self.response_topic_message_timeout(rt))
wait_timer.start(timeout_ms)
# 将响应信息添加到等待列表
self.wait_response_list.append([response_topic, publish_message, select_object, wait_timer, inform])
def para_or_measure_query(self, child = None) :
widget :QWidget = self
if child == None :
self.param_dict = {}
else :
widget = child
for child_widget in widget.children():
if hasattr(child_widget, "statusTip") :
tip_str : str = child_widget.statusTip()
if "ScreenBlankingTime" in tip_str :
sbt = system_parameter()
child_widget.setText(sbt.get_screen_blanking_time())
continue
mqtt_str = get_tip_mqtt_name(tip_str)
if mqtt_str != None and mqtt_str not in self.param_dict.keys():
unique_name = self.get_unique_name_from_object(child_widget)
if unique_name != None and self.mqtt_thread != None:
self.mqtt_thread.publish("request/param/info/"+unique_name, '{"name":"%s"}'%mqtt_str)
self.param_dict[mqtt_str] = ALIAS_QUERY_NONE
self.para_or_measure_query(child_widget)
def para_face(self, child = None) :
widget :QWidget = self
if child == None :
self.param_dict = {}
else :
widget = child
for child_widget in widget.children():
if hasattr(child_widget, "statusTip") :
tip_str : str = child_widget.statusTip()
if "FaceRecogTimeout" in tip_str :
frt = system_parameter()
child_widget.setText(str(frt.get_verify_timeout()))
continue
self.para_face(child_widget)
def flush_system_cycle(self) :
self.flush_system_info()
if __name__ == '__main__':
style_sheet = "QLabel {color : RED;\nbackground-color : GREEN;}"
print(style_sheet)
new_style_items = modify_style_sheet("QLabel", style_sheet, "background-color", "WHITE")
new_style_items = modify_style_sheet("QLabel", new_style_items, "color", "YELLOW")
print(new_style_items)
sys.exit(0)