浏览代码

modify log system, add cec21_converter

liqudong 2 年之前
父节点
当前提交
7896901760

+ 1 - 1
AES_crypt.py

@@ -77,7 +77,7 @@ if __name__ == '__main__':
 
     aescryptor = AESCrypt(passwd, AES.MODE_CBC, iv)  # CBC模式
     # aescryptor = Aescrypt(passwd,AES.MODE_ECB,"") # ECB模式
-    text = "root"
+    text = "zzZZ4144670.."
     en_text = aescryptor.aesencrypt(text)
     print("密文:", en_text)
     text = aescryptor.aesdecrypt(en_text)

+ 1 - 1
alarm.py

@@ -2,7 +2,7 @@
 @Date  :2021/5/21/00219:10:57
 @Desc  :
 """
-from logging_config import logger
+from logging_config import general as logger
 import threading
 import time
 from event_storage import EventStorage

+ 3 - 2
configuration.py

@@ -1,5 +1,6 @@
 import json
-from logging_config import logger
+import os
+from logging_config import general as logger
 import sys
 from AES_crypt import decrypt
 
@@ -11,7 +12,7 @@ class Configuration:
             with open(self.path) as json_file:
                 self.config = json.load(json_file)
         except FileNotFoundError as e:
-            logger.error(f"无法找到配置文件:{e}")
+            logger.error(f"当前路径:{os.getcwd()}无法找到配置文件:{e}")
             input("按任意键退出!")
             sys.exit()
 

+ 15 - 16
connectors/modbus_rtu_over_tcp_connector.py

@@ -1,3 +1,7 @@
+"""
+@Date  :2021/5/21/00219:10:57
+@Desc  : 目前此连接器用于的传感器有:insitu水质传感器,水质传感器,气象传感器
+"""
 import json
 import threading
 import queue
@@ -5,11 +9,10 @@ import time
 from modbus_tk import modbus_rtu_over_tcp
 from connector import Connector
 from event_storage import EventStorage
-from logging_config import logger
+from logging_config import modbus_connector as logger
 
 
 class ModbusRtuOverTcpConnector(Connector, threading.Thread):
-
     def __init__(self, name, config, converter):
         super().__init__()
         self._master = None
@@ -47,9 +50,9 @@ class ModbusRtuOverTcpConnector(Connector, threading.Thread):
     def _connect(self):
         try:
             self._master = modbus_rtu_over_tcp.RtuOverTcpMaster(host=self._ip, port=self._port)
-            logger.info(f"{self._ip}:{self._port} connect success!")
+            logger.info(f'Connect to [{self.name}]:[{self._ip}]:[{self._port}] success !')
         except Exception as e:
-            logger.error(f'Error in modbus_tcp_connector.__connect: {e}')
+            logger.info(f'Connect to [{self.name}]:[{self._ip}]:[{self._port}] failed:{e} !!!')
             self._connected = False
             self._reconnect()
 
@@ -57,13 +60,10 @@ class ModbusRtuOverTcpConnector(Connector, threading.Thread):
         while True:
             try:
                 self._master = modbus_rtu_over_tcp.RtuOverTcpMaster(host=self._ip, port=self._port)
-                logger.error('client start connect to host/port:{}'.format(self._port))
+                logger.info(f'Reconnect to [{self.name}]:[{self._ip}]:[{self._port}] success !')
                 break
-            except ConnectionRefusedError:
-                logger.error('modbus server refused or not started, reconnect to server in 5s .... host/port:{}'.format(self._port))
-                time.sleep(5)
             except Exception as e:
-                logger.error('do connect error:{}'.format(str(e)))
+                logger.error(f'Reconnect to [{self.name}]:[{self._ip}]:[{self._port}] failed:{e} !!! Continue reconnect in 5s..')
                 time.sleep(5)
 
     def close(self):
@@ -102,7 +102,7 @@ class ModbusRtuOverTcpConnector(Connector, threading.Thread):
             # 读寄存器
             length = int(command['length'])
             try:
-                self._master.set_timeout(2.0)  # modbus读取数据超时时间设置
+                self._master.set_timeout(3.0)  # modbus读取数据超时时间设置
                 self._master.set_verbose(True)
                 # print(device_id, ' ', function_code, " ", start_addr, " ", length)
                 receive_data = self._master.execute(device_id, function_code, start_addr, length)
@@ -113,7 +113,7 @@ class ModbusRtuOverTcpConnector(Connector, threading.Thread):
                 result = [device_id, datadict]
                 return result
             except Exception as e:
-                logger.error(f'An error occurred while executing the read register command:{e}')
+                logger.error(f'[{self.name}]: An error occurred while executing the read register command:{e}')
         elif function_code in (5, 6, 15, 16):
             # 写寄存器
             output_value = command['output_value']
@@ -130,21 +130,20 @@ class ModbusRtuOverTcpConnector(Connector, threading.Thread):
                         result = True
                 return result
             except Exception as e:
-                logger.error(f'An error occurred while executing the write register command:{e}')
+                logger.error(f'[{self.name}]: An error occurred while executing the write register command:{e}')
         else:
-            logger.error(f'Unsupported function code.')
+            logger.error(f'[{self.name}]: Unsupported function code.')
 
     def command_polling(self, command_list, resend_times=None):
         # msg = str(time.strftime("%Y-%m-%d %H:%M:%S"))
         for i in range(len(command_list)):
             command_item = command_list[i]
             if not self.__command_queue.empty():
-                write_command = self.__command_queue.get()  # 写命令来自数组
+                write_command = self.__command_queue.get()  # 写命令来自队列
                 try:
                     res = self.exec_command(command=write_command)
                 except Exception as e:
-                    logger.error(f"modbus_rtu,write:{e}")
-
+                    logger.error(f"[{self.name}]: modbus_rtu,write:{e}")
             else:
                 result = self.exec_command(command=command_item)
                 format_data = None

+ 2 - 2
connectors/shuizhi_tcp_connector.py

@@ -1,6 +1,6 @@
 """
 @Date  :2021/5/21/00219:10:57
-@Desc  :
+@Desc  : 原本用于insitu水质传感器的读取和解析,目前此连接器未启动,insitu读取和解析均为mudbus_rtu_over_tcp。
 """
 import binascii
 import threading
@@ -9,7 +9,7 @@ import struct
 import socket
 from connector import Connector
 from event_storage import EventStorage
-from logging_config import shuizhi_file_logger as logger
+# from logging_config import shuizhi_converter as logger
 from binascii import *
 from crcmod import *
 

+ 8 - 11
connectors/tcp_connector.py

@@ -1,16 +1,13 @@
 """
 @Date  :2021/5/21/00219:10:57
