Bläddra i källkod

第一次提交

DJW 1 år sedan
incheckning
2a29146871

+ 1 - 0
GatewayWorker_DTU

@@ -0,0 +1 @@
+Subproject commit 2aff3f053bc5580e8cd11e4c9264be73feef53d1

+ 90 - 0
historical_data_storage/event_storage.py

@@ -0,0 +1,90 @@
+import time
+import json
+import re
+from memory_storage import MemoryStorage
+from hard_disk_storage import HardDiskStorage
+
+
+class EventStorage:
+    def __init__(self):
+        self.config = {
+          "hardDiskdataBase": {
+            "ip": "127.0.0.1",
+            "username": "root",
+            # "password": "R.!a@O&t9CjweWLSTr",
+            "password": "zzZZ4144670..",
+            "dataBaseName": "centralized_control_guangfu_cloud"
+          },
+          "memoryDatabase": {
+            "ip": "127.0.0.1",
+            "port": 6379
+          }
+        }
+        self.memoryStorage = MemoryStorage(self.config['memoryDatabase'])
+        self.hardDiskStorage = HardDiskStorage(self.config['hardDiskdataBase'])
+
+    def get_real_data(self, keys):
+        data = self.memoryStorage.get_value(keys)
+        return data
+
+    def get_historical_data(self, select_info):
+        data = self.hardDiskStorage.get_table_data(select_info)
+        return data
+
+    def real_time_data_storage(self, data):
+        self.memoryStorage.set_value(data)
+
+    def historical_data_storage(self, table_name, seve_time, data):
+        self.hardDiskStorage.insert_column_many(table_name, seve_time, data)
+
+    def get_connector_config(self):
+        config = self.hardDiskStorage.get_connectors()
+        for station_info in config:
+            station_info['connector_config'] = json.loads(station_info['connector_config'])
+        return config
+
+    def get_station_info(self, station_name):
+        return self.hardDiskStorage.get_station_info(station_name)
+
+    def get_point_info(self, keys):
+        point_list = []
+        if keys:
+            for key in keys:
+                point_list.append(re.sub("\D", "", key))
+            point_tuple = tuple(point_list)
+        else:
+            point_tuple = None
+        return self.hardDiskStorage.get_point_info(point_tuple)
+
+    # 获取modbus命令
+    def get_command_info(self, station_name):
+        return self.hardDiskStorage.get_command_info(station_name)
+
+    # 历史查询接口(new)
+    def get_total_count_and_first_id(self, select_info):
+        data = self.hardDiskStorage.get_total_count_and_first_id(select_info)
+        return data
+
+    def get_item_by_id_offset(self, select_info):
+        data = self.hardDiskStorage.get_item_by_id_offset(select_info)
+        return data
+
+    # 数据导出接口
+    def quary_table_data(self, select_info):
+        data = self.hardDiskStorage.quary_table_data(select_info)
+        return data
+
+    # 获取insitu指令
+    def get_in_situ_command(self):
+        data = self.hardDiskStorage.get_in_situ_command()
+        return data
+
+    # 执行自定义sql
+    def execute_sql(self, sql):
+        data = self.hardDiskStorage.execute_sql(sql)
+        return data
+
+
+class Networkerror(RuntimeError):
+    def __init__(self, arg):
+        self.args = arg

+ 268 - 0
historical_data_storage/hard_disk_storage.py

