import sys import serial from serial.tools import list_ports import struct import threading from PyQt5.QtWidgets import ( QApplication, QWidget, QVBoxLayout, QHBoxLayout, QTextEdit, QLineEdit, QPushButton, QComboBox, QLabel, QCheckBox ) from PyQt5.QtCore import QTimer # 定义ZBUS命令类型 class ZBUS_CMD: ZBUS_FRAME_NODE_INFO_REQUEST = 0 ZBUS_FRAME_NODE_ID_LOCK = 1 # 地址锁定命令 ZBUS_FRAME_NODE_ID_UNLOCK = 2 ZBUS_FRAME_ENTER_AUTO_BAUD = 3 ZBUS_FRAME_FLUSH_GLOBAL_STATUS = 4 ZBUS_FRAME_FLUSH_OUTPUT = 5 ZBUS_FRAME_FLUSH_INPUT = 6 ZBUS_FRAME_TOKEN_PASS = 7 # 令牌传递 ZBUS_FRAME_MODBUS = 8 # Modbus RTU数据 ZBUS_FRAME_CONFIG_TOKEN = 9 ZBUS_FRAME_PATH_TO_NEXT = 10 ZBUS_FRAME_TRANSMIT_AUTO_BAUD = 11 # 发送自动波特率帧, 方便从机配置波特率 ZBUS_FRAME_FILE_OPEN = 16 # 打开文件 ZBUS_FRAME_FILE_WRITE = 17 # 写入文件数据 ZBUS_FRAME_FILE_READ = 18 # 读取文件数据 ZBUS_FRAME_FILE_CLOSE = 19 # 文件关闭 ZBUS_FRAME_CUSTOM_COMMAND = 20 # 可以自定义命令 # ZBUS常量 ZBUS_LEAD_FLAG1 = 0x55 ZBUS_LEAD_FLAG2 = 0xAA ZBUS_LEAD_FLAG_TOKEN = 0xDC NODE_ID_GLOBAL = 0 NODE_ID_CONFIG = 1 NODE_ID_MODULE_AC12 = 2 STR_FORMAT_NAME = "name:" STR_FORMAT_MODE = "mode:" STR_FORMAT_WRITE = "write:" # 定义ZBUS通讯类 class ZBUS: def __init__(self) -> None: self.SelfNodeId = NODE_ID_CONFIG # 配置节点Id self.tx_frame_index = 0 # 发送帧索引 def ZBUS_CalculateLRC(self, buf_list): lrc = sum(buf_list) # 使用 sum 函数简化计算 return lrc & 0xFFFF def ZBUS_AssembleTxFrame(self, dest_node_id: int, source_node_id: int, frame_type: int, tx_buf: list): # 组装一个发送帧 frame_index = self.tx_frame_index self.tx_frame_index += 1 tx_list = [] if frame_type == ZBUS_CMD.ZBUS_FRAME_TRANSMIT_AUTO_BAUD: tx_list = [0x55] * 32 return bytes(tx_list) if frame_type == ZBUS_CMD.ZBUS_FRAME_TOKEN_PASS: tx_list = struct.pack("BBB", ZBUS_LEAD_FLAG_TOKEN, dest_node_id, source_node_id) return tx_list cmd = frame_type # 获取枚举值 tx_list = struct.pack("BBBBBB", ZBUS_LEAD_FLAG1, ZBUS_LEAD_FLAG2, dest_node_id, source_node_id, cmd, frame_index) if tx_buf is not None: frame_size = len(tx_buf) else: frame_size = 0 tx_list += struct.pack("B", frame_size) if tx_buf is not None: tx_list += bytes(tx_buf) lrc = self.ZBUS_CalculateLRC(tx_list) tx_list += struct.pack(" None: super().__init__() self.setWindowTitle("ZBUS 串口通讯") self.setGeometry(100, 100, 800, 600) self.layout = QVBoxLayout() self.setLayout(self.layout) self.text_area = QTextEdit() self.layout.addWidget(self.text_area) self.receive_area = QTextEdit() self.layout.addWidget(self.receive_area) self.hex_data_buffer = [] # 存储接收到的Hex数据 self.ascii_data_buffer = [] # 存储接收到的ASCII数据 self.display_mode = QCheckBox("显示Hex数据") self.display_mode.stateChanged.connect(self.switch_display_mode) self.layout.addWidget(self.display_mode) self.create_widgets() self.bus = ZBUS() self.ser = None # 串口对象 self.serial_thread = None # 串口线程 self.timer = QTimer(self) self.timer.timeout.connect(self.check_serial_insertion) self.timer.start(1000) self.available_ports = set() # 用于存储当前可用的串口设备 self.timer = QTimer(self) self.timer.timeout.connect(self.check_serial_insertion) self.timer.start(1000) self.check_serial_insertion() self.ThreadStop = False def switch_display_mode(self): self.receive_area.clear() if self.display_mode.isChecked(): # 显示Hex数据 for line in self.hex_data_buffer: self.receive_area.append(line) else: # 显示ASCII数据 for line in self.ascii_data_buffer: self.receive_area.append(line) def create_widgets(self): self.serial_port = QComboBox() self.layout.addWidget(self.serial_port) self.baud_rate = QLineEdit("115200") self.layout.addWidget(self.baud_rate) self.timeout = QLineEdit("0.5") self.layout.addWidget(self.timeout) self.send_hello_btn = QPushButton("发送 Hello") self.send_hello_btn.clicked.connect(self.send_hello) self.layout.addWidget(self.send_hello_btn) self.send_auto_baud_btn = QPushButton("发送自动波特率帧") self.send_auto_baud_btn.clicked.connect(self.send_auto_baud) self.layout.addWidget(self.send_auto_baud_btn) self.send_lock_btn = QPushButton("发送节点ID锁定帧") self.send_lock_btn.clicked.connect(self.send_lock) self.layout.addWidget(self.send_lock_btn) self.get_node_status_btn = QPushButton("获取节点状态") self.get_node_status_btn.clicked.connect(self.get_node_status) self.layout.addWidget(self.get_node_status_btn) self.zbus_file_open_btn = QPushButton("打开文件") self.zbus_file_open_btn.clicked.connect(self.zbus_file_open_test) self.layout.addWidget(self.zbus_file_open_btn) self.send_custom_command_btn = QPushButton("发送自定义命令") self.send_custom_command_btn.clicked.connect(self.send_custom_command) self.layout.addWidget(self.send_custom_command_btn) self.open_serial_btn = QPushButton("打开串口") self.open_serial_btn.clicked.connect(self.open_serial) self.layout.addWidget(self.open_serial_btn) self.close_serial_btn = QPushButton("关闭串口") self.close_serial_btn.clicked.connect(self.close_serial) self.layout.addWidget(self.close_serial_btn) def detect_serial_ports(self): return set(port.device for port in list_ports.comports()) def check_serial_insertion(self): current_ports = self.detect_serial_ports() if current_ports != self.available_ports: self.available_ports = current_ports self.serial_port.clear() self.serial_port.addItems(sorted(self.available_ports)) def open_serial(self): try: if not self.serial_port.currentText(): raise ValueError("请先选择一个串口") self.ser = serial.Serial( port=self.serial_port.currentText(), baudrate=int(self.baud_rate.text()), timeout=float(self.timeout.text()) ) self.ThreadStop = False self.serial_thread = threading.Thread(target=self.serial_thread_func) self.serial_thread.start() self.log_text(f"成功打开串口: {self.serial_port.currentText()}") except Exception as e: self.log_text(f"打开串口失败: {str(e)}") def close_serial(self): try: if self.ser and self.ser.is_open: self.ThreadStop = True self.serial_thread.join() self.ser.close() self.log_text("串口已关闭") except Exception as e: self.log_text(f"关闭串口失败: {str(e)}") def serial_thread_func(self): while not self.ThreadStop: try: if self.ser.in_waiting > 0: data = self.ser.readline() decoded_data = data.decode('utf-8', errors='replace').replace('\x00', '') self.ascii_data_buffer.append(decoded_data) hex_data = ' '.join(f'{byte:02X}' for byte in data) self.hex_data_buffer.append(hex_data) self.update_receive_area(decoded_data, hex_data) except Exception as e: self.log_text(f"读取串口数据失败: {str(e)}") break def update_receive_area(self, ascii_data, hex_data): if self.display_mode.isChecked(): # 显示Hex数据 self.receive_area.append(hex_data) else: # 显示ASCII数据 self.receive_area.append(ascii_data) def send_data(self, data): try: if self.ser and self.ser.is_open: self.ser.write(data) self.log_text(f"发送数据: {data}") else: self.log_text("串口未打开") except Exception as e: self.log_text(f"发送数据失败: {str(e)}") def send_hello(self): data = b'Hello' self.send_data(data) def send_auto_baud(self): data = self.bus.ZBUS_AssembleTxFrame(0, 1, ZBUS_CMD.ZBUS_FRAME_TRANSMIT_AUTO_BAUD, None) self.send_data(data) def send_lock(self): data = self.bus.ZBUS_AssembleTxFrame(0, 1, ZBUS_CMD.ZBUS_FRAME_NODE_ID_LOCK, None) self.send_data(data) def get_node_status(self): data = self.bus.ZBUS_AssembleTxFrame(0, 1, ZBUS_CMD.ZBUS_FRAME_NODE_INFO_REQUEST, None) self.send_data(data) def zbus_file_open_test(self): filename = "example.txt" data = list(f"{STR_FORMAT_NAME}{filename}\n".encode()) assembled_data = self.bus.ZBUS_AssembleTxFrame(0, 1, ZBUS_CMD.ZBUS_FRAME_FILE_OPEN, data) self.send_data(assembled_data) def send_custom_command(self): data = b'Custom Command' self.send_data(data) def log_text(self, text): self.text_area.append(text) # 启动 GUI if __name__ == "__main__": app = QApplication(sys.argv) gui = SerialGUI() gui.show() sys.exit(app.exec_())