# -*- coding: UTF-8 -*- ''' @Project :jh5_sonar @File :everyday_calculate.py @Author :Fang @Date :2024/6/25 ''' import copy import numpy as np from hard_disk_storage import HardDiskStorage from datetime import datetime, timedelta from filterpy.kalman import KalmanFilter import numpy as np db = 'dbjh' harddisk_db = HardDiskStorage() db_name = 'package_data' def Kalman_Filter(data,new_data): kf = KalmanFilter(dim_x=2, dim_z=1) kf.x = np.array([[data[-1]['assess_total_number']], [0]]) # 初始状态 (位置, 速度) kf.F = np.array([[1., 1.], [0., 1.]]) # 状态转移矩阵 kf.H = np.array([[1., 0.]]) # 观测矩阵 kf.P *= 1000. # 初始协方差矩阵 kf.R = 7000 # 观测噪声协方差 kf.Q = np.array([[1., 0.], [0., 1.]]) kf.predict() kf.update(new_data) filtered_value = kf.x[0, 0] return filtered_value def caculate_data(res1, flag): original_data = int(np.mean([i['fish_count'] for i in res1])) # 预先输入数据 x = np.array([2, 5.85, 11.7 , 13.6,]) y = np.array([0, 12.6, 38.3 , 42.6, ]) # 多项式拟合 z1 = np.polyfit(x, y, 2) p1 = np.poly1d(z1) data = [p1(i['fish_count'] / 1000) * 10000 for i in res1] mean_data = int(np.mean(data)) return mean_data , original_data def detect_outliers(res,current_date,mean_data): data_list = [i['map_data'] for i in res] datetime_list = [i['datetime'] for i in res] data_list.append(mean_data) datetime_list.append(current_date) median_value = np.mean(data_list) std_dev = np.std(data_list) threshold_max = median_value + 2 * std_dev threshold_min = median_value - 2 * std_dev if threshold_min < 0: threshold_min = 0 filtered_data = [] filtered_datetime = [] for temp_data,temp_time in zip(data_list,datetime_list): if temp_data >= threshold_min and temp_data <= threshold_max: filtered_data.append(temp_data) filtered_datetime.append(temp_time) if current_date in filtered_datetime: return False else: return True if __name__ == '__main__': # start_date_str = "2024-09-4" # end_date_str = "2024-09-4" # # start_date = datetime.strptime(start_date_str, '%Y-%m-%d') # end_date = datetime.strptime(end_date_str, '%Y-%m-%d') # # date_list = [] # current_date = start_date # while current_date <= end_date: # date_list.append(current_date) # current_date += timedelta(days=1) # #打印每一天的 datetime 对象 # for current_date in date_list: current_date = datetime.now() yesterday = current_date - timedelta(days=1) otherStyleTime = yesterday.strftime("%Y-%m-%d") flag = otherStyleTime # 构建SQL查询语句 sql3 = f"select id, datetime, fish_count from {db_name} where is_processed = 1 and is_abnormal = 0 and datetime like '%{flag}%';" res1 = harddisk_db.execute_sql(sql3, None) if len(res1) < 50: print("No data",otherStyleTime) exit() # sql3 = f"select original_data,map_data,first_filter,assess_total_number from jh5_dailyreport WHERE is_visible=1 order by datetime desc limit 1;;" # old_count = harddisk_db.execute_sql(sql3, None)[0] # # sql = 'INSERT INTO jh5_dailyreport (datetime ,original_data,map_data,first_filter,assess_total_number ,outliers_flag,is_visible,is_send) VALUES (%s , %s,%s , %s, %s , %s, %s, %s);' # var = (current_date, old_count['original_data'], old_count['map_data'], old_count['first_filter'], old_count['assess_total_number'], 0, 0, 1) # res1 = harddisk_db.execute_sql(sql, var) # continue # 使用 sorted 函数并指定 key 以及 reverse 参数 sorted_data = sorted(res1, key=lambda x: x['fish_count'], reverse=True) # 计算前 70% 的大小 top_70_percent_length = int(len(sorted_data) * 0.7) top_10_percent_length = int(len(sorted_data) * 0.1) # 获取前 70% 的数据 top_70_percent_data = sorted_data[top_10_percent_length:top_70_percent_length] sorted_data = sorted(top_70_percent_data, key=lambda x: x['datetime']) mean_data,original_data = caculate_data(sorted_data,flag) print('全部时间前70%平均数量:', mean_data,original_data) old_day = (current_date - timedelta(days=5)).strftime('%Y-%m-%d') old_day = datetime.strptime(old_day, "%Y-%m-%d") sql3 = f"select datetime, map_data from jh5_dailyreport;" data = harddisk_db.execute_sql(sql3,None) otherStyleTime += ' 23:20:00' dt = datetime.strptime(otherStyleTime, "%Y-%m-%d %H:%M:%S") outliers_flag = detect_outliers(data,dt,mean_data) print(outliers_flag) sql3 = f"select datetime, assess_total_number from jh5_dailyreport WHERE datetime between '{old_day}' and '{current_date}';" data = harddisk_db.execute_sql(sql3, None) if outliers_flag == True: filter_list = [i['assess_total_number'] for i in data] filter_data = filter_list[-1] sql = 'INSERT INTO jh5_dailyreport (datetime ,original_data,map_data,first_filter,assess_total_number,outliers_flag ) VALUES (%s , %s, %s ,%s , %s, %s) ;' var = (dt, original_data, mean_data, filter_data, filter_data,1) res1 = harddisk_db.execute_sql(sql, var) else: if len(data) < 4: sql3 = f"select datetime,assess_total_number from jh5_dailyreport where is_visible=1 ORDER BY datetime DESC LIMIT 5;" data = harddisk_db.execute_sql(sql3, None) new_filter_data = Kalman_Filter(data, mean_data) sql = 'INSERT INTO jh5_dailyreport (datetime ,original_data,map_data,first_filter,assess_total_number ,outliers_flag,is_visible,is_send) VALUES (%s , %s,%s , %s, %s , %s, %s, %s);' var = (dt,original_data,mean_data,mean_data,new_filter_data, 0,0,1) res1 = harddisk_db.execute_sql(sql, var) print(res1) exit() else: filter_list = [i['assess_total_number'] for i in data ] filter_list.append(mean_data) filter_data = int(np.mean(filter_list)) new_filter_data = Kalman_Filter(data,filter_data) sql = 'INSERT INTO jh5_dailyreport (datetime ,original_data,map_data,first_filter,assess_total_number,outliers_flag,is_visible,is_send ) VALUES (%s , %s ,%s , %s , %s, %s, %s, %s);' var = (dt,original_data, mean_data, filter_data,new_filter_data, 0,0,1) res1 = harddisk_db.execute_sql(sql, var) print(res1)