historical_data_storage.py 3.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. import json
  2. import threading
  3. from event_storage import EventStorage
  4. import time
  5. from logging_config import general as logger
  6. class HistoricalDataStorage(threading.Thread):
  7. def __init__(self):
  8. super(HistoricalDataStorage, self).__init__()
  9. self._storage = EventStorage()
  10. # 历史存储主函数
  11. def run(self):
  12. logger.info('Historical data storage module is running!')
  13. station_info = self._storage.hardDiskStorage.get_connectors() # 获取所有站点信息
  14. all_devices = []
  15. for item in station_info:
  16. station_name = item['station_name'] # 站点名称
  17. connector_config = json.loads(item['connector_config']) # 加载json格式connector_config参数
  18. save_frequency = connector_config['save_frequency'] # 获取存储频率
  19. devices_each_station = self._storage.hardDiskStorage.get_device_name_by_station_name(station_name) # 根据站点名称获取设备列表
  20. for i in devices_each_station:
  21. temp_dict = {}
  22. # 获取每个设备所有点的serial_number,转换为键列表
  23. device_name = i['device_name']
  24. data_point_each_decive = self._storage.hardDiskStorage.get_data_point_by_device_name(device_name) # 根据设备名称获取设备点表
  25. serial_number_list = []
  26. for item in data_point_each_decive:
  27. serial_number = 'c' + str(item['serial_number'])
  28. serial_number_list.append(serial_number)
  29. temp_dict['device_name'] = device_name
  30. temp_dict['save_frequency'] = save_frequency
  31. temp_dict['serial_number_list'] = serial_number_list
  32. temp_dict['last_save_time'] = 0
  33. all_devices.append(temp_dict)
  34. while 1:
  35. time.sleep(0.2)
  36. for item in all_devices:
  37. save_frequency = item['save_frequency']
  38. last_save_time = item['last_save_time']
  39. now_time = time.time()
  40. if now_time - last_save_time >= save_frequency:
  41. item['last_save_time'] = now_time
  42. save_time = int(now_time) - int(now_time) % save_frequency
  43. save_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(save_time))
  44. serial_number_list = item['serial_number_list']
  45. real_time_data = self._storage.memoryStorage.get_value(serial_number_list) # 根据键列表查询实时数据库
  46. # print(real_time_data)
  47. flag = False # 是否允许存储标志位
  48. for key in real_time_data:
  49. # 值全部为空,不允许存储
  50. if real_time_data[key]:
  51. flag = True
  52. break
  53. if flag:
  54. for key in real_time_data: # redis值为None的
  55. if real_time_data[key] is None: # redis数据库未存储此值
  56. real_time_data[key] = 'null'
  57. if real_time_data[key] == '': # redis存储的为空值
  58. real_time_data[key] = 'null'
  59. table_name = "table_" + str(item['device_name']) # 根据站名计算表名
  60. logger.debug(f"{table_name} <- {real_time_data}")
  61. self._storage.hardDiskStorage.insert_column_many(table_name, save_time, real_time_data)
  62. if __name__ == '__main__':
  63. historicalDataStorage = HistoricalDataStorage()
  64. historicalDataStorage.run()