12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 |
- import importlib
- import json
- import datetime
- import connectors
- import converters
- # name = "Test"
- # zz = connectors.modbus_connector.Test()
- # zz.f()
- class Utility:
- loaded_connector_module = {}
- loaded_converter_module = {}
- available_connectors = {}
- @staticmethod
- def import_connector_module(configs):
- for config in configs:
- module_name = "connectors." + config['connector_module']
- class_name = config['connector']
- if Utility.loaded_connector_module.get(class_name) is None:
- extension_class = getattr(eval(module_name), class_name)
- Utility.loaded_connector_module[class_name] = extension_class
- return Utility.loaded_connector_module
- @staticmethod
- def import_converter_module(configs):
- for config in configs:
- module_name = "converters." + config['converter_module']
- class_name = config['converter']
- if Utility.loaded_converter_module.get(class_name) is None:
- extension_class = getattr(eval(module_name), class_name)
- Utility.loaded_converter_module[class_name] = extension_class
- return Utility.loaded_converter_module
- @staticmethod
- def start_connectors(configs):
- connectors = Utility.import_connector_module(configs)
- converters = Utility.import_converter_module(configs)
- for config in configs:
- connector_config = config['connector_config']
- name = config['station_name']
- converter = converters[config['converter']]()
- connector = connectors[config['connector']](name, connector_config, converter)
- Utility.available_connectors[name] = connector
- connector.open()
- @staticmethod
- def data_encoder(data_list):
- data_json = json.dumps(data_list, cls=DateEncoder)
- return data_json
- class DateEncoder(json.JSONEncoder):
- def default(self, obj):
- if isinstance(obj, datetime.datetime):
- return obj.strftime("%Y-%m-%d %H:%M:%S")
- else:
- return json.JSONEncoder.default(self, obj)
|