-@Desc  :
+@Desc  : 此连接器用于wxt536气象传感器、td266单点流速仪、sm140波浪传感器、adcp流速流向仪、cec26单点流速仪
 """
 import json
 import time
 import threading
 import socket
 import queue
-
-from modbus_tk import utils
-
-from logging_config import logger
+from logging_config import tcp_connector as logger
 from connector import Connector
 from event_storage import EventStorage
 
@@ -62,10 +59,10 @@ class TcpConnector(Connector, threading.Thread):
         self.__sock.settimeout(180)  # 设置超时时间3mins
         try:
             self.__sock.connect((self.__ip, self.__port))
-            logger.info(f'Connect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] success !')
+            logger.info(f'Connect to [{self.name}]:[{self.__ip}]:[{self.__port}] success !')
             self.__connected = True
         except Exception as e:
-            logger.info(f'Connect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] failed:{e} !!!')
+            logger.info(f'Connect to [{self.name}]:[{self.__ip}]:[{self.__port}] failed:{e} !!!')
             self.__connected = False
             self.__reconnect()
 
@@ -80,10 +77,10 @@ class TcpConnector(Connector, threading.Thread):
                 self.__sock.settimeout(180)  # 设置超时时间3mins
                 self.__sock.connect((self.__ip, self.__port))
                 self.__connected = True
-                logger.info(f'Reconnect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] success !')
+                logger.info(f'Reconnect to [{self.name}]:[{self.__ip}]:[{self.__port}] success !')
                 break
             except Exception as e:
-                logger.info(f'Reconnect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] failed:{e} !!! Continue reconnect in 5s..')
+                logger.info(f'Reconnect to [{self.name}]:[{self.__ip}]:[{self.__port}] failed:{e} !!! Continue reconnect in 5s..')
                 self.__connected = False
                 time.sleep(5)
 
@@ -106,7 +103,7 @@ class TcpConnector(Connector, threading.Thread):
             try:
                 self.__sock.send(data.encode(encoding='utf-8'))
             except Exception as e:
-                logger.info(f'Send command to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] error:{e}')
+                logger.info(f'Send command to [{self.name}]:[{self.__ip}]:[{self.__port}] error:{e}')
 
     def exec_command(self, command):
         try:
@@ -127,7 +124,7 @@ class TcpConnector(Connector, threading.Thread):
                     if format_data and format_data != "error" and format_data != 'pass':
                         self.__storager.real_time_data_storage(format_data)
             except Exception as e:
-                logger.error(f'Other error occur [{self.get_name()}]:[{self.__ip}]:[{self.__port}]:{e}')
+                logger.error(f'Other error occur [{self.name}]:[{self.__ip}]:[{self.__port}]:{e}')
                 time.sleep(5)
                 self.__reconnect()
         else:

+ 1 - 4
converters/__init__.py

@@ -1,9 +1,6 @@
 from . import nmea0183_converter
 from . import wxt536_converter
 from . import td266_converter
-from . import shuizhi_converter
 from . import adcp_converter
-from . import adcp_converter_480
-from . import dandian_converter
+from . import cec21_converter
 from . import modbus_converter
-

+ 34 - 25
converters/adcp_converter.py

@@ -1,50 +1,59 @@
 """
 @Date  :2021/5/21/00219:10:57