@@ -0,0 +1,268 @@
+import datetime
+import math
+import re
+
+import pymysql
+import traceback
+import time
+
+
+class HardDiskStorage:
+    def __init__(self, config, port=3306, charset='utf8'):
+        self.host = config['ip']
+        self.user = config['username']
+        self.passwd = config['password']
+        self.db = config['dataBaseName']
+        self.port = port
+        self.charset = charset
+        self.conn = None
+        if not self._conn():
+            self._reConn()
+
+    def _conn(self):
+        try:
+            self.conn = pymysql.connect(host=self.host, user=self.user, password=self.passwd, database=self.db, port=self.port, autocommit=True)
+            return True
+        except Exception as e:
+            print(f'failed to connect to {self.host}:{self.port}:{self.db} by [{self.user}:{self.passwd}]:{e}')
+            return False
+
+    def _reConn(self, num=28800, stime=3):  # 重试连接总次数为1天,这里根据实际情况自己设置,如果服务器宕机1天都没发现就......
+        _number = 0
+        _status = True
+        while _status and _number <= num:
+            try:
+                self.conn.ping()  # cping 校验连接是否异常
+                _status = False
+            except Exception as e:
+                if self._conn():  # 重新连接,成功退出
+                    _status = False
+                    break
+                _number += 1
+                time.sleep(stime)  # 连接不成功,休眠3秒钟,继续循环,知道成功或重试次数结束
+
+    def get_station_info(self, station_name):
+        sql = "SELECT * FROM data_point_tbl where station_name = '%s'" % station_name
+        try:
+            self._reConn()
+            self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
+            self.cursor.execute(sql)
+            results = self.cursor.fetchall()
+            self.cursor.close()
+            return results
+        except:
+            print(traceback.format_exc())
+            return False
+
+    def get_point_info(self, point_tuple):
+        if point_tuple:
+            sql = "SELECT * FROM data_point_tbl where serial_number in %s" % (str(point_tuple))
+        else:
+            sql = "SELECT * FROM data_point_tbl"
+        try:
+            self._reConn()
+            self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
+            self.cursor.execute(sql)
+            results = self.cursor.fetchall()
+            self.cursor.close()
+            return results
+        except:
+            print(traceback.format_exc())
+            return False
+
+    def get_table_data(self, senect_info):
+        table_name = "table_" + senect_info['deviceName']
+        time_begin = senect_info['timeBegin']
+        time_end = senect_info['timeEnd']
+        column = senect_info['pointList']
+        if len(column) > 0:
+            sql = "SELECT times"
+            for column_name in column:
+                sql = sql + "," + column_name
+            sql = sql + " FROM %s WHERE times > '%s' AND times < '%s'" % (table_name, time_begin, time_end)
+        else:
+            sql = "SELECT * FROM %s WHERE times > '%s' AND times < '%s';" % (table_name, time_begin, time_end)
+        try:
+            self._reConn()
+            self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
+            self.cursor.execute(sql)
+            results = self.cursor.fetchall()
+            self.cursor.close()
+            return results
+        except:
+            print(traceback.format_exc())
+            return None
+
+    # 历史查询接口(new)--------------------------------------------\\
+    def get_total_count_and_first_id(self, search_info):
+        table_name = "table_" + search_info['deviceName']
+        time_begin = search_info['timeBegin']
+        time_end = search_info['timeEnd']
+        sql = "select count(*) from %s where times >= '%s' and times <= '%s';" % (table_name, time_begin, time_end)
+        sql_1 = "select id from %s where times >= '%s' limit 1;" % (table_name, time_begin)
+        try:
+            self._reConn()
+            self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
+            self.cursor.execute(sql)
+            count = self.cursor.fetchall()
+            self.cursor.execute(sql_1)
+            first_id = self.cursor.fetchall()
+            if isinstance(first_id, tuple):
+                first_id = list(first_id)
+            result = count + first_id
+            return result
+        except:
+            print(traceback.format_exc())
+            return None
+
+    def get_item_by_id_offset(self, search_info):
+        table_name = "table_" + search_info['deviceName']
+        point_list = search_info['pointList']
+        id_offset = search_info['idOffset']
+        quantity = search_info['quantity']
+        sql = "select times, %s from %s where id  >= %s limit %s" % (','.join(point_list), table_name, id_offset, quantity)
+        try:
+            self._reConn()
+            self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
+            self.cursor.execute(sql)
+            results = self.cursor.fetchall()
+            self.cursor.close()
+            return results
+        except:
+            print(traceback.format_exc())
+            return None
+
+    # --------------------------------------------//
+
+    # ------------------------------------------------//
+
+    # 获取insitu指令接口
+    def get_in_situ_command(self):
+        sql = "select * from shuizhi_insitu_instruct;"
+        try:
+            self._reConn()
+            self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
+            self.cursor.execute(sql)
+            results = self.cursor.fetchall()
+            self.cursor.close()
+            return results
+        except:
+            print(traceback.format_exc())
+            return None
+
+    def get_connectors(self):
+        sql = "SELECT * FROM station_info_tbl WHERE status = 1"
+        try:
+            self._reConn()
+            self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
+            self.cursor.execute(sql)
+            results = self.cursor.fetchall()
+            self.cursor.close()
+            return results
+        except:
+            print(traceback.format_exc())
+            return None
+
+    def create_delete_stale_data_event(self, eventName, table_name, day):
+        self.cursor = self.conn.cursor()
+        sql = "create event %s on SCHEDULE every 1 day do delete from %s where times<(CURRENT_TIMESTAMP() + INTERVAL -%s DAY);" % (
+            eventName, table_name, day)
+        self.cursor.execute(sql)
+        self.cursor.close()
+
+    def create_many_column_table(self, table_name, list):
+        self.cursor = self.conn.cursor()
+        for index in range(len(list)):
+            dataType = list[index]['storageType']
+            columnName = "c" + str(list[index]['serialNumber'])
+            sql_c = "CREATE TABLE IF NOT EXISTS %s (times datetime NOT NULL,INDEX (times)) \
+                ENGINE=InnoDB DEFAULT CHARSET=utf8;" % (table_name)
+            sql_add = "ALTER TABLE %s ADD %s %s " % (table_name, columnName, dataType)
+            try:
+                self.cursor.execute(sql_c)
+                self.cursor.execute(sql_add)
+            except:
+                print(traceback.format_exc())
+        sql_send = "ALTER TABLE %s ADD 'is_send'tinyint NOT NULL DEFAULT '0'" % (table_name)
+        self.cursor.execute(sql_send)
+        self.cursor.close()
+
+    def insert_column_many(self, table_name, timeNow, dict):
+        try:
+            self.cursor = self.conn.cursor()
+            sql = "INSERT INTO %s (times" % (table_name)
+            for key_name in dict.keys():
+                sql = sql + "," + key_name
+            sql = sql + ") VALUE ('" + str(timeNow) + "'"
+            for key_name in dict.keys():
+                data = "," + str(dict[key_name])
+                sql = sql + data
+            sql = sql + ")"
+            try:
+                self.cursor.execute(sql)
+                # 提交到数据库执行
+                self.conn.commit()
+            except Exception as e:
+                # 如果发生错误则回滚
+                self.conn.rollback()
+                print(e)
+        except Exception as e:
+            self._reConn()
+            print(e)
+        else:
+            self.cursor.close()
+
+    def close(self):
+        self.conn.close()
+
+    def get_command_info(self, station_name):
+        sql = "SELECT command FROM command where station_name = '%s' " % station_name
+        try:
+            self._reConn()
+            self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
+            self.cursor.execute(sql)
+            results = self.cursor.fetchall()
+            self.cursor.close()
+            return results
+        except:
+            print(traceback.format_exc())
+            return None
+
+    # lee
+    def get_device_name_by_station_name(self, station_name):
+        sql = "SELECT DISTINCT device_name FROM data_point_tbl WHERE station_name = '%s' " % station_name
+        try:
+            self._reConn()
+            self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
+            self.cursor.execute(sql)
+            results = self.cursor.fetchall()
+            self.cursor.close()
+            return results
+        except:
+            print(traceback.format_exc())
+            return None
+
+    def get_data_point_by_device_name(self, device_name):
+        sql = "SELECT * FROM data_point_tbl WHERE device_name = '%s'" % device_name
+        try:
+            self._reConn()
+            self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
+            self.cursor.execute(sql)
+            results = self.cursor.fetchall()
+            self.cursor.close()
+            return results
+        except:
+            print(traceback.format_exc())
+            return None
+
+    def execute_sql(self, sql):
+        try:
+            self._reConn()
+            self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
+            self.cursor.execute(sql)
+            results = self.cursor.fetchall()
+            self.cursor.close()
+            return results
+        except:
+            print(traceback.format_exc())
+            return None

