s7_converter.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. from converter import Converter
  2. from log import OutPutLog
  3. class S7Converter(Converter):
  4. def __init__(self):
  5. self._log = OutPutLog()
  6. def convert(self, config, data, master):
  7. if data:
  8. format_data_dict = {} # 列表数据转换字典数字
  9. try:
  10. for index in config:
  11. type = index['data_type']
  12. addr = self.address_transform(index['address'])
  13. addr_division = "".join(addr).split(".")
  14. db_number = addr_division[0]
  15. data_addr = int(addr_division[1])
  16. if db_number in data:
  17. if data[db_number] != None : #防止设备断线返回为空导致的异常
  18. if data_addr < len(data[db_number]):
  19. if type == "FLOAT32":
  20. return_data = master.byteTransform.TransSingle(data[db_number], data_addr)
  21. elif type == "BOOL":
  22. return_data = int(master.byteTransform.TransBoolArray(data[db_number], data_addr, 1)[int(addr_division[2])])
  23. elif type == "UINT8":
  24. return_data = master.byteTransform.TransByte(data[db_number], data_addr)
  25. elif type == "INT32":
  26. return_data = master.byteTransform.TransInt32(data[db_number], data_addr)
  27. name = 'c' + str(index['serial_number'])
  28. if isinstance(return_data, float):
  29. return_data = round(return_data, 2)
  30. format_data_dict[name] = return_data
  31. except Exception as e:
  32. print("S7Converter[ERROR]:"+str(e))
  33. return "error"
  34. return format_data_dict
  35. def address_transform(self, addr):
  36. real_addr_buffer = "".join(addr).split(".") # 合并list为字符串 用“.”分割字符串获得数据
  37. if real_addr_buffer[1][2] == 'X':
  38. data_addr = real_addr_buffer[0] + "." + real_addr_buffer[1][3:] + "." + real_addr_buffer[2]
  39. elif real_addr_buffer[1][2] == 'D' or real_addr_buffer[1][2] == 'B' or real_addr_buffer[1][2] == 'W':
  40. data_addr = real_addr_buffer[0] + "." + real_addr_buffer[1][3:]
  41. return data_addr