-@Desc  :
+@Desc  :此类为adcp流速流向仪(30流速,30流向的版本)的解析器。
 """
-from logging_config import adcp_file_logger as logger
+from tools.format_value import format_value
+from logging_config import adcp_converter as logger
 from converter import Converter
 
 
-# data = b' 4 24 2021 12  9 59 0 32 11.7 1498.7 194.7 -2.2 0.6 1.008 12.58 0 0\r\n  1    331 2954\r\n  2    164 2448\r\n  3    171 1438\r\n  4    137 1034\r\n  5     38 2292\r\n  6    185 3027\r\n  7    162  928\r\n  8    611 3209\r\n  9    470 3287\r\n 10    463 3111\r\n 11    914 1178\r\n 12    149  390\r\n 13    703  939\r\n 14    411 2452\r\n 15    158 1111\r\n 16    206  710\r\n 17    673 1099\r\n 18    156  955\r\n 19    324  991\r\n 20    127  396\r\n 21    335 2590\r\n 22    105 3485\r\n 23    263 2615\r\n 24    179 1201\r\n 25    114 1496\r\n 26    669  844\r\n 27    105 2831\r\n 28    720 1256\r\n 29    449  839\r\n 30    392  818\r\n' 实收
-# data = b' 4 24 2021 11 29 59 0 32 11.7 1498.7 194.9 -2.2 0.7 1.001 12.57 0 0\r\n  1    311 3003\r\n  2     91 1489\r\n  3     97 1476\r\n  4    100 1269\r\n  5     47  997\r\n  6     50 1074\r\n  7    158   40\r\n  8    487 3545\r\n  9    411 3268\r\n 10    715 2939\r\n 11    425  204\r\n 12    206 2557\r\n 13    259  820\r\n 14    407 2153\r\n 15    237 1405\r\n 16    378  768\r\n 17    481 1191\r\n 18    856 3105\r\n 19    681  999\r\n 20    274  400\r\n 21    369 2852\r\n 22    332  208\r\n 23     48  366\r\n 24    497  539\r\n 25    770 1103\r\n 26    102 1750\r\n 27    514  659\r\n 28    180 1787\r\n 29    499  938\r\n 30    611 3350\r\n' 模拟收
-#        b' 4 25 2021 3 26 59 0 32 11.7 1500.7 195.0 -2.2 0.7 1.025 13.19 0 0\r\n  1    124 1973\r\n  2    108 1211\r\n  3    393  982\r\n  4    182  159\r\n  5     75 3440\r\n  6    176 2566\r\n  7    268 2553\r\n  8    250 2333\r\n  9    282 2215\r\n 10    564 1113\r\n 11    505  974\r\n 12    285  940\r\n 13    329 1381\r\n 14    118 1087\r\n 15    276 1313\r\n 16    707 2832\r\n 17    385 1000\r\n 18    365 2530\r\n 19    514 1374\r\n 20    591  683\r\n 21   1109  837\r\n 22    889  955\r\n 23    333  829\r\n 24    194  515\r\n 25    466  786\r\n 26    243  445\r\n 27    107 1693\r\n 28    261  317\r\n 29    704 1008\r\n 30    301  219\r\n'
 class AdcpConverter(Converter):
+    def __init__(self, name):
+        self._name = name
+
     def convert(self, config, data):
+        """
+        data :
+        b' 4 24 2021 12  9 59 0 32 11.7 1498.7 194.7 -2.2 0.6 1.008 12.58 0 0\r\n  1    331 2954\r\n  2    164 2448\r\n  3    171 1438\r\n  4    137 1034\r\n  5     38 2292\r\n  6    185 3027\r\n  7    162  928\r\n  8    611 3209\r\n  9    470 3287\r\n 10    463 3111\r\n 11    914 1178\r\n 12    149  390\r\n 13    703  939\r\n 14    411 2452\r\n 15    158 1111\r\n 16    206  710\r\n 17    673 1099\r\n 18    156  955\r\n 19    324  991\r\n 20    127  396\r\n 21    335 2590\r\n 22    105 3485\r\n 23    263 2615\r\n 24    179 1201\r\n 25    114 1496\r\n 26    669  844\r\n 27    105 2831\r\n 28    720 1256\r\n 29    449  839\r\n 30    392  818\r\n' 实收
+        b' 4 24 2021 11 29 59 0 32 11.7 1498.7 194.9 -2.2 0.7 1.001 12.57 0 0\r\n  1    311 3003\r\n  2     91 1489\r\n  3     97 1476\r\n  4    100 1269\r\n  5     47  997\r\n  6     50 1074\r\n  7    158   40\r\n  8    487 3545\r\n  9    411 3268\r\n 10    715 2939\r\n 11    425  204\r\n 12    206 2557\r\n 13    259  820\r\n 14    407 2153\r\n 15    237 1405\r\n 16    378  768\r\n 17    481 1191\r\n 18    856 3105\r\n 19    681  999\r\n 20    274  400\r\n 21    369 2852\r\n 22    332  208\r\n 23     48  366\r\n 24    497  539\r\n 25    770 1103\r\n 26    102 1750\r\n 27    514  659\r\n 28    180 1787\r\n 29    499  938\r\n 30    611 3350\r\n' 模拟收
+        b' 4 25 2021 3 26 59 0 32 11.7 1500.7 195.0 -2.2 0.7 1.025 13.19 0 0\r\n  1    124 1973\r\n  2    108 1211\r\n  3    393  982\r\n  4    182  159\r\n  5     75 3440\r\n  6    176 2566\r\n  7    268 2553\r\n  8    250 2333\r\n  9    282 2215\r\n 10    564 1113\r\n 11    505  974\r\n 12    285  940\r\n 13    329 1381\r\n 14    118 1087\r\n 15    276 1313\r\n 16    707 2832\r\n 17    385 1000\r\n 18    365 2530\r\n 19    514 1374\r\n 20    591  683\r\n 21   1109  837\r\n 22    889  955\r\n 23    333  829\r\n 24    194  515\r\n 25    466  786\r\n 26    243  445\r\n 27    107 1693\r\n 28    261  317\r\n 29    704 1008\r\n 30    301  219\r\n'
+        """
         if data:
-            # logger.debug(config)
-            logger.info(f"(ADCP)原始接收数据:{data}")
             dict = {}
             try:
+                logger.info(f"[{self._name}]原始接收数据: len: {len(data)}, values: {data}")
+                if len(data) < 500:
+                    return
                 raw_data = data.decode().split("\r\n")
-                logger.debug(f"(ADCP)解码分割:{raw_data}")
+                logger.info(f"[{self._name}]解码分割: len: {len(raw_data)}, values: {raw_data}")
                 if len(raw_data) == 32:
-                    # logger.debug(raw_data)
-                    raw_data = raw_data[1:-1]
-                    flow_rate_data = []
-                    flow_direction = []
+                    # -----------------------解析数据
+                    header = raw_data[0].split(" ")  # 按空格分拆字符串得到字符串列表
+                    logger.info(f"[{self._name}]报文头header: len: {len(header)}, values: {header}")
+                    temperature_deep = [float(header[-3]), float(header[-4])]  # 温度地址:-3,深度地址:-4
+                    logger.info(f"[{self._name}]温度、深度: len: {len(temperature_deep)}, values: {temperature_deep}")
+                    raw_data = raw_data[1:-1]  # 去掉表头header和结尾空字符串,长度应该为30
+                    flow_rate_data = []  # 用于存储30个流速的数组
+                    flow_direction = []  # 用于存储30个流向的数组
                     for i in range(0, len(raw_data)):
                         t = raw_data[i].split(' ')  # 按空格分拆字符串得到字符串列表:['', '', '1', '', '', '', '211', '3075']
                         t = list(filter(None, t))  # 去除字符串列表中的空格:['1', '211', '3075']
-                        t1 = [int(x) for x in t]  # 字符串列表转int列表:[1, 211, 3075]
+                        t1 = [float(x) for x in t]  # 字符串列表转数值列表:[1, 211, 3075]
                         flow_rate_data.append(t1[1] / 1000)  # 流速值除以1000
                         flow_direction.append(t1[2] / 10)  # 流向值除以10
-                    format_data = flow_rate_data + flow_direction
-                    logger.debug(f"(ADCP)解析后数据: {len(format_data)}, {format_data}")
+                    format_data = flow_rate_data + flow_direction + temperature_deep
+                    logger.info(f"[{self._name}]解析后的列表: len: {len(format_data)}, values: {format_data}")
+                    # -----------------------格式化数据
                     j = 0
                     for index in config:
                         name = 'c' + str(index['serial_number'])
-                        if index['divisor'] is not None:
-                            format_data[j] = format_data[j] / index['divisor']
-                        if index['offset'] is not None:
-                            format_data[j] = format_data[j] - index['offset']
-                        dict[name] = format_data[j]
+                        dict[name] = format_value(index, format_data[j])
                         j += 1
+                    # -------------------------
+                    logger.info(f"[{self._name}]返回数据: len: {len(dict)}, values: {dict}")
                     return dict
-                elif len(raw_data) > 0:
-                    return "pass"
                 else:
-                    return "error"
+                    return
             except Exception as e:
-                logger.error(e)
-                return "error"
+                logger.info(f"[{self._name}]:{repr(e)}")
+                return

+ 0 - 60
converters/adcp_converter_480.py

@@ -1,60 +0,0 @@
-"""
-@Date  :2022/8/10/00219:10:57
-@Desc  : 增加温度深度解析
-"""
-from logging_config import logger
-from converter import Converter
-
-
-# data = b' 4 24 2021 12  9 59 0 32 11.7 1498.7 194.7 -2.2 0.6 1.008 12.58 0 0\r\n  1    331 2954\r\n  2    164 2448\r\n  3    171 1438\r\n  4    137 1034\r\n  5     38 2292\r\n  6    185 3027\r\n  7    162  928\r\n  8    611 3209\r\n  9    470 3287\r\n 10    463 3111\r\n 11    914 1178\r\n 12    149  390\r\n 13    703  939\r\n 14    411 2452\r\n 15    158 1111\r\n 16    206  710\r\n 17    673 1099\r\n 18    156  955\r\n 19    324  991\r\n 20    127  396\r\n 21    335 2590\r\n 22    105 3485\r\n 23    263 2615\r\n 24    179 1201\r\n 25    114 1496\r\n 26    669  844\r\n 27    105 2831\r\n 28    720 1256\r\n 29    449  839\r\n 30    392  818\r\n' 实收
-# data = b' 4 24 2021 11 29 59 0 32 11.7 1498.7 194.9 -2.2 0.7 1.001 12.57 0 0\r\n  1    311 3003\r\n  2     91 1489\r\n  3     97 1476\r\n  4    100 1269\r\n  5     47  997\r\n  6     50 1074\r\n  7    158   40\r\n  8    487 3545\r\n  9    411 3268\r\n 10    715 2939\r\n 11    425  204\r\n 12    206 2557\r\n 13    259  820\r\n 14    407 2153\r\n 15    237 1405\r\n 16    378  768\r\n 17    481 1191\r\n 18    856 3105\r\n 19    681  999\r\n 20    274  400\r\n 21    369 2852\r\n 22    332  208\r\n 23     48  366\r\n 24    497  539\r\n 25    770 1103\r\n 26    102 1750\r\n 27    514  659\r\n 28    180 1787\r\n 29    499  938\r\n 30    611 3350\r\n' 模拟收
-#        b' 4 25 2021 3 26 59 0 32 11.7 1500.7 195.0 -2.2 0.7 1.025 13.19 0 0\r\n  1    124 1973\r\n  2    108 1211\r\n  3    393  982\r\n  4    182  159\r\n  5     75 3440\r\n  6    176 2566\r\n  7    268 2553\r\n  8    250 2333\r\n  9    282 2215\r\n 10    564 1113\r\n 11    505  974\r\n 12    285  940\r\n 13    329 1381\r\n 14    118 1087\r\n 15    276 1313\r\n 16    707 2832\r\n 17    385 1000\r\n 18    365 2530\r\n 19    514 1374\r\n 20    591  683\r\n 21   1109  837\r\n 22    889  955\r\n 23    333  829\r\n 24    194  515\r\n 25    466  786\r\n 26    243  445\r\n 27    107 1693\r\n 28    261  317\r\n 29    704 1008\r\n 30    301  219\r\n'
-class AdcpConverter(Converter):
-    def convert(self, config, data):
-        if data:
-            # logger.debug(config)
-            # logger.debug(data)
-            dict = {}
-            try:
-                logger.debug(f"len(data)= {len(data)}")
-                logger.debug(f"(data)= {data}")
-                if len(data) < 500:
-                    return
-                raw_data = data.decode().split("\r\n")
-                logger.debug(f"len(raw_data): {len(raw_data)}")
-                if len(raw_data) == 32:
-                    logger.debug(f"raw_data= {raw_data}")
-                    title = raw_data[0].split(" ")
-                    addon = [float(title[-3]), float(title[-4])]
-                    logger.debug(f"len(title)= {len(title)}")
-                    logger.debug(f"title= {title}")
-                    logger.debug(f"addon={addon}")
-                    raw_data = raw_data[1:-1]
-                    flow_rate_data = []
-                    flow_direction = []
-                    for i in range(0, len(raw_data)):
-                        t = raw_data[i].split(' ')  # 按空格分拆字符串得到字符串列表:['', '', '1', '', '', '', '211', '3075']
-                        t = list(filter(None, t))  # 去除字符串列表中的空格:['1', '211', '3075']
-                        t1 = [int(x) for x in t]  # 字符串列表转int列表:[1, 211, 3075]
-                        flow_rate_data.append(t1[1] / 1000)  # 流速值除以1000
-                        flow_direction.append(t1[2] / 10)  # 流向值除以10
-                    format_data = flow_rate_data + flow_direction + addon
-                    # logger.debug(f"len(format_data): {len(format_data)}")
-                    # logger.debug(f"format_data: {format_data}")
-                    j = 0
-                    # logger.debug(f"len(config): {len(config)}")
-                    # logger.debug(f"config: {config}")
-                    for index in config:
-                        name = 'c' + str(index['serial_number'])
-                        if index['divisor'] is not None:
-                            format_data[j] = format_data[j] / index['divisor']
-                        if index['offset'] is not None:
-                            format_data[j] = format_data[j] - index['offset']
-                        dict[name] = format_data[j]
-                        j += 1
-                    return dict
-                else:
-                    return "error"
-            except Exception as e:
-                logger.error(e)
-                return

+ 14 - 14
converters/dandian_converter.py → converters/cec21_converter.py

@@ -1,33 +1,33 @@
 """
