import modbus_tk import modbus_tk.defines as cst import modbus_tk.modbus_tcp as modbus_tcp import socket class ModbusClient: def __init__(self): self.master = None def connect(self, ip, port): if self.master: self.master.close() self.master = None try: self.master = modbus_tcp.TcpMaster(host=ip, port=port) self.master.set_timeout(5.0) self.master.set_verbose(False) # Test connection self.master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 1) return {'success': True, 'msg': f"Modbus连接成功\nIP: {ip}\n端口: {port}"} except modbus_tk.modbus.ModbusError as e: self.master = None return {'success': False, 'msg': f"Modbus协议错误: {e}\n从站地址: {e.slave}\n功能码: {e.function_code}"} except socket.error as e: self.master = None return {'success': False, 'msg': f"网络连接失败: {e}"} except Exception as e: self.master = None return {'success': False, 'msg': f"连接失败: {str(e)}"} def disconnect(self): if self.master: self.master.close() self.master = None def read_holding_registers(self, slave_id, address, count): if not self.master: return {'success': False, 'data': None, 'msg': "Modbus未初始化,请先连接"} try: result = self.master.execute( slave=slave_id, function_code=cst.READ_HOLDING_REGISTERS, starting_address=address, quantity_of_x=count ) if result: return {'success': True, 'data': list(result), 'msg': "读取成功"} else: return {'success': False, 'data': None, 'msg': "读取结果为空"} except Exception as e: return {'success': False, 'data': None, 'msg': f"读取失败:{e}"} def write_single_register(self, slave_id, address, value): if not self.master: return {'success': False, 'msg': "Modbus未连接,请先初始化连接"} if value is None: return {'success': False, 'msg': "未指定要写入的值"} try: write_value = int(value) if not (0 <= write_value <= 65535): return {'success': False, 'msg': "值必须在0-65535范围内(16位无符号整数)"} except ValueError: return {'success': False, 'msg': "写入值必须为整数"} try: self.master.execute( slave=slave_id, function_code=cst.WRITE_SINGLE_REGISTER, starting_address=address, output_value=write_value ) return {'success': True, 'msg': f"写入成功: 从机ID={slave_id},地址={address},值={write_value}"} except Exception as e: return {'success': False, 'msg': f"写入失败: 从机ID={slave_id},地址={address},错误:{e}"}