#modbus协议处理 import sys import string import json import serial import time import menu_utils as utils import string import struct import math import modbus_tk.defines as cst from modbus_tk import modbus_rtu, modbus_tcp, modbus from print_color import * from comm_protocol import class_protocol from comm_thread import class_comm_master_thread from threading import Thread, Lock from modbus_tk.exceptions import( ModbusError, ModbusFunctionNotSupportedError, DuplicatedKeyError, MissingKeyError, InvalidModbusBlockError, InvalidArgumentError, OverlapModbusBlockError, OutOfModbusBlockError, ModbusInvalidResponseError, ModbusInvalidRequestError ) def json_load_message(message) : json_dict = {} if isinstance(message, bytes) : json_dict = json.loads(message.decode('utf-8')) elif isinstance(message, str) : json_dict = json.loads(message.encode('utf-8')) return json_dict #从通讯字符串中取出寄存器地址,位地址,个数 # 以下为允许的Modbus通讯地址信息 #"8000#2" 从8000地址开始,连续取两个寄存器 #"8000#2#s" 从8000地址开始,连续取两个寄存器, #s 代表有符号数, 缺省为无符号数 #"8000" 或 "8000#1" 8000地址, 1个寄存器 #"8000.2#3" 取8000地址的第2位开始取 3个位的数据 #"8000#2#f#>" 8000地址开始,连续取两个寄存器, #> 大端模式, #f 浮点数, #"8000#2#f" 或 "8000#2#f#<" 8000地址开始,连续取两个寄存器, #< 小端模式(缺省模式), #f 浮点数, 单精度 #"8000#4#f" 或 "8000#4#f#<" 8000地址开始,连续取4个寄存器, #< 小端模式(缺省模式), #f 浮点数, 双精度 #"100618" 位地址618, 超过100000就表示是位地址 def get_modbus_comm_info(addr_string : string) : #tuple(int, int, int, bool, bool, bool) reg_addr = 0 reg_bit = -1 reg_count = 0 is_sign = True if "#s" in addr_string else False is_float = True if "#f" in addr_string else False is_big_endian = True if "#>" in addr_string else False info_splits = addr_string.split("#") # 使用'#'作为分隔符 if info_splits != None : reg_bit_str : string = info_splits[0] reg_bit_splits = reg_bit_str.split('.') if len(reg_bit_splits) == 2 : reg_addr = int(reg_bit_splits[0]) reg_bit = int(reg_bit_splits[1]) else : reg_addr = int(reg_bit_splits[0]) reg_bit = -1 if len(info_splits) >= 2: reg_count = int(info_splits[1]) else : reg_count = 1 return (reg_addr, reg_bit, reg_count, is_float, is_big_endian, is_sign) #modbus协议基类, 无法独立工作, 需要派生类实现对应函数 class class_protocol_modbus(class_protocol) : #protocol_port_name = "COM1", "dev/tty1" 之类的设备 def __init__(self, protocol_name) : class_protocol.__init__(self, protocol_name) self.mode : string = None #协议open时的mode字符串 self.master : modbus.Master = None #modbus主机 self.last_errcode = 0 #协议最后一次故障代码 self.timeout = 1.0 self.mutex = Lock() return def set_modbus_master(self, master : modbus.Master) : if self.master != None : self.master.close() self.master = master if self.master != None : self.master.set_timeout(self.timeout) #reg_value最大为64位数据, reg_count最大为4 def modbus_value_to_list(self, reg_value, reg_count, is_big_endian = False) -> list : result = [] if reg_count == 1 : result.append(reg_value & 0xFFFF) elif reg_count == 2: if is_big_endian : result.append((reg_value >> 16) & 0xFFFF) result.append((reg_value >> 0) & 0xFFFF) else : result.append((reg_value >> 0) & 0xFFFF) result.append((reg_value >> 16) & 0xFFFF) elif reg_count == 3: if is_big_endian : result.append((reg_value >> 32) & 0xFFFF) result.append((reg_value >> 16) & 0xFFFF) result.append((reg_value >> 0) & 0xFFFF) else : result.append((reg_value >> 0) & 0xFFFF) result.append((reg_value >> 16) & 0xFFFF) result.append((reg_value >> 32) & 0xFFFF) elif reg_count == 4: if is_big_endian : result.append((reg_value >> 48) & 0xFFFF) result.append((reg_value >> 32) & 0xFFFF) result.append((reg_value >> 16) & 0xFFFF) result.append((reg_value >> 0) & 0xFFFF) else : result.append((reg_value >> 0) & 0xFFFF) result.append((reg_value >> 16) & 0xFFFF) result.append((reg_value >> 32) & 0xFFFF) result.append((reg_value >> 48) & 0xFFFF) return result def modbus_list_to_value(self, read_regs : tuple, is_big_endian : bool = False) -> int: read_reg_cunt = len(read_regs) read_value = 0 if read_reg_cunt == 4 : if is_big_endian : read_value = (read_regs[0] << 48) | (read_regs[1] << 32) | (read_regs[2] << 16) | (read_regs[3] << 0) else : read_value = (read_regs[0] << 0) | (read_regs[1] << 16) | (read_regs[2] << 32) | (read_regs[3] << 48) if read_reg_cunt == 3 : if is_big_endian : read_value = (read_regs[0] << 32) | (read_regs[1] << 16) | (read_regs[2] << 0) else : read_value = (read_regs[0] << 0) | (read_regs[1] << 16) | (read_regs[2] << 32) elif read_reg_cunt == 2 : if is_big_endian : read_value = (read_regs[0] << 16) | (read_regs[1] << 0) else : read_value = (read_regs[0] << 0) | (read_regs[1] << 16) elif read_reg_cunt == 1 : read_value = read_regs[0] return read_value def modbus_read_regs(self, device_addr, reg_addr, reg_count) -> tuple: result_tuple = [] try: self.mutex.acquire() if self.master is None: raise Exception("Call set_modbus_master() first to init protocol master") read_reg_count = reg_count is_bit_addr = reg_addr >= 100000 func_code = cst.READ_COILS if is_bit_addr else cst.READ_HOLDING_REGISTERS result_tuple = self.master.execute(slave=device_addr, function_code=func_code, starting_address=reg_addr % 100000, quantity_of_x=read_reg_count) except ModbusError as e: self.last_errcode = e.get_exception_code() err_msg = f"Modbus 读寄存器错误: {str(e)}, modbus 读寄存器(addr={device_addr}, reg={reg_addr}, count={reg_count})" print_error_msg(err_msg) raise except Exception as e: err_msg = f"其他错误: {str(e)}" print_error_msg(err_msg) raise finally: self.mutex.release() return result_tuple #reg_addr >= 100000 时为写 位地址, 否则写保持寄存器 #values 传入整数或浮点数时, 表示写单个寄存器, 或 强制单个位, #强制单个位时,modbus-tk会转化成0xFF00, 0x0000, 要是其他值时,需要修改modbus-tk底层库 def modbus_write_regs(self, device_addr, reg_addr, reg_count, values) -> bool : try : success = False self.mutex.acquire() if self.master == None : raise Exception("Call set_modbus_master() first to init protocol master") if isinstance(values, float) : reg_value = (int)(round(values)) values = [reg_value] elif isinstance(values, int) : reg_value = (int)(values) values = [reg_value] if len(values) != reg_count or reg_count == 0: raise Exception("error, value size %d not match to reg_count"%(len(values))) is_bit_addr = True if reg_addr >= 100000 else False if len(values) >= 2 : func_code = cst.WRITE_MULTIPLE_COILS if is_bit_addr else cst.WRITE_MULTIPLE_REGISTERS self.master.execute(slave = device_addr, function_code = func_code, starting_address = reg_addr % 100000, quantity_of_x = len(values), output_value= values) else : func_code = cst.WRITE_SINGLE_COIL if is_bit_addr else cst.WRITE_SINGLE_REGISTER if func_code == cst.WRITE_SINGLE_COIL : write_value= values[0] & 0xFFFF else : write_value = values[0] self.master.execute(slave = device_addr, function_code = func_code, starting_address = reg_addr % 100000, quantity_of_x = len(values), output_value= write_value) success = True except ModbusError as e: self.last_errcode = e.get_exception_code() print_error_msg(f"modbus写寄存器错误: {str(e)}, modbus写寄存器(addr={device_addr}, reg={reg_addr}, count={reg_count})") success = False except Exception as e: success = False print_error_msg(f"其他错误: {str(e)}") finally: self.mutex.release() return success def modbus_read_float(self, device_addr, comm_str :string, scale = 1.0, offset = 0.0) -> float: fvalue = math.nan try : reg_addr, reg_bit, reg_count, is_float, is_big_endian, is_sign = get_modbus_comm_info(comm_str) if self.master == None : raise Exception("Call set_modbus_master() first to init protocol master") read_reg_count = reg_count if reg_bit >= 0 : read_reg_count = 1 + (int)((reg_bit + reg_count - 1) / 16) result = self.modbus_read_regs(device_addr, reg_addr, read_reg_count) if read_reg_count == 4 : if is_big_endian : read_value = (result[0] << 48) | (result[1] << 32) | (result[2] << 16) | (result[3] << 0) else : read_value = (result[0] << 0) | (result[1] << 16) | (result[2] << 32) | (result[3] << 48) if is_sign : read_value = utils.value_u64_to_s64(read_value) elif read_reg_count == 3 : if is_big_endian : read_value = (result[0] << 32) | (result[1] << 16) | (result[2] << 0) else : read_value = (result[0] << 0) | (result[1] << 16) | (result[2] << 32) elif read_reg_count == 2 : if is_big_endian : read_value = (result[0] << 16) | (result[1] << 0) else : read_value = (result[0] << 0) | (result[1] << 16) if is_sign : read_value = utils.value_u32_to_s32(read_value) elif read_reg_count == 1 : read_value = result[0] if is_sign : read_value = utils.value_u16_to_s16(read_value) else : raise Exception("目前只支持读取(1,2,3,4)个寄存器个数") if reg_bit >= 0 : read_value = (read_value >> reg_bit) & (0xFFFFFFFF >> (32 - reg_count)) if is_float : fvalue = math.nan if read_reg_count == 2: bytes = struct.pack('BBBB', read_value & 0xFF, (read_value >> 8) & 0xFF, (read_value >> 16) & 0xFF, (read_value >> 24) & 0xFF) fvalue = struct.unpack("> 8) & 0xFF, (low_part >> 16) & 0xFF, (low_part >> 24) & 0xFF) high_part = (read_value >> 32) & 0xFFFFFFFF high_bytes = struct.pack('BBBB', high_part & 0xFF, (high_part >> 8) & 0xFF, (high_part >> 16) & 0xFF, (high_part >> 24) & 0xFF) bytes = low_bytes + high_bytes fvalue = struct.unpack(" tuple(float) fvalue = math.nan total_reg_count = 0 try : reg_addr, reg_bit, reg_count, is_float, is_big_endian, is_sign = get_modbus_comm_info(comm_str) if self.master == None : raise Exception("Call set_modbus_master() first to init protocol master") float_result_arr = [] read_reg_count = reg_count if reg_bit >= 0 : raise Exception("multi read for reg.bit is not support") #读连续寄存器数据, 再添加到浮点数 数组列表 total_reg_count = read_reg_count * count result = self.modbus_read_regs(device_addr, reg_addr, total_reg_count) for i in range(count) : if read_reg_count == 4 : if is_big_endian : read_value = (result[0] << 48) | (result[1] << 32) | (result[2] << 16) | (result[3] << 0) else : read_value = (result[0] << 0) | (result[1] << 16) | (result[2] << 32) | (result[3] << 48) if is_sign : read_value = utils.value_u64_to_s64(read_value) elif read_reg_count == 3 : if is_big_endian : read_value = (result[0] << 32) | (result[1] << 16) | (result[2] << 0) else : read_value = (result[0] << 0) | (result[1] << 16) | (result[2] << 32) elif read_reg_count == 2 : if is_big_endian : read_value = (result[0] << 16) | (result[1] << 0) else : read_value = (result[0] << 0) | (result[1] << 16) if is_sign : read_value = utils.value_u32_to_s32(read_value) elif read_reg_count == 1 : read_value = result[0] if is_sign : read_value = utils.value_u16_to_s16(read_value) else : raise Exception("目前只支持读取(1,2,3,4)个寄存器个数") if is_float : fvalue = math.nan if read_reg_count == 2: bytes = struct.pack('BBBB', read_value & 0xFF, (read_value >> 8) & 0xFF, (read_value >> 16) & 0xFF, (read_value >> 24) & 0xFF) fvalue = struct.unpack("> 8) & 0xFF, (low_part >> 16) & 0xFF, (low_part >> 24) & 0xFF) high_part = (read_value >> 32) & 0xFFFFFFFF high_bytes = struct.pack('BBBB', high_part & 0xFF, (high_part >> 8) & 0xFF, (high_part >> 16) & 0xFF, (high_part >> 24) & 0xFF) bytes = low_bytes + high_bytes fvalue = struct.unpack(" bool: success = False write_value = 0 is_writing = False is_reading = False try : reg_addr, reg_bit, reg_count, is_float, is_big_endian, is_sign = get_modbus_comm_info(comm_str) if self.master == None : raise Exception("Call set_modbus_master() first to init protocol master") fvalue_origin = round((fvalue - offset) / scale, 0) write_reg_count = reg_count if reg_bit >= 0 : write_reg_count = 1 + (int)((reg_bit + reg_count - 1) / 16) if write_reg_count > 4 : #寄存器个数大于4个, 目前不支持 raise Exception("write_reg_count is more than 4") write_reg_values = [] #定义空列表 if reg_addr >= 100000 and reg_addr < 200000 : #位数据 if fvalue > 0 : write_reg_values.append(1) else : write_reg_values.append(0) elif reg_bit >= 0 : if write_reg_count > 2 : raise Exception("register count is > 2") is_reading = True read_regs = self.modbus_read_regs(device_addr, reg_addr, write_reg_count) is_reading = False origin_read_value = self.modbus_list_to_value(read_regs, is_big_endian) write_value = (int)(fvalue_origin) bit_mask = (0xFFFFFFFF >> (32 - reg_count)) set_mask = write_value & bit_mask modify_value = origin_read_value modify_value = modify_value & ~(bit_mask << reg_bit) modify_value = modify_value | (set_mask << reg_bit) write_reg_values = self.modbus_value_to_list(modify_value, write_reg_count, is_big_endian) elif is_float : if is_big_endian : format_pack = ">d" if write_reg_count == 4 else ">f" else : format_pack = " bool: success = False is_writing = False is_read = False try : reg_addr, reg_bit, reg_count, is_float, is_big_endian, is_sign = get_modbus_comm_info(comm_str) if self.master == None : raise Exception("Call set_modbus_master() first to init protocol master") write_reg_values = [] write_reg_count = reg_count if reg_bit >= 0 : raise Exception("multi write for reg.bit is not support") if write_reg_count > 4 : #单次写入寄存器个数大于4个, 目前不支持 raise Exception("write_reg_count is more than 4") for fvalue_origin in fvalue_arr: if reg_addr >= 100000 and reg_addr < 200000 : #位数据 if fvalue_origin > 0 : write_reg_values.append(1) else : write_reg_values.append(0) elif is_float : if is_big_endian : format_pack = ">d" if write_reg_count == 4 else ">f" else : format_pack = " float: return self.modbus_read_float(device_addr, comm_str, scale, offset) #@override 返回 False 表示读取失败, 否则读取成功 def write_float(self, device_addr, comm_str, fvalue, scale = 1.0, offset = 0.0) -> bool: return self.modbus_write_float(device_addr, comm_str, fvalue, scale, offset) #@override #读取comm_str地址开始的连续地址count个数, 地址连续 #返回 math.nan 表示读取失败, 否则读取成功 def read_float_arr(self, device_addr : int, comm_str : string, count : int) : return self.modbus_read_float_arr(device_addr, comm_str, count) #@override #写入comm_str地址开始的连续地址 写入总个数由 len(fvalue_arr)决定 #返回 False 表示读取失败, 否则读取成功 def write_float_arr(self, device_addr : int, comm_str : string, fvalue_arr : list) -> bool: return self.modbus_write_float_arr(device_addr, comm_str, fvalue_arr) #串口Modbus_rtu主机类 class class_protocol_modbus_rtu(class_protocol_modbus) : def __init__(self, protocol_name = "modbus_rtu") : class_protocol_modbus.__init__(self, protocol_name) self.comm_parity = serial.PARITY_NONE self.comm_bytesize = serial.EIGHTBITS self.comm_stopbits = serial.STOPBITS_ONE self.is_open = False self.comm_uart = None self.rtu_master : modbus_rtu.RtuMaster = None return #@override #parity('N', 'E', 'O', 'M', 'S') 选其一, 含义分别代表 ("无校验", "偶校验", "奇校验", "校验1", "校验0"), 缺省为无校验 #stop(1, 1.5, 2)选其一, 含义分别代表 ("1个停止位", "1.5个停止位", "2个停止位"), 缺省为1个停止位 #bit(5, 6, 7, 8)选其一, 含义分别代表 ("5位数据", "6位数据", "7位数据","8位数据"), 缺省为8位数据 #baud(50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800, 9600, 19200, 38400, 57600, 115200)选其一, 缺省为9600位数据 #mode 需要是一个json字符串, exam: "{"name" : 'COM1', "baud" : 115200, "parity" :'E', "stop" : 1}" 代表COM1, 115200波特率, 偶校验, 1停止位 def open(self, mode : string) : try : class_protocol_modbus.mode = mode mode_dict = json_load_message(mode) print_inform_msg(f"打开: 协议名称:{self.protocol_name} ,工作端口:{self.mode}") except Exception as e: print_error_msg(f"错误modbus rtu信息:{str(e)}") mode_dict = None if mode_dict : self.comm_port_name = utils.dict_or_object_get_attr(mode_dict, "name", "COM1") self.comm_parity = utils.dict_or_object_get_attr(mode_dict, "parity", serial.PARITY_NONE) self.comm_baud = utils.dict_or_object_get_attr(mode_dict, "baud", 9600) self.comm_stopbits = utils.dict_or_object_get_attr(mode_dict, "stop", serial.STOPBITS_ONE) self.comm_bytesize = utils.dict_or_object_get_attr(mode_dict, "bit", serial.EIGHTBITS) try : self.comm_uart = serial.Serial(port= self.comm_port_name, baudrate= self.comm_baud, bytesize= self.comm_bytesize, parity=self.comm_parity, stopbits=self.comm_stopbits) self.rtu_master = modbus_rtu.RtuMaster(self.comm_uart) self.set_modbus_master(self.rtu_master) self.set_timeout(self.timeout) self.is_open = True except Exception as e: print_error_msg(f"错误modbus rtu信息:{str(e)}") self.comm_uart = None self.is_open = False #@override 关闭到modbus rtu 对应的串口 def close(self) : if self.is_open : try : print_inform_msg(f"关闭 : 协议名称:{self.protocol_name} ,工作端口:{self.mode}") if self.rtu_master : self.rtu_master.close() if self.comm_uart : self.comm_uart.close() except Exception as e: print_error_msg(f"错误关闭modbus rtu信息:{str(e)}") pass finally : self.rtu_master = None self.comm_uart = None self.is_open = False #以太网Modbus_tcp主机类 class class_protocol_modbus_tcp(class_protocol_modbus) : def __init__(self, protocol_name = "modbus_tcp") : class_protocol_modbus.__init__(self, protocol_name) self.ip_str = "127.0.0.1" self.port = 502 self.tcp_master : modbus_tcp.TcpMaster = None return #ip: 服务器ip地址, 缺省"127.0.0.1" #port: 服务器端口, 缺省502 #timeout: 超时时间, 缺省 5.0S #mode 需要是一个json字符串, exam: '{"ip" : '192.168.1.201', "port" : 502, "timeout": 5.0}' 代表192.168.1.201, 502端口, 通讯超时 5.0S def open(self, mode: str): try: self.mode = mode print_inform_msg(f"打开 Modbus TCP: 协议名称:{self.protocol_name} ,工作端口:{self.mode}") mode_dict = json_load_message(mode) except json.decoder.JSONDecodeError as e: print_error_msg(f"解析模式失败: {str(e)}") mode_dict = None except Exception as e: print_error_msg(f"错误打开 Modbus TCP 信息: {str(e)}") mode_dict = None if mode_dict: self.ip_str = utils.dict_or_object_get_attr(mode_dict, "ip", "127.0.0.1") self.port = utils.dict_or_object_get_attr(mode_dict, "port", 502) if "timeout" in mode_dict: self.timeout = mode_dict["timeout"] try: self.tcp_master = modbus_tcp.TcpMaster(host=self.ip_str, port=self.port, timeout_in_sec=self.timeout) self.set_modbus_master(self.tcp_master) self.tcp_master.open() self.is_open = True except Exception as e: print_error_msg(f"打开 Modbus TCP 失败: {str(e)} IP地址:{self.ip_str} ,端口号:{self.port}") self.is_open = False #关闭到modbus tcp服务器的连接 def close(self): if self.is_open or self.tcp_master is not None: try: if self.tcp_master: print_inform_msg(f"关闭 Modbus TCP: 协议名称:{self.protocol_name} ,工作端口:{self.mode}") self.tcp_master.close() except Exception as e: print_error_msg(f"错误关闭 Modbus TCP 信息: {str(e)}") pass finally: self.tcp_master = None self.is_open = False print_inform_msg("已断开与 Modbus TCP 服务器的连接") if __name__ == "__main__": protocol_modbus_rtu = class_protocol_modbus_rtu("modbus_rtu") protocol_modbus_rtu.open('{"name" : "COM2", "baud":115200, "parity" :"E", "stop" : 1}') protocol_modbus_rtu.close() protocol_modbus_tcp : class_protocol_modbus_tcp = class_protocol_modbus_tcp("modbus_tcp") protocol_modbus_tcp.open('{"ip" : "127.0.0.1", "port" : 502, "timeout" : 4.0}') protocol_modbus_tcp.close()