-@File  : dandian_converter.py
+@File  : cec21_converter.py
 @Author: lee
 @Date  : 2022/8/30/0030 14:42:02
-@Desc  :
+@Desc  : CEC21国产单点流速仪解析器
 """
-import json
-import re
-from logging_config import dandian_file_logger as logger
+from logging_config import cec21_converter as logger
 from converter import Converter
+from tools.format_value import format_value
 
 
-class DandianConverter(Converter):
+class CEC21Converter(Converter):
+    def __init__(self, name):
+        self._name = name
+
     def convert(self, config, data):
         # 原始data: {'data': b'\x11\x11pval,22.650,-37.896,3.613,11.104,14.158,17.992,70.457,243.563,-8.010,-16.111,11.813,0\r\n'}
         # 格式化数据:['\x11\x11pval', '22.658', '-36.617', '11.304', '10.291', '14.330', '17.643', '82.970', '253.655', '-4.965', '-16.930', '11.813', '0\r\n']
         #                               温度,     pitch,    roll,    X流速,   Y流速,   流速,     方位,   流向,  南 - 北向流速,东 - 西向流速,电压,状态
         try:
+            logger.info(f"[{self._name}]原始接收数据: len: {len(data)}, values: {data}")
             data = data.decode('utf-8').split(',')
-            logger.info(f"(新版单点流速仪)原始数据: {data}")
+            logger.info(f"[{self._name}]decode后数据: len: {len(data)}, values: {data}")
             dict = {}
             for index in config:
                 name = 'c' + str(index['serial_number'])
                 i = int(index['address'])
-                if index['divisor'] is None:
-                    dict[name] = float(data[i])
-                else:
-                    dict[name] = round((float(data[i]) / index['divisor']), 2)
-            logger.info(f"(新版单点流速仪)解析后数据:{data}")
+                dict[name] = format_value(index, data[i])
+            logger.info(f"[{self._name}]返回数据: len: {len(dict)}, values: {dict}")
             return dict
         except Exception as e:
-            logger.error(e)
-            return "error"
+            logger.info(f"[{self._name}]:{repr(e)}")
+            return

+ 121 - 94
converters/modbus_converter.py

@@ -1,24 +1,28 @@
 """
-@File  : moxa1_converter.py
+@File  : modbus_converter.py
 @Author: lee
 @Date  : 2022/7/12/0012 8:55:13