+ 74 - 0
historical_data_storage/historical_data_storage.py

@@ -0,0 +1,74 @@
+import json
+import threading
+from event_storage import EventStorage
+import time
+from logging_config import general as logger
+
+
+class HistoricalDataStorage(threading.Thread):
+    def __init__(self):
+        super(HistoricalDataStorage, self).__init__()
+        self._storage = EventStorage()
+
+    # 历史存储主函数
+    def run(self):
+        logger.info('Historical data storage module is running!')
+        station_info = self._storage.hardDiskStorage.get_connectors()  # 获取所有站点信息
+
+        all_devices = []
+        for item in station_info:
+            station_name = item['station_name']  # 站点名称
+            connector_config = json.loads(item['connector_config'])  # 加载json格式connector_config参数
+            save_frequency = connector_config['save_frequency']  # 获取存储频率
+            devices_each_station = self._storage.hardDiskStorage.get_device_name_by_station_name(station_name)  # 根据站点名称获取设备列表
+
+            for i in devices_each_station:
+                temp_dict = {}
+                # 获取每个设备所有点的serial_number,转换为键列表
+                device_name = i['device_name']
+                data_point_each_decive = self._storage.hardDiskStorage.get_data_point_by_device_name(device_name)  # 根据设备名称获取设备点表
+
+                serial_number_list = []
+                for item in data_point_each_decive:
+                    serial_number = 'c' + str(item['serial_number'])
+                    serial_number_list.append(serial_number)
+
+                temp_dict['device_name'] = device_name
+                temp_dict['save_frequency'] = save_frequency
+                temp_dict['serial_number_list'] = serial_number_list
+                temp_dict['last_save_time'] = 0
+
+                all_devices.append(temp_dict)
+        while 1:
+            time.sleep(0.2)
+            for item in all_devices:
+                save_frequency = item['save_frequency']
+                last_save_time = item['last_save_time']
+                now_time = time.time()
+                if now_time - last_save_time >= save_frequency:
+                    item['last_save_time'] = now_time
+                    save_time = int(now_time) - int(now_time) % save_frequency
+                    save_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(save_time))
+                    serial_number_list = item['serial_number_list']
+                    real_time_data = self._storage.memoryStorage.get_value(serial_number_list)  # 根据键列表查询实时数据库
+                    # print(real_time_data)
+                    flag = False  # 是否允许存储标志位
+                    for key in real_time_data:
+                        # 值全部为空,不允许存储
+                        if real_time_data[key]:
+                            flag = True
+                            break
+                    if flag:
+                        for key in real_time_data:  # redis值为None的
+                            if real_time_data[key] is None:  # redis数据库未存储此值
+                                real_time_data[key] = 'null'
+                            if real_time_data[key] == '':  # redis存储的为空值
+                                real_time_data[key] = 'null'
+                        table_name = "table_" + str(item['device_name'])  # 根据站名计算表名
+                        logger.debug(f"{table_name} <- {real_time_data}")
+                        self._storage.hardDiskStorage.insert_column_many(table_name, save_time, real_time_data)
+
+
+if __name__ == '__main__':
+    historicalDataStorage = HistoricalDataStorage()
+    historicalDataStorage.run()

