华荣三照明、合信、荣欣八组合馈电

This commit is contained in:
冯佳
2025-06-25 11:36:43 +08:00
parent 37d39f4578
commit 25b3cb7f2e
494 changed files with 114074 additions and 0 deletions

285
Test/test_cuver.py Normal file
View File

@ -0,0 +1,285 @@
import sys
import serial
from serial.tools import list_ports
import struct
import threading
from PyQt5.QtWidgets import (
QApplication, QWidget, QVBoxLayout, QHBoxLayout, QTextEdit, QLineEdit, QPushButton,
QComboBox, QLabel, QCheckBox
)
from PyQt5.QtCore import QTimer
# 定义ZBUS命令类型
class ZBUS_CMD:
ZBUS_FRAME_NODE_INFO_REQUEST = 0
ZBUS_FRAME_NODE_ID_LOCK = 1 # 地址锁定命令
ZBUS_FRAME_NODE_ID_UNLOCK = 2
ZBUS_FRAME_ENTER_AUTO_BAUD = 3
ZBUS_FRAME_FLUSH_GLOBAL_STATUS = 4
ZBUS_FRAME_FLUSH_OUTPUT = 5
ZBUS_FRAME_FLUSH_INPUT = 6
ZBUS_FRAME_TOKEN_PASS = 7 # 令牌传递
ZBUS_FRAME_MODBUS = 8 # Modbus RTU数据
ZBUS_FRAME_CONFIG_TOKEN = 9
ZBUS_FRAME_PATH_TO_NEXT = 10
ZBUS_FRAME_TRANSMIT_AUTO_BAUD = 11 # 发送自动波特率帧, 方便从机配置波特率
ZBUS_FRAME_FILE_OPEN = 16 # 打开文件
ZBUS_FRAME_FILE_WRITE = 17 # 写入文件数据
ZBUS_FRAME_FILE_READ = 18 # 读取文件数据
ZBUS_FRAME_FILE_CLOSE = 19 # 文件关闭
ZBUS_FRAME_CUSTOM_COMMAND = 20 # 可以自定义命令
# ZBUS常量
ZBUS_LEAD_FLAG1 = 0x55
ZBUS_LEAD_FLAG2 = 0xAA
ZBUS_LEAD_FLAG_TOKEN = 0xDC
NODE_ID_GLOBAL = 0
NODE_ID_CONFIG = 1
NODE_ID_MODULE_AC12 = 2
STR_FORMAT_NAME = "name:"
STR_FORMAT_MODE = "mode:"
STR_FORMAT_WRITE = "write:"
# 定义ZBUS通讯类
class ZBUS:
def __init__(self) -> None:
self.SelfNodeId = NODE_ID_CONFIG # 配置节点Id
self.tx_frame_index = 0 # 发送帧索引
def ZBUS_CalculateLRC(self, buf_list):
lrc = sum(buf_list) # 使用 sum 函数简化计算
return lrc & 0xFFFF
def ZBUS_AssembleTxFrame(self, dest_node_id: int, source_node_id: int, frame_type: int, tx_buf: list):
# 组装一个发送帧
frame_index = self.tx_frame_index
self.tx_frame_index += 1
tx_list = []
if frame_type == ZBUS_CMD.ZBUS_FRAME_TRANSMIT_AUTO_BAUD:
tx_list = [0x55] * 32
return bytes(tx_list)
if frame_type == ZBUS_CMD.ZBUS_FRAME_TOKEN_PASS:
tx_list = struct.pack("BBB", ZBUS_LEAD_FLAG_TOKEN, dest_node_id, source_node_id)
return tx_list
cmd = frame_type # 获取枚举值
tx_list = struct.pack("BBBBBB", ZBUS_LEAD_FLAG1, ZBUS_LEAD_FLAG2, dest_node_id, source_node_id, cmd, frame_index)
if tx_buf is not None:
frame_size = len(tx_buf)
else:
frame_size = 0
tx_list += struct.pack("B", frame_size)
if tx_buf is not None:
tx_list += bytes(tx_buf)
lrc = self.ZBUS_CalculateLRC(tx_list)
tx_list += struct.pack("<H", lrc)
return tx_list
# GUI 类
class SerialGUI(QWidget):
def __init__(self) -> None:
super().__init__()
self.setWindowTitle("ZBUS 串口通讯")
self.setGeometry(100, 100, 800, 600)
self.layout = QVBoxLayout()
self.setLayout(self.layout)
self.text_area = QTextEdit()
self.layout.addWidget(self.text_area)
self.receive_area = QTextEdit()
self.layout.addWidget(self.receive_area)
self.hex_data_buffer = [] # 存储接收到的Hex数据
self.ascii_data_buffer = [] # 存储接收到的ASCII数据
self.display_mode = QCheckBox("显示Hex数据")
self.display_mode.stateChanged.connect(self.switch_display_mode)
self.layout.addWidget(self.display_mode)
self.create_widgets()
self.bus = ZBUS()
self.ser = None # 串口对象
self.serial_thread = None # 串口线程
self.timer = QTimer(self)
self.timer.timeout.connect(self.check_serial_insertion)
self.timer.start(1000)
self.available_ports = set() # 用于存储当前可用的串口设备
self.timer = QTimer(self)
self.timer.timeout.connect(self.check_serial_insertion)
self.timer.start(1000)
self.check_serial_insertion()
self.ThreadStop = False
def switch_display_mode(self):
self.receive_area.clear()
if self.display_mode.isChecked(): # 显示Hex数据
for line in self.hex_data_buffer:
self.receive_area.append(line)
else: # 显示ASCII数据
for line in self.ascii_data_buffer:
self.receive_area.append(line)
def create_widgets(self):
self.serial_port = QComboBox()
self.layout.addWidget(self.serial_port)
self.baud_rate = QLineEdit("115200")
self.layout.addWidget(self.baud_rate)
self.timeout = QLineEdit("0.5")
self.layout.addWidget(self.timeout)
self.send_hello_btn = QPushButton("发送 Hello")
self.send_hello_btn.clicked.connect(self.send_hello)
self.layout.addWidget(self.send_hello_btn)
self.send_auto_baud_btn = QPushButton("发送自动波特率帧")
self.send_auto_baud_btn.clicked.connect(self.send_auto_baud)
self.layout.addWidget(self.send_auto_baud_btn)
self.send_lock_btn = QPushButton("发送节点ID锁定帧")
self.send_lock_btn.clicked.connect(self.send_lock)
self.layout.addWidget(self.send_lock_btn)
self.get_node_status_btn = QPushButton("获取节点状态")
self.get_node_status_btn.clicked.connect(self.get_node_status)
self.layout.addWidget(self.get_node_status_btn)
self.zbus_file_open_btn = QPushButton("打开文件")
self.zbus_file_open_btn.clicked.connect(self.zbus_file_open_test)
self.layout.addWidget(self.zbus_file_open_btn)
self.send_custom_command_btn = QPushButton("发送自定义命令")
self.send_custom_command_btn.clicked.connect(self.send_custom_command)
self.layout.addWidget(self.send_custom_command_btn)
self.open_serial_btn = QPushButton("打开串口")
self.open_serial_btn.clicked.connect(self.open_serial)
self.layout.addWidget(self.open_serial_btn)
self.close_serial_btn = QPushButton("关闭串口")
self.close_serial_btn.clicked.connect(self.close_serial)
self.layout.addWidget(self.close_serial_btn)
def detect_serial_ports(self):
return set(port.device for port in list_ports.comports())
def check_serial_insertion(self):
current_ports = self.detect_serial_ports()
if current_ports != self.available_ports:
self.available_ports = current_ports
self.serial_port.clear()
self.serial_port.addItems(sorted(self.available_ports))
def open_serial(self):
try:
if not self.serial_port.currentText():
raise ValueError("请先选择一个串口")
self.ser = serial.Serial(
port=self.serial_port.currentText(),
baudrate=int(self.baud_rate.text()),
timeout=float(self.timeout.text())
)
self.ThreadStop = False
self.serial_thread = threading.Thread(target=self.serial_thread_func)
self.serial_thread.start()
self.log_text(f"成功打开串口: {self.serial_port.currentText()}")
except Exception as e:
self.log_text(f"打开串口失败: {str(e)}")
def close_serial(self):
try:
if self.ser and self.ser.is_open:
self.ThreadStop = True
self.serial_thread.join()
self.ser.close()
self.log_text("串口已关闭")
except Exception as e:
self.log_text(f"关闭串口失败: {str(e)}")
def serial_thread_func(self):
while not self.ThreadStop:
try:
if self.ser.in_waiting > 0:
data = self.ser.readline()
decoded_data = data.decode('utf-8', errors='replace').replace('\x00', '')
self.ascii_data_buffer.append(decoded_data)
hex_data = ' '.join(f'{byte:02X}' for byte in data)
self.hex_data_buffer.append(hex_data)
self.update_receive_area(decoded_data, hex_data)
except Exception as e:
self.log_text(f"读取串口数据失败: {str(e)}")
break
def update_receive_area(self, ascii_data, hex_data):
if self.display_mode.isChecked(): # 显示Hex数据
self.receive_area.append(hex_data)
else: # 显示ASCII数据
self.receive_area.append(ascii_data)
def send_data(self, data):
try:
if self.ser and self.ser.is_open:
self.ser.write(data)
self.log_text(f"发送数据: {data}")
else:
self.log_text("串口未打开")
except Exception as e:
self.log_text(f"发送数据失败: {str(e)}")
def send_hello(self):
data = b'Hello'
self.send_data(data)
def send_auto_baud(self):
data = self.bus.ZBUS_AssembleTxFrame(0, 1, ZBUS_CMD.ZBUS_FRAME_TRANSMIT_AUTO_BAUD, None)
self.send_data(data)
def send_lock(self):
data = self.bus.ZBUS_AssembleTxFrame(0, 1, ZBUS_CMD.ZBUS_FRAME_NODE_ID_LOCK, None)
self.send_data(data)
def get_node_status(self):
data = self.bus.ZBUS_AssembleTxFrame(0, 1, ZBUS_CMD.ZBUS_FRAME_NODE_INFO_REQUEST, None)
self.send_data(data)
def zbus_file_open_test(self):
filename = "example.txt"
data = list(f"{STR_FORMAT_NAME}{filename}\n".encode())
assembled_data = self.bus.ZBUS_AssembleTxFrame(0, 1, ZBUS_CMD.ZBUS_FRAME_FILE_OPEN, data)
self.send_data(assembled_data)
def send_custom_command(self):
data = b'Custom Command'
self.send_data(data)
def log_text(self, text):
self.text_area.append(text)
# 启动 GUI
if __name__ == "__main__":
app = QApplication(sys.argv)
gui = SerialGUI()
gui.show()
sys.exit(app.exec_())