-@Desc  :
+@Desc  : 此类为mosbus协议通用解析器,用于:insitu水质传感器,新气象传感器,新水质传感器
 """
 import binascii
 import math
 import struct
-from logging_config import shuizhi_file_logger as logger
+from logging_config import modbus_converter as logger
 from converter import Converter
 from event_storage import EventStorage
+from tools.format_value import format_value
 
 
 class ModbusConverter(Converter):
-    list = []
-
-    def __init__(self):
+    def __init__(self, name):
         self._storage = EventStorage()
+        self._name = name
 
     def convert(self, config, data):
+        """
+        data: [1, {112: 16801, 113: 50856}]
+              [站号, {地址:值,地址:值 。。。}]
+        """
         if data:
             device_id = data[0]
             data = data[1]
@@ -26,83 +30,126 @@ class ModbusConverter(Converter):
             format_data_dict = {}  # 列表数据转换字典数字
             try:
                 for index in config:
-                    # addr_type : 'D' 或 'X';
-                    # addr_list: [10] 或 [10, 5]
-                    addr_type, addr_list = addr_parsing(index['address'])
-                    register_addr = addr_list[0]  # 数据地址  example:'X10.5' -> 10
-                    parameter_id = addr_list[1]
-                    if register_addr > 40000:
-                        register_addr = register_addr - 40001
-                    if int(index["device_id"]) == int(device_id) and register_addr in data and data[register_addr + 4] == parameter_id:
-                        logger.info(f"----------------------------")
-                        if addr_type == 'X':
-                            bit_offset_addr = addr_list[1]  # 位偏移地址  example:'X10.5' -> 5
-                            if index['data_type'] == "BOOL":
-                                data_bin = bin(65536 | data[register_addr])  # 和65536做或运算,为了留开头0
-                                data_bin = list(data_bin[3:][::-1])  # 去除开头字符 0  b  1并反转字符串
-                                data_bin = list(map(int, data_bin))  # 字符列表转整型列表
-                                if index['modbus_mode'] == "BA":
-                                    data_bin = data_bin[8:16] + data_bin[0:8]
-                                if index['negation'] == 1:
-                                    return_data = int(bool(1 - data_bin[bit_offset_addr]))  # 取反运算
-                                else:
-                                    return_data = data_bin[bit_offset_addr]
-
-                        elif addr_type == 'D':
-                            if index['data_type'] == "FLOAT16":
-                                return_data = data[register_addr]
-                            elif index['data_type'] == "INT16":
-                                if index['negation'] == 1:
-                                    return_data = int(bool(1 - data[register_addr]))  # 取反运算
-                                else:
-                                    if data[register_addr] > 32767:
-                                        return_data = data[register_addr] - 65536
+                    if int(index["device_id"]) == int(device_id):
+                        # addr_type : 'D' 或 'X';
+                        # addr_list: [10] 或 [10, 5]
+                        addr_type, addr_list = addr_parsing(index['address'])
+                        register_addr = addr_list[0]  # 数据地址  example:'X10.5' -> 10
+                        # parameter_id = addr_list[1]
+                        # print(parameter_id)
+                        if register_addr > 40000:
+                            register_addr = register_addr - 40001
+                        # if register_addr in data and data[register_addr + 4] == parameter_id:
+                        if register_addr in data:
+                            logger.info(f"----------------------------")
+                            if addr_type == 'X':
+                                bit_offset_addr = addr_list[1]  # 位偏移地址  example:'X10.5' -> 5
+                                if index['data_type'] == "BOOL":
+                                    data_bin = bin(65536 | data[register_addr])  # 和65536做或运算,为了留开头0
+                                    data_bin = list(data_bin[3:][::-1])  # 去除开头字符 0  b  1并反转字符串
+                                    data_bin = list(map(int, data_bin))  # 字符列表转整型列表
+                                    if index['modbus_mode'] == "BA":
+                                        data_bin = data_bin[8:16] + data_bin[0:8]
+                                    if index['negation'] == 1:
+                                        return_data = int(bool(1 - data_bin[bit_offset_addr]))  # 取反运算
+                                    else:
+                                        return_data = data_bin[bit_offset_addr]
+                            elif addr_type == 'D':
+                                if index['data_type'] == "FLOAT16":
+                                    return_data = data[register_addr]
+                                elif index['data_type'] == "INT16":
+                                    if index['negation'] == 1:
+                                        return_data = int(bool(1 - data[register_addr]))  # 取反运算
                                     else:
-                                        return_data = data[register_addr]
-                            elif index['data_type'] == "INT32":
-                                return_data_H = data[register_addr]
-                                return_data_L = data[register_addr + 1]
-                                if index['modbus_mode'] == "CDAB":
-                                    return_data = data[register_addr] * 65536 + data[register_addr + 1]
-                                else:
-                                    return_data = data[register_addr + 1] * 65536 + data[register_addr]
-                            elif index['data_type'] == "FLOAT32":
-                                t1 = hex(data[register_addr])[2:]
-                                t2 = hex(data[register_addr + 1])[2:]
-                                if len(t1) < 4:
-                                    t1 = (4 - len(t1)) * "0" + t1
-                                if len(t2) < 4:
-                                    t2 = (4 - len(t2)) * "0" + t2
-                                if index['modbus_mode'] == "CDAB":
-                                    return_data = \
-                                        struct.unpack('>f', binascii.unhexlify((t2 + t1).replace(' ', '')))[0]
-                                else:
-                                    return_data = \
-                                        struct.unpack('>f', binascii.unhexlify((t1 + t2).replace(' ', '')))[0]
-                            elif index['data_type'] == "BELZ_FLOAT32":
-                                llj_data = []
-                                for x in range(0, 6, 2):
-                                    t1 = hex(data[register_addr + x])[2:]
-                                    t2 = hex(data[register_addr + x + 1])[2:]
+                                        if data[register_addr] > 32767:
+                                            return_data = data[register_addr] - 65536
+                                        else:
+                                            return_data = data[register_addr]
+                                elif index['data_type'] == "INT32":
+                                    return_data_H = data[register_addr]
+                                    return_data_L = data[register_addr + 1]
+                                    if index['modbus_mode'] == "CDAB":
+                                        return_data = data[register_addr] * 65536 + data[register_addr + 1]
+                                    else:
+                                        return_data = data[register_addr + 1] * 65536 + data[register_addr]
+                                elif index['data_type'] == "FLOAT32":
+                                    t1 = hex(data[register_addr])[2:]
+                                    t2 = hex(data[register_addr + 1])[2:]
                                     if len(t1) < 4:
                                         t1 = (4 - len(t1)) * "0" + t1
                                     if len(t2) < 4:
                                         t2 = (4 - len(t2)) * "0" + t2
-                                    t = struct.unpack('>f', binascii.unhexlify((t1 + t2).replace(' ', '')))[0]
-                                    llj_data.append(t)
-                                return_data = llj_data[0] * 10 ** 6 + llj_data[1] + llj_data[2] / 10 ** 6  # 总累计量
-                            elif index['data_type'] == "UINT16":
-                                return_data = data[register_addr]
-                        logger.info(f"{index['io_point_name']}(格式化前): {return_data}")
-                        return_data = format_value(index, return_data)
-                        name = 'c' + str(index['serial_number'])
-                        format_data_dict[name] = return_data
-                        logger.info(f"{index['io_point_name']}: {return_data}")
+                                    if index['modbus_mode'] == "CDAB":
+                                        return_data = struct.unpack('>f', binascii.unhexlify((t2 + t1).replace(' ', '')))[0]
+                                    elif index['modbus_mode'] == "DCBA":
+                                        t1_l, t1_r = t1[0:2], t1[2:4]
+                                        t2_l, t2_r = t2[0:2], t2[2:4]
+                                        t1, t2 = t1_r + t1_l, t2_r + t2_l
+                                        return_data = struct.unpack('>f', binascii.unhexlify((t2 + t1).replace(' ', '')))[0]
+                                    else:
+                                        return_data = struct.unpack('>f', binascii.unhexlify((t1 + t2).replace(' ', '')))[0]
+                                    # 电导率转盐度
+                                    if index['io_point_name'] == "盐度" and self._name == 'shuzhi_new':
+                                        return_data = self.cal_salty(return_data)
+
+                                elif index['data_type'] == "BELZ_FLOAT32":
+                                    llj_data = []
+                                    for x in range(0, 6, 2):
+                                        t1 = hex(data[register_addr + x])[2:]
+                                        t2 = hex(data[register_addr + x + 1])[2:]
+                                        if len(t1) < 4:
+                                            t1 = (4 - len(t1)) * "0" + t1
+                                        if len(t2) < 4:
+                                            t2 = (4 - len(t2)) * "0" + t2
+                                        t = struct.unpack('>f', binascii.unhexlify((t1 + t2).replace(' ', '')))[0]
+                                        llj_data.append(t)
+                                    return_data = llj_data[0] * 10 ** 6 + llj_data[1] + llj_data[2] / 10 ** 6  # 总累计量
+                                elif index['data_type'] == "UINT16":
+                                    return_data = data[register_addr]
+                            name = 'c' + str(index['serial_number'])
+                            # 格式化数据
+                            format_data_dict[name] = format_value(index, return_data)
+                            logger.info(f"[{self._name}]: {index['io_point_name']}: {format_data_dict}")
                 return format_data_dict
             except Exception as e:
-                logger.error(e)
+                logger.error(f"{self._name}:{repr(e)}")
                 return "error"
 
+    def cal_salty(self, conductivity):
+        """盐度转换:需要温度和深度参数"""
+        sql = f"SELECT serial_number FROM data_point_tbl WHERE device_name = '{self._name}' AND io_point_name = '温度'"
+        sql1 = f"SELECT serial_number FROM data_point_tbl WHERE device_name = '{self._name}' AND io_point_name = '深度'"
+        res = self._storage.execute_sql(sql)
+        res1 = self._storage.execute_sql(sql1)
+        serial_number_temperature = 'c' + str(res[0]['serial_number'])
+        serial_number_deep = 'c' + str(res1[0]['serial_number'])
+        data = self._storage.get_real_data([serial_number_temperature, serial_number_deep])
+        a0, a1, a2, a3, a4, a5 = 0.0080, -0.1692, 25.3851, 14.0941, -7.0261, 2.7081
+        b0, b1, b2, b3, b4, b5 = 0.0005, -0.0056, -0.0066, -0.0375, 0.0636, -0.0144
+        c0, c1, c2, c3, c4 = 0.6766097, 2.00564e-2, 1.104259e-4, -6.9698e-7, 1.0031e-9
+        d1, d2, d3, d4 = 3.426e-2, 4.464e-4, 4.215e-1, -3.107e-3
+        e1, e2, e3 = 2.070e-5, -6.370e-10, 3.989e-15
+        k = 0.0162
+        R = conductivity / 42.914
+
+        try:
+            if data[serial_number_temperature] and data[serial_number_deep]:
+                t = float(data[serial_number_temperature]) * 1.00024
+                p = float(data[serial_number_deep])
+
+                rt = c0 + c1 * t + c2 * t * t + c3 * t * t * t + c4 * t * t * t * t
+                Rp = 1 + p * (e1 + e2 * p + e3 * p * p) / (1 + d1 * t + d2 * t * t + (d3 + d4 * t) * R)
+                Rt = R / (Rp * rt)
+                # print("\nRt=",Rt,'\n')
+                S = (t - 15) * (b0 + b1 * math.sqrt(Rt) + b2 * Rt + b3 * Rt * math.sqrt(Rt) + b4 * Rt * Rt + b5 * Rt * Rt * math.sqrt(Rt)) / (1 + k * (t - 15))
+                Salinity = a0 + a1 * math.sqrt(Rt) + a2 * Rt + a3 * Rt * math.sqrt(Rt) + a4 * Rt * Rt + a5 * Rt * Rt * math.sqrt(Rt) + S
+                return Salinity
+            else:
+                return None
+        except Exception as e:
+            logger.error(f"{self._name}:{repr(e)}")
+            return 0.00
+
 
 def addr_parsing(addr_smash):
     """
@@ -115,23 +162,3 @@ def addr_parsing(addr_smash):
     addr_smash = addr_smash[1:]  # 地址部分: '10' 或 '10.5'
     addr_list = list(map(int, addr_smash.split(".")))  # 用“.”分割字符串转换为整型存入列表
     return addr_type, addr_list
-
-
-def format_value(index, value):
-    if value:
-        value = float(value)
-        divisor = index['divisor']
-        offset = index['offset']
-        low_limit = index['low_limit']
-        up_limit = index['up_limit']
-        if divisor:
-            value /= divisor
-        if offset:
-            value -= offset
-        value = round(value, 2)
-        if low_limit <= value <= up_limit:
-            return value
-        else:
-            return ''
-    else:
-        return ''

