utility.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. import importlib
  2. import json
  3. import datetime
  4. import connectors
  5. import converters
  6. # name = "Test"
  7. # zz = connectors.modbus_connector.Test()
  8. # zz.f()
  9. class Utility:
  10. loaded_connector_module = {}
  11. loaded_converter_module = {}
  12. available_connectors = {}
  13. @staticmethod
  14. def import_connector_module(configs):
  15. for config in configs:
  16. module_name = "connectors." + config['connector_module']
  17. class_name = config['connector']
  18. if Utility.loaded_connector_module.get(class_name) is None:
  19. extension_class = getattr(eval(module_name), class_name)
  20. Utility.loaded_connector_module[class_name] = extension_class
  21. return Utility.loaded_connector_module
  22. @staticmethod
  23. def import_converter_module(configs):
  24. for config in configs:
  25. module_name = "converters." + config['converter_module']
  26. class_name = config['converter']
  27. if Utility.loaded_converter_module.get(class_name) is None:
  28. extension_class = getattr(eval(module_name), class_name)
  29. Utility.loaded_converter_module[class_name] = extension_class
  30. return Utility.loaded_converter_module
  31. @staticmethod
  32. def start_connectors(configs):
  33. connectors = Utility.import_connector_module(configs)
  34. converters = Utility.import_converter_module(configs)
  35. for config in configs:
  36. connector_config = config['connector_config']
  37. name = config['station_name']
  38. converter = converters[config['converter']]()
  39. connector = connectors[config['connector']](name, connector_config, converter)
  40. Utility.available_connectors[name] = connector
  41. connector.open()
  42. @staticmethod
  43. def data_encoder(data_list):
  44. data_json = json.dumps(data_list, cls=DateEncoder)
  45. return data_json
  46. class DateEncoder(json.JSONEncoder):
  47. def default(self, obj):
  48. if isinstance(obj, datetime.datetime):
  49. return obj.strftime("%Y-%m-%d %H:%M:%S")
  50. else:
  51. return json.JSONEncoder.default(self, obj)