Files
MenuPython_QT/mqtt_object.py

103 lines
4.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import menu_utils as utils
import json
from mqtt_device import class_comm_mqtt_interface, class_comm_mqtt_thread
from device_conf import class_comm_device_config
import string
from enum import Enum
import math
from print_color import *
#创建mqtt 消息属性字典,
class class_mqtt_info_object (class_comm_mqtt_interface):
def __init__(self, device_name):
class_comm_mqtt_interface.__init__(self)
self.mqtt_dict = {}
device_config_info = __import__(device_name)
self.device_config : class_comm_device_config = device_config_info.comm_device_config
self.menu_description = self.device_config.get_menu_caption_info()
menu_process_list = []
for item_dict in self.menu_description :
if "menu" in item_dict.keys() :
menu_object = item_dict["menu"]
self.__create_mqtt_dict__(menu_object, menu_process_list)
#创建mqtt索引, 用于定位mqtt消息相关的菜单属性, 内部函数, 外部不要调用
def __create_mqtt_dict__(self, menu_object, menu_process_list: list): #创建菜单对应的各项mqtt信息包含菜单内部的子菜单的各项mqtt信息
if menu_object == None :
return
if menu_object in menu_process_list : #防止递归进入无限循环
return
menu_process_list.append(menu_object) #添加到处理队列中可以防止menu_object重复处理
for menu_item in menu_object : #遍历菜单内部的各个菜单项
if "mqtt" in menu_item.keys() : #该项菜单具有mqtt属性
mqtt_info_name = menu_item["mqtt"] #获取mqtt的信息名称 mqtt的消息中包含该信息时可快速定位该 菜单项
if mqtt_info_name not in self.mqtt_dict.keys() : #mqtt信息需要具有唯一性,
self.mqtt_dict[mqtt_info_name] = menu_item #把mqtt信息与 菜单项 进行(key, value)绑定. key = mqtt信息, value为菜单项
else :
print_error_msg("Error, 菜单项有相同的mqtt字段(%s)"%(mqtt_info_name))
if "sub_menu" in menu_item.keys() :
sub_menu_name = menu_item["sub_menu"]
if isinstance(sub_menu_name, Enum):
sub_menu_name = sub_menu_name.name
sub_menu_object = self.search_menu_group_object(sub_menu_name)
self.__create_mqtt_dict__(sub_menu_object, menu_process_list) #递归调用, 直到处理完所有子菜单项
#通过菜单名搜索到菜单项
def search_menu_item(self, mqtt_info_name : string) :
if mqtt_info_name in self.mqtt_dict.keys() :
return self.mqtt_dict[mqtt_info_name]
else :
return None
#通过菜单名字来查找菜单组对象, 每个菜单组包含若干个菜单项
def search_menu_group_object(self, menu_group_name) :
menu_group_object = None
for item_dict in self.menu_description:
menu_item_name = utils.dict_or_object_get_attr(item_dict, "name", None)
if menu_item_name == menu_group_name :
menu_group_object = utils.dict_or_object_get_attr(item_dict, "menu", None)
break
return menu_group_object
#定义mqtt信息组所有设备的mqtt信息 都集中于该类
class class_mqtt_info_object_group():
def __init__(self):
self.device_mqtt_info_dict : class_mqtt_info_object = {}
def create_mqtt_info_object(self, device_name) ->class_mqtt_info_object:
try :
mqtt_info_object = None
if device_name not in self.device_mqtt_info_dict.keys() :
new_mqtt_info = class_mqtt_info_object(device_name)
self.device_mqtt_info_dict[device_name] = new_mqtt_info
mqtt_info_object = self.get_mqtt_info_object(device_name)
except Exception as e:
print_error_msg(str(e))
mqtt_info_object = None
finally :
return mqtt_info_object
def get_mqtt_info_object(self, device_name) ->class_mqtt_info_object:
matched_mqtt_info = None
if device_name in self.device_mqtt_info_dict.keys() :
matched_mqtt_info = self.device_mqtt_info_dict[device_name]
return matched_mqtt_info
def search_mqtt_menu_item_info(self, device_name, mqtt_info_name) :
if mqtt_info_name in self.device_mqtt_info_dict.keys() :
mqtt_info_object : class_mqtt_info_object = self.device_mqtt_info_dict[device_name]
menu_item = mqtt_info_object.search_menu_item(mqtt_info_name)
return menu_item
return None