historical_data_storage.py 3.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. import json
  2. import threading
  3. from event_storage import EventStorage
  4. import time
  5. from log import Log
  6. class HistoricalDataStorage:
  7. def __init__(self):
  8. self._storage = EventStorage()
  9. self._log = Log()
  10. # 历史存储主函数
  11. def run(self):
  12. self._log.info('[HistoricalDataStorage] - Historical data storage module is running!')
  13. station_info = self._storage.hardDiskStorage.get_connectors() # 获取所有站点信息
  14. all_devices = []
  15. for item in station_info:
  16. station_name = item['station_name'] # 站点名称
  17. connector_config = json.loads(item['connector_config']) # 加载json格式connector_config参数
  18. save_frequency = connector_config['save_frequency'] # 获取存储频率
  19. devices_each_station = self._storage.hardDiskStorage.get_device_name_by_station_name(
  20. station_name) # 根据站点名称获取设备列表
  21. for i in devices_each_station:
  22. temp_dict = {}
  23. # 获取每个设备所有点的serial_number,转换为键列表
  24. device_name = i['device_name']
  25. data_point_each_decive = self._storage.hardDiskStorage.get_data_point_by_device_name(
  26. device_name) # 根据设备名称获取设备点表
  27. serial_number_list = []
  28. for item in data_point_each_decive:
  29. serial_number = 'c' + str(item['serial_number'])
  30. serial_number_list.append(serial_number)
  31. temp_dict['device_name'] = device_name
  32. temp_dict['save_frequency'] = save_frequency
  33. temp_dict['serial_number_list'] = serial_number_list
  34. temp_dict['last_save_time'] = 0
  35. all_devices.append(temp_dict)
  36. while 1:
  37. time.sleep(0.2)
  38. for item in all_devices:
  39. save_frequency = item['save_frequency']
  40. last_save_time = item['last_save_time']
  41. now_time = time.time()
  42. if now_time - last_save_time >= save_frequency:
  43. item['last_save_time'] = now_time
  44. save_time = int(now_time) - int(now_time) % save_frequency
  45. save_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(save_time))
  46. serial_number_list = item['serial_number_list']
  47. real_time_data = self._storage.memoryStorage.get_value(serial_number_list) # 根据键列表查询实时数据库
  48. # print(real_time_data)
  49. flag = False # 是否允许存储标志位
  50. for key in real_time_data:
  51. # 值全部为空,不允许存储
  52. if real_time_data[key]:
  53. flag = True
  54. break
  55. if flag:
  56. for key in real_time_data: # redis值为None的
  57. if real_time_data[key] is None: # redis数据库未存储此值
  58. real_time_data[key] = 'null'
  59. if real_time_data[key] == '': # redis存储的为空值
  60. real_time_data[key] = 'null'
  61. table_name = "table_" + str(item['device_name']) # 根据站名计算表名
  62. # self._log.debug("[HistoricalDataStorage] - " + repr(table_name) + '<-' + repr(real_time_data))
  63. self._storage.hardDiskStorage.insert_column_many(table_name, save_time, real_time_data)
  64. if __name__ == '__main__':
  65. historicalDataStorage = HistoricalDataStorage()
  66. historicalDataStorage.run()