Files
Kivy_APP/comm_protocol_modbus.py
2025-07-25 08:08:11 +08:00

657 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#modbus协议处理
import sys
import string
import json
import serial
import time
import menu_utils as utils
import string
import struct
import math
import modbus_tk.defines as cst
from modbus_tk import modbus_rtu, modbus_tcp, modbus
from print_color import *
from comm_protocol import class_protocol
from comm_thread import class_comm_master_thread
from threading import Thread, Lock
from modbus_tk.exceptions import(
ModbusError, ModbusFunctionNotSupportedError, DuplicatedKeyError, MissingKeyError, InvalidModbusBlockError,
InvalidArgumentError, OverlapModbusBlockError, OutOfModbusBlockError, ModbusInvalidResponseError,
ModbusInvalidRequestError
)
def json_load_message(message) :
json_dict = {}
if isinstance(message, bytes) :
json_dict = json.loads(message.decode('utf-8'))
elif isinstance(message, str) :
json_dict = json.loads(message.encode('utf-8'))
return json_dict
#从通讯字符串中取出寄存器地址,位地址,个数
# 以下为允许的Modbus通讯地址信息
#"8000#2" 从8000地址开始连续取两个寄存器
#"8000#2#s" 从8000地址开始连续取两个寄存器, #s 代表有符号数, 缺省为无符号数
#"8000" 或 "8000#1" 8000地址 1个寄存器
#"8000.2#3" 取8000地址的第2位开始取 3个位的数据
#"8000#2#f#>" 8000地址开始连续取两个寄存器 #> 大端模式, #f 浮点数,
#"8000#2#f" 或 "8000#2#f#<" 8000地址开始连续取两个寄存器 #< 小端模式(缺省模式) #f 浮点数, 单精度
#"8000#4#f" 或 "8000#4#f#<" 8000地址开始连续取4个寄存器 #< 小端模式(缺省模式) #f 浮点数, 双精度
#"100618" 位地址618, 超过100000就表示是位地址
def get_modbus_comm_info(addr_string : string) : #tuple(int, int, int, bool, bool, bool)
reg_addr = 0
reg_bit = -1
reg_count = 0
is_sign = True if "#s" in addr_string else False
is_float = True if "#f" in addr_string else False
is_big_endian = True if "#>" in addr_string else False
info_splits = addr_string.split("#") # 使用'#'作为分隔符
if info_splits != None :
reg_bit_str : string = info_splits[0]
reg_bit_splits = reg_bit_str.split('.')
if len(reg_bit_splits) == 2 :
reg_addr = int(reg_bit_splits[0])
reg_bit = int(reg_bit_splits[1])
else :
reg_addr = int(reg_bit_splits[0])
reg_bit = -1
if len(info_splits) >= 2:
reg_count = int(info_splits[1])
else :
reg_count = 1
return (reg_addr, reg_bit, reg_count, is_float, is_big_endian, is_sign)
#modbus协议基类, 无法独立工作, 需要派生类实现对应函数
class class_protocol_modbus(class_protocol) :
#protocol_port_name = "COM1", "dev/tty1" 之类的设备
def __init__(self, protocol_name) :
class_protocol.__init__(self, protocol_name)
self.mode : string = None #协议open时的mode字符串
self.master : modbus.Master = None #modbus主机
self.last_errcode = 0 #协议最后一次故障代码
self.timeout = 1.0
self.mutex = Lock()
return
def set_modbus_master(self, master : modbus.Master) :
if self.master != None :
self.master.close()
self.master = master
if self.master != None :
self.master.set_timeout(self.timeout)
#reg_value最大为64位数据, reg_count最大为4
def modbus_value_to_list(self, reg_value, reg_count, is_big_endian = False) -> list :
result = []
if reg_count == 1 :
result.append(reg_value & 0xFFFF)
elif reg_count == 2:
if is_big_endian :
result.append((reg_value >> 16) & 0xFFFF)
result.append((reg_value >> 0) & 0xFFFF)
else :
result.append((reg_value >> 0) & 0xFFFF)
result.append((reg_value >> 16) & 0xFFFF)
elif reg_count == 3:
if is_big_endian :
result.append((reg_value >> 32) & 0xFFFF)
result.append((reg_value >> 16) & 0xFFFF)
result.append((reg_value >> 0) & 0xFFFF)
else :
result.append((reg_value >> 0) & 0xFFFF)
result.append((reg_value >> 16) & 0xFFFF)
result.append((reg_value >> 32) & 0xFFFF)
elif reg_count == 4:
if is_big_endian :
result.append((reg_value >> 48) & 0xFFFF)
result.append((reg_value >> 32) & 0xFFFF)
result.append((reg_value >> 16) & 0xFFFF)
result.append((reg_value >> 0) & 0xFFFF)
else :
result.append((reg_value >> 0) & 0xFFFF)
result.append((reg_value >> 16) & 0xFFFF)
result.append((reg_value >> 32) & 0xFFFF)
result.append((reg_value >> 48) & 0xFFFF)
return result
def modbus_list_to_value(self, read_regs : tuple, is_big_endian : bool = False) -> int:
read_reg_cunt = len(read_regs)
read_value = 0
if read_reg_cunt == 4 :
if is_big_endian :
read_value = (read_regs[0] << 48) | (read_regs[1] << 32) | (read_regs[2] << 16) | (read_regs[3] << 0)
else :
read_value = (read_regs[0] << 0) | (read_regs[1] << 16) | (read_regs[2] << 32) | (read_regs[3] << 48)
if read_reg_cunt == 3 :
if is_big_endian :
read_value = (read_regs[0] << 32) | (read_regs[1] << 16) | (read_regs[2] << 0)
else :
read_value = (read_regs[0] << 0) | (read_regs[1] << 16) | (read_regs[2] << 32)
elif read_reg_cunt == 2 :
if is_big_endian :
read_value = (read_regs[0] << 16) | (read_regs[1] << 0)
else :
read_value = (read_regs[0] << 0) | (read_regs[1] << 16)
elif read_reg_cunt == 1 :
read_value = read_regs[0]
return read_value
def modbus_read_regs(self, device_addr, reg_addr, reg_count) -> tuple:
result_tuple = []
try:
self.mutex.acquire()
if self.master is None:
raise Exception("Call set_modbus_master() first to init protocol master")
read_reg_count = reg_count
is_bit_addr = reg_addr >= 100000
func_code = cst.READ_COILS if is_bit_addr else cst.READ_HOLDING_REGISTERS
result_tuple = self.master.execute(slave=device_addr,
function_code=func_code,
starting_address=reg_addr % 100000,
quantity_of_x=read_reg_count)
except ModbusError as e:
self.last_errcode = e.get_exception_code()
err_msg = f"Modbus 读寄存器错误: {str(e)}, modbus 读寄存器(addr={device_addr}, reg={reg_addr}, count={reg_count})"
print_error_msg(err_msg)
raise
except Exception as e:
err_msg = f"其他错误: {str(e)}"
print_error_msg(err_msg)
raise
finally:
self.mutex.release()
return result_tuple
#reg_addr >= 100000 时为写 位地址, 否则写保持寄存器
#values 传入整数或浮点数时, 表示写单个寄存器, 或 强制单个位,
#强制单个位时modbus-tk会转化成0xFF00, 0x0000, 要是其他值时需要修改modbus-tk底层库
def modbus_write_regs(self, device_addr, reg_addr, reg_count, values) -> bool :
try :
success = False
self.mutex.acquire()
if self.master == None :
raise Exception("Call set_modbus_master() first to init protocol master")
if isinstance(values, float) :
reg_value = (int)(round(values))
values = [reg_value]
elif isinstance(values, int) :
reg_value = (int)(values)
values = [reg_value]
if len(values) != reg_count or reg_count == 0:
raise Exception("error, value size %d not match to reg_count"%(len(values)))
is_bit_addr = True if reg_addr >= 100000 else False
if len(values) >= 2 :
func_code = cst.WRITE_MULTIPLE_COILS if is_bit_addr else cst.WRITE_MULTIPLE_REGISTERS
self.master.execute(slave = device_addr,
function_code = func_code,
starting_address = reg_addr % 100000,
quantity_of_x = len(values),
output_value= values)
else :
func_code = cst.WRITE_SINGLE_COIL if is_bit_addr else cst.WRITE_SINGLE_REGISTER
if func_code == cst.WRITE_SINGLE_COIL :
write_value= values[0] & 0xFFFF
else :
write_value = values[0]
self.master.execute(slave = device_addr,
function_code = func_code,
starting_address = reg_addr % 100000,
quantity_of_x = len(values),
output_value= write_value)
success = True
except ModbusError as e:
self.last_errcode = e.get_exception_code()
print_error_msg(f"modbus写寄存器错误: {str(e)}, modbus写寄存器(addr={device_addr}, reg={reg_addr}, count={reg_count})")
success = False
except Exception as e:
success = False
print_error_msg(f"其他错误: {str(e)}")
finally:
self.mutex.release()
return success
def modbus_read_float(self, device_addr, comm_str :string, scale = 1.0, offset = 0.0) -> float:
fvalue = math.nan
try :
reg_addr, reg_bit, reg_count, is_float, is_big_endian, is_sign = get_modbus_comm_info(comm_str)
if self.master == None :
raise Exception("Call set_modbus_master() first to init protocol master")
read_reg_count = reg_count
if reg_bit >= 0 :
read_reg_count = 1 + (int)((reg_bit + reg_count - 1) / 16)
result = self.modbus_read_regs(device_addr, reg_addr, read_reg_count)
if read_reg_count == 4 :
if is_big_endian :
read_value = (result[0] << 48) | (result[1] << 32) | (result[2] << 16) | (result[3] << 0)
else :
read_value = (result[0] << 0) | (result[1] << 16) | (result[2] << 32) | (result[3] << 48)
if is_sign :
read_value = utils.value_u64_to_s64(read_value)
elif read_reg_count == 3 :
if is_big_endian :
read_value = (result[0] << 32) | (result[1] << 16) | (result[2] << 0)
else :
read_value = (result[0] << 0) | (result[1] << 16) | (result[2] << 32)
elif read_reg_count == 2 :
if is_big_endian :
read_value = (result[0] << 16) | (result[1] << 0)
else :
read_value = (result[0] << 0) | (result[1] << 16)
if is_sign :
read_value = utils.value_u32_to_s32(read_value)
elif read_reg_count == 1 :
read_value = result[0]
if is_sign :
read_value = utils.value_u16_to_s16(read_value)
else :
raise Exception("目前只支持读取(1,2,3,4)个寄存器个数")
if reg_bit >= 0 :
read_value = (read_value >> reg_bit) & (0xFFFFFFFF >> (32 - reg_count))
if is_float :
fvalue = math.nan
if read_reg_count == 2:
bytes = struct.pack('BBBB', read_value & 0xFF, (read_value >> 8) & 0xFF, (read_value >> 16) & 0xFF, (read_value >> 24) & 0xFF)
fvalue = struct.unpack("<f", bytes)
elif read_reg_count == 4:
low_part = read_value & 0xFFFFFFFF
low_bytes = struct.pack('BBBB', low_part & 0xFF, (low_part >> 8) & 0xFF, (low_part >> 16) & 0xFF, (low_part >> 24) & 0xFF)
high_part = (read_value >> 32) & 0xFFFFFFFF
high_bytes = struct.pack('BBBB', high_part & 0xFF, (high_part >> 8) & 0xFF, (high_part >> 16) & 0xFF, (high_part >> 24) & 0xFF)
bytes = low_bytes + high_bytes
fvalue = struct.unpack("<d", bytes)
else :
fvalue = float(read_value)
if fvalue != math.nan :
fvalue = fvalue * scale + offset
except Exception as e:
fvalue = math.nan #无效浮点数
print_error_msg(f"modbus读浮点数{str(e)} 读保持寄存器(addr={device_addr}, reg={reg_addr}, count={read_reg_count})")
finally:
return fvalue
def modbus_read_float_arr(self, device_addr, comm_str :string, count) : # -> tuple(float)
fvalue = math.nan
total_reg_count = 0
try :
reg_addr, reg_bit, reg_count, is_float, is_big_endian, is_sign = get_modbus_comm_info(comm_str)
if self.master == None :
raise Exception("Call set_modbus_master() first to init protocol master")
float_result_arr = []
read_reg_count = reg_count
if reg_bit >= 0 :
raise Exception("multi read for reg.bit is not support")
#读连续寄存器数据, 再添加到浮点数 数组列表
total_reg_count = read_reg_count * count
result = self.modbus_read_regs(device_addr, reg_addr, total_reg_count)
for i in range(count) :
if read_reg_count == 4 :
if is_big_endian :
read_value = (result[0] << 48) | (result[1] << 32) | (result[2] << 16) | (result[3] << 0)
else :
read_value = (result[0] << 0) | (result[1] << 16) | (result[2] << 32) | (result[3] << 48)
if is_sign :
read_value = utils.value_u64_to_s64(read_value)
elif read_reg_count == 3 :
if is_big_endian :
read_value = (result[0] << 32) | (result[1] << 16) | (result[2] << 0)
else :
read_value = (result[0] << 0) | (result[1] << 16) | (result[2] << 32)
elif read_reg_count == 2 :
if is_big_endian :
read_value = (result[0] << 16) | (result[1] << 0)
else :
read_value = (result[0] << 0) | (result[1] << 16)
if is_sign :
read_value = utils.value_u32_to_s32(read_value)
elif read_reg_count == 1 :
read_value = result[0]
if is_sign :
read_value = utils.value_u16_to_s16(read_value)
else :
raise Exception("目前只支持读取(1,2,3,4)个寄存器个数")
if is_float :
fvalue = math.nan
if read_reg_count == 2:
bytes = struct.pack('BBBB', read_value & 0xFF, (read_value >> 8) & 0xFF, (read_value >> 16) & 0xFF, (read_value >> 24) & 0xFF)
fvalue = struct.unpack("<f", bytes)
elif read_reg_count == 4:
low_part = read_value & 0xFFFFFFFF
low_bytes = struct.pack('BBBB', low_part & 0xFF, (low_part >> 8) & 0xFF, (low_part >> 16) & 0xFF, (low_part >> 24) & 0xFF)
high_part = (read_value >> 32) & 0xFFFFFFFF
high_bytes = struct.pack('BBBB', high_part & 0xFF, (high_part >> 8) & 0xFF, (high_part >> 16) & 0xFF, (high_part >> 24) & 0xFF)
bytes = low_bytes + high_bytes
fvalue = struct.unpack("<d", bytes)
else :
fvalue = float(read_value)
float_result_arr.append(fvalue)
except Exception as e:
float_result_arr = []
print_error_msg(f"modbus读浮点数数组{str(e)},读保持寄存器(addr={device_addr}, reg={reg_addr}, count={total_reg_count})")
finally:
return float_result_arr
def modbus_write_float(self, device_addr, comm_str, fvalue, scale = 1.0, offset = 0.0) -> bool:
success = False
write_value = 0
is_writing = False
is_reading = False
try :
reg_addr, reg_bit, reg_count, is_float, is_big_endian, is_sign = get_modbus_comm_info(comm_str)
if self.master == None :
raise Exception("Call set_modbus_master() first to init protocol master")
fvalue_origin = round((fvalue - offset) / scale, 0)
write_reg_count = reg_count
if reg_bit >= 0 :
write_reg_count = 1 + (int)((reg_bit + reg_count - 1) / 16)
if write_reg_count > 4 : #寄存器个数大于4个, 目前不支持
raise Exception("write_reg_count is more than 4")
write_reg_values = [] #定义空列表
if reg_addr >= 100000 and reg_addr < 200000 : #位数据
if fvalue > 0 :
write_reg_values.append(1)
else :
write_reg_values.append(0)
elif reg_bit >= 0 :
if write_reg_count > 2 :
raise Exception("register count is > 2")
is_reading = True
read_regs = self.modbus_read_regs(device_addr, reg_addr, write_reg_count)
is_reading = False
origin_read_value = self.modbus_list_to_value(read_regs, is_big_endian)
write_value = (int)(fvalue_origin)
bit_mask = (0xFFFFFFFF >> (32 - reg_count))
set_mask = write_value & bit_mask
modify_value = origin_read_value
modify_value = modify_value & ~(bit_mask << reg_bit)
modify_value = modify_value | (set_mask << reg_bit)
write_reg_values = self.modbus_value_to_list(modify_value, write_reg_count, is_big_endian)
elif is_float :
if is_big_endian :
format_pack = ">d" if write_reg_count == 4 else ">f"
else :
format_pack = "<d" if write_reg_count == 4 else "<f"
format_unpack = "HHHH" if write_reg_count == 4 else "HH"
bytes = struct.pack(format_pack, fvalue_origin)
write_reg_values = struct.unpack(format_unpack, bytes)
else :
modify_value = round(fvalue_origin)
write_reg_values = self.modbus_value_to_list(modify_value, write_reg_count, is_big_endian)
is_writing = True #以下真正开始写寄存器
self.modbus_write_regs(device_addr, reg_addr, write_reg_count, write_reg_values)
is_writing = False
success = True
except Exception as e:
success = False
if is_reading : #正在读时发送异常
print_error_msg(f"正在读浮点数时异常{str(e)}, 读保持寄存器(addr={device_addr}, reg={reg_addr}, count={write_reg_count})")
elif is_writing : #正在写时发送异常
print_error_msg(f"正在写浮点数时异常{str(e)}, 写保持寄存器(addr={device_addr}, reg={reg_addr}, count={write_reg_count})")
else :
print_error_msg(f"其他错误: {str(e)}")
finally:
return success
def modbus_write_float_arr(self, device_addr, comm_str, fvalue_arr) -> bool:
success = False
is_writing = False
is_read = False
try :
reg_addr, reg_bit, reg_count, is_float, is_big_endian, is_sign = get_modbus_comm_info(comm_str)
if self.master == None :
raise Exception("Call set_modbus_master() first to init protocol master")
write_reg_values = []
write_reg_count = reg_count
if reg_bit >= 0 :
raise Exception("multi write for reg.bit is not support")
if write_reg_count > 4 : #单次写入寄存器个数大于4个, 目前不支持
raise Exception("write_reg_count is more than 4")
for fvalue_origin in fvalue_arr:
if reg_addr >= 100000 and reg_addr < 200000 : #位数据
if fvalue_origin > 0 :
write_reg_values.append(1)
else :
write_reg_values.append(0)
elif is_float :
if is_big_endian :
format_pack = ">d" if write_reg_count == 4 else ">f"
else :
format_pack = "<d" if write_reg_count == 4 else "<f"
format_unpack = "HHHH" if write_reg_count == 4 else "HH"
bytes = struct.pack(format_pack, fvalue_origin)
write_reg_values += struct.unpack(format_unpack, bytes)
else :
modify_value = round(fvalue_origin)
write_reg_values += self.modbus_value_to_list(modify_value, write_reg_count, is_big_endian)
is_writing = True #以下真正开始写寄存器
self.modbus_write_regs(device_addr, reg_addr, len(write_reg_values), write_reg_values)
is_writing = False
success = True
except Exception as e:
success = False
if is_read : #正在读时发送异常
print_error_msg(f"正在读浮点数时异常{str(e)}, 读保持寄存器(addr={device_addr}, reg={reg_addr}, count={write_reg_count})")
elif is_writing : #正在写时发送异常
print_error_msg(f"正在写浮点数时异常{str(e)}, 写保持寄存器(addr={device_addr}, reg={reg_addr}, count={write_reg_count})")
else :
print_error_msg(f"其他错误: {str(e)}")
finally:
return success
#@override
def set_timeout(self, timeout = 5.0) :
self.timeout = timeout
if self.master != None :
self.master.set_timeout(self.timeout)
def get_timeout(self) :
return self.timeout
#@override 返回 math.nan 表示读取失败, 否则读取成功
def read_float(self, device_addr, comm_str, scale = 1.0, offset = 0.0) -> float:
return self.modbus_read_float(device_addr, comm_str, scale, offset)
#@override 返回 False 表示读取失败, 否则读取成功
def write_float(self, device_addr, comm_str, fvalue, scale = 1.0, offset = 0.0) -> bool:
return self.modbus_write_float(device_addr, comm_str, fvalue, scale, offset)
#@override
#读取comm_str地址开始的连续地址count个数 地址连续
#返回 math.nan 表示读取失败, 否则读取成功
def read_float_arr(self, device_addr : int, comm_str : string, count : int) :
return self.modbus_read_float_arr(device_addr, comm_str, count)
#@override
#写入comm_str地址开始的连续地址 写入总个数由 len(fvalue_arr)决定
#返回 False 表示读取失败, 否则读取成功
def write_float_arr(self, device_addr : int, comm_str : string, fvalue_arr : list) -> bool:
return self.modbus_write_float_arr(device_addr, comm_str, fvalue_arr)
#串口Modbus_rtu主机类
class class_protocol_modbus_rtu(class_protocol_modbus) :
def __init__(self, protocol_name = "modbus_rtu") :
class_protocol_modbus.__init__(self, protocol_name)
self.comm_parity = serial.PARITY_NONE
self.comm_bytesize = serial.EIGHTBITS
self.comm_stopbits = serial.STOPBITS_ONE
self.is_open = False
self.comm_uart = None
self.rtu_master : modbus_rtu.RtuMaster = None
return
#@override
#parity('N', 'E', 'O', 'M', 'S') 选其一, 含义分别代表 ("无校验", "偶校验", "奇校验", "校验1", "校验0"), 缺省为无校验
#stop(1, 1.5, 2)选其一, 含义分别代表 ("1个停止位", "1.5个停止位", "2个停止位"), 缺省为1个停止位
#bit(5, 6, 7, 8)选其一, 含义分别代表 ("5位数据", "6位数据", "7位数据""8位数据"), 缺省为8位数据
#baud(50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800, 9600, 19200, 38400, 57600, 115200)选其一, 缺省为9600位数据
#mode 需要是一个json字符串, exam: "{"name" : 'COM1', "baud" : 115200, "parity" :'E', "stop" : 1}" 代表COM1, 115200波特率, 偶校验, 1停止位
def open(self, mode : string) :
try :
class_protocol_modbus.mode = mode
mode_dict = json_load_message(mode)
print_inform_msg(f"打开: 协议名称:{self.protocol_name} ,工作端口:{self.mode}")
except Exception as e:
print_error_msg(f"错误modbus rtu信息:{str(e)}")
mode_dict = None
if mode_dict :
self.comm_port_name = utils.dict_or_object_get_attr(mode_dict, "name", "COM1")
self.comm_parity = utils.dict_or_object_get_attr(mode_dict, "parity", serial.PARITY_NONE)
self.comm_baud = utils.dict_or_object_get_attr(mode_dict, "baud", 9600)
self.comm_stopbits = utils.dict_or_object_get_attr(mode_dict, "stop", serial.STOPBITS_ONE)
self.comm_bytesize = utils.dict_or_object_get_attr(mode_dict, "bit", serial.EIGHTBITS)
try :
self.comm_uart = serial.Serial(port= self.comm_port_name,
baudrate= self.comm_baud,
bytesize= self.comm_bytesize,
parity=self.comm_parity,
stopbits=self.comm_stopbits)
self.rtu_master = modbus_rtu.RtuMaster(self.comm_uart)
self.set_modbus_master(self.rtu_master)
self.set_timeout(self.timeout)
self.is_open = True
except Exception as e:
print_error_msg(f"错误modbus rtu信息:{str(e)}")
self.comm_uart = None
self.is_open = False
#@override 关闭到modbus rtu 对应的串口
def close(self) :
if self.is_open :
try :
print_inform_msg(f"关闭 : 协议名称:{self.protocol_name} ,工作端口:{self.mode}")
if self.rtu_master :
self.rtu_master.close()
if self.comm_uart :
self.comm_uart.close()
except Exception as e:
print_error_msg(f"错误关闭modbus rtu信息:{str(e)}")
pass
finally :
self.rtu_master = None
self.comm_uart = None
self.is_open = False
#以太网Modbus_tcp主机类
class class_protocol_modbus_tcp(class_protocol_modbus) :
def __init__(self, protocol_name = "modbus_tcp") :
class_protocol_modbus.__init__(self, protocol_name)
self.ip_str = "127.0.0.1"
self.port = 502
self.tcp_master : modbus_tcp.TcpMaster = None
return
#ip: 服务器ip地址, 缺省"127.0.0.1"
#port: 服务器端口, 缺省502
#timeout: 超时时间, 缺省 5.0S
#mode 需要是一个json字符串, exam: '{"ip" : '192.168.1.201', "port" : 502, "timeout": 5.0}' 代表192.168.1.201, 502端口, 通讯超时 5.0S
def open(self, mode: str):
try:
self.mode = mode
print_inform_msg(f"打开 Modbus TCP: 协议名称:{self.protocol_name} ,工作端口:{self.mode}")
mode_dict = json_load_message(mode)
except json.decoder.JSONDecodeError as e:
print_error_msg(f"解析模式失败: {str(e)}")
mode_dict = None
except Exception as e:
print_error_msg(f"错误打开 Modbus TCP 信息: {str(e)}")
mode_dict = None
if mode_dict:
self.ip_str = utils.dict_or_object_get_attr(mode_dict, "ip", "127.0.0.1")
self.port = utils.dict_or_object_get_attr(mode_dict, "port", 502)
if "timeout" in mode_dict:
self.timeout = mode_dict["timeout"]
try:
self.tcp_master = modbus_tcp.TcpMaster(host=self.ip_str, port=self.port, timeout_in_sec=self.timeout)
self.set_modbus_master(self.tcp_master)
self.tcp_master.open()
self.is_open = True
except Exception as e:
print_error_msg(f"打开 Modbus TCP 失败: {str(e)} IP地址:{self.ip_str} ,端口号:{self.port}")
self.is_open = False
#关闭到modbus tcp服务器的连接
def close(self):
if self.is_open or self.tcp_master is not None:
try:
if self.tcp_master:
print_inform_msg(f"关闭 Modbus TCP: 协议名称:{self.protocol_name} ,工作端口:{self.mode}")
self.tcp_master.close()
except Exception as e:
print_error_msg(f"错误关闭 Modbus TCP 信息: {str(e)}")
pass
finally:
self.tcp_master = None
self.is_open = False
print_inform_msg("已断开与 Modbus TCP 服务器的连接")
if __name__ == "__main__":
protocol_modbus_rtu = class_protocol_modbus_rtu("modbus_rtu")
protocol_modbus_rtu.open('{"name" : "COM2", "baud":115200, "parity" :"E", "stop" : 1}')
protocol_modbus_rtu.close()
protocol_modbus_tcp : class_protocol_modbus_tcp = class_protocol_modbus_tcp("modbus_tcp")
protocol_modbus_tcp.open('{"ip" : "127.0.0.1", "port" : 502, "timeout" : 4.0}')
protocol_modbus_tcp.close()