+ 9 - 25
converters/nmea0183_converter.py

@@ -1,18 +1,22 @@
 """
 @Date  :2021/5/21/00219:10:57
-@Desc  :
+@Desc  : 波浪传感器解析器
 """
-from logging_config import sm140_file_logger as logger
+from tools.format_value import format_value
+from logging_config import sm140_converter as logger
 import binascii
 
 from converter import Converter
 
 
 class NEMA0183Converter(Converter):
+    def __init__(self, name):
+        self._name = name
 
     def convert(self, config, data):
+        logger.info(f"[{self._name}]原始接收数据: len: {len(data)}, values: {data}")
         data = data.decode().split("\r\n")
-        logger.info(f"(波高传感器)原始接收数据:len:{len(data)}, data: {data}")
+        logger.info(f"[{self._name}]解码分割: len: {len(data)}, values: {data}")
         for i in data:
             if self.checksum(i):
                 res = self.check_type(config, i)
@@ -67,15 +71,14 @@ class NEMA0183Converter(Converter):
         data = data.split('*')
         # Splits up the NMEA data by comma
         data = data[0].split(',')
-        logger.info(f"进一步格式化数据(波高传感器):len:{len(data)}, data: {data}")
+        logger.info(f"[{self._name}]进一步格式化数据: len: {len(data)}, values: {data}")
         if data[0] == '$PMIRWM':
             for index in config:
                 name = 'c' + str(index['serial_number'])
                 address = int(index['address'])
                 dict[name] = format_value(index, data[address])
-            logger.info(f"解析后数据(波高传感器):len:{len(dict)}, dict: {dict}")
+            logger.info(f"[{self._name}]解析后数据: len: {len(dict)}, values: {dict}")
             return dict
-        # return data[0]
 
     def nmea2utc(self, data):
         '''
@@ -86,25 +89,6 @@ class NEMA0183Converter(Converter):
         return date + 'T' + time + 'Z'
 
 
-def format_value(index, value):
-    if value:
-        value = float(value)
-        divisor = index['divisor']
-        offset = index['offset']
-        low_limit = index['low_limit']
-        up_limit = index['up_limit']
-        if divisor:
-            value /= divisor
-        if offset:
-            value -= offset
-        if low_limit <= value <= up_limit:
-            return value
-        else:
-            return ''
-    else:
-        return ''
-
-
 '''        
 data = "$PMIRR,20210325,033351.719,0.000,0.000,V,0.00*0F"
 

+ 0 - 6
converters/shuizhi_converter.py

@@ -1,6 +0,0 @@
-from converter import Converter
-
-
-class ShuizhiConverter(Converter):
-    def convert(self, config, data):
-        pass

+ 9 - 9
converters/td266_converter.py

@@ -1,28 +1,28 @@
-import json
-import re
-from logging_config import td266_file_logger as logger
+from tools.format_value import format_value
+from logging_config import td266_converter as logger
 
 from converter import Converter
 
 
 class TD266Converter(Converter):
+    def __init__(self, name):
+        self._name = name
+
     def convert(self, config, data):
         # 原始data: data = b'4420\t1194\t29.823\t104.507\t-7.471\t28.872\t253.153\t9.369\t1.816\t91.491\t-59.593\t100\t9.542\t9.589\t0.015\r\n'
         # 去除结尾\r\n: data = b'4420\t1194\t29.823\t104.507\t-7.471\t28.872\t253.153\t9.369\t1.816\t91.491\t-59.593\t100\t9.542\t9.589\t0.015'
         # decode('utf-8'): data = 4420	1194	29.823	104.507	-7.471	28.872	253.153	9.369	1.816	91.491	-59.593	100	9.542	9.589	0.015
         # split('\t'): data = ['4420', '1194', '29.823', '104.507', '-7.471', '28.872', '253.153', '9.369', '1.816', '91.491', '-59.593', '100', '9.542', '9.589', '0.015']
         try:
+            logger.info(f"[{self._name}]原始接收数据: len: {len(data)}, values: {data}")
             data = data.decode('utf-8').split('\t')
-            logger.info(f"(单点流速仪)原始数据: {data}")
+            logger.info(f"[{self._name}]解码分割: len: {len(data)}, values: {data}")
             dict = {}
             for index in config:
                 name = 'c' + str(index['serial_number'])
                 i = int(index['address'])
-                if index['divisor'] is None:
-                    dict[name] = float(data[i])
-                else:
-                    dict[name] = round((float(data[i]) / index['divisor']), 2)
-            logger.info(f"(单点流速仪)解析后数据:{data}")
+                dict[name] = format_value(index, data[i])
+            logger.info(f"[{self._name}]返回数据: len: {len(dict)}, values: {dict}")
             return dict
         except Exception as e:
             logger.error(e)

+ 12 - 27
converters/wxt536_converter.py

@@ -1,20 +1,24 @@
-from logging_config import wxt536_file_logger as logger
 import re
 from converter import Converter
+from tools.format_value import format_value
+from logging_config import wxt536_converter as logger
 
 
 class WXT536Converter(Converter):
-    """
-    data: b'0R0,Dm=267D,Sm=1.2M,Ta=-25.0C,Ua=87.1P,Pa=1001.9H,Rc=-0.00M,Th=28.3C,Vh=0.0#'
-    """
+
+    def __init__(self, name):
+        self._name = name
 
     def convert(self, config, data):
-        logger.info(f"(气象仪)原始接收数据: {data}")
+        """
+        data: b'0R0,Dm=267D,Sm=1.2M,Ta=-25.0C,Ua=87.1P,Pa=1001.9H,Rc=-0.00M,Th=28.3C,Vh=0.0#'
+        """
+        logger.info(f"原始接收数据[{self._name}]: {data}")
         if data:
             dict = {}
             try:
                 list = data.decode().split(",")
-                logger.info(f"(气象仪)解码分割后, 标准长度:9,实际长度:{len(list)}, 内容: {list}, ")
+                logger.info(f"({self._name})解码分割后, 标准长度:9,实际长度:{len(list)}, 内容: {list}, ")
                 if list[0] == '0R0':
                     for index in config:
                         name = 'c' + str(index['serial_number'])
@@ -23,31 +27,12 @@ class WXT536Converter(Converter):
                         if list[i][-1] != "#":
                             value = re.findall(r"-*\d+\.?\d*", list[i])[0]
                         dict[name] = format_value(index, value)
-                    logger.info(f"(气象仪)解析后数据:{dict}")
+                    logger.info(f"({self._name})解析后数据:{dict}")
                     return dict
                 elif len(list) > 0:
                     return "pass"
                 else:
                     return "error"
             except Exception as e:
-                logger.error(e)
+                logger.error(f"({self._name}):{repr(e)}")
                 return "error"
-
-
-def format_value(index, value):
-    if value:
-        value = float(value)
-        divisor = index['divisor']
-        offset = index['offset']
-        low_limit = index['low_limit']
-        up_limit = index['up_limit']
-        if divisor:
-            value /= divisor
-        if offset:
-            value -= offset
-        if low_limit <= value <= up_limit:
-            return value
-        else:
-            return ''
-    else:
-        return ''

+ 34 - 40
create_data_tbl.py

@@ -1,69 +1,63 @@
-import time
-import traceback
-import pymysql
+"""
+@Date  :2021/5/21/00219:10:57
+@Desc  :此类可以根据站表和点表的内容自动创建table开头的存储历史数据的数据表
+        运行只对status为1的站点所对应的数据表有修改。
 
+"""
+import os
+import sys
 
-class Mysql:
-    def __init__(self, host='', user='', passwd='', db='', port=3306, charset='utf8'):
-        self.host = host
-        self.user = user
-        self.passwd = passwd
-        self.db = db
-        self.port = port
-        self.charset = charset
-        self.conn = None
-        self.cursor = None
-        self._conn()
+from event_storage import EventStorage
+import threading
 
