浏览代码

add shuizhi modbus read function

liqudong 1 年之前
父节点
当前提交
7cc5afda5b
共有 5 个文件被更改,包括 296 次插入1 次删除
  1. 1 0
      connectors/__init__.py
  2. 156 0
      connectors/modbus_rtu_over_tcp_connector.py
  3. 1 0
      converters/__init__.py
  4. 137 0
      converters/modbus_converter.py
  5. 1 1
      logging_config.py

+ 1 - 0
connectors/__init__.py

@@ -1,2 +1,3 @@
 from . import tcp_connector
 from . import shuizhi_tcp_connector
+from . import modbus_rtu_over_tcp_connector

+ 156 - 0
connectors/modbus_rtu_over_tcp_connector.py

@@ -0,0 +1,156 @@
+import json
+import threading
+import queue
+import time
+from modbus_tk import modbus_rtu_over_tcp
+from connector import Connector
+from event_storage import EventStorage
+from logging_config import logger
+
+
+class ModbusRtuOverTcpConnector(Connector, threading.Thread):
+
+    def __init__(self, name, config, converter):
+        super().__init__()
+        self._master = None
+        self.__stopped = False
+        self._connected = False
+        self._ip = config['ip']  # ip
+        self._port = config['port']  # 端口
+        self._save_frequency = config['save_frequency']  # 数据存储时间间隔
+        self.setDaemon(True)
+        self.setName(name)
+        self.__converter = converter
+        self.__storager = EventStorage()
+        self.__command_queue = queue.Queue(500)
+        self.__last_save_time = 0
+        self.__data_point_config = self.__storager.get_station_info(name)
+        self._command = self.__storager.get_command_info(name)
+
+    def open(self):
+        self.__stopped = False
+        self.start()
+
+    def run(self):
+        self._connect()
+        self._connected = True
+        while True:
+            if isinstance(self._command, list):
+                for i in self._command:
+                    command_list = json.loads(i['command'])
+                    self.command_polling(command_list, resend_times=5)
+                    time.sleep(1)
+            time.sleep(1)
+            if self.__stopped:
+                break
+
+    def _connect(self):
+        try:
+            self._master = modbus_rtu_over_tcp.RtuOverTcpMaster(host=self._ip, port=self._port)
+            logger.info(f"{self._ip}:{self._port} connect success!")
+        except Exception as e:
+            logger.error(f'Error in modbus_tcp_connector.__connect: {e}')
+            self._connected = False
+            self._reconnect()
+
+    def _reconnect(self):
+        while True:
+            try:
+                self._master = modbus_rtu_over_tcp.RtuOverTcpMaster(host=self._ip, port=self._port)
+                logger.error('client start connect to host/port:{}'.format(self._port))
+                break
+            except ConnectionRefusedError:
+                logger.error('modbus server refused or not started, reconnect to server in 5s .... host/port:{}'.format(self._port))
+                time.sleep(5)
+            except Exception as e:
+                logger.error('do connect error:{}'.format(str(e)))
+                time.sleep(5)
+
+    def close(self):
+        pass
+
+    def get_name(self):
+        return self.name
+
+    def is_connected(self):
+        return self._connected
+
+    def send_command(self, command):
+        # command = {'device_id': 1, 'start_addr': 0, 'output_value': [0, 0, 0, 0], 'function_code': 15}
+        # print(f"[send_command] {command}")
+        try:
+            if isinstance(command, dict):
+                result = self.exec_command(command)
+            elif isinstance(command, list):
+                for each in command:
+                    result = self.exec_command(each)
+        except Exception as e:
+            print(f"[ModbusTcpConnector][send_command] error: {e}")
+            result = False
+
+        # print(f"[send_command][result] {result}")
+        return result
+
+    def exec_command(self, command):
+        if isinstance(command, str):
+            command = json.loads(command)
+        device_id = int(command['device_id'])
+        function_code = int(command['function_code'])
+        start_addr = int(command['start_addr'])
+
+        if function_code in (1, 2, 3, 4):
+            # 读寄存器
+            length = int(command['length'])
+            try:
+                self._master.set_timeout(2.0)  # modbus读取数据超时时间设置
+                self._master.set_verbose(True)
+                # print(device_id, ' ', function_code, " ", start_addr, " ", length)
+                receive_data = self._master.execute(device_id, function_code, start_addr, length)
+                datadict = {}
+                for i in range(len(receive_data)):
+                    addr = start_addr + i
+                    datadict[addr] = receive_data[i]
+                result = [device_id, datadict]
+                return result
+            except Exception as e:
+                logger.error(f'An error occurred while executing the read register command:{e}')
+        elif function_code in (5, 6, 15, 16):
+            # 写寄存器
+            output_value = command['output_value']
+            try:
+                self._master.set_timeout(10.0)
+                self._master.set_verbose(True)
+                data = self._master.execute(device_id, function_code, start_addr, output_value=output_value)
+                # print("data = ", data)
+                # data = (0, 65280) or (0, 0)
+                result = False
+                if function_code == 5 and "res" in command.keys():
+                    res = command["res"]
+                    if start_addr == data[0] and res == data[1]:
+                        result = True
+                return result
+            except Exception as e:
+                logger.error(f'An error occurred while executing the write register command:{e}')
+        else:
+            logger.error(f'Unsupported function code.')
+
+    def command_polling(self, command_list, resend_times=None):
+        # msg = str(time.strftime("%Y-%m-%d %H:%M:%S"))
+        for i in range(len(command_list)):
+            command_item = command_list[i]
+            if not self.__command_queue.empty():
+                write_command = self.__command_queue.get()  # 写命令来自数组
+                try:
+                    res = self.exec_command(command=write_command)
+                except Exception as e:
+                    logger.error(f"modbus_rtu,write:{e}")
+
+            else:
+                result = self.exec_command(command=command_item)
+                format_data = None
+                if result:
+                    format_data = self.__converter.convert(self.__data_point_config, result)
+                if format_data:
+                    if format_data != "error" and format_data != 'pass':
+                        # 往redis存储数据
+                        self.__storager.real_time_data_storage(format_data)

