# -*- coding: UTF-8 -*- ''' @Project :jh5sonar_newest @File :zhijian.py @Author :Fang @Date :2024/7/26 ''' from hard_disk_storage import HardDiskStorage import pandas as pd hard_disk_db = HardDiskStorage() sql = "SELECT image_name, datetime FROM sonar_data ORDER BY datetime ASC;" var = () res = hard_disk_db.execute_sql(sql, var) # 转换为 DataFrame df = pd.DataFrame(res) # 提取时间戳部分 df['timestamp'] = df['image_name'].apply(lambda x: x.split('_')[0]) # 统计每个时间戳的数据条数 timestamp_counts = df.groupby('timestamp').size().reset_index(name='count') # 转换 datetime 字段为 datetime 类型 df['datetime'] = pd.to_datetime(df['datetime']) # 计算每组的最后一个 datetime 和下一组的第一个 datetime 的时间差 time_differences = [] timestamps = df['timestamp'].unique() for i in range(len(timestamps)): if i == 0: group_1_num = timestamp_counts[timestamp_counts['timestamp'] == timestamps[i]]['count'].values[0] group_1_first = df[df['timestamp'] == timestamps[i]]['datetime'].min() group_1_last = df[df['timestamp'] == timestamps[i]]['datetime'].max() local_time_diff = group_1_last - group_1_first local_total_seconds = local_time_diff.total_seconds() local_minutes = int(local_total_seconds // 60) local_seconds = int(local_total_seconds % 60) time_error_flag = '' num_error_flag = '' local_time_error_flag = '' minutes = 0 seconds = 0 time_differences.append({ '时间戳': timestamps[i], '图片张数': group_1_num, '第一组': group_1_first.strftime('%d日 %H:%M:%S'), '最后一组': group_1_last.strftime('%d日 %H:%M:%S'), '花费时间': f'{local_minutes} 分 {local_seconds} 秒', '与上一组间隔时间': f'{minutes} 分 {seconds} 秒', '间隔时间异常': time_error_flag, '采集数量异常': num_error_flag, '采集时间异常': local_time_error_flag, }) continue time_error_flag = '' num_error_flag = '' local_time_error_flag = '' group_1_last = df[df['timestamp'] == timestamps[i]]['datetime'].max() group_1_first = df[df['timestamp'] == timestamps[i]]['datetime'].min() local_time_diff = group_1_last - group_1_first local_total_seconds = local_time_diff.total_seconds() local_minutes = int(local_total_seconds // 60) local_seconds = int(local_total_seconds % 60) group_2_last = df[df['timestamp'] == timestamps[i - 1]]['datetime'].max() time_diff = group_1_first - group_2_last # 转换时间间隔为秒 total_seconds = time_diff.total_seconds() minutes = int(total_seconds // 60) seconds = int(total_seconds % 60) group_1_num = timestamp_counts[timestamp_counts['timestamp'] == timestamps[i]]['count'].values[0] group_2_num = timestamp_counts[timestamp_counts['timestamp'] == timestamps[i - 1]]['count'].values[0] num_diff = abs(group_1_num - group_2_num) # 判断是否大于5分钟 if local_time_diff > pd.Timedelta(seconds=40) or local_time_diff < pd.Timedelta(seconds=20): local_time_error_flag = True if time_diff > pd.Timedelta(minutes=5): time_error_flag = True if num_diff > 10: num_error_flag = True time_differences.append({ '时间戳': timestamps[i], '图片张数': group_1_num, '第一组':group_1_first.strftime('%d日 %H:%M:%S'), '最后一组':group_1_last.strftime('%d日 %H:%M:%S'), '花费时间':f'{local_minutes} 分 {local_seconds} 秒', '与上一组间隔时间': f'{minutes} 分 {seconds} 秒', '间隔时间异常':time_error_flag, '采集数量异常':num_error_flag, '采集时间异常': local_time_error_flag, }) # 自定义居中对齐的输出格式,确保表头和数据列对齐 def print_centered(df): # 计算每列的最大宽度,考虑到列标题和数据的最大值 col_widths = [max(df[col].astype(str).map(len).max(), len(col)) for col in df.columns] # 打印表头,并且确保宽度和数据一致 header = " | ".join([f"{col:^{width}}" for col, width in zip(df.columns, col_widths)]) print(header) print("-" * len(header)) # 打印分隔符,长度与表头一致 # 打印每一行数据,保持列宽与表头一致 for _, row in df.iterrows(): print(" | ".join([f"{str(val):^{width}}" for val, width in zip(row, col_widths)])) # 转换为 DataFrame time_diff_df = pd.DataFrame(time_differences) # 显示时间间隔的结果 print("每两组时间戳的时间间隔(分和秒):") print_centered(time_diff_df) time_diff_df.to_excel('2024_08_21_001声呐.xlsx',index=False) print("目前测试总数:",len(timestamps) , '组')