import struct from comm_protocol import class_protocol DEFAULT_COMM_WRITE_TIMEOUT_MS = 500 #通讯设备基类, 派生类需要实现 device_comm_cycle函数, read_register函数, write_registers函数 class class_comm_device() : def __init__(self, comm_addr = 0, device_remap = None, unique_name = None, protocol : class_protocol = None) : self.comm_active = True self.comm_addr = comm_addr self.device_remap = device_remap self.uniqut_name = unique_name self.protocol = protocol #该函数为虚基类, 本身未实现read_register, 以下为实现范例, 需要在派生类中实现 def read_register(self, reg_addr, reg_count) : raise NotImplementedError() #bool write_registers(int reg_addr, reg_count, uint16_t value[]) #该函数为虚基类, 本身未实现write_registers, 以下为实现范例, 需要在派生类中实现 def write_register(self, reg_addr, reg_count, new_value, timeout_ms = DEFAULT_COMM_WRITE_TIMEOUT_MS) : raise NotImplementedError() #bool write_bit_register(int reg_addr, uint16_t value[]) #bool write_bit_register(int reg_addr, uint16_t value) def write_bit_register(self, reg_addr, value, timeout_ms = DEFAULT_COMM_WRITE_TIMEOUT_MS) : raise NotImplementedError() def device_comm_cycle(self) : comm_success_count = 0 total_comm_count = 0 return comm_success_count, total_comm_count def device_comm_flush_request(self, reg_addr, reg_count, cycle = 3000) : return True def device_comm_deactive(self) : self.comm_active = False def device_comm_active(self) : self.comm_active = True def value_u16_to_s16(self, value) : if value == None: return None svalue = value if (svalue & 0x8000) : svalue = -((svalue ^ 0xFFFF) + 1) return svalue def value_u32_to_s32(self, value) : if value == None: return None svalue = value if (svalue & 0x80000000) : svalue = -((svalue ^ 0xFFFFFFFF) + 1) return svalue def read_register_count(self, reg_addr, reg_count) : return self.read_register(reg_addr, reg_count) def read_register_u16(self, reg_addr) : result = self.read_register_count(reg_addr, 1) if result : result = (result[0] & 0xFFFF) return result def read_register_s16(self, reg_addr) : result = self.read_register_u16(reg_addr) sign_result = self.value_u16_to_s16(result) return sign_result def read_register_u32(self, reg_addr, big_endian = 0) : result = self.read_register_count(reg_addr, 2) if result : if big_endian: result = (result[1] & 0xFFFF) | ((result[0] & 0xFFFF) << 16) else : result = (result[0] & 0xFFFF) | ((result[1] & 0xFFFF) << 16) return result def read_register_s32(self, reg_addr, big_endian = 0) : result = self.read_register_u32(reg_addr, big_endian) sign_value = self.value_u32_to_s32(result) return sign_value def read_register_f32(self, reg_addr, big_endian = 0) : result = self.read_register_u32(reg_addr, big_endian) fvalue = None if result : bytes = struct.pack('BBBB', result & 0xFF, (result >> 8) & 0xFF, (result >> 16) & 0xFF, (result >> 24) & 0xFF) fvalue = struct.unpack("