123456789101112131415161718192021222324252627282930313233343536373839404142434445 |
- from converter import Converter
- from log import OutPutLog
- class S7Converter(Converter):
- def __init__(self):
- self._log = OutPutLog()
- def convert(self, config, data, master):
- if data:
- format_data_dict = {} # 列表数据转换字典数字
- try:
- for index in config:
- type = index['data_type']
- addr = self.address_transform(index['address'])
- addr_division = "".join(addr).split(".")
- db_number = addr_division[0]
- data_addr = int(addr_division[1])
- if db_number in data:
- if data[db_number] != None : #防止设备断线返回为空导致的异常
- if data_addr < len(data[db_number]):
- if type == "FLOAT32":
- return_data = master.byteTransform.TransSingle(data[db_number], data_addr)
- elif type == "BOOL":
- return_data = int(master.byteTransform.TransBoolArray(data[db_number], data_addr, 1)[int(addr_division[2])])
- elif type == "UINT8":
- return_data = master.byteTransform.TransByte(data[db_number], data_addr)
- elif type == "INT32":
- return_data = master.byteTransform.TransInt32(data[db_number], data_addr)
- name = 'c' + str(index['serial_number'])
- if isinstance(return_data, float):
- return_data = round(return_data, 2)
- format_data_dict[name] = return_data
- except Exception as e:
- print("S7Converter[ERROR]:"+str(e))
- return "error"
- return format_data_dict
- def address_transform(self, addr):
- real_addr_buffer = "".join(addr).split(".") # 合并list为字符串 用“.”分割字符串获得数据
- if real_addr_buffer[1][2] == 'X':
- data_addr = real_addr_buffer[0] + "." + real_addr_buffer[1][3:] + "." + real_addr_buffer[2]
- elif real_addr_buffer[1][2] == 'D' or real_addr_buffer[1][2] == 'B' or real_addr_buffer[1][2] == 'W':
- data_addr = real_addr_buffer[0] + "." + real_addr_buffer[1][3:]
- return data_addr
|