7 Commits e6fb522868 ... fd99d938c3

Author SHA1 Message Date
  liuxingchen fd99d938c3 数据上传机制优化 1 year ago
  liuxingchen 048f32c33c 脚本统一 1 year ago
  liuxingchen e0019d2d36 生物识别数据查询优化 1 year ago
  liuxingchen b6a9150ba3 脚本统一 1 year ago
  liuxingchen 604068e776 脚本统一 1 year ago
  liuxingchen 079bfbfbdf 脚本统一 1 year ago
  liuxingchen 4365d87508 脚本统一 1 year ago

+ 30 - 28
001_h511/imageCapture/CPpost.py

@@ -1,4 +1,6 @@
-# This file create two thread, t1 capturing rstp video and save frame every interval, t2 read frame from database and post to https server.
+# This file create two thread, t1 capturing rstp video and save frame every interval, t2 read frame from database and
+# post to https server.
+# 摄像机图片上传
 import configparser
 import time
 import pymysql
@@ -10,36 +12,37 @@ import os
 
 
 def post(cameraList):
-    try:
         headers = {
             "Authorization": token,
             "app-id": appId}
         timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
         for x in range(0, len(cameraList)):
-            picture_name = '%s.jpg' % cameraList[x][0]
-            last_change_time = os.path.getmtime(picture_name)   # 最后一次修改时间
-            time_diff = (time.time()-last_change_time)/60       # 计算时间差(分钟)
-            if time_diff <= 60:
-                f = open(picture_name, 'rb')
-                img = f.read()
-                img = pymysql.Binary(img)
-                m = MultipartEncoder({'file': (picture_name, img, 'image/jpeg'),
-                                      "timestamp": str(timestamp),
-                                      "token": token,
-                                      "isImage": "1",
-                                      "cameraId": cameraList[x][4]})
-                headers["Content-Type"] = m.content_type
-                requests.urllib3.disable_warnings()
-                ret = requests.post('https://management.super-sight.com.cn:8000/system/api/v1/camera/add', headers=headers,
-                                    data=m, verify=False)
-                if ret.status_code == 200:
-                    print(timestamp, "post SUCCESS:", ret.text, ret.status_code)
+            try:
+                picture_name = '%s.jpg' % cameraList[x][0]
+                last_change_time = os.path.getmtime(picture_name)  # 最后一次修改时间
+                time_diff = (time.time() - last_change_time) / 60  # 计算时间差(分钟)
+                if time_diff <= 60:
+                    f = open(picture_name, 'rb')
+                    img = f.read()
+                    img = pymysql.Binary(img)
+                    m = MultipartEncoder({'file': (picture_name, img, 'image/jpeg'),
+                                          "timestamp": str(timestamp),
+                                          "token": token,
+                                          "isImage": "1",
+                                          "cameraId": cameraList[x][4]})
+                    headers["Content-Type"] = m.content_type
+                    requests.urllib3.disable_warnings()
+                    ret = requests.post('https://management.super-sight.com.cn:8000/system/api/v1/camera/add',
+                                        headers=headers,
+                                        data=m, verify=False)
+                    if ret.status_code == 200:
+                        print(timestamp, "post SUCCESS:", ret.text, ret.status_code, picture_name)
+                    else:
+                        print(timestamp, "post ERROR:", ret.text, ret.status_code, picture_name)
                 else:
-                    print(timestamp, "post ERROR:", ret.text, ret.status_code)
-            else:
-                print(f"{timestamp}-[{picture_name}]: {int(time_diff/60)}小时未更新")
-    except Exception as e:
-        print(timestamp, "post ERROE", e)
+                    print(f"{timestamp}-[{picture_name}]: {int(time_diff / 60)}小时未更新")
+            except Exception as e:
+                print(timestamp, "post ERROE", e)
 
 
 def post_extream(cameraList, startDate, endDate):
@@ -65,7 +68,6 @@ def post_extream(cameraList, startDate, endDate):
             print(ret.text, ret.status_code)
 
 
-
 def posterize_strategy():
     lastTime = 0
     lastTime1 = 0