+ 49 - 0
historical_data_storage/logging_config.py

@@ -0,0 +1,49 @@
+"""
+@File  : log_config.py
+@Author: lee
+@Date  : 2022/7/13/0013 11:08:55
+@Desc  :
+"""
+import logging
+import os
+import sys
+
+LOGGING_CONFIG = dict(
+    version=1,
+    disable_existing_loggers=False,
+    loggers={
+        # 新曾自定义日志,用于数据采集程序
+        "general": {
+            "level": "INFO",
+            "handlers": ["console", "general"],
+            "propagate": True,
+            "qualname": "general.debug",
+        }
+    },
+    handlers={
+        # 数据采集程序控制台输出handler
+        "console": {
+            "class": "logging.StreamHandler",
+            "formatter": "generic",
+            "stream": sys.stdout,
+        },
+        "general": {
+            'class': 'logging.handlers.RotatingFileHandler',
+            'filename': 'log/general/general.log',
+            'maxBytes': 10 * 1024 * 1024,
+            'delay': True,
+            "formatter": "generic",
+            "backupCount": 20,
+            "encoding": "utf-8"
+        }
+    },
+    formatters={
+        # 自定义文件格式化器
+        "generic": {
+            "format": "%(asctime)s {%(process)d(%(thread)d)} [%(filename)s:%(lineno)d] [%(levelname)s] %(message)s",
+            "datefmt": "[%Y-%m-%d %H:%M:%S]",
+            "class": "logging.Formatter",
+        },
+    },
+)
+general = logging.getLogger("general")

+ 20 - 0
historical_data_storage/main.py

@@ -0,0 +1,20 @@
+from historical_data_storage import HistoricalDataStorage
+from logging_config import LOGGING_CONFIG
+import logging.config
+import os
+
+logging.config.dictConfig(LOGGING_CONFIG)
+handlers = LOGGING_CONFIG['handlers']
+for handler in handlers:
+    item = handlers[handler]
+    if 'filename' in item:
+        filename = item['filename']
+        dirname = os.path.dirname(filename)
+        if not os.path.exists(dirname):
+            os.makedirs(dirname)
+# --------------------------
+
+
+if __name__ == '__main__':
+    historicalDataStorage = HistoricalDataStorage()
+    historicalDataStorage.run()

+ 46 - 0
historical_data_storage/memory_storage.py

@@ -0,0 +1,46 @@
+import redis
+
+
+class MemoryStorage:
+    def __init__(self, config):
+        self.ip = config['ip']
+        self.port = config['port']
+        self.conn = None
+        self.connected()
+
+    def connected(self):
+        self.conn = redis.StrictRedis(host=self.ip, port=self.port, db=0, decode_responses=True)
+
+    def set_value(self, data_dict):
+        try:
+            pipe = self.conn.pipeline(transaction=True)
+            for key_name in data_dict.keys():
+                pipe.set(key_name, data_dict[key_name], ex=30*60)  # redis过期时间
+            pipe.execute()
+        except Exception as e:
+            print(e)
+            return e
+        else:
+            return True
+
+    def get_value(self, keys):
+        data_dict = {}
+        try:
+            pipe = self.conn.pipeline(transaction=True)
+            for index in range(len(keys)):
+                pipe.get(keys[index])
+            result = pipe.execute()
+            for index in range(len(keys)):
+                data_dict[keys[index]] = result[index]
+            return data_dict
+        except Exception as e:
+            return e
+
+    def is_connected(self):
+        if self.conn:
+            return True
+        else:
+            return False
+
+    def re_connected(self):
+        self.conn = redis.StrictRedis(host=self.ip, port=self.port, db=0, decode_responses=True)