344 lines
14 KiB
Python
344 lines
14 KiB
Python
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QLabel, QPushButton,
|
|
QLineEdit, QMessageBox, QComboBox, QTableWidget, QTableWidgetItem, QSpinBox, QGroupBox, QFormLayout, QAction, QMenuBar)
|
|
from PyQt5.QtCore import Qt, QTimer
|
|
from PyQt5.QtGui import QFont, QIcon
|
|
from modbus_tk import modbus_tcp, modbus_rtu
|
|
import serial
|
|
from modbus_tk.defines import *
|
|
|
|
class ModbusClient(QMainWindow):
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
self.init_ui()
|
|
self.master = None
|
|
self.timer = QTimer()
|
|
self.timer.timeout.connect(self.update_data)
|
|
|
|
def init_ui(self):
|
|
self.setWindowTitle("Modbus 客户端")
|
|
self.setGeometry(100, 100, 900, 700)
|
|
self.setStyleSheet("""
|
|
QWidget {
|
|
background-color: #2e2e2e;
|
|
color: #ffffff;
|
|
font-family: Arial;
|
|
font-size: 14px;
|
|
}
|
|
QGroupBox {
|
|
font-weight: bold;
|
|
font-size: 16px;
|
|
border: 1px solid #555;
|
|
border-radius: 5px;
|
|
margin-top: 10px;
|
|
}
|
|
QGroupBox::title {
|
|
subcontrol-origin: margin;
|
|
subcontrol-position: top left;
|
|
padding: 10px;
|
|
}
|
|
QLabel {
|
|
font-size: 14px;
|
|
}
|
|
QLineEdit, QComboBox, QSpinBox {
|
|
padding: 5px;
|
|
border: 1px solid #555;
|
|
border-radius: 5px;
|
|
background-color: #444;
|
|
color: #fff;
|
|
}
|
|
QPushButton {
|
|
background-color: #555;
|
|
border: 1px solid #444;
|
|
border-radius: 5px;
|
|
padding: 5px;
|
|
}
|
|
QPushButton:hover {
|
|
background-color: #666;
|
|
}
|
|
QPushButton:pressed {
|
|
background-color: #777;
|
|
}
|
|
QTableWidget {
|
|
background-color: #444;
|
|
alternate-background-color: #555;
|
|
border: 1px solid #555;
|
|
}
|
|
QHeaderView::section {
|
|
background-color: #333;
|
|
color: #fff;
|
|
padding: 5px;
|
|
border: 1px solid #444;
|
|
}
|
|
""")
|
|
|
|
# 创建菜单栏
|
|
menu_bar = self.menuBar()
|
|
file_menu = menu_bar.addMenu('文件')
|
|
connection_menu = menu_bar.addMenu('连接')
|
|
action_menu = menu_bar.addMenu('操作')
|
|
|
|
# 创建文件菜单项
|
|
exit_action = QAction('退出', self)
|
|
exit_action.triggered.connect(self.close)
|
|
file_menu.addAction(exit_action)
|
|
|
|
# 创建连接菜单项
|
|
connect_action = QAction('连接', self)
|
|
connect_action.triggered.connect(self.connect_modbus)
|
|
connection_menu.addAction(connect_action)
|
|
|
|
disconnect_action = QAction('断开连接', self)
|
|
disconnect_action.triggered.connect(self.disconnect_modbus)
|
|
connection_menu.addAction(disconnect_action)
|
|
|
|
# 创建操作菜单项
|
|
execute_action = QAction('执行', self)
|
|
execute_action.triggered.connect(self.execute_action)
|
|
action_menu.addAction(execute_action)
|
|
|
|
monitor_action = QAction('开始监视', self)
|
|
monitor_action.triggered.connect(self.toggle_monitoring)
|
|
action_menu.addAction(monitor_action)
|
|
|
|
# 创建中心小部件
|
|
central_widget = QWidget()
|
|
self.setCentralWidget(central_widget)
|
|
main_layout = QVBoxLayout(central_widget)
|
|
|
|
# 连接部分
|
|
connection_group = QGroupBox("连接设置")
|
|
connection_layout = QFormLayout()
|
|
|
|
self.mode_combo = QComboBox()
|
|
self.mode_combo.addItem("TCP", "TCP")
|
|
self.mode_combo.addItem("RTU", "RTU")
|
|
self.mode_combo.currentIndexChanged.connect(self.toggle_mode)
|
|
connection_layout.addRow(QLabel("模式选择:"), self.mode_combo)
|
|
|
|
self.ip_edit = QLineEdit()
|
|
self.ip_edit.setPlaceholderText("输入 IP 地址")
|
|
connection_layout.addRow(QLabel("IP 地址 / 串口端口:"), self.ip_edit)
|
|
|
|
self.port_edit = QLineEdit()
|
|
self.port_edit.setPlaceholderText("输入端口号 / 波特率")
|
|
connection_layout.addRow(QLabel("端口号 / 波特率:"), self.port_edit)
|
|
|
|
self.connect_button = QPushButton("连接")
|
|
self.connect_button.clicked.connect(self.connect_modbus)
|
|
connection_layout.addRow(self.connect_button)
|
|
|
|
self.disconnect_button = QPushButton("断开连接")
|
|
self.disconnect_button.clicked.connect(self.disconnect_modbus)
|
|
self.disconnect_button.setEnabled(False)
|
|
connection_layout.addRow(self.disconnect_button)
|
|
|
|
connection_group.setLayout(connection_layout)
|
|
main_layout.addWidget(connection_group)
|
|
|
|
# 功能部分
|
|
function_group = QGroupBox("功能选择")
|
|
function_layout = QFormLayout()
|
|
|
|
self.function_combo = QComboBox()
|
|
self.function_combo.addItem("读线圈 (0)", READ_COILS)
|
|
self.function_combo.addItem("读输入 (1)", READ_DISCRETE_INPUTS)
|
|
self.function_combo.addItem("读保持寄存器 (3)", READ_HOLDING_REGISTERS)
|
|
self.function_combo.addItem("读输入寄存器 (4)", READ_INPUT_REGISTERS)
|
|
self.function_combo.addItem("写单个线圈 (5)", WRITE_SINGLE_COIL)
|
|
self.function_combo.addItem("写单个寄存器 (6)", WRITE_SINGLE_REGISTER)
|
|
self.function_combo.addItem("写多个寄存器 (16)", WRITE_MULTIPLE_REGISTERS)
|
|
|
|
self.function_combo.currentIndexChanged.connect(self.update_function_description)
|
|
function_layout.addRow(QLabel("功能码:"), self.function_combo)
|
|
|
|
self.function_description = QLabel()
|
|
function_layout.addRow(QLabel("功能描述:"), self.function_description)
|
|
|
|
self.device_edit = QLineEdit()
|
|
self.device_edit.setPlaceholderText("输入设备地址")
|
|
function_layout.addRow(QLabel("设备地址:"), self.device_edit)
|
|
|
|
self.address_edit = QLineEdit()
|
|
self.address_edit.setPlaceholderText("输入寄存器地址")
|
|
function_layout.addRow(QLabel("寄存器地址:"), self.address_edit)
|
|
|
|
self.value_edit = QLineEdit()
|
|
self.value_edit.setPlaceholderText("输入写入值")
|
|
function_layout.addRow(QLabel("写入值:"), self.value_edit)
|
|
|
|
self.execute_button = QPushButton("执行")
|
|
self.execute_button.clicked.connect(self.execute_action)
|
|
function_layout.addRow(self.execute_button)
|
|
|
|
function_group.setLayout(function_layout)
|
|
main_layout.addWidget(function_group)
|
|
|
|
# 数据部分
|
|
data_group = QGroupBox("数据")
|
|
data_layout = QVBoxLayout()
|
|
|
|
self.data_table = QTableWidget()
|
|
self.data_table.setColumnCount(2)
|
|
self.data_table.setHorizontalHeaderLabels(["地址", "值"])
|
|
data_layout.addWidget(self.data_table)
|
|
|
|
data_group.setLayout(data_layout)
|
|
main_layout.addWidget(data_group)
|
|
|
|
# 实时监视部分
|
|
monitor_group = QGroupBox("实时监视")
|
|
monitor_layout = QFormLayout()
|
|
self.monitor_interval_edit = QSpinBox()
|
|
self.monitor_interval_edit.setRange(100, 5000) # 设置监视间隔范围
|
|
self.monitor_interval_edit.setValue(1000) # 设置默认值
|
|
monitor_layout.addRow(QLabel("监视间隔(ms):"), self.monitor_interval_edit)
|
|
|
|
self.start_address_edit = QLineEdit()
|
|
self.start_address_edit.setPlaceholderText("输入开始地址")
|
|
monitor_layout.addRow(QLabel("监视开始地址:"), self.start_address_edit)
|
|
|
|
self.length_edit = QSpinBox()
|
|
self.length_edit.setRange(1, 100)
|
|
monitor_layout.addRow(QLabel("长度:"), self.length_edit)
|
|
|
|
self.monitor_button = QPushButton("开始监视")
|
|
self.monitor_button.clicked.connect(self.toggle_monitoring)
|
|
monitor_layout.addRow(self.monitor_button)
|
|
|
|
monitor_group.setLayout(monitor_layout)
|
|
main_layout.addWidget(monitor_group)
|
|
|
|
self.update_function_description()
|
|
|
|
def toggle_mode(self):
|
|
mode = self.mode_combo.currentData()
|
|
if mode == "TCP":
|
|
self.ip_edit.setPlaceholderText("输入 IP 地址")
|
|
self.port_edit.setPlaceholderText("输入端口号")
|
|
elif mode == "RTU":
|
|
self.ip_edit.setPlaceholderText("输入串口端口")
|
|
self.port_edit.setPlaceholderText("输入波特率")
|
|
|
|
def connect_modbus(self):
|
|
mode = self.mode_combo.currentData()
|
|
try:
|
|
if mode == "TCP":
|
|
ip = self.ip_edit.text()
|
|
port = int(self.port_edit.text())
|
|
self.master = modbus_tcp.TcpMaster(ip, port)
|
|
self.master.open()
|
|
elif mode == "RTU":
|
|
port = self.ip_edit.text()
|
|
baudrate = int(self.port_edit.text())
|
|
self.master = modbus_rtu.RtuMaster(serial.Serial(port, baudrate))
|
|
self.master.open()
|
|
QMessageBox.information(self, "连接", f"成功连接到 Modbus {mode} 设备。")
|
|
self.connect_button.setEnabled(False)
|
|
self.disconnect_button.setEnabled(True)
|
|
except Exception as e:
|
|
QMessageBox.critical(self, "连接错误", f"无法连接到 Modbus 设备: {e}")
|
|
|
|
def disconnect_modbus(self):
|
|
if self.master:
|
|
self.master.close()
|
|
self.master = None
|
|
QMessageBox.information(self, "断开连接", "已断开与 Modbus 设备的连接。")
|
|
self.connect_button.setEnabled(True)
|
|
self.disconnect_button.setEnabled(False)
|
|
|
|
def update_function_description(self):
|
|
descriptions = {
|
|
READ_COILS: "读线圈状态",
|
|
READ_DISCRETE_INPUTS: "读离散输入状态",
|
|
READ_HOLDING_REGISTERS: "读保持寄存器值",
|
|
READ_INPUT_REGISTERS: "读输入寄存器值",
|
|
WRITE_SINGLE_COIL: "写单个位",
|
|
WRITE_SINGLE_REGISTER: "写单个寄存器",
|
|
WRITE_MULTIPLE_REGISTERS: "写多个寄存器",
|
|
}
|
|
function_code = self.function_combo.currentData()
|
|
self.function_description.setText(descriptions.get(function_code, ""))
|
|
|
|
def execute_action(self):
|
|
if not self.master:
|
|
QMessageBox.warning(self, "连接", "请先连接到 Modbus 设备。")
|
|
return
|
|
|
|
function_code = self.function_combo.currentData()
|
|
device = int(self.device_edit.text())
|
|
address = int(self.address_edit.text())
|
|
|
|
try:
|
|
if function_code in [READ_COILS, READ_DISCRETE_INPUTS]:
|
|
response = self.master.execute(device, function_code, address, 1)
|
|
data = response[0]
|
|
self.display_data({address: data})
|
|
elif function_code in [READ_HOLDING_REGISTERS, READ_INPUT_REGISTERS]:
|
|
response = self.master.execute(device, function_code, address, 1)
|
|
data = response[0]
|
|
self.display_data({address: data})
|
|
elif function_code == WRITE_SINGLE_COIL:
|
|
value = int(self.value_edit.text())
|
|
if value not in [0, 1]:
|
|
raise ValueError("写线圈的值必须为0或1")
|
|
self.master.execute(device, function_code, address, output_value=value)
|
|
QMessageBox.information(self, "写入", "写入操作成功完成。")
|
|
elif function_code == WRITE_SINGLE_REGISTER:
|
|
value = int(self.value_edit.text())
|
|
self.master.execute(device, function_code, address, output_value=value)
|
|
QMessageBox.information(self, "写入", "写入操作成功完成。")
|
|
elif function_code == WRITE_MULTIPLE_REGISTERS:
|
|
values_str = self.value_edit.text()
|
|
values = [int(val) for val in values_str.split(",")]
|
|
self.master.execute(device, function_code, address, output_value=values)
|
|
QMessageBox.information(self, "写入", "写入操作成功完成。")
|
|
except Exception as e:
|
|
QMessageBox.critical(self, "错误", f"执行操作时出错: {e}")
|
|
|
|
def display_data(self, data):
|
|
# 批量更新表格
|
|
self.data_table.setRowCount(len(data))
|
|
for i, (address, value) in enumerate(data.items()):
|
|
self.data_table.setItem(i, 0, QTableWidgetItem(str(address)))
|
|
self.data_table.setItem(i, 1, QTableWidgetItem(str(value)))
|
|
|
|
def toggle_monitoring(self):
|
|
if self.timer.isActive():
|
|
self.timer.stop()
|
|
self.monitor_button.setText("开始监视")
|
|
else:
|
|
self.timer.start(self.monitor_interval_edit.value())
|
|
self.monitor_button.setText("停止监视")
|
|
|
|
def update_data(self):
|
|
if not self.master:
|
|
self.timer.stop()
|
|
self.monitor_button.setText("开始监视")
|
|
QMessageBox.warning(self, "连接", "请先连接到 Modbus 设备。")
|
|
return
|
|
|
|
try:
|
|
device = int(self.device_edit.text())
|
|
start_address = int(self.start_address_edit.text())
|
|
length = self.length_edit.value()
|
|
|
|
response = self.master.execute(device, READ_HOLDING_REGISTERS, start_address, length)
|
|
|
|
# 将数据转换为地址-值对的字典
|
|
data = {start_address + i: val for i, val in enumerate(response)}
|
|
|
|
# 更新表格
|
|
self.display_data(data)
|
|
except Exception as e:
|
|
self.timer.stop()
|
|
self.monitor_button.setText("开始监视")
|
|
QMessageBox.critical(self, "错误", f"读取数据时出错: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
app = QApplication([])
|
|
app.setFont(QFont("Arial", 10)) # 设置全局字体
|
|
window = ModbusClient()
|
|
window.show()
|
|
app.exec_()
|