@@ -121,7 +123,7 @@ def posterize_strategy():
                         print('---> post')
                         post(camera_list)
                         print('post down----')
-                        lastTime = newTime                       
+                        lastTime = newTime
                 elif posterizeUnit == 'S':
                     _posterizeTime = posterizeTime
                     if lastTime + _posterizeTime <= newTime:
@@ -172,7 +174,7 @@ def is_read():
 
 
 if __name__ == '__main__':
-    #time.sleep(10)
+    # time.sleep(10)
     _logger = LogOut.Log('CPpost')
 
     # 创建读取配置文件对象

+ 1 - 2
001_h511/imageCapture/dataBase_api.py

@@ -41,7 +41,7 @@ class Mysql:
                     host='127.0.0.1',
                     port = 3306,
                     user='root',
-                    passwd='zzZZ4144670..',
+                    passwd='',
                     db ='centralized_control_db',
                     )
             '''          
@@ -367,4 +367,3 @@ def redisSave(dataList,conn):
             return e               
         else:
             return True
-        

+ 64 - 58
001_h511/imageCapture/dataPackUp.py

@@ -82,10 +82,9 @@ def getMqttDataFromMysqlHistory(data_list, code_list):
             code, point = getCodeAndPoint(code_list[i]['mqttCode'])
             dataDist[point] = str(data_list[index]["c" + str(code_list[i]['serialNumber'])])
             if code_list[i]['lowLimit'] != None and code_list[i]['upLimit'] != None:
-                if dataDist[point] != 'None':
-                    if dataDist[point] is None or float(dataDist[point]) < code_list[i]['lowLimit'] or float(dataDist[point]) > code_list[i]['upLimit']:
-                        # dataDist['DQ_DI'] = '1'     # 功能优化
-                        dataDist.pop(point)     # 功能优化,过滤坏数据
+                if dataDist[point] != 'None' or float(dataDist[point]) < code_list[i]['lowLimit'] or float(dataDist[point]) > code_list[i]['upLimit']:
+                    # dataDist['DQ_DI'] = '1'     # 功能优化
+                    dataDist.pop(point)     # 功能优化,过滤坏数据
 
             if code_list[i]['storageType'] == "datetime":
 
@@ -159,23 +158,35 @@ def post(zip_file):
     headers = {
         "Authorization": token,
         "Content-Type": "application/json"}
-    data = {"platformId": PLATFORM_ID,
-            "fileNames": zip_file
-            }
-    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
-    # print(zip_file)
-    # print('timestamp: ',datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
-    try:
-        ret = requests.post('https://management.super-sight.com.cn/device/api/v2/device/history', headers=headers,json=data, verify=False)
-    except Exception as e:
-        print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())+"[ERROR] {0}\n".format(e))
-
-    if ret.status_code == 200:
-        print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), ret.text, ret.status_code, 'len(zip_file)=', len(zip_file))
-        return "SUCCESS"
-    else:
-        print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), ret.text, ret.status_code)
-        return "FAIL"
+
+    result = "SUCCESS"
+    step_size = 1000
+    for i in range(0, len(zip_file), step_size):
+        if len(zip_file) > i + step_size:
+            group_zip_file = zip_file[i:i+step_size]
+        else:
+            group_zip_file = zip_file[i:]
+
+        data = {"platformId": PLATFORM_ID,
+                "fileNames": group_zip_file
+                }
+
+        # print(data)
+        # print(len(group_zip_file))
+        # print('timestamp: ',datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
+        try:
+            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
+            ret = requests.post('https://management.super-sight.com.cn/device/api/v2/device/history', headers=headers, json=data, verify=False)
+
+            if ret.status_code == 200:
+                print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), ret.text, ret.status_code, 'len(group_zip_file)=', len(group_zip_file))
+            else:
+                print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), ret.text, ret.status_code, 'len(group_zip_file)=', len(group_zip_file))
+                result = "FAIL"
+        except Exception as e:
+            print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), "e=", e)
+
+    return result
 
 
 if __name__ == "__main__":
@@ -213,7 +224,7 @@ if __name__ == "__main__":
     if not os.path.exists(txt_path):
         os.mkdir(txt_path)
     while True:
-        # time.sleep(1)
+        time.sleep(1)
         if post_time < time.time() - 3600:
             zip_list = []
             flie_name = os.listdir("HDU/")
@@ -231,45 +242,40 @@ if __name__ == "__main__":
             while i < max(int(200 / count), 1):
                 for device_name in list_devices:
                     code_devices = mysql_object.get_mqtt_devices_from_name(device_name)
-                    # 鏋勯?犺〃鍚嶇О锛屽table_fg
+                    # 构造表名称,如table_fg
                     table_name = 'table_' + device_name
-                    # 鑾峰彇鏂偣鏃堕棿
+                    # 获取断点时间
                     break_point = mysql_object.get_breakpoint_last_time_datetime(table_name)
                     if break_point != None:
-                        # 构造表名称,如table_fg
-                        table_name = 'table_' + device_name
-                        # 获取断点时间
-                        break_point = mysql_object.get_breakpoint_last_time_datetime(table_name)
-                        if break_point != None:
-                            # 找到断点时间前后一个小时的时间点
-                            timeArray = time.strptime(str(break_point['times']), "%Y-%m-%d %H:%M:%S")
-                            begin = time.strftime("%Y-%m-%d %H:00:00", timeArray)
-                            end = time.strftime("%Y-%m-%d %H:59:59", timeArray)
-                            begin_timeArray = time.strptime(str(begin), "%Y-%m-%d %H:%M:%S")
-                            end_timeArray = time.strptime(str(end), "%Y-%m-%d %H:%M:%S")
-                            begin_timestamp = int(time.mktime(begin_timeArray))
-                            end_timestamp = int(time.mktime(end_timeArray))
-                            # 获取断点前后一个小时的数据
-                            data_list = mysql_object.get_hour_data_datetime(table_name, begin, end)
-                            history_data_list = data_alignment(data_list, begin_timestamp, end_timestamp, 2)
-                            if len(code_devices) >= 1:
-                                for index in range(len(code_devices)):
-                                    # 获取设备码表
-                                    code_list = mysql_object.get_mqtt_point(code_devices[index])
-                                    param, id = getMqttDataFromMysqlHistory(history_data_list, code_list)
-                                    txt_name = get_txt_name(begin_timestamp, param[0]["code"])
-                                    # 将txt_path目录下所有设备txt组包
-                                    if len(os.listdir(txt_path)) >= 20:
-                                        zip_name = make_zip(txt_path)
-                                        time.sleep(5)
-                                    with open(os.path.join(txt_path, txt_name), 'w') as f:
-                                        txt_content = [str(line) + '\n' for line in param]
-                                        f.writelines(txt_content)
-                                    time.sleep(1)
-                                    # print(zip_name)
-                                    if index == len(code_devices) - 1:
-                                        # pass
-                                        mysql_object.set_many_send_status(table_name, data_list)
+                        # 找到断点时间前后一个小时的时间点
+                        timeArray = time.strptime(str(break_point['times']), "%Y-%m-%d %H:%M:%S")
+                        begin = time.strftime("%Y-%m-%d %H:00:00", timeArray)
+                        end = time.strftime("%Y-%m-%d %H:59:59", timeArray)
+                        begin_timeArray = time.strptime(str(begin), "%Y-%m-%d %H:%M:%S")
+                        end_timeArray = time.strptime(str(end), "%Y-%m-%d %H:%M:%S")
+                        begin_timestamp = int(time.mktime(begin_timeArray))
+                        end_timestamp = int(time.mktime(end_timeArray))
+                        # 获取断点前后一个小时的数据
+                        data_list = mysql_object.get_hour_data_datetime(table_name, begin, end)
+                        history_data_list = data_alignment(data_list, begin_timestamp, end_timestamp, 2)
+                        if len(code_devices) >= 1:
+                            for index in range(len(code_devices)):
+                                # 获取设备码表
+                                code_list = mysql_object.get_mqtt_point(code_devices[index])
+                                param, id = getMqttDataFromMysqlHistory(history_data_list, code_list)
+                                txt_name = get_txt_name(begin_timestamp, param[0]["code"])
+                                # 将txt_path目录下所有设备txt组包
+                                if len(os.listdir(txt_path)) >= 20:
+                                    zip_name = make_zip(txt_path)
+                                    time.sleep(5)
+                                with open(os.path.join(txt_path, txt_name), 'w') as f:
+                                    txt_content = [str(line) + '\n' for line in param]
+                                    f.writelines(txt_content)
+                                time.sleep(1)
+                                # print(zip_name)
+                                if index == len(code_devices) - 1:
+                                    # pass
+                                    mysql_object.set_many_send_status(table_name, data_list)
                 i += 1
             pack_time = time.time()
             if len(os.listdir(txt_path)) > 0:

+ 8 - 4
001_h511/imageCapture/iotServerMQTT.py

@@ -50,7 +50,7 @@ def getMqttDataFromMysql(list_devices, appId, token):
             date_time = str(data_from_mysql['times'])
             date_time = dateAndTimeToTimestamp(date_time)
             STS_DI = getDeviceConnectionStatus(date_time)
-            # print("STS_DI = ", STS_DI)
+            print("STS_DI = ", STS_DI)
         else:
             STS_DI = 1
         dataDist = {}
@@ -67,7 +67,7 @@ def getMqttDataFromMysql(list_devices, appId, token):
                 columnName = "c" + str(list_points[i]['serialNumber'])
                 dataDist[point] = str(data_from_mysql[columnName])
                 if list_points[i]['lowLimit'] != None and list_points[i]['upLimit'] != None:
-                    if dataDist[point] is None or float(dataDist[point]) < list_points[i]['lowLimit'] or float(dataDist[point]) > list_points[i]['upLimit']:
+                    if dataDist[point] == "None" or float(dataDist[point]) < list_points[i]['lowLimit'] or float(dataDist[point]) > list_points[i]['upLimit']:
                         # dataDist['DQ_DI'] = '1'     # 功能优化
                         dataDist.pop(point)     # 功能优化,过滤坏数据
                 if list_points[i]['storageType'] == "datetime":
@@ -214,7 +214,10 @@ if __name__ == "__main__":
     client.on_message = on_message
     client.on_subscribe = on_subscribe
     client.on_disconnect = on_disconnect
-    client.connect(HOST, PORT, 10)
+    try:
+        client.connect(HOST, PORT, 10)
+    except Exception as e:
+        print('client.connect error:', e)
     client.loop_start()
     while True:
         time.sleep(0.1)
@@ -222,9 +225,10 @@ if __name__ == "__main__":
             try:
                 post_time = time.time()
                 param, dataInfo = getMqttDataFromMysql(list_devices, appId, token)
-                # print(param)
+                # print("param = ", param)
                 aes128string = aes_ecb_encrypt(get_sha1prng_key(key), param)
                 is_send, mid = client.publish(topic, payload=aes128string, qos=1)
+                print("is_send = ", is_send)
                 if is_send == 0:
                     setSendStatusIsSucceed(dataInfo)
                 print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), 'is_send=', is_send)

+ 11 - 10
001_h511/imageCapture/mysqlDataBase.py

@@ -465,7 +465,7 @@ class MysqldbOperational(object):
             return False
 
     def get_breakpoint_last_time_datetime(self, tableName):
-        sql = "SELECT times FROM %s WHERE is_send = 0 AND times < date_sub(now(), interval 1 hour) ORDER BY id DESC LIMIT 1;" % (
+        sql = "SELECT times FROM `%s` WHERE is_send = 0 AND times < date_sub(now(), interval 1 hour) ORDER BY id DESC LIMIT 1;" % (
             tableName)
         try:
             self._reConn()
@@ -485,7 +485,7 @@ class MysqldbOperational(object):
             return None
 
     def get_breakpoint_last_time_date_and_time(self, tableName):
-        sql = "SELECT Date,Time FROM %s WHERE is_send = 0 ORDER BY id LIMIT 1;" % (tableName)
+        sql = "SELECT Date,Time FROM `%s` WHERE is_send = 0 ORDER BY id LIMIT 1;" % (tableName)
         dict = {}
         try:
             self._reConn()
@@ -505,7 +505,7 @@ class MysqldbOperational(object):
             return None
 
     def get_hour_data_datetime(self, tableName, begin, end):
-        sql = "SELECT * FROM %s WHERE times >= '%s' And times <= '%s' ORDER BY id ASC;" % (tableName, begin, end)
+        sql = "SELECT * FROM `%s` WHERE times >= '%s' And times <= '%s' ORDER BY id ASC;" % (tableName, begin, end)
         try:
             self._reConn()
             self.cursor = self.con.cursor(cursor=pymysql.cursors.DictCursor)
@@ -523,7 +523,7 @@ class MysqldbOperational(object):
             return None
 
     def get_hour_data_date_and_time(self, tableName, begin, end):
-        sql = "SELECT * FROM %s WHERE CONCAT(Date,' ', Time) >= '%s' And CONCAT(Date,' ', Time) <= '%s' ORDER BY id ASC;" % (
+        sql = "SELECT * FROM `%s` WHERE CONCAT(Date,' ', Time) >= '%s' And CONCAT(Date,' ', Time) <= '%s' ORDER BY id ASC;" % (
             tableName, begin, end)
         try:
             self._reConn()
@@ -545,7 +545,7 @@ class MysqldbOperational(object):
         if len(id_list) >= 1:
             id = id_list[0]['id']
             value = 1
-            sql = "UPDATE %s SET is_send = %s WHERE id = %s" % (tableName, value, id)
+            sql = "UPDATE `%s` SET is_send = %s WHERE id = %s" % (tableName, value, id)
             for index in range(len(id_list)):
                 sql = sql + " OR id = %s" % (id_list[index]['id'])
 
@@ -581,7 +581,7 @@ class MysqldbOperational(object):
             return False
 
     def get_newest_data(self, tableName):
-        sql = "SELECT * FROM %s ORDER BY id DESC LIMIT 1;" % (tableName)
+        sql = "SELECT * FROM `%s` ORDER BY id DESC LIMIT 1;" % (tableName)
         results = []
         try:
             self._reConn()
@@ -604,7 +604,7 @@ class MysqldbOperational(object):
             return None
 
     def set_send_status(self, tableName, id, value):
-        sql = "UPDATE %s SET is_send = %s WHERE id = %s;" % (tableName, value, id)
+        sql = "UPDATE `%s` SET is_send = %s WHERE id = %s;" % (tableName, value, id)
         try:
             self._reConn()
             self.cursor = self.con.cursor()
@@ -626,7 +626,7 @@ class MysqldbOperational(object):
         """
 
         yes_time = datetime.now().strftime("%Y-%m-%d")
-        sql1 = "UPDATE binocular_dailyreport SET is_send = 1 WHERE size_average IS NULL;"
+        sql1 = "UPDATE binocular_dailyreport SET is_send = 1 WHERE size_average IS NULL AND datetime<\'%s\';" % (yes_time)
         sql2 = "SELECT datetime,fish_species,size_average,size_median,weight_average,weight_median FROM binocular_dailyreport WHERE datetime<\'%s\' AND is_send=0 AND size_average IS NOT NULL;" % (yes_time)
 
         try:
@@ -635,7 +635,7 @@ class MysqldbOperational(object):
             self.cursor.execute(sql1)
             self.con.commit()
             self.cursor.execute(sql2)
-            results = self.cursor.fetchall()         
+            results = self.cursor.fetchall()
             self.cursor.close()
             return results
 
@@ -673,7 +673,8 @@ class MysqldbOperational(object):
               "CAST(AVG(assess_top_number) AS signed) AS assess_top_number," \
               "CAST(AVG(assess_middle_number) AS signed) AS assess_middle_number," \
               "CAST(AVG(assess_bottom_number) AS signed) AS assess_bottom_number " \
-              "FROM fish_distribution_data WHERE datetime<\'%s\' AND is_send=0 GROUP BY date_format(datetime,'%%Y-%%m-%%d');" % (td_time)
+              "FROM fish_distribution_data WHERE datetime<\'%s\' AND is_send=0 " \
+              "GROUP BY date_format(datetime,'%%Y-%%m-%%d');" % (td_time)
         try:
             self._reConn()
             self.cursor = self.con.cursor(cursor=pymysql.cursors.DictCursor)

+ 3 - 0
001_h511/imageCapture/reset_issend_to_zero.py

@@ -13,6 +13,9 @@ from mysqlDataBase import MysqldbOperational
 
 if __name__ == "__main__":
     _logger = LogOut.Log('reset_issend_to_zero')
+    # 创建读取配置文件对象
+    config = configparser.ConfigParser()
+    config.read("config.ini", encoding="utf-8")
     # 获取通用配置项
     # section = "General"  # 读取的部section标签
     mysql_host = "localhost"

+ 15 - 11
001_h511/imageCapture/upload_biology_monitor_data.py

@@ -12,7 +12,6 @@ import LogOut
 import configparser
 from mysqlDataBase import MysqldbOperational
 import requests
-import json
 
 
 def post_data(all_data):
@@ -46,7 +45,8 @@ def post_data(all_data):
         ret = requests.post(url=url, headers=headers, json=data, verify=False)
         if ret.status_code == 200:
             print(now, ret.text, ret.status_code)
-            success_date.append(each)
+            if ret.text == "{\"code\":200,\"msg\":\"海上平台监测数据同步成功\"}":
+                success_date.append(each)
         else:
             print(now, ret.text, ret.status_code)
     if len(success_date) > 0:
@@ -72,7 +72,8 @@ def deal_data():
     if len(sonar_data_avg) > 0:
         for each in sonar_data_avg:
             date = each["date"]
-            each_data = {"upperBiomass": each["assess_top_number"], "middleBiomass": each["assess_middle_number"],
+            each_data = {"upperBiomass": each["assess_top_number"],
+                         "middleBiomass": each["assess_middle_number"],
                          "lowerBiomass": each["assess_bottom_number"]}
             if date in data_all.keys():
                 tmp = data_all[date]
@@ -84,10 +85,13 @@ def deal_data():
     print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())), 'binocular_datas = ', binocular_datas)
     if len(binocular_datas) > 0:
         for each in binocular_datas:
-            breed_info = {"breedName": each["fish_species"], "lengthAverage": each["size_average"],
-                          "lengthMedian": each["size_median"], "weightAverage": each["weight_average"],
-                          "weightMedian": each["weight_median"]}
+            breed_info = {}
+            breed_info["breedName"] = each["fish_species"]
             # breed_info["breedBiomass"] =
+            breed_info["lengthAverage"] = each["size_average"]
+            breed_info["lengthMedian"] = each["size_median"]
+            breed_info["weightAverage"] = each["weight_average"]
+            breed_info["weightMedian"] = each["weight_median"]
 
             date = str(each["datetime"])
             if date in data_all.keys():
@@ -122,12 +126,9 @@ if __name__ == '__main__':
     time.sleep(2)
     _logger = LogOut.Log('upload_biology_monitor_data')
 
-    run_time = 0
-    file_time = 0
     while True:
-        now_time = time.time()
-        if now_time - run_time > 24 * 60 * 60:
-            run_time = now_time
+        now_time = time.strftime("%H:%M", time.localtime(time.time()))
+        if now_time == "01:00":
 
             # 创建读取配置文件对象
             config = configparser.ConfigParser()
@@ -150,6 +151,7 @@ if __name__ == '__main__':
             lower_range = config.get(SECTION, "lowerRange")  # 下层范围
             coefficient = config.get(SECTION, "coefficient")  # 系数:来计算各种鱼的数量
 
+
             binocularSql = MysqldbOperational(host=mysql_host,
                                               username=mysql_username,
                                               password=mysql_password,
@@ -164,3 +166,5 @@ if __name__ == '__main__':
                                           database=sonar_database,
                                           logger=_logger)
             deal_data()
+            time.sleep(50)
+        time.sleep(10)