740 lines
29 KiB
Python
740 lines
29 KiB
Python
|
||
from kivy.properties import ObjectProperty, StringProperty, ListProperty
|
||
from datetime import datetime, timedelta
|
||
from kivy.core.text import LabelBase
|
||
from kivy.metrics import dp
|
||
from kivy.uix.anchorlayout import AnchorLayout
|
||
from kivy.uix.boxlayout import BoxLayout
|
||
from kivy.uix.button import Button
|
||
from kivy.uix.gridlayout import GridLayout
|
||
from kivy.uix.image import Image
|
||
from kivymd.app import MDApp
|
||
from kivy.core.window import Window
|
||
from kivy.uix.screenmanager import ScreenManager
|
||
from kivy.lang import Builder
|
||
from kivy.uix.screenmanager import ScreenManager, Screen
|
||
from kivymd.uix.datatables import MDDataTable
|
||
import random
|
||
import threading
|
||
import csv
|
||
from kivymd.uix.textfield import MDTextField
|
||
from kivymd.uix.boxlayout import MDBoxLayout
|
||
from kivymd.uix.button import MDFlatButton
|
||
from kivymd.uix.dialog import MDDialog
|
||
from kivymd.uix.label import MDLabel
|
||
from kivy.uix.label import Label
|
||
|
||
from kivy.utils import platform
|
||
from kivy.clock import Clock
|
||
from kivy.uix.widget import Widget
|
||
|
||
from kivy.graphics import Color, Rectangle
|
||
from kivy_garden.graph import Graph, LinePlot
|
||
|
||
import modbus_tk
|
||
import modbus_tk.defines as cst
|
||
import socket
|
||
from modbus_client import ModbusClient
|
||
import re
|
||
from threading import Thread
|
||
from functools import partial
|
||
from collections import defaultdict
|
||
|
||
from random import randint
|
||
from kivymd.theming import ThemeManager
|
||
|
||
LabelBase.register(name="MPoppins", fn_regular="fonts/Chinese/msyh.ttf")
|
||
LabelBase.register(name="BPoppins", fn_regular="fonts/Chinese/msyh.ttf")
|
||
LabelBase.register(name="RRubik", fn_regular="fonts/Chinese/msyh.ttf")
|
||
LabelBase.register(name="RCro", fn_regular="fonts/Chinese/msyh.ttf")
|
||
LabelBase.register(name="RPac", fn_regular="fonts/Chinese/msyh.ttf")
|
||
|
||
|
||
class MainScreen(Screen):
|
||
pass
|
||
class HomeScreen(Screen):
|
||
home = ObjectProperty(None)
|
||
class LoginScreen(Screen):
|
||
login = ObjectProperty(None)
|
||
class ProfileScreen(Screen):
|
||
profile = ObjectProperty(None)
|
||
class ProfileEditScreen(Screen):
|
||
profile_edit = ObjectProperty(None)
|
||
class HistoryScreen(Screen):
|
||
history = ObjectProperty(None)
|
||
|
||
class ModifyCurrentParamScreen(Screen):
|
||
modify_current_param = ObjectProperty(None)
|
||
class ModifyVoltageParamScreen(Screen):
|
||
modify_voltage_param = ObjectProperty(None)
|
||
class ModifyLeakageParamScreen(Screen):
|
||
modify_leakage_param = ObjectProperty(None)
|
||
class ModifySystemParamScreen(Screen):
|
||
modify_system_param = ObjectProperty(None)
|
||
class ModifyProtectionParamScreen(Screen):
|
||
modify_protection_param = ObjectProperty(None)
|
||
class RealTimeCurveScreen(Screen):
|
||
real_time_curve = ObjectProperty(None)
|
||
class ControlCommandScreen(Screen):
|
||
control_command = ObjectProperty(None)
|
||
class AboutScreen(Screen):
|
||
about = ObjectProperty(None)
|
||
|
||
|
||
# Window.size = (dp(360), dp(680))
|
||
|
||
class app(MDApp):
|
||
wifi_status_text = StringProperty("")
|
||
def __init__(self):
|
||
super().__init__()
|
||
|
||
self.user_name = None
|
||
self.dialog = None
|
||
self.dialog2 = None
|
||
|
||
self.profile_edit_screen = None
|
||
|
||
self.edit_user_pass = None
|
||
self.edit_user_name = None
|
||
self.edit_wifi_ssid = None
|
||
self.edit_modbus_ip = None
|
||
self.edit_modbus_port = None
|
||
self.edit_nfc_id = None
|
||
self.edit_reserve = None
|
||
|
||
|
||
self.profile_user_pass = None
|
||
self.profile_user_name = None
|
||
self.profile_wifi_ssid = None
|
||
self.profile_modbus_ip = None
|
||
self.profile_modbus_port = None
|
||
self.profile_nfc_id = None
|
||
self.profile_reserve = None
|
||
|
||
|
||
self.user_pass = None
|
||
self.d1 = None
|
||
self.edit_semester = None
|
||
self.otp_button = None
|
||
self.new_pass = None
|
||
self.new_pass1 = None
|
||
self.mobile_no = None
|
||
self.password = None
|
||
self.username = None
|
||
|
||
self.modbus_master = ModbusClient() # Modbus连接对象
|
||
self.modbus_ip = None # Modbus服务器IP
|
||
self.modbus_port = None # Modbus服务器端口
|
||
|
||
|
||
self.theme_cls.font_styles["H1"] = ["BPoppins", 96, False, -1.5]
|
||
self.theme_cls.font_styles["H2"] = ["BPoppins", 60, False, -0.5]
|
||
self.theme_cls.font_styles["H3"] = ["BPoppins", 48, False, 0]
|
||
self.theme_cls.font_styles["H4"] = ["BPoppins", 34, False, 0.25]
|
||
self.theme_cls.font_styles["H5"] = ["BPoppins", 25, False, 0.15]
|
||
self.theme_cls.font_styles["H6"] = ["BPoppins", 16, False, 0.15]
|
||
self.theme_cls.font_styles["Subtitle1"] = ["BPoppins", 16, False, 0.15]
|
||
self.theme_cls.font_styles["Subtitle2"] = ["BPoppins", 14, False, 0.1]
|
||
self.theme_cls.font_styles["Body1"] = ["BPoppins", 16, False, 0.5]
|
||
self.theme_cls.font_styles["Body2"] = ["BPoppins", 14, False, 0.25]
|
||
self.theme_cls.font_styles["Button"] = ["BPoppins", 14, True, 1.25]
|
||
self.theme_cls.font_styles["Caption"] = ["BPoppins", 13, False, 0.15]
|
||
self.theme_cls.font_styles["Overline"] = ["BPoppins", 10, True, 1.5]
|
||
|
||
def on_text_field_focus(self, instance, value):
|
||
if value:
|
||
# 当MDTextField获得焦点时
|
||
# 获取ScrollView和MDList的引用
|
||
scroll_view = self.root.get_screen('modify_current_param').ids.real_time_scroll_view
|
||
md_list = self.root.get_screen('modify_current_param').ids.real_time_md_list
|
||
|
||
# 计算MDTextField在MDList中的相对位置
|
||
# instance.y 是MDTextField在父控件(BoxLayout)中的y坐标
|
||
# instance.parent.y 是BoxLayout在MDList中的y坐标
|
||
# 所以MDTextField在MDList中的y坐标是 instance.y + instance.parent.y
|
||
text_field_y_in_mdlist = instance.y + instance.parent.y
|
||
|
||
# 计算需要滚动的目标位置
|
||
# 目标是让MDTextField的底部与ScrollView的底部对齐,或者至少在可见区域内
|
||
# scroll_view.height 是ScrollView的可见高度
|
||
# md_list.height 是MDList的总高度
|
||
# scroll_view.scroll_y 的范围是0到1,0表示底部,1表示顶部
|
||
|
||
# 计算MDTextField相对于MDList顶部的距离
|
||
relative_y = md_list.height - text_field_y_in_mdlist - instance.height
|
||
|
||
# 将相对距离转换为scroll_y的值
|
||
# 确保不会滚动到超出范围
|
||
if md_list.height > scroll_view.height:
|
||
target_scroll_y = relative_y / (md_list.height - scroll_view.height)
|
||
# 限制target_scroll_y在0到1之间
|
||
target_scroll_y = max(0, min(1, target_scroll_y))
|
||
scroll_view.scroll_y = target_scroll_y
|
||
|
||
def build(self):
|
||
Builder.load_file('kv/app.kv')
|
||
screen_manager = ScreenManager()
|
||
screen_manager.add_widget(MainScreen(name="main"))
|
||
screen_manager.add_widget(HomeScreen(name="home"))
|
||
screen_manager.add_widget(LoginScreen(name="login"))
|
||
screen_manager.add_widget(HistoryScreen(name="history"))
|
||
screen_manager.add_widget(ProfileScreen(name="profile"))
|
||
screen_manager.add_widget(ProfileEditScreen(name="profile_edit"))
|
||
|
||
screen_manager.add_widget(ModifyCurrentParamScreen(name="modify_current_param"))
|
||
screen_manager.add_widget(ModifyVoltageParamScreen(name="modify_voltage_param"))
|
||
screen_manager.add_widget(ModifyLeakageParamScreen(name="modify_leakage_param"))
|
||
screen_manager.add_widget(ModifySystemParamScreen(name="modify_system_param"))
|
||
screen_manager.add_widget(ModifyProtectionParamScreen(name="modify_protection_param"))
|
||
screen_manager.add_widget(RealTimeCurveScreen(name="real_time_curve"))
|
||
screen_manager.add_widget(ControlCommandScreen(name="control_command"))
|
||
screen_manager.add_widget(AboutScreen(name="about"))
|
||
|
||
# 添加其他屏幕
|
||
|
||
|
||
return screen_manager
|
||
|
||
#############################################ALL INPUT TEXT############################################################
|
||
def on_start(self):
|
||
|
||
login_screen = self.root.get_screen("login")
|
||
|
||
home_screen = self.root.get_screen("home")
|
||
self.user = home_screen.ids.user
|
||
|
||
profile_screen = self.root.get_screen("profile")
|
||
self.profile_user_name = profile_screen.ids.profile_user_name
|
||
self.profile_user_pass = profile_screen.ids.profile_user_pass
|
||
self.profile_wifi_ssid = profile_screen.ids.profile_wifi_ssid
|
||
self.profile_modbus_ip = profile_screen.ids.profile_modbus_ip
|
||
self.profile_modbus_port = profile_screen.ids.profile_modbus_port
|
||
self.profile_nfc_id = profile_screen.ids.profile_nfc_id
|
||
self.profile_reserve = profile_screen.ids.profile_reserve
|
||
|
||
self.profile_edit_screen = self.root.get_screen("profile_edit")
|
||
|
||
self.edit_user_name = self.profile_edit_screen.ids.edit_user_name
|
||
self.edit_user_pass = self.profile_edit_screen.ids.edit_user_pass
|
||
self.edit_wifi_ssid = self.profile_edit_screen.ids.edit_wifi_ssid
|
||
self.edit_modbus_ip = self.profile_edit_screen.ids.edit_modbus_ip
|
||
self.edit_modbus_port = self.profile_edit_screen.ids.edit_modbus_port
|
||
self.edit_nfc_id = self.profile_edit_screen.ids.edit_nfc_id
|
||
self.edit_reserve = self.profile_edit_screen.ids.edit_reserve
|
||
|
||
|
||
self.root.bind(current=self.on_screen_changed)
|
||
self.register_update_event = None
|
||
self.wifi_update_event = None
|
||
|
||
def on_screen_changed(self, instance, value):
|
||
"""屏幕切换时启动/停止刷新"""
|
||
if value == "real_time_curve":
|
||
# 进入目标屏幕时启动定时刷新
|
||
if not self.register_update_event:
|
||
self.register_update_event = Clock.schedule_interval(self.update_register_display, 1)
|
||
else:
|
||
# 离开时停止刷新
|
||
if self.register_update_event:
|
||
self.register_update_event.cancel()
|
||
self.register_update_event = None
|
||
|
||
if value == "login":
|
||
if not self.wifi_update_event:
|
||
self.wifi_update_event = Clock.schedule_interval(self.update_wifi_status, 1)
|
||
else:
|
||
if self.wifi_update_event:
|
||
self.wifi_update_event.cancel()
|
||
self.wifi_update_event = None
|
||
|
||
#############################################@Frequently used functions##################################################
|
||
|
||
@staticmethod
|
||
def change_cursor(is_enter):
|
||
"""改变鼠标指针样式"""
|
||
# 如果is_enter为True(鼠标进入目标区域),将鼠标指针改为手型
|
||
if is_enter:
|
||
Window.set_system_cursor('hand')
|
||
# 否则(鼠标离开目标区域),将鼠标指针改回默认箭头样式
|
||
else:
|
||
Window.set_system_cursor('arrow')
|
||
|
||
@staticmethod
|
||
def toggle_password_visibility(password_field, icon_button):
|
||
"""切换密码输入框的可见性状态"""
|
||
# 检查密码输入框当前是否处于密码隐藏状态
|
||
if password_field.password:
|
||
# 如果是隐藏状态,切换为可见状态
|
||
password_field.password = False
|
||
# 更新图标为"eye"(眼睛图标,表示当前可见)
|
||
icon_button.icon = "eye"
|
||
else:
|
||
# 如果是可见状态,切换为隐藏状态(显示为圆点/星号)
|
||
password_field.password = True
|
||
# 更新图标为"eye-off"(闭眼图标,表示当前隐藏)
|
||
icon_button.icon = "eye-off"
|
||
|
||
def clear_text(self):
|
||
pass
|
||
|
||
def on_press_back_arrow(self):
|
||
pass
|
||
|
||
|
||
|
||
|
||
|
||
############################################@DIALOG BOX most used functions##############################################
|
||
# MDDialog box function 'dialog'
|
||
|
||
def dialog1(self, text, title="Tips"):
|
||
close_button = MDFlatButton(
|
||
text="关闭",
|
||
font_name="MPoppins",
|
||
on_release=self.close_dialog
|
||
)
|
||
|
||
content_label = Label(
|
||
text=text,
|
||
font_name="MPoppins",
|
||
color=(0, 0, 0, 1),
|
||
size_hint_y=None,
|
||
height=dp(30),
|
||
halign="center",
|
||
valign="middle"
|
||
)
|
||
content_label.bind(size=content_label.setter('text_size'))
|
||
|
||
self.dialog = MDDialog(
|
||
title=title,
|
||
type="custom",
|
||
content_cls=content_label,
|
||
size_hint=(0.84, None),
|
||
buttons=[close_button]
|
||
)
|
||
self.dialog.open()
|
||
def dialog2(self, title, text):
|
||
close_button = MDFlatButton(
|
||
text="关闭",
|
||
font_name="MPoppins",
|
||
on_release=self.close_dialog
|
||
)
|
||
|
||
content_label = Label(
|
||
text=text,
|
||
font_name="MPoppins",
|
||
color=(0, 0, 0, 1),
|
||
size_hint_y=None,
|
||
height=dp(30),
|
||
halign="center",
|
||
valign="middle"
|
||
)
|
||
content_label.bind(size=content_label.setter('text_size'))
|
||
|
||
self.dialog = MDDialog(
|
||
title=title,
|
||
type="custom",
|
||
content_cls=content_label,
|
||
size_hint=(0.84, None),
|
||
buttons=[close_button]
|
||
)
|
||
self.dialog.open()
|
||
|
||
|
||
|
||
# MDDialog box dismiss function 'close_dialog'
|
||
def close_dialog(self, *args):
|
||
self.dialog.dismiss()
|
||
|
||
|
||
|
||
#############################################Modbus Functions############################################################
|
||
def show_modify_dialog(self, label_id, current_value=""):
|
||
"""显示修改对话框"""
|
||
if not self.dialog2:
|
||
# 创建输入框
|
||
self.modify_input = MDTextField(
|
||
id="modify_input",
|
||
hint_text="请输入新值",
|
||
input_filter="int", # 保持整数输入限制
|
||
text=current_value
|
||
)
|
||
|
||
# 创建对话框
|
||
self.dialog2 = MDDialog(
|
||
title="修改参数",
|
||
type="custom",
|
||
content_cls=MDBoxLayout(
|
||
self.modify_input,
|
||
orientation="vertical",
|
||
padding=dp(10),
|
||
spacing=dp(10),
|
||
size_hint_y=None,
|
||
height=dp(100)
|
||
),
|
||
buttons=[
|
||
MDFlatButton(
|
||
text="取消",
|
||
on_press=lambda x: self.dialog2.dismiss()
|
||
),
|
||
MDFlatButton(
|
||
text="确认",
|
||
on_press=lambda x: self.confirm_modify(label_id)
|
||
)
|
||
]
|
||
)
|
||
else:
|
||
# 重置输入框内容
|
||
self.modify_input.text = current_value
|
||
|
||
self.dialog2.open()
|
||
|
||
def confirm_modify(self, label_id):
|
||
"""确认修改并更新标签,增强鲁棒性"""
|
||
try:
|
||
new_value = self.modify_input.text
|
||
if not new_value:
|
||
self.show_dialog("错误", "请输入新值")
|
||
return
|
||
|
||
screen = self.root.get_screen("real_time_curve")
|
||
label = screen.ids.get(label_id, None)
|
||
if label:
|
||
label.text = f"通信超时: {new_value}"
|
||
# 调用原有的修改方法,增加异常捕获
|
||
try:
|
||
self.modify_register(new_value)
|
||
except Exception as e:
|
||
self.show_dialog("错误", f"寄存器修改失败:{e}")
|
||
else:
|
||
self.show_dialog("错误", "未找到目标标签")
|
||
except Exception as e:
|
||
self.show_dialog("错误", f"操作异常:{e}")
|
||
finally:
|
||
if self.dialog2:
|
||
self.dialog2.dismiss()
|
||
|
||
def read_modbus_registers(self, slave_id=1, address=0, count=1):
|
||
"""
|
||
通用的Modbus保持寄存器读取函数
|
||
:param slave_id: 从机ID
|
||
:param address: 寄存器地址
|
||
:param count: 读取数量
|
||
:return: {'success': bool, 'data': list, 'msg': str}
|
||
"""
|
||
return self.modbus_master.read_holding_registers(slave_id, address, count)
|
||
|
||
def modify_register(self, field_widget, input_text):
|
||
"""
|
||
用户主动修改时,写入指定寄存器并刷新显示
|
||
:param field_widget: MDTextField控件对象
|
||
:param input_text: 用户输入的值
|
||
"""
|
||
address = getattr(field_widget, 'comm_address', None)
|
||
slave_id = 1 # 可根据实际情况动态获取
|
||
if address is None:
|
||
self.show_dialog("错误", "未设置通讯地址")
|
||
return
|
||
if not input_text:
|
||
self.show_dialog("错误", "请输入值")
|
||
return
|
||
try:
|
||
value = int(input_text)
|
||
result = self.write_modbus_register(slave_id=slave_id, address=address, value=value)
|
||
# self.show_dialog("操作结果", result)
|
||
self.update_register_display(0)
|
||
except ValueError:
|
||
self.show_dialog("错误", "请输入有效整数")
|
||
|
||
def write_modbus_register(self, slave_id=1, address=0, value=None):
|
||
"""
|
||
通用写入Modbus保持寄存器(单个寄存器)
|
||
:param slave_id: 从机ID
|
||
:param address: 寄存器地址
|
||
:param value: 要写入的值(整数,0-65535)
|
||
:return: {'success': bool, 'msg': str}
|
||
"""
|
||
return self.modbus_master.write_single_register(slave_id, address, value)
|
||
|
||
def show_dialog(self, title, message):
|
||
self.dialog1(message,title)
|
||
|
||
def update_register_display(self, dt):
|
||
real_time_screen = self.root.get_screen("real_time_curve")
|
||
widgets = [w for w in real_time_screen.ids.values()
|
||
if isinstance(w, MDTextField) and hasattr(w, 'comm_address')]
|
||
|
||
# 收集未聚焦的寄存器地址
|
||
addr_widget_map = {}
|
||
for w in widgets:
|
||
if not w.focus:
|
||
addr = getattr(w, 'comm_address', None)
|
||
if addr is not None:
|
||
addr_widget_map[addr] = w
|
||
|
||
if not addr_widget_map:
|
||
return
|
||
|
||
# 分组连续地址段(例如 [1,2,3,10,11,12] -> [[1,2,3], [10,11,12]])
|
||
sorted_addresses = sorted(addr_widget_map.keys())
|
||
grouped_ranges = []
|
||
group = [sorted_addresses[0]]
|
||
|
||
for addr in sorted_addresses[1:]:
|
||
if addr == group[-1] + 1:
|
||
group.append(addr)
|
||
else:
|
||
grouped_ranges.append(group)
|
||
group = [addr]
|
||
grouped_ranges.append(group)
|
||
|
||
# 开启线程异步读取每段
|
||
for group in grouped_ranges:
|
||
start = group[0]
|
||
count = group[-1] - start + 1
|
||
thread = Thread(target=self._read_and_update_range, args=(start, count, group, addr_widget_map))
|
||
thread.start()
|
||
|
||
def _read_and_update_range(self, start, count, group, addr_widget_map):
|
||
try:
|
||
result = self.read_modbus_registers(slave_id=1, address=start, count=count)
|
||
except Exception as e:
|
||
result = {'success': False, 'error': str(e)}
|
||
|
||
# 在主线程中更新 UI
|
||
Clock.schedule_once(partial(self._update_widgets_with_data, start, group, addr_widget_map, result), 0)
|
||
|
||
def _update_widgets_with_data(self, start, group, addr_widget_map, result, dt):
|
||
if result.get('success') and result.get('data'):
|
||
data = result['data']
|
||
for addr in group:
|
||
widget = addr_widget_map.get(addr)
|
||
index = addr - start
|
||
if widget and 0 <= index < len(data):
|
||
new_value = str(data[index])
|
||
if widget.text != new_value:
|
||
widget.text = new_value
|
||
elif widget:
|
||
widget.text = "索引错误"
|
||
else:
|
||
for addr in group:
|
||
widget = addr_widget_map.get(addr)
|
||
if widget:
|
||
widget.text = f"失败: {result.get('error', '读取失败')}"
|
||
|
||
###################################LoginPageWork-Start#################################################
|
||
|
||
def update_wifi_status(self, dt):
|
||
# 只有当前屏幕为 login 时才更新
|
||
if self.root.current != "login":
|
||
return
|
||
|
||
if platform != 'android':
|
||
self.wifi_status_text = '非Android设备'
|
||
return
|
||
|
||
try:
|
||
from jnius import autoclass, cast
|
||
|
||
PythonActivity = autoclass('org.kivy.android.PythonActivity')
|
||
Context = autoclass('android.content.Context')
|
||
activity = PythonActivity.mActivity
|
||
|
||
WifiManager = autoclass('android.net.wifi.WifiManager')
|
||
Formatter = autoclass('android.text.format.Formatter')
|
||
|
||
wifi_service = activity.getSystemService(Context.WIFI_SERVICE)
|
||
wifi_manager = cast('android.net.wifi.WifiManager', wifi_service)
|
||
|
||
if not wifi_manager.isWifiEnabled():
|
||
self.wifi_status_text = 'WiFi 未启用'
|
||
return
|
||
|
||
wifi_info = wifi_manager.getConnectionInfo()
|
||
if wifi_info is None:
|
||
self.wifi_status_text = 'WiFi 信息不可用'
|
||
return
|
||
|
||
ssid = wifi_info.getSSID()
|
||
ssid_display = ssid.strip('"') if ssid else "SSID Unknown"
|
||
|
||
ip_int = wifi_info.getIpAddress()
|
||
ip_str = Formatter.formatIpAddress(ip_int) if ip_int != 0 else "0.0.0.0"
|
||
link_speed = wifi_info.getLinkSpeed() # Mbps
|
||
bssid = wifi_info.getBSSID()
|
||
rssi = wifi_info.getRssi()
|
||
|
||
self.wifi_status_text = (
|
||
f'SSID: {ssid_display}\n'
|
||
f'IP: {ip_str}\n'
|
||
f'速度: {link_speed} Mbps\n'
|
||
f'BSSID: {bssid}\n'
|
||
f'信号: {rssi} dBm'
|
||
)
|
||
|
||
except Exception as e:
|
||
self.wifi_status_text = f'获取WiFi信息失败\n{e}'
|
||
|
||
def verify(self, obj=None):
|
||
try:
|
||
if platform == "android":
|
||
from jnius import autoclass
|
||
PythonActivity = autoclass('org.kivy.android.PythonActivity')
|
||
Context = autoclass('android.content.Context')
|
||
activity = PythonActivity.mActivity
|
||
wifi_service = activity.getSystemService(Context.WIFI_SERVICE)
|
||
wifi_info = wifi_service.getConnectionInfo()
|
||
# 获取当前连接的WiFi名称并处理引号和大小写
|
||
wifi_id = wifi_info.getSSID().strip('"').lower()
|
||
else:
|
||
# 非Android平台使用模拟WiFi
|
||
wifi_id = "zhizhan-2"
|
||
except Exception as e:
|
||
self.dialog1(f"获取WiFi信息失败:{e}")
|
||
return
|
||
|
||
try:
|
||
with open("data/Users.csv", "r", encoding="utf-8") as file:
|
||
# 使用DictReader读取CSV,通过列名访问数据
|
||
csv_reader = csv.DictReader(file)
|
||
|
||
for row in csv_reader:
|
||
# 获取CSV中存储的WiFi SSID并处理大小写
|
||
csv_ssid = row['Wifi_SSID'].strip().lower()
|
||
|
||
# 精确匹配WiFi SSID
|
||
if wifi_id == csv_ssid:
|
||
# 通过列名获取用户信息
|
||
name = row['User']
|
||
password = row['User_pass']
|
||
wifi_ssid = row['Wifi_SSID']
|
||
modbus_ip = row['Modbus_IP']
|
||
modbus_port = row['Modbus_Port']
|
||
nfc_id = row['NFC_ID']
|
||
reserve = row['Reserve']
|
||
|
||
# 更新界面显示的用户信息
|
||
self.root.current = "home"
|
||
self.user.text = f"[b]Hey! {name}[/b]"
|
||
|
||
self.profile_user_name.text = self.edit_user_name.text = name
|
||
self.profile_user_pass.text = self.edit_user_pass.text = password
|
||
self.profile_wifi_ssid.text = self.edit_wifi_ssid.text = wifi_ssid
|
||
self.profile_modbus_ip.text = self.edit_modbus_ip.text = modbus_ip
|
||
self.profile_modbus_port.text = self.edit_modbus_port.text = modbus_port
|
||
self.profile_nfc_id.text = self.edit_nfc_id.text = nfc_id
|
||
self.profile_reserve.text = self.edit_reserve.text = reserve
|
||
|
||
|
||
# 连接Modbus设备
|
||
self.connect_modbus()
|
||
self.dialog1(f"欢迎你,{name}!\n认证成功!")
|
||
return
|
||
except Exception as e:
|
||
self.dialog1(f"读取用户信息失败:{e}")
|
||
return
|
||
|
||
# 认证失败提示
|
||
self.dialog1("认证失败,请检查网络或确保手机权限打开定位和连接到目标WiFi")
|
||
|
||
def connect_modbus(self):
|
||
def _connect_in_background():
|
||
modbus_ip = None
|
||
modbus_port = 502
|
||
master = None
|
||
|
||
try:
|
||
# 获取当前WiFi SSID
|
||
if platform == "android":
|
||
from jnius import autoclass
|
||
PythonActivity = autoclass('org.kivy.android.PythonActivity')
|
||
Context = autoclass('android.content.Context')
|
||
activity = PythonActivity.mActivity
|
||
wifi_service = activity.getSystemService(Context.WIFI_SERVICE)
|
||
wifi_info = wifi_service.getConnectionInfo()
|
||
current_wifi_id = wifi_info.getSSID().strip('"').lower()
|
||
else:
|
||
current_wifi_id = "zhizhan-2"
|
||
|
||
# 读取CSV配置
|
||
with open("data/Users.csv", "r", encoding="utf-8") as file:
|
||
csv_reader = csv.DictReader(file)
|
||
for row in csv_reader:
|
||
csv_ssid = row['Wifi_SSID'].strip().lower()
|
||
if current_wifi_id == csv_ssid:
|
||
modbus_ip = row['Modbus_IP']
|
||
modbus_port = int(row['Modbus_Port']) if row['Modbus_Port'] else 502
|
||
break
|
||
|
||
if not modbus_ip:
|
||
Clock.schedule_once(lambda dt: self.dialog1("未找到匹配的Modbus配置"))
|
||
return
|
||
|
||
# 断开现有连接
|
||
self.modbus_master.disconnect()
|
||
|
||
# 创建新连接并测试
|
||
connect_result = self.modbus_master.connect(modbus_ip, modbus_port)
|
||
if not connect_result['success']:
|
||
Clock.schedule_once(lambda dt: self.dialog1(connect_result['msg']))
|
||
return
|
||
print(f"Modbus连接成功\nIP: {modbus_ip}\n端口: {modbus_port}")
|
||
# Clock.schedule_once(lambda dt: self.dialog1(f"Modbus连接成功\nIP: {modbus_ip}\n端口: {modbus_port}"))
|
||
|
||
except FileNotFoundError as e:
|
||
# 修复:将异常信息转为字符串或通过默认参数传递
|
||
err_msg = "配置文件 Users.csv 未找到"
|
||
Clock.schedule_once(lambda dt, msg=err_msg: self.dialog1(msg))
|
||
except modbus_tk.modbus.ModbusError as e:
|
||
# 修复:使用默认参数传递异常信息
|
||
err_msg = f"Modbus协议错误: {e}\n从站地址: {e.slave}\n功能码: {e.function_code}"
|
||
Clock.schedule_once(lambda dt, msg=err_msg: self.dialog1(msg))
|
||
self.modbus_master = None
|
||
except socket.error as e:
|
||
err_msg = f"网络连接失败: {e}"
|
||
Clock.schedule_once(lambda dt, msg=err_msg: self.dialog1(msg))
|
||
self.modbus_master = None
|
||
except Exception as e:
|
||
# 修复:捕获所有其他异常
|
||
err_msg = f"连接失败: {str(e)}"
|
||
Clock.schedule_once(lambda dt, msg=err_msg: self.dialog1(msg))
|
||
self.modbus_master = None
|
||
|
||
threading.Thread(target=_connect_in_background, daemon=True).start()
|
||
|
||
def edit_profile(self):
|
||
if self.edit_user_name.text == "" or self.edit_user_pass.text == "" or self.edit_wifi_ssid.text == "" or self.edit_modbus_ip.text == "" or self.edit_modbus_port.text == "" or self.edit_nfc_id.text == "" or self.edit_reserve.text == "":
|
||
check = "Enter all required fields"
|
||
return self.dialog1(check)
|
||
else:
|
||
user_data = {
|
||
'User': self.edit_user_name.text,
|
||
'User_pass': self.edit_user_pass.text,
|
||
'Wifi_SSID': self.edit_wifi_ssid.text,
|
||
'Modbus_IP': self.edit_modbus_ip.text,
|
||
'Modbus_Port': self.edit_modbus_port.text,
|
||
'NFC_ID': self.edit_nfc_id.text,
|
||
'Reserve': self.edit_reserve.text
|
||
}
|
||
update_user_profile(user_data, self.profile_wifi_ssid.text)
|
||
self.verify(True)
|
||
self.root.current = "home"
|
||
|
||
def secure_profile(self):
|
||
self.profile_edit_screen.ids.edit_user_pass.readonly = True
|
||
self.profile_edit_screen.ids.edit_wifi_ssid.readonly = True
|
||
self.profile_edit_screen.ids.edit_modbus_ip.readonly = True
|
||
self.profile_edit_screen.ids.edit_modbus_port.readonly = True
|
||
self.profile_edit_screen.ids.edit_nfc_id.readonly = True
|
||
self.profile_edit_screen.ids.edit_reserve.readonly = False
|
||
|
||
|
||
|
||
from user_data_manager import update_user_profile
|
||
|
||
if __name__ == '__main__':
|
||
app().run()
|
||
|
||
|
||
|