modbus tk库例程使用,曲线(暂时取消)

This commit is contained in:
冯佳
2025-07-25 17:04:59 +08:00
parent 50f7531518
commit 805299ec2a
17 changed files with 1237 additions and 5321 deletions

376
main.py
View File

@ -1,5 +1,5 @@
from kivy.properties import ObjectProperty, StringProperty
from kivy.properties import ObjectProperty, StringProperty, ListProperty
from datetime import datetime, timedelta
from kivy.core.text import LabelBase
from kivy.metrics import dp
@ -26,6 +26,16 @@ from kivy.uix.label import Label
from kivy.utils import platform
from kivy.clock import Clock
from kivy.uix.widget import Widget
from kivy.graphics import Color, Rectangle
from kivy_garden.graph import Graph, LinePlot
from modbus_tk.modbus_tcp import TcpMaster
import modbus_tk.defines as cst
import socket
from random import randint
from kivymd.theming import ThemeManager
LabelBase.register(name="MPoppins", fn_regular="fonts/Chinese/msyh.ttf")
@ -41,51 +51,58 @@ def get_books():
class MainScreen(Screen):
pass
class HomeScreen(Screen):
home = ObjectProperty(None)
class LoginScreen(Screen):
login = ObjectProperty(None)
class ProfileScreen(Screen):
profile = ObjectProperty(None)
class ProfileEditScreen(Screen):
profile_edit = ObjectProperty(None)
class HistoryScreen(Screen):
history = ObjectProperty(None)
class ModifyCurrentParamScreen(Screen):
modify_current_param = ObjectProperty(None)
class ModifyVoltageParamScreen(Screen):
modify_voltage_param = ObjectProperty(None)
class ModifyLeakageParamScreen(Screen):
modify_leakage_param = ObjectProperty(None)
class ModifySystemParamScreen(Screen):
modify_system_param = ObjectProperty(None)
class ModifyProtectionParamScreen(Screen):
modify_protection_param = ObjectProperty(None)
class RealTimeCurveScreen(Screen):
real_time_curve = ObjectProperty(None)
class ControlCommandScreen(Screen):
control_command = ObjectProperty(None)
class AboutScreen(Screen):
about = ObjectProperty(None)
class SignUpScreen(Screen):
signup = ObjectProperty(None)
class ForgetPassScreen(Screen):
forget_Password = ObjectProperty(None)
class OtpScreen(Screen):
otp = ObjectProperty(None)
class ResetPassScreen(Screen):
reset_pass = ObjectProperty(None)
class HomeScreen(Screen):
home = ObjectProperty(None)
class ProfileScreen(Screen):
profile = ObjectProperty(None)
class ProfileEditScreen(Screen):
profile_edit = ObjectProperty(None)
class SearchBookScreen(Screen):
search_book = ObjectProperty(None)
class NotificationScreen(Screen):
notifications = ObjectProperty(None)
class RecommendScreen(Screen):
recommendation = ObjectProperty(None)
class BorrowBookScreen(Screen):
borrow_book = ObjectProperty(None)
class ReturnBookScreen(Screen):
return_book = ObjectProperty(None)
class RenewBookScreen(Screen):
renew_book = ObjectProperty(None)
class HistoryScreen(Screen):
history = ObjectProperty(None)
# Window.size = (dp(360), dp(680))
@ -148,6 +165,11 @@ class app(MDApp):
self.password = None
self.username = None
self.modbus_master = None # Modbus连接对象
self.modbus_ip = None # Modbus服务器IP
self.modbus_port = None # Modbus服务器端口
self.theme_cls.font_styles["H1"] = ["BPoppins", 96, False, -1.5]
self.theme_cls.font_styles["H2"] = ["BPoppins", 60, False, -0.5]
self.theme_cls.font_styles["H3"] = ["BPoppins", 48, False, 0]
@ -168,21 +190,36 @@ class app(MDApp):
screen_manager.add_widget(MainScreen(name="main"))
screen_manager.add_widget(HomeScreen(name="home"))
screen_manager.add_widget(LoginScreen(name="login"))
screen_manager.add_widget(HistoryScreen(name="history"))
screen_manager.add_widget(ProfileScreen(name="profile"))
screen_manager.add_widget(ProfileEditScreen(name="profile_edit"))
screen_manager.add_widget(ModifyCurrentParamScreen(name="modify_current_param"))
screen_manager.add_widget(ModifyVoltageParamScreen(name="modify_voltage_param"))
screen_manager.add_widget(ModifyLeakageParamScreen(name="modify_leakage_param"))
screen_manager.add_widget(ModifySystemParamScreen(name="modify_system_param"))
screen_manager.add_widget(ModifyProtectionParamScreen(name="modify_protection_param"))
screen_manager.add_widget(RealTimeCurveScreen(name="real_time_curve"))
screen_manager.add_widget(ControlCommandScreen(name="control_command"))
screen_manager.add_widget(AboutScreen(name="about"))
screen_manager.add_widget(SignUpScreen(name="signup"))
screen_manager.add_widget(ForgetPassScreen(name="forgot_pass"))
screen_manager.add_widget(OtpScreen(name="otp"))
screen_manager.add_widget(ResetPassScreen(name="reset_pass"))
screen_manager.add_widget(ProfileScreen(name="profile"))
screen_manager.add_widget(ProfileEditScreen(name="profile_edit"))
screen_manager.add_widget(NotificationScreen(name="notification"))
screen_manager.add_widget(SearchBookScreen(name="searchBook"))
screen_manager.add_widget(RecommendScreen(name="recommend"))
screen_manager.add_widget(BorrowBookScreen(name="borrow_book"))
screen_manager.add_widget(ReturnBookScreen(name="return_book"))
screen_manager.add_widget(RenewBookScreen(name="renew_book"))
screen_manager.add_widget(HistoryScreen(name="history"))
Clock.schedule_once(self.update_wifi_status, 2)
# Clock.schedule_interval(self.update_register_display, 1)
return screen_manager
#############################################ALL INPUT TEXT############################################################
@ -232,21 +269,21 @@ class app(MDApp):
self.d4 = otp_screen.ids.d4
# 从根窗口中获取名为"signup"的屏幕(注册界面)
signup_screen = self.root.get_screen("signup")
# signup_screen = self.root.get_screen("signup")
# 通过界面ID获取注册界面中的用户名输入框组件
self.user_name = signup_screen.ids.signup_name
# self.user_name = signup_screen.ids.signup_name
# 通过界面ID获取注册界面中的PRN可能是学号/身份标识)输入框组件
self.user_prn = signup_screen.ids.signup_prn
# self.user_prn = signup_screen.ids.signup_prn
# 通过界面ID获取注册界面中的邮箱输入框组件
self.user_email = signup_screen.ids.signup_email
# self.user_email = signup_screen.ids.signup_email
# 通过界面ID获取注册界面中的手机号输入框组件
self.user_no = signup_screen.ids.signup_no
# self.user_no = signup_screen.ids.signup_no
# 通过界面ID获取注册界面中的密码输入框组件
self.user_pass = signup_screen.ids.signup_pass
# self.user_pass = signup_screen.ids.signup_pass
# 通过界面ID获取注册界面中的所属部门/专业选择组件
self.signup_branch = signup_screen.ids.signup_branch
# self.signup_branch = signup_screen.ids.signup_branch
# 通过界面ID获取注册界面中的年级/学期选择组件
self.signup_sem = signup_screen.ids.signup_sem
# self.signup_sem = signup_screen.ids.signup_sem
# 从根窗口中获取名为"profile"的屏幕(个人资料界面)
profile_screen = self.root.get_screen("profile")
@ -314,7 +351,76 @@ class app(MDApp):
# 通过界面ID获取推荐界面中的推荐信息标签组件
self.rec_label = self.recommend_screen.ids.rec_label
#############################################@Frequently used functions##################################################
self.root.bind(current=self.on_screen_changed)
self.register_update_event = None
def on_screen_changed(self, instance, value):
"""屏幕切换时启动/停止刷新"""
if value == "real_time_curve":
# 进入目标屏幕时启动定时刷新
if not self.register_update_event:
self.register_update_event = Clock.schedule_interval(self.update_register_display, 1)
else:
# 离开时停止刷新
if self.register_update_event:
self.register_update_event.cancel()
self.register_update_event = None
def read_modbus_register(self):
"""读取寄存器值(复用原逻辑)"""
if not self.modbus_master:
try:
self.modbus_master = TcpMaster(self.modbus_ip or '192.168.1.1', self.modbus_port or 502)
self.modbus_master.set_timeout(5.0)
except Exception as e:
print(f"Modbus连接失败: {e}")
return "连接失败"
try:
result = self.modbus_master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 1)
return str(result[0]) # 返回寄存器值
except Exception as e:
print(f"读取失败: {e}")
return "读取失败"
# def update_register_display(self, dt):
# """更新原标签的显示内容"""
# # 仅在目标屏幕时更新
# if self.root.current != "real_time_curve":
# return
# # 读取值并更新到原标签
# value = self.read_modbus_register()
# real_time_screen = self.root.get_screen("real_time_curve")
# real_time_screen.ids.register_label.text = f"寄存器 0: {value}" # 直接更新原标签
def write_modbus_register(self, value):
"""写入寄存器值"""
if not self.modbus_master:
pass
try:
self.modbus_master.execute(1, cst.WRITE_SINGLE_REGISTER, 0, value)
return "修改成功"
except Exception as e:
return f"修改失败: {e}"
def modify_register(self, input_text):
"""处理修改请求,修改后更新原标签"""
if not input_text:
self.show_dialog("错误", "请输入值")
return
try:
value = int(input_text)
result = self.write_modbus_register(value)
self.show_dialog("结果", result)
# 若成功,立即刷新原标签显示
if "成功" in result:
self.update_register_display(0)
except ValueError:
self.show_dialog("错误", "请输入有效整数")
def show_dialog(self, title, message):
self.dialog2(title, message)
#############################################@Frequently used functions##################################################
@staticmethod
def change_cursor(is_enter):
@ -436,61 +542,62 @@ class app(MDApp):
buttons=[close_button]
)
self.dialog.open()
def dialog2(self, title, text):
close_button = MDFlatButton(
text="关闭",
font_name="MPoppins",
on_release=self.close_dialog
)
content_label = Label(
text=text,
font_name="MPoppins",
color=(0, 0, 0, 1),
size_hint_y=None,
height=dp(30),
halign="center",
valign="middle"
)
content_label.bind(size=content_label.setter('text_size'))
self.dialog = MDDialog(
title=title,
type="custom",
content_cls=content_label,
size_hint=(0.84, None),
buttons=[close_button]
)
self.dialog.open()
# MDDialog box dismiss function 'close_dialog'
def close_dialog(self, *args):
self.dialog.dismiss()
#############################################Modbus Functions############################################################
def read_modbus_register(self):
"""读取Modbus寄存器值并更新界面显示"""
if not self.modbus_master:
pass
try:
# 读取寄存器1号从机保持寄存器地址0长度1
result = self.modbus_master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 1)
# 结果是元组,取第一个值
return str(result[0])
except Exception as e:
pass
def update_register_display(self, dt):
"""定时更新寄存器值显示"""
register_value = self.read_modbus_register()
# 获取显示标签并更新文本
real_time_screen = self.root.get_screen("real_time_curve") # 根据实际屏幕名称调整
if hasattr(real_time_screen.ids, 'register_label'):
real_time_screen.ids.register_label.text = f"寄存器 0: {register_value}"
###################################LoginPageWork-Start#################################################
# # login Verification function "verify"
# def verify(self, obj):
# status = False
# if self.username.text == "" or self.password.text == "":
# check = "请输入账户和密码"
# return self.dialog1(check)
# else:
# name = ""
# with open('data/Users.csv', "r", encoding="utf-8") as file:
# csv_reader = csv.reader(file, delimiter=",")
# for line in csv_reader:
# try:
# if not obj:
# a = (line[1] == self.username.text or line[2] == self.username.text)
# else:
# a = (line[1] == self.edit_prn.text or line[2] == self.edit_email.text)
# if a and line[4] == self.password.text:
# status = True
# name = line[0]
# prn = line[1]
# email = line[2]
# number = line[3]
# branch = line[5]
# semester = line[6]
# break
# except IndexError:
# pass
# except Exception as e:
# print(e)
# if status:
# if not obj:
# check = "Login Successful" + "\nHello! " + name
# else:
# check = "Profile Updated Successfully!"
# self.secure_profile()
# self.root.current = "home"
# self.user.text = f"""[b]Hey! {name}[/b]"""
# self.profile_name.text = self.edit_name.text = name
# self.profile_email.text = self.edit_email.text = email
# self.profile_prn.text = self.edit_prn.text = prn
# self.profile_number.text = self.edit_number.text = number
# self.profile_semester.text = self.edit_semester.text = semester
# self.profile_branch.text = self.edit_branch.text = branch
# else:
# check = "Login Failed!. " + "Enter correct username and password."
# self.dialog1(check)
def update_wifi_status(self, dt):
self.check_wifi()
Clock.schedule_once(self.update_wifi_status, 1) # 5秒刷新一次
@ -558,21 +665,23 @@ class app(MDApp):
else:
wifi_id = "zhizhan-2" # 非 Android 用模拟 WiFi
except Exception as e:
self.dialog1(f"获取WiFi信息失败{e}")
self.dialog1(f"获取WiFi信息失败:{e}")
return
try:
with open("data/Users.csv", "r", encoding="utf-8") as file:
csv_reader = csv.reader(file)
for line in csv_reader:
# 确保至少有7列数据
if len(line) < 7:
if len(line) < 8:
continue
csv_ssid = line[7].strip().lower() if len(line) > 7 else ""
# 假设SSID存储在第8列(索引7)
csv_ssid = line[7].strip().lower()
# 更精确匹配SSID且忽略大小写
if wifi_id == csv_ssid or wifi_id in map(str.lower, line):
name, prn, email, number, password, branch, semester = line[:7]
name, prn, email, number, password, branch, semester,csv_ssid = line[:8]
self.root.current = "home"
self.user.text = f"[b]Hey! {name}[/b]"
@ -583,35 +692,80 @@ class app(MDApp):
self.profile_semester.text = self.edit_semester.text = semester
self.profile_branch.text = self.edit_branch.text = branch
self.dialog1(f"欢迎你,{name} wifi认证成功")
self.connect_modbus()
self.dialog1(f"欢迎你,{name}!\n认证成功!")
return
except Exception as e:
self.dialog1(f"读取用户信息失败:{e}")
return
self.dialog1("当前WiFi认证失败,请检查网络或退出")
self.dialog1("认证失败,请检查网络或确保手机权限打开定位和连接到目标WiFi")
# 添加Modbus连接函数
def connect_modbus(self):
try:
# 从CSV文件读取Modbus配置
modbus_ip = None
modbus_port = 502 # 默认端口
# 获取当前连接的WiFi SSID
if platform == "android":
from jnius import autoclass
PythonActivity = autoclass('org.kivy.android.PythonActivity')
Context = autoclass('android.content.Context')
activity = PythonActivity.mActivity
wifi_service = activity.getSystemService(Context.WIFI_SERVICE)
wifi_info = wifi_service.getConnectionInfo()
current_wifi_id = wifi_info.getSSID().strip('"').lower()
else:
current_wifi_id = "zhizhan-2" # 非Android环境模拟WiFi
# 读取CSV文件查找匹配的Modbus配置
with open("data/Users.csv", "r", encoding="utf-8") as file:
csv_reader = csv.reader(file)
headers = next(csv_reader) # 获取表头
for line in csv_reader:
if len(line) < 10: # 确保有足够的字段
continue
# 匹配当前连接的WiFi SSID
csv_ssid = line[7].strip().lower()
if current_wifi_id == csv_ssid or current_wifi_id in map(str.lower, line):
modbus_ip = line[8]
modbus_port = int(line[9]) if line[9] else 502
break # 找到匹配项则退出循环
if not modbus_ip:
# self.dialog1("未找到与当前WiFi匹配的Modbus配置")
return
# 断开现有连接(如果存在)
if hasattr(self, 'modbus_master') and self.modbus_master:
self.modbus_master.close()
# 创建新连接
self.modbus_master = TcpMaster(host=modbus_ip, port=modbus_port)
self.modbus_master.set_timeout(5.0) # 设置超时时间
# 测试连接(读取一个保持寄存器)
self.modbus_master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 1)
print(f"Modbus连接成功\nIP: {modbus_ip}\n端口: {modbus_port}")
except FileNotFoundError:
self.dialog1("配置文件user.csv未找到")
except socket.error as e:
self.dialog1(f"Modbus连接失败: 网络错误\n{e}")
self.modbus_master = None
except Exception as e:
self.dialog1(f"Modbus连接失败: {e}")
self.modbus_master = None
click_count = 0
def login_mode(self, username_text, button_text):
self.click_count += 1
if self.click_count % 2 == 0:
username_text.hint_text = "Enter PRN"
button_text.text = "VIT Email Login"
username_text.icon_right = "dialpad"
username_text.helper_text = "PRN of 8 digits starting with 1 2 ......only*"
username_text.max_text_length = 8
else:
username_text.hint_text = "Enter Email"
username_text.icon_right = "email-arrow-left"
button_text.text = "VIT PRN Login"
username_text.helper_text = "enter email ID of vit.edu only*"
username_text.max_text_length = 50
###############################################SIGNUP Page Functions#####################################################
# signup Verification function "check_signup"
def check_signup(self, name, prn, email, num, password, branch, sem):
def check_signup(self, name, prn, email, num, password, branch, sem, ssid, modbus_ip, modbus_port):
if name == "" or prn == "" or email == "" or num == "" or password == "" or branch == "" or sem == "":
check = "Enter all required fields"
return self.dialog1(check)
@ -630,7 +784,6 @@ class app(MDApp):
if line[2] == email:
status = "email"
break
else:
status = False
except IndexError:
@ -647,6 +800,9 @@ class app(MDApp):
else:
self.root.current = "otp"
self.send_otp(num)
# 保存注册信息时包含Modbus信息
self.signup_modbus_ip = modbus_ip
self.signup_modbus_port = modbus_port
check = "One Time Password has been shared on your registered Whatsapp No. Successful!."
self.dialog1(check)
@ -1226,14 +1382,6 @@ class app(MDApp):
self.book_pos.text = "Please check the Spell!"
# LabelBase.register(name="MPoppins", fn_regular="fonts/Poppins/Poppins-Medium.ttf")
# LabelBase.register(name="BPoppins", fn_regular="fonts/Poppins/Poppins-SemiBold.ttf")
# LabelBase.register(name="RRubik", fn_regular="fonts/Rubik_Vinyl/RubikVinyl-Regular.ttf")
# LabelBase.register(name="RCro", fn_regular="fonts/Croissant_One/CroissantOne-Regular.ttf")
# LabelBase.register(name="RPac", fn_regular="fonts/Pacifico/Pacifico-Regular.ttf")
if __name__ == '__main__':
app().run()