-    def _conn(self):
-        try:
-            self.conn = pymysql.connect(host=self.host, user=self.user, password=self.passwd, db=self.db, port=self.port)
-            self.cursor = self.conn.cursor(pymysql.cursors.DictCursor)
-            return True
-        except Exception as e:
-            print(e)
-            return False
 
-    def run(self):
-        self.create_data_table()
+class CreateDataTable(threading.Thread):
+    def __init__(self):
+        super().__init__()
+        self.storage = EventStorage()
 
     def get_stations_info(self):
+        """
+        获取激活的站点信息
+        """
         sql = "SELECT * FROM station_info_tbl WHERE status = 1;"
-        self.cursor.execute(sql)
-        results = self.cursor.fetchall()
-        return results
+        res = self.storage.execute_sql(sql)
+        return res
 
     def get_device_by_station_name(self, station_name):
         sql = "select DISTINCT (device_name) from data_point_tbl where station_name = '%s'" % station_name
-        self.cursor.execute(sql)
-        results = self.cursor.fetchall()
+        results = self.storage.execute_sql(sql)
         return results
 
     def get_point_by_device_name(self, device_name):
         sql = "SELECT * FROM data_point_tbl WHERE device_name = '%s'" % device_name
-        self.cursor.execute(sql)
-        results = self.cursor.fetchall()
+        results = self.storage.execute_sql(sql)
         return results
 
-    def create_data_table(self):
+    def run(self):
         stations_list = self.get_stations_info()
         for each_station in stations_list:
             devices_list = self.get_device_by_station_name(each_station['station_name'])
             for each_device in devices_list:
                 points_list = self.get_point_by_device_name(each_device['device_name'])
-                print(points_list)
-                print("------------")
                 table_name = 'table_' + each_device['device_name']
-                sql_c = "CREATE TABLE IF NOT EXISTS %s (id bigint primary key , times datetime NOT NULL,INDEX (times)) ENGINE=InnoDB DEFAULT CHARSET=utf8;" % table_name
-                self.cursor.execute(sql_c)
+                sql_c = "CREATE TABLE IF NOT EXISTS %s (id bigint primary key auto_increment, times datetime NOT NULL,INDEX (times)) ENGINE=InnoDB DEFAULT CHARSET=utf8;" % table_name
+                self.storage.execute_sql(sql_c)
                 for i in points_list:
                     dataType = i['storage_type']
                     columnName = "c" + str(i['serial_number'])
-                    sql_add = "ALTER TABLE %s ADD %s  %s comment '%s'" % (table_name, columnName, dataType, i['io_point_name'])
-                    print(f"sql_add: {sql_add}")
-                    self.cursor.execute(sql_add)
+                    sql = "SELECT * FROM information_schema.COLUMNS WHERE column_name='%s' and table_name='%s' and table_schema='shucai'" % (columnName, table_name)
+                    res = self.storage.execute_sql(sql)
+                    if not res:
+                        sql_add = "ALTER TABLE %s ADD COLUMN %s  %s comment '%s' " % (table_name, columnName, dataType, i['io_point_name'])
+                        self.storage.execute_sql(sql_add)
+                sql = "SELECT * FROM information_schema.COLUMNS WHERE column_name='is_send' and table_name='%s' and table_schema='shucai'" % (table_name)
+                res = self.storage.execute_sql(sql)
+                if not res:
+                    sql_add_is_send = "ALTER TABLE %s ADD COLUMN is_send tinyint" % (table_name)
+                    self.storage.execute_sql(sql_add_is_send)
                 print(table_name, "done !")
 
 
 if __name__ == '__main__':
-    Mysql(host='127.0.0.1', user='root', passwd='zzZZ4144670..', db='shucai').run()
+    CreateDataTable().run()

+ 5 - 0
event_storage.py

@@ -68,6 +68,11 @@ class EventStorage:
         data = self.hardDiskStorage.get_in_situ_command()
         return data
 
+    # 执行自定义sql
+    def execute_sql(self, sql):
+        data = self.hardDiskStorage.execute_sql(sql)
+        return data
+
 
 class Networkerror(RuntimeError):
     def __init__(self, arg):

+ 6 - 7
gateway.py

@@ -5,7 +5,7 @@ import time
 import os
 import wmi
 from sanic import Sanic
-from sanic_cors import CORS, cross_origin
+from sanic_cors import CORS
 from sanic import response
 
 # device import
@@ -14,9 +14,8 @@ from configuration import Configuration
 from utility import Utility
 from alarm import Alarm
 from historical_data_storage import HistoricalDataStorage
-from hard_disk_storage import HardDiskStorage
 from api_context import ApiContext
-from AES_crypt import decrypt, encrypt
+from AES_crypt import decrypt
 from logging_config import LOGGING_CONFIG
 import logging.config
 
@@ -31,8 +30,11 @@ for handler in handlers:
         if not os.path.exists(dirname):
             os.makedirs(dirname)
 # --------------------------
-
+gateway_storage = EventStorage()
+connector_config = gateway_storage.get_connector_config()
+Utility.start_connectors(connector_config)
 app = Sanic(__name__)
+# app.config.CORS_ORIGINS = "*"
 CORS(app)
 
 
@@ -197,9 +199,6 @@ async def notify_server_started_after_five_seconds():
 
 
 if __name__ == "__main__":
-    gateway_storage = EventStorage()
-    connector_config = gateway_storage.get_connector_config()
-    Utility.start_connectors(connector_config)
     Alarm().start()
     HistoricalDataStorage().start()
     # 气象仪降雨量每日清零:一号打开,二号关闭,三号关闭

+ 13 - 1
hard_disk_storage.py

@@ -1,5 +1,5 @@
 import datetime
-from logging_config import logger
+from logging_config import general as logger
 import openpyxl
 import pymysql
 import traceback
@@ -297,3 +297,15 @@ class HardDiskStorage:
         except:
             print(traceback.format_exc())
             return None
+
+    def execute_sql(self, sql):
+        try:
+            self._reConn()
+            self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
+            self.cursor.execute(sql)
+            results = self.cursor.fetchall()
+            self.cursor.close()
+            return results
+        except:
+            print(traceback.format_exc())
+            return None

+ 1 - 1
historical_data_storage.py

@@ -2,7 +2,7 @@ import json
 import threading
 from event_storage import EventStorage
 import time
-from logging_config import logger
+from logging_config import general as logger
 
 
 class HistoricalDataStorage(threading.Thread):

+ 75 - 43
logging_config.py