+ 1 - 0
converters/__init__.py

@@ -5,4 +5,5 @@ from . import shuizhi_converter
 from . import adcp_converter
 from . import adcp_converter_480
 from . import dandian_converter
+from . import modbus_converter
 

+ 137 - 0
converters/modbus_converter.py

@@ -0,0 +1,137 @@
+"""
+@File  : moxa1_converter.py
+@Author: lee
+@Date  : 2022/7/12/0012 8:55:13
+@Desc  :
+"""
+import binascii
+import math
+import struct
+from logging_config import shuizhi_file_logger as logger
+from converter import Converter
+from event_storage import EventStorage
+
+
+class ModbusConverter(Converter):
+    list = []
+
+    def __init__(self):
+        self._storage = EventStorage()
+
+    def convert(self, config, data):
+        if data:
+            device_id = data[0]
+            data = data[1]
+            # print(device_name, data)
+            format_data_dict = {}  # 列表数据转换字典数字
+            try:
+                for index in config:
+                    # addr_type : 'D' 或 'X';
+                    # addr_list: [10] 或 [10, 5]
+                    addr_type, addr_list = addr_parsing(index['address'])
+                    register_addr = addr_list[0]  # 数据地址  example:'X10.5' -> 10
+                    parameter_id = addr_list[1]
+                    if register_addr > 40000:
+                        register_addr = register_addr - 40001
+                    if int(index["device_id"]) == int(device_id) and register_addr in data and data[register_addr + 4] == parameter_id:
+                        logger.info(f"----------------------------")
+                        if addr_type == 'X':
+                            bit_offset_addr = addr_list[1]  # 位偏移地址  example:'X10.5' -> 5
+                            if index['data_type'] == "BOOL":
+                                data_bin = bin(65536 | data[register_addr])  # 和65536做或运算,为了留开头0
+                                data_bin = list(data_bin[3:][::-1])  # 去除开头字符 0  b  1并反转字符串
+                                data_bin = list(map(int, data_bin))  # 字符列表转整型列表
+                                if index['modbus_mode'] == "BA":
+                                    data_bin = data_bin[8:16] + data_bin[0:8]
+                                if index['negation'] == 1:
+                                    return_data = int(bool(1 - data_bin[bit_offset_addr]))  # 取反运算
+                                else:
+                                    return_data = data_bin[bit_offset_addr]
+
+                        elif addr_type == 'D':
+                            if index['data_type'] == "FLOAT16":
+                                return_data = data[register_addr]
+                            elif index['data_type'] == "INT16":
+                                if index['negation'] == 1:
+                                    return_data = int(bool(1 - data[register_addr]))  # 取反运算
+                                else:
+                                    if data[register_addr] > 32767:
+                                        return_data = data[register_addr] - 65536
+                                    else:
+                                        return_data = data[register_addr]
+                            elif index['data_type'] == "INT32":
+                                return_data_H = data[register_addr]
+                                return_data_L = data[register_addr + 1]
+                                if index['modbus_mode'] == "CDAB":
+                                    return_data = data[register_addr] * 65536 + data[register_addr + 1]
+                                else:
+                                    return_data = data[register_addr + 1] * 65536 + data[register_addr]
+                            elif index['data_type'] == "FLOAT32":
+                                t1 = hex(data[register_addr])[2:]
+                                t2 = hex(data[register_addr + 1])[2:]
+                                if len(t1) < 4:
+                                    t1 = (4 - len(t1)) * "0" + t1
+                                if len(t2) < 4:
+                                    t2 = (4 - len(t2)) * "0" + t2
+                                if index['modbus_mode'] == "CDAB":
+                                    return_data = \
+                                        struct.unpack('>f', binascii.unhexlify((t2 + t1).replace(' ', '')))[0]
+                                else:
+                                    return_data = \
+                                        struct.unpack('>f', binascii.unhexlify((t1 + t2).replace(' ', '')))[0]
+                            elif index['data_type'] == "BELZ_FLOAT32":
+                                llj_data = []
+                                for x in range(0, 6, 2):
+                                    t1 = hex(data[register_addr + x])[2:]
+                                    t2 = hex(data[register_addr + x + 1])[2:]
+                                    if len(t1) < 4:
+                                        t1 = (4 - len(t1)) * "0" + t1
+                                    if len(t2) < 4:
+                                        t2 = (4 - len(t2)) * "0" + t2
+                                    t = struct.unpack('>f', binascii.unhexlify((t1 + t2).replace(' ', '')))[0]
+                                    llj_data.append(t)
+                                return_data = llj_data[0] * 10 ** 6 + llj_data[1] + llj_data[2] / 10 ** 6  # 总累计量
+                            elif index['data_type'] == "UINT16":
+                                return_data = data[register_addr]
+                        logger.info(f"{index['io_point_name']}(格式化前): {return_data}")
+                        return_data = format_value(index, return_data)
+                        name = 'c' + str(index['serial_number'])
+                        format_data_dict[name] = return_data
+                        logger.info(f"{index['io_point_name']}: {return_data}")
+                return format_data_dict
+            except Exception as e:
+                logger.error(e)
+                return "error"
+
+
+def addr_parsing(addr_smash):
+    """
+    :param addr_smash: [X15.1] or [D15]
+    :return: X, [15, 1] or D [15]
+    """
+    # addr_smash_1 = 'D2'
+    # addr_smash = list(addr_smash)  # 拆分字符串为list ['X', '6', '.', '2']
+    addr_type = addr_smash[0]  # 地址类型 : 'D' 或 'X'
+    addr_smash = addr_smash[1:]  # 地址部分: '10' 或 '10.5'
+    addr_list = list(map(int, addr_smash.split(".")))  # 用“.”分割字符串转换为整型存入列表
+    return addr_type, addr_list
+
+
+def format_value(index, value):
+    if value:
+        value = float(value)
+        divisor = index['divisor']
+        offset = index['offset']
+        low_limit = index['low_limit']
+        up_limit = index['up_limit']
+        if divisor:
+            value /= divisor
+        if offset:
+            value -= offset
+        value = round(value, 2)
+        if low_limit <= value <= up_limit:
+            return value
+        else:
+            return ''
+    else:
+        return ''

+ 1 - 1
logging_config.py

@@ -13,7 +13,7 @@ LOGGING_CONFIG = dict(
     loggers={
         # 新曾自定义日志,用于数据采集程序
         "console": {
-            "level": "DEBUG",
+            "level": "INFO",
             "handlers": ["console", "connector_file"],
             "propagate": True,
             "qualname": "console.debug",