286 lines
9.9 KiB
Python
286 lines
9.9 KiB
Python
import sys
|
|
import serial
|
|
from serial.tools import list_ports
|
|
import struct
|
|
import threading
|
|
from PyQt5.QtWidgets import (
|
|
QApplication, QWidget, QVBoxLayout, QHBoxLayout, QTextEdit, QLineEdit, QPushButton,
|
|
QComboBox, QLabel, QCheckBox
|
|
)
|
|
from PyQt5.QtCore import QTimer
|
|
|
|
# 定义ZBUS命令类型
|
|
class ZBUS_CMD:
|
|
ZBUS_FRAME_NODE_INFO_REQUEST = 0
|
|
ZBUS_FRAME_NODE_ID_LOCK = 1 # 地址锁定命令
|
|
ZBUS_FRAME_NODE_ID_UNLOCK = 2
|
|
ZBUS_FRAME_ENTER_AUTO_BAUD = 3
|
|
ZBUS_FRAME_FLUSH_GLOBAL_STATUS = 4
|
|
ZBUS_FRAME_FLUSH_OUTPUT = 5
|
|
ZBUS_FRAME_FLUSH_INPUT = 6
|
|
ZBUS_FRAME_TOKEN_PASS = 7 # 令牌传递
|
|
ZBUS_FRAME_MODBUS = 8 # Modbus RTU数据
|
|
ZBUS_FRAME_CONFIG_TOKEN = 9
|
|
ZBUS_FRAME_PATH_TO_NEXT = 10
|
|
ZBUS_FRAME_TRANSMIT_AUTO_BAUD = 11 # 发送自动波特率帧, 方便从机配置波特率
|
|
|
|
ZBUS_FRAME_FILE_OPEN = 16 # 打开文件
|
|
ZBUS_FRAME_FILE_WRITE = 17 # 写入文件数据
|
|
ZBUS_FRAME_FILE_READ = 18 # 读取文件数据
|
|
ZBUS_FRAME_FILE_CLOSE = 19 # 文件关闭
|
|
|
|
ZBUS_FRAME_CUSTOM_COMMAND = 20 # 可以自定义命令
|
|
|
|
# ZBUS常量
|
|
ZBUS_LEAD_FLAG1 = 0x55
|
|
ZBUS_LEAD_FLAG2 = 0xAA
|
|
ZBUS_LEAD_FLAG_TOKEN = 0xDC
|
|
|
|
NODE_ID_GLOBAL = 0
|
|
NODE_ID_CONFIG = 1
|
|
NODE_ID_MODULE_AC12 = 2
|
|
|
|
STR_FORMAT_NAME = "name:"
|
|
STR_FORMAT_MODE = "mode:"
|
|
STR_FORMAT_WRITE = "write:"
|
|
|
|
# 定义ZBUS通讯类
|
|
class ZBUS:
|
|
def __init__(self) -> None:
|
|
self.SelfNodeId = NODE_ID_CONFIG # 配置节点Id
|
|
self.tx_frame_index = 0 # 发送帧索引
|
|
|
|
def ZBUS_CalculateLRC(self, buf_list):
|
|
lrc = sum(buf_list) # 使用 sum 函数简化计算
|
|
return lrc & 0xFFFF
|
|
|
|
def ZBUS_AssembleTxFrame(self, dest_node_id: int, source_node_id: int, frame_type: int, tx_buf: list):
|
|
# 组装一个发送帧
|
|
frame_index = self.tx_frame_index
|
|
self.tx_frame_index += 1
|
|
|
|
tx_list = []
|
|
if frame_type == ZBUS_CMD.ZBUS_FRAME_TRANSMIT_AUTO_BAUD:
|
|
tx_list = [0x55] * 32
|
|
return bytes(tx_list)
|
|
|
|
if frame_type == ZBUS_CMD.ZBUS_FRAME_TOKEN_PASS:
|
|
tx_list = struct.pack("BBB", ZBUS_LEAD_FLAG_TOKEN, dest_node_id, source_node_id)
|
|
return tx_list
|
|
|
|
cmd = frame_type # 获取枚举值
|
|
tx_list = struct.pack("BBBBBB", ZBUS_LEAD_FLAG1, ZBUS_LEAD_FLAG2, dest_node_id, source_node_id, cmd, frame_index)
|
|
if tx_buf is not None:
|
|
frame_size = len(tx_buf)
|
|
else:
|
|
frame_size = 0
|
|
tx_list += struct.pack("B", frame_size)
|
|
if tx_buf is not None:
|
|
tx_list += bytes(tx_buf)
|
|
|
|
lrc = self.ZBUS_CalculateLRC(tx_list)
|
|
tx_list += struct.pack("<H", lrc)
|
|
return tx_list
|
|
|
|
# GUI 类
|
|
class SerialGUI(QWidget):
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
|
|
self.setWindowTitle("ZBUS 串口通讯")
|
|
self.setGeometry(100, 100, 800, 600)
|
|
|
|
self.layout = QVBoxLayout()
|
|
self.setLayout(self.layout)
|
|
|
|
self.text_area = QTextEdit()
|
|
self.layout.addWidget(self.text_area)
|
|
|
|
self.receive_area = QTextEdit()
|
|
self.layout.addWidget(self.receive_area)
|
|
|
|
self.hex_data_buffer = [] # 存储接收到的Hex数据
|
|
self.ascii_data_buffer = [] # 存储接收到的ASCII数据
|
|
|
|
self.display_mode = QCheckBox("显示Hex数据")
|
|
self.display_mode.stateChanged.connect(self.switch_display_mode)
|
|
self.layout.addWidget(self.display_mode)
|
|
|
|
self.create_widgets()
|
|
|
|
self.bus = ZBUS()
|
|
self.ser = None # 串口对象
|
|
self.serial_thread = None # 串口线程
|
|
|
|
self.timer = QTimer(self)
|
|
self.timer.timeout.connect(self.check_serial_insertion)
|
|
self.timer.start(1000)
|
|
|
|
self.available_ports = set() # 用于存储当前可用的串口设备
|
|
self.timer = QTimer(self)
|
|
self.timer.timeout.connect(self.check_serial_insertion)
|
|
self.timer.start(1000)
|
|
|
|
self.check_serial_insertion()
|
|
|
|
self.ThreadStop = False
|
|
|
|
def switch_display_mode(self):
|
|
self.receive_area.clear()
|
|
if self.display_mode.isChecked(): # 显示Hex数据
|
|
for line in self.hex_data_buffer:
|
|
self.receive_area.append(line)
|
|
else: # 显示ASCII数据
|
|
for line in self.ascii_data_buffer:
|
|
self.receive_area.append(line)
|
|
|
|
def create_widgets(self):
|
|
self.serial_port = QComboBox()
|
|
self.layout.addWidget(self.serial_port)
|
|
|
|
self.baud_rate = QLineEdit("115200")
|
|
self.layout.addWidget(self.baud_rate)
|
|
|
|
self.timeout = QLineEdit("0.5")
|
|
self.layout.addWidget(self.timeout)
|
|
|
|
self.send_hello_btn = QPushButton("发送 Hello")
|
|
self.send_hello_btn.clicked.connect(self.send_hello)
|
|
self.layout.addWidget(self.send_hello_btn)
|
|
|
|
self.send_auto_baud_btn = QPushButton("发送自动波特率帧")
|
|
self.send_auto_baud_btn.clicked.connect(self.send_auto_baud)
|
|
self.layout.addWidget(self.send_auto_baud_btn)
|
|
|
|
self.send_lock_btn = QPushButton("发送节点ID锁定帧")
|
|
self.send_lock_btn.clicked.connect(self.send_lock)
|
|
self.layout.addWidget(self.send_lock_btn)
|
|
|
|
self.get_node_status_btn = QPushButton("获取节点状态")
|
|
self.get_node_status_btn.clicked.connect(self.get_node_status)
|
|
self.layout.addWidget(self.get_node_status_btn)
|
|
|
|
self.zbus_file_open_btn = QPushButton("打开文件")
|
|
self.zbus_file_open_btn.clicked.connect(self.zbus_file_open_test)
|
|
self.layout.addWidget(self.zbus_file_open_btn)
|
|
|
|
self.send_custom_command_btn = QPushButton("发送自定义命令")
|
|
self.send_custom_command_btn.clicked.connect(self.send_custom_command)
|
|
self.layout.addWidget(self.send_custom_command_btn)
|
|
|
|
self.open_serial_btn = QPushButton("打开串口")
|
|
self.open_serial_btn.clicked.connect(self.open_serial)
|
|
self.layout.addWidget(self.open_serial_btn)
|
|
|
|
self.close_serial_btn = QPushButton("关闭串口")
|
|
self.close_serial_btn.clicked.connect(self.close_serial)
|
|
self.layout.addWidget(self.close_serial_btn)
|
|
|
|
def detect_serial_ports(self):
|
|
return set(port.device for port in list_ports.comports())
|
|
|
|
def check_serial_insertion(self):
|
|
current_ports = self.detect_serial_ports()
|
|
if current_ports != self.available_ports:
|
|
self.available_ports = current_ports
|
|
self.serial_port.clear()
|
|
self.serial_port.addItems(sorted(self.available_ports))
|
|
|
|
def open_serial(self):
|
|
try:
|
|
if not self.serial_port.currentText():
|
|
raise ValueError("请先选择一个串口")
|
|
|
|
self.ser = serial.Serial(
|
|
port=self.serial_port.currentText(),
|
|
baudrate=int(self.baud_rate.text()),
|
|
timeout=float(self.timeout.text())
|
|
)
|
|
self.ThreadStop = False
|
|
|
|
self.serial_thread = threading.Thread(target=self.serial_thread_func)
|
|
self.serial_thread.start()
|
|
|
|
self.log_text(f"成功打开串口: {self.serial_port.currentText()}")
|
|
|
|
except Exception as e:
|
|
self.log_text(f"打开串口失败: {str(e)}")
|
|
|
|
def close_serial(self):
|
|
try:
|
|
if self.ser and self.ser.is_open:
|
|
self.ThreadStop = True
|
|
self.serial_thread.join()
|
|
self.ser.close()
|
|
self.log_text("串口已关闭")
|
|
|
|
except Exception as e:
|
|
self.log_text(f"关闭串口失败: {str(e)}")
|
|
|
|
def serial_thread_func(self):
|
|
while not self.ThreadStop:
|
|
try:
|
|
if self.ser.in_waiting > 0:
|
|
data = self.ser.readline()
|
|
decoded_data = data.decode('utf-8', errors='replace').replace('\x00', '')
|
|
self.ascii_data_buffer.append(decoded_data)
|
|
hex_data = ' '.join(f'{byte:02X}' for byte in data)
|
|
self.hex_data_buffer.append(hex_data)
|
|
|
|
self.update_receive_area(decoded_data, hex_data)
|
|
except Exception as e:
|
|
self.log_text(f"读取串口数据失败: {str(e)}")
|
|
break
|
|
|
|
def update_receive_area(self, ascii_data, hex_data):
|
|
if self.display_mode.isChecked(): # 显示Hex数据
|
|
self.receive_area.append(hex_data)
|
|
else: # 显示ASCII数据
|
|
self.receive_area.append(ascii_data)
|
|
|
|
def send_data(self, data):
|
|
try:
|
|
if self.ser and self.ser.is_open:
|
|
self.ser.write(data)
|
|
self.log_text(f"发送数据: {data}")
|
|
else:
|
|
self.log_text("串口未打开")
|
|
except Exception as e:
|
|
self.log_text(f"发送数据失败: {str(e)}")
|
|
|
|
def send_hello(self):
|
|
data = b'Hello'
|
|
self.send_data(data)
|
|
|
|
def send_auto_baud(self):
|
|
data = self.bus.ZBUS_AssembleTxFrame(0, 1, ZBUS_CMD.ZBUS_FRAME_TRANSMIT_AUTO_BAUD, None)
|
|
self.send_data(data)
|
|
|
|
def send_lock(self):
|
|
data = self.bus.ZBUS_AssembleTxFrame(0, 1, ZBUS_CMD.ZBUS_FRAME_NODE_ID_LOCK, None)
|
|
self.send_data(data)
|
|
|
|
def get_node_status(self):
|
|
data = self.bus.ZBUS_AssembleTxFrame(0, 1, ZBUS_CMD.ZBUS_FRAME_NODE_INFO_REQUEST, None)
|
|
self.send_data(data)
|
|
|
|
def zbus_file_open_test(self):
|
|
filename = "example.txt"
|
|
data = list(f"{STR_FORMAT_NAME}{filename}\n".encode())
|
|
assembled_data = self.bus.ZBUS_AssembleTxFrame(0, 1, ZBUS_CMD.ZBUS_FRAME_FILE_OPEN, data)
|
|
self.send_data(assembled_data)
|
|
|
|
def send_custom_command(self):
|
|
data = b'Custom Command'
|
|
self.send_data(data)
|
|
|
|
def log_text(self, text):
|
|
self.text_area.append(text)
|
|
|
|
# 启动 GUI
|
|
if __name__ == "__main__":
|
|
app = QApplication(sys.argv)
|
|
gui = SerialGUI()
|
|
gui.show()
|
|
sys.exit(app.exec_())
|