@@ -12,47 +12,59 @@ LOGGING_CONFIG = dict(
     disable_existing_loggers=False,
     loggers={
         # 新曾自定义日志,用于数据采集程序
-        "console": {
+        "general": {
+            "level": "INFO",
+            "handlers": ["console", "general"],
+            "propagate": True,
+            "qualname": "general.debug",
+        },
+        "modbus_connector": {
             "level": "INFO",
-            "handlers": ["console", "connector_file"],
+            "handlers": ["console", "modbus_connector"],
             "propagate": True,
-            "qualname": "console.debug",
+            "qualname": "modbus_connector.debug",
         },
-        "sm140_file": {
+        "tcp_connector": {
+            "level": "INFO",
+            "handlers": ["console", "tcp_connector"],
+            "propagate": True,
+            "qualname": "tcp_connector.debug",
+        },
+        "sm140_converter": {
             "level": "DEBUG",
-            "handlers": ["console", "sm140_file"],
+            "handlers": ["console", "sm140_converter"],
             "propagate": True,
-            "qualname": "sm140.debug",
+            "qualname": "sm140_converter.debug",
         },
-        "wxt536_file": {
+        "wxt536_converter": {
             "level": "DEBUG",
-            "handlers": ["console", "wxt536_file"],
+            "handlers": ["console", "wxt536_converter"],
             "propagate": True,
-            "qualname": "wxt536.debug",
+            "qualname": "wxt536_converter.debug",
         },
-        "adcp_file": {
+        "adcp_converter": {
             "level": "DEBUG",
-            "handlers": ["console", "adcp_file"],
+            "handlers": ["console", "adcp_converter"],
             "propagate": True,
-            "qualname": "adcp.debug",
+            "qualname": "adcp_converter.debug",
         },
-        "dandian_file": {
+        "cec21_converter": {
             "level": "DEBUG",
-            "handlers": ["console", "dandian_file"],
+            "handlers": ["console", "cec21_converter"],
             "propagate": True,
-            "qualname": "dandian.debug",
+            "qualname": "cec21_converter.debug",
         },
-        "td266_file": {
+        "td266_converter": {
             "level": "DEBUG",
-            "handlers": ["console", "td266_file"],
+            "handlers": ["console", "td266_converter"],
             "propagate": True,
-            "qualname": "td266.debug",
+            "qualname": "td266_converter.debug",
         },
-        "shuizhi_file": {
+        "modbus_converter": {
             "level": "DEBUG",
-            "handlers": ["console", "shuizhi_file"],
+            "handlers": ["console", "modbus_converter"],
             "propagate": True,
-            "qualname": "shuizhi.debug",
+            "qualname": "modbus_converter.debug",
         },
     },
     handlers={
@@ -62,63 +74,81 @@ LOGGING_CONFIG = dict(
             "formatter": "generic",
             "stream": sys.stdout,
         },
-        "connector_file": {
+        "general": {
+            'class': 'logging.handlers.RotatingFileHandler',
+            'filename': 'log/general/general.log',
+            'maxBytes': 10 * 1024 * 1024,
+            'delay': True,
+            "formatter": "generic",
+            "backupCount": 20,
+            "encoding": "utf-8"
+        },
+        "modbus_connector": {
+            'class': 'logging.handlers.RotatingFileHandler',
+            'filename': 'log/modbus_connector/modbus_connector.log',
+            'maxBytes': 10 * 1024 * 1024,
+            'delay': True,
+            "formatter": "generic",
+            "backupCount": 20,
+            "encoding": "utf-8"
+        },
+        "tcp_connector": {
             'class': 'logging.handlers.RotatingFileHandler',
-            'filename': 'log/connector_log/connector_file.log',
+            'filename': 'log/tcp_connector/tcp_connector.log',
             'maxBytes': 10 * 1024 * 1024,
             'delay': True,
             "formatter": "generic",
             "backupCount": 20,
             "encoding": "utf-8"
         },
-        "sm140_file": {
+        "sm140_converter": {
             'class': 'logging.handlers.RotatingFileHandler',
-            'filename': 'log/sm140_log/sm140_file.log',
+            'filename': 'log/sm140_converter/sm140_converter.log',
             'maxBytes': 10 * 1024 * 1024,
             'delay': True,
             "formatter": "generic",
             "backupCount": 20,
             "encoding": "utf-8"
         },
-        "wxt536_file": {
+        "wxt536_converter": {
             'class': 'logging.handlers.RotatingFileHandler',
-            'filename': 'log/wxt536_log/wxt536_log.log',
+            'filename': 'log/wxt536_converter/wxt536_converter.log',
             'maxBytes': 10 * 1024 * 1024,
             'delay': True,
             "formatter": "generic",
             "backupCount": 20,
             "encoding": "utf-8"
         },
-        "adcp_file": {
+        "adcp_converter": {
             'class': 'logging.handlers.RotatingFileHandler',
-            'filename': 'log/adcp_log/adcp_log.log',
+            'filename': 'log/adcp_converter/adcp_converter.log',
             'maxBytes': 10 * 1024 * 1024,
             'delay': True,
             "formatter": "generic",
             "backupCount": 20,
             "encoding": "utf-8"
         },
-        "dandian_file": {
+        "cec21_converter": {
             'class': 'logging.handlers.RotatingFileHandler',
-            'filename': 'log/dandian_log/dandian_log.log',
+            'filename': 'log/cec21_converter/cec21_converter.log',
             'maxBytes': 10 * 1024 * 1024,
             'delay': True,
             "formatter": "generic",
             "backupCount": 20,
             "encoding": "utf-8"
         },
-        "td266_file": {
+        "td266_converter": {
             'class': 'logging.handlers.RotatingFileHandler',
-            'filename': 'log/td266_log/td266_log.log',
+            'filename': 'log/td266_converter/td266_converter.log',
             'maxBytes': 10 * 1024 * 1024,
             'delay': True,
             "formatter": "generic",
             "backupCount": 20,
             "encoding": "utf-8"
         },
-        "shuizhi_file": {
+        "modbus_converter": {
             'class': 'logging.handlers.RotatingFileHandler',
-            'filename': 'log/shuizhi_log/shuizhi_log.log',
+            'filename': 'log/modbus_converter/modbus_converter.log',
             'maxBytes': 10 * 1024 * 1024,
             'delay': True,
             "formatter": "generic",
@@ -129,16 +159,18 @@ LOGGING_CONFIG = dict(
     formatters={
         # 自定义文件格式化器
         "generic": {
-            "format": "%(asctime)s [%(filename)s:%(lineno)d] [%(levelname)s] %(message)s",
+            "format": "%(asctime)s {%(process)d(%(thread)d)} [%(filename)s:%(lineno)d] [%(levelname)s] %(message)s",
             "datefmt": "[%Y-%m-%d %H:%M:%S]",
             "class": "logging.Formatter",
         },
     },
 )
-logger = logging.getLogger("console")
-sm140_file_logger = logging.getLogger("sm140_file")
-wxt536_file_logger = logging.getLogger("wxt536_file")
-adcp_file_logger = logging.getLogger("adcp_file")
-dandian_file_logger = logging.getLogger("dandian_file")
-td266_file_logger = logging.getLogger("td266_file")
-shuizhi_file_logger = logging.getLogger("shuizhi_file")
+general = logging.getLogger("general")
+modbus_connector = logging.getLogger("modbus_connector")
+tcp_connector = logging.getLogger("tcp_connector")
+sm140_converter = logging.getLogger("sm140_converter")
+wxt536_converter = logging.getLogger("wxt536_converter")
+adcp_converter = logging.getLogger("adcp_converter")
+cec21_converter = logging.getLogger("cec21_converter")
+td266_converter = logging.getLogger("td266_converter")
+modbus_converter = logging.getLogger("modbus_converter")

+ 29 - 0
tools/format_value.py

@@ -0,0 +1,29 @@
+def format_value(index, value):
+    """格式化数据"""
+    if value is not None:
+        value = float(value)
+        divisor = index['divisor']
+        offset = index['offset']
+        low_limit = index['low_limit']
+        up_limit = index['up_limit']
+        if divisor:
+            value /= divisor
+        if offset:
+            value -= offset
+        # 保留两位小数
+        # value = round(value, 2)
+        if low_limit is not None and up_limit is not None:  # 上下限都存在
+            if low_limit <= value <= up_limit:
+                return value
+        elif low_limit is not None and up_limit is None:  # 下限存在
+            if low_limit <= value:
+                return value
+        elif up_limit is not None and low_limit is None:  # 上限存在
+            if value <= up_limit:
+                return value
+        else:  # 都不存在
+            return value
+        # 不在范围舍弃数据
+        return ''
+    else:
+        return ''

+ 1 - 7
utility.py

@@ -1,15 +1,9 @@
-import importlib
 import json
 import datetime
 import connectors
 import converters
 
 
-# name = "Test"
-# zz = connectors.modbus_connector.Test()
-# zz.f()
-
-
 class Utility:
     loaded_connector_module = {}
     loaded_converter_module = {}
@@ -42,7 +36,7 @@ class Utility:
         for config in configs:
             connector_config = config['connector_config']
             name = config['station_name']
-            converter = converters[config['converter']]()
+            converter = converters[config['converter']](name)
             connector = connectors[config['connector']](name, connector_config, converter)
             Utility.available_connectors[name] = connector
             connector.open()