高耦合度使用最后一版

This commit is contained in:
冯佳
2025-07-28 08:30:12 +08:00
parent 805299ec2a
commit 1664b625ea
3 changed files with 234 additions and 67 deletions

180
main.py
View File

@ -17,7 +17,7 @@ from kivymd.uix.datatables import MDDataTable
import random
import csv
from kivymd.uix.textfield import MDTextField
from kivymd.uix.boxlayout import MDBoxLayout
from kivymd.uix.button import MDFlatButton
from kivymd.uix.dialog import MDDialog
@ -137,6 +137,7 @@ class app(MDApp):
self.back_button = None
self.user = None
self.dialog = None
self.dialog2 = None
self.profile_edit_screen = None
self.edit_email = None
self.edit_prn = None
@ -366,23 +367,6 @@ class app(MDApp):
self.register_update_event.cancel()
self.register_update_event = None
def read_modbus_register(self):
"""读取寄存器值(复用原逻辑)"""
if not self.modbus_master:
try:
self.modbus_master = TcpMaster(self.modbus_ip or '192.168.1.1', self.modbus_port or 502)
self.modbus_master.set_timeout(5.0)
except Exception as e:
print(f"Modbus连接失败: {e}")
return "连接失败"
try:
result = self.modbus_master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 1)
return str(result[0]) # 返回寄存器值
except Exception as e:
print(f"读取失败: {e}")
return "读取失败"
# def update_register_display(self, dt):
# """更新原标签的显示内容"""
# # 仅在目标屏幕时更新
@ -393,33 +377,68 @@ class app(MDApp):
# real_time_screen = self.root.get_screen("real_time_curve")
# real_time_screen.ids.register_label.text = f"寄存器 0: {value}" # 直接更新原标签
def write_modbus_register(self, value):
"""写入寄存器值"""
def write_modbus_register(self, slave_id=1, address=0, value=None):
"""
写入Modbus保持寄存器单个寄存器
:param slave_id: 从机ID默认1
:param address: 寄存器地址默认0
:param value: 要写入的值必须为整数16位范围内0-65535
:return: 操作结果字符串
"""
# 检查Modbus连接是否初始化
if not self.modbus_master:
pass
return "错误Modbus未连接请先初始化连接"
# 检查写入值是否有效
if value is None:
return "错误:未指定要写入的值"
try:
self.modbus_master.execute(1, cst.WRITE_SINGLE_REGISTER, 0, value)
return "修改成功"
# 转换为整数并检查范围Modbus保持寄存器通常为16位无符号整数
write_value = int(value)
if not (0 <= write_value <= 65535):
return "错误:值必须在0-65535范围内(16位无符号整数)"
except ValueError:
return "错误:写入值必须为整数"
try:
# 使用正确的写入函数WRITE_SINGLE_REGISTER写入单个寄存器
self.modbus_master.execute(
slave=slave_id,
function_code=cst.WRITE_SINGLE_REGISTER,
starting_address=address,
output_value=write_value # 注意参数名是output_value
)
return f"成功:从机ID={slave_id},寄存器地址={address},写入值={write_value}"
except Exception as e:
return f"修改失败: {e}"
# 捕获异常并返回错误信息
error_msg = f"失败:从机ID={slave_id},寄存器地址={address},错误信息:{str(e)}"
print(error_msg)
return error_msg
def modify_register(self, input_text):
"""处理修改请求,修改后更新原标签"""
if not input_text:
self.show_dialog("错误", "请输入值")
return
try:
value = int(input_text)
result = self.write_modbus_register(value)
self.show_dialog("结果", result)
# 若成功,立即刷新原标签显示
# 这里假设写入 从机ID=1地址=0 的寄存器
# 若需要动态指定可从界面控件中获取例如self.ids.slave_id.text
result = self.write_modbus_register(
slave_id=1,
address=0,
value=value
)
self.show_dialog("操作结果", result)
# 若成功,立即刷新显示
if "成功" in result:
self.update_register_display(0)
self.update_register_display(0) # 0是dt参数占位符
except ValueError:
self.show_dialog("错误", "请输入有效整数")
def show_dialog(self, title, message):
self.dialog2(title, message)
self.dialog1(message,title)
#############################################@Frequently used functions##################################################
@staticmethod
@ -570,42 +589,119 @@ class app(MDApp):
self.dialog.open()
# MDDialog box dismiss function 'close_dialog'
def close_dialog(self, *args):
self.dialog.dismiss()
#############################################Modbus Functions############################################################
def read_modbus_register(self):
"""读取Modbus寄存器值并更新界面显示"""
def show_modify_dialog(self, label_id, current_value=""):
"""显示修改对话框"""
if not self.dialog2:
# 创建输入框
self.modify_input = MDTextField(
id="modify_input",
hint_text="请输入新值",
input_filter="int", # 保持整数输入限制
text=current_value
)
# 创建对话框
self.dialog2 = MDDialog(
title="修改参数",
type="custom",
content_cls=MDBoxLayout(
self.modify_input,
orientation="vertical",
padding=dp(10),
spacing=dp(10),
size_hint_y=None,
height=dp(100)
),
buttons=[
MDFlatButton(
text="取消",
on_press=lambda x: self.dialog2.dismiss()
),
MDFlatButton(
text="确认",
on_press=lambda x: self.confirm_modify(label_id)
)
]
)
else:
# 重置输入框内容
self.modify_input.text = current_value
self.dialog2.open()
def confirm_modify(self, label_id):
"""确认修改并更新标签"""
new_value = self.modify_input.text
if new_value: # 确保输入不为空
# 根据标签ID获取对应的标签并更新
screen = self.root.get_screen("real_time_curve")
if label_id == "register_label" and hasattr(screen.ids, label_id):
screen.ids[label_id].text = f"通信超时: {new_value}" # 更新标签显示
self.modify_register(new_value) # 调用原有的修改方法
self.dialog2.dismiss() # 关闭对话框
def read_modbus_registers(self, slave_id, address, count):
"""
通用的Modbus保持寄存器读取函数
:param slave_id: 从机ID动态传入如1、2、3等
:param address: 寄存器地址
:param count: 读取的寄存器数量
:return: 读取结果或None失败时
"""
if not self.modbus_master:
pass
print("Modbus未初始化请先连接")
return None
try:
# 读取寄存器1号从机保持寄存器地址0长度1
result = self.modbus_master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 1)
# 结果是元组,取第一个值
return str(result[0])
# 动态传入slave_id替代固定的1
result = self.modbus_master.execute(
slave=slave_id,
function_code=cst.READ_HOLDING_REGISTERS,
starting_address=address,
quantity_of_x=count
)
print(f"从机ID {slave_id} 读取成功:{result}")
if result:
register_value = result[0] # 提取元组中的第一个元素即0
print(f"寄存器值:{register_value}")
return result
except Exception as e:
pass
print(f"从机ID {slave_id} 读取失败:{e}")
return None
def update_register_display(self, dt):
"""定时更新寄存器值显示"""
register_value = self.read_modbus_register()
register_value = self.read_modbus_registers(slave_id=1, address=0, count=1)
# 获取显示标签并更新文本
real_time_screen = self.root.get_screen("real_time_curve") # 根据实际屏幕名称调整
if hasattr(real_time_screen.ids, 'register_label'):
real_time_screen.ids.register_label.text = f"寄存器 0: {register_value}"
if register_value is not None:
real_time_screen.ids.register_label.text = f"寄存器 0: {register_value[0]}"
else:
real_time_screen.ids.register_label.text = "寄存器 0: 读取失败或超时"
###################################LoginPageWork-Start#################################################
def update_wifi_status(self, dt):
self.check_wifi()
Clock.schedule_once(self.update_wifi_status, 1) # 5秒刷新一次
Clock.schedule_once(self.update_wifi_status, 1) # 1秒刷新一次
def check_wifi(self):
if platform != 'android':
self.wifi_status_text = '非Android设备'
return
login_screen = self.root.get_screen("login") # 根据实际屏幕名称调整
if hasattr(login_screen.ids, 'wifi_status_text'):
self.wifi_status_text = '非Android设备'
return
try:
from jnius import autoclass, cast