dataPackUp.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. import time
  2. import configparser
  3. import datetime
  4. from mysqlDataBase import MysqldbOperational
  5. import os
  6. import zipfile
  7. import copy
  8. import requests
  9. import LogOut
  10. import urllib3
  11. def get_txt_name(beginTime, code):
  12. # 获取txt文件名
  13. txt_name = code + '-' + str(int(beginTime) * 1000) + '.txt'
  14. return txt_name
  15. def delete_txt(file_path):
  16. # 删除txt文件及文件夹
  17. txt_file_list = os.listdir(file_path)
  18. for txt in txt_file_list:
  19. os.remove(os.path.join(file_path, txt))
  20. os.rmdir(file_path)
  21. def get_zip_name():
  22. # 获取zip包名
  23. zip_name = PLATFORM_ID + '-' + str(int(time.time() * 1000)) + '000' + '.zip'
  24. return zip_name
  25. def make_zip(file_path):
  26. # 组包
  27. # 参数:file_path, 包含txt的目录
  28. # 返回:zip包文件名
  29. # txt文件名列表
  30. txt_file_list = os.listdir(file_path)
  31. # zip包文件名
  32. zip_name = "HDU/" + get_zip_name()
  33. zp = zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED)
  34. for txt_file in txt_file_list:
  35. txt_file_path = os.path.join(file_path, txt_file)
  36. # 打开txt,写入zip
  37. with open(txt_file_path, 'r') as f:
  38. zp.write(txt_file_path, txt_file)
  39. os.remove(txt_file_path)
  40. # 关闭zip包
  41. zp.close()
  42. # 删除txt文件
  43. # delete_txt(file_path)
  44. return zip_name
  45. def dateAndTimeToTimestamp(date_time):
  46. time_array = time.strptime(date_time, "%Y-%m-%d %H:%M:%S")
  47. time_stamp = int(time.mktime(time_array))
  48. return time_stamp
  49. def getCodeAndPoint(addrData):
  50. str = "."
  51. code = addrData[:addrData.index(str)]
  52. point = addrData[addrData.index(str) + 1:]
  53. return code, point
  54. def getMqttDataFromMysqlHistory(data_list, code_list):
  55. dataList = []
  56. is_send_0 = []
  57. for index in range(len(data_list)):
  58. date_time = data_list[index]['times']
  59. # date_time = dateAndTimeToTimestamp(date_time)
  60. # print(date_time)
  61. dataDist = {}
  62. code = ''
  63. dataDist['DQ_DI'] = '0'
  64. dataDist['STS_DI'] = '0'
  65. for i in range(len(code_list)):
  66. # print(code_list[i])
  67. code, point = getCodeAndPoint(code_list[i]['mqttCode'])
  68. dataDist[point] = str(data_list[index]["c" + str(code_list[i]['serialNumber'])])
  69. if code_list[i]['lowLimit'] != None and code_list[i]['upLimit'] != None:
  70. if dataDist[point] != 'None' or float(dataDist[point]) < code_list[i]['lowLimit'] or float(dataDist[point]) > code_list[i]['upLimit']:
  71. # dataDist['DQ_DI'] = '1' # 功能优化
  72. dataDist.pop(point) # 功能优化,过滤坏数据
  73. if code_list[i]['storageType'] == "datetime":
  74. try:
  75. timeArray = time.strptime(dataDist[point], "%Y-%m-%d %H:%M:%S")
  76. timeStamp = int(time.mktime(timeArray) * 1000)
  77. dataDist[point] = timeStamp
  78. except Exception as e:
  79. dataDist['DQ_DI'] = '1'
  80. dataDist[point] = '0'
  81. print(e)
  82. dataDist['code'] = code
  83. dataDist['ts'] = date_time * 1000
  84. dataList.append(dataDist)
  85. # param = json.dumps(dataList)
  86. for data in data_list:
  87. is_send_0.append(data["id"])
  88. return dataList, is_send_0
  89. def data_alignment_date_and_time(data_list, begin_timestamp, end_timestamp, frequency):
  90. all_dict = {}
  91. lists = []
  92. for index in range(begin_timestamp, end_timestamp):
  93. all_dict[index] = None
  94. all_dict[begin_timestamp] = data_list[0]
  95. all_dict[end_timestamp] = data_list[len(data_list) - 1]
  96. for index in range(len(data_list)):
  97. timeArray = time.strptime(str(data_list[index]['Date']) + " " + str(data_list[index]['Time']),
  98. "%Y-%m-%d %H:%M:%S")
  99. dataTime = int(time.mktime(timeArray))
  100. all_dict[dataTime] = data_list[index]
  101. for index in range(begin_timestamp, end_timestamp):
  102. if all_dict[index] == None:
  103. all_dict[index] = copy.deepcopy(all_dict[index - 1])
  104. for index in range(begin_timestamp, end_timestamp):
  105. if index % frequency == 0:
  106. all_dict[index]['times'] = index
  107. lists.append(all_dict[index])
  108. return lists
  109. def data_alignment(data_list, begin_timestamp, end_timestamp, frequency):
  110. all_dict = {}
  111. lists = []
  112. for index in range(begin_timestamp, end_timestamp):
  113. all_dict[index] = None
  114. all_dict[begin_timestamp] = data_list[0]
  115. all_dict[end_timestamp] = data_list[len(data_list) - 1]
  116. for index in range(len(data_list)):
  117. timeArray = time.strptime(str(data_list[index]['times']), "%Y-%m-%d %H:%M:%S")
  118. dataTime = int(time.mktime(timeArray))
  119. all_dict[dataTime] = data_list[index]
  120. for index in range(begin_timestamp, end_timestamp):
  121. if all_dict[index] == None:
  122. all_dict[index] = copy.deepcopy(all_dict[index - 1])
  123. else:
  124. all_dict[index] = copy.deepcopy(all_dict[index])
  125. for index in range(begin_timestamp, end_timestamp):
  126. if index % frequency == 0:
  127. all_dict[index]['times'] = index
  128. lists.append(all_dict[index])
  129. return lists
  130. def post(zip_file):
  131. headers = {
  132. "Authorization": token,
  133. "Content-Type": "application/json"}
  134. result = "SUCCESS"
  135. step_size = 1000
  136. for i in range(0, len(zip_file), step_size):
  137. if len(zip_file) > i + step_size:
  138. group_zip_file = zip_file[i:i+step_size]
  139. else:
  140. group_zip_file = zip_file[i:]
  141. data = {"platformId": PLATFORM_ID,
  142. "fileNames": group_zip_file
  143. }
  144. # print(data)
  145. # print(len(group_zip_file))
  146. # print('timestamp: ',datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
  147. try:
  148. urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
  149. ret = requests.post('https://management.super-sight.com.cn/device/api/v2/device/history', headers=headers, json=data, verify=False)
  150. if ret.status_code == 200:
  151. print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), ret.text, ret.status_code, 'len(group_zip_file)=', len(group_zip_file))
  152. else:
  153. print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), ret.text, ret.status_code, 'len(group_zip_file)=', len(group_zip_file))
  154. result = "FAIL"
  155. except Exception as e:
  156. print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), "e=", e)
  157. return result
  158. if __name__ == "__main__":
  159. _logger = LogOut.Log('dataPackUp')
  160. # 创建读取配置文件对象
  161. config = configparser.ConfigParser()
  162. config.read("config.ini", encoding="utf-8")
  163. # 获取通用配置项
  164. section = "General" # 读取的section标签
  165. mysql_host = config.get(section, 'mysqlHost')
  166. mysql_username = config.get(section, 'mysqlUsername')
  167. mysql_password = config.get(section, 'mysqlPassword')
  168. mysql_port = config.getint(section, 'mysqlPort')
  169. token = config.get(section, 'token')
  170. PLATFORM_ID = config.get(section, 'appId')
  171. # 读取专有配置项
  172. section = 'dataPackUp'
  173. mysql_database = config.get(section, 'mysqlDatabase')
  174. post_time = 0
  175. pack_time = 0
  176. mysql_object = MysqldbOperational(host=mysql_host,
  177. username=mysql_username,
  178. password=mysql_password,
  179. port=mysql_port,
  180. database=mysql_database,
  181. logger=_logger)
  182. list_devices = mysql_object.get_devices_name()
  183. count = mysql_object.get_mqtt_devices_count()
  184. # print(list_devices)
  185. txt_path = "cache"
  186. if not os.path.exists(txt_path):
  187. os.mkdir(txt_path)
  188. while True:
  189. time.sleep(1)
  190. if post_time < time.time() - 3600:
  191. zip_list = []
  192. flie_name = os.listdir("HDU/")
  193. for filenames in flie_name:
  194. if filenames.endswith('.zip') == True:
  195. zip_list.append(filenames)
  196. if len(zip_list) > 0:
  197. # pass
  198. result = post(zip_list)
  199. post_time = time.time()
  200. print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ":" + str(result))
  201. if pack_time < time.time() - 3600:
  202. i = 0
  203. while i < max(int(200 / count), 1):
  204. for device_name in list_devices:
  205. code_devices = mysql_object.get_mqtt_devices_from_name(device_name)
  206. # 构造表名称,如table_fg
  207. table_name = 'table_' + device_name
  208. # 获取断点时间
  209. break_point = mysql_object.get_breakpoint_last_time_datetime(table_name)
  210. if break_point != None:
  211. # 找到断点时间前后一个小时的时间点
  212. timeArray = time.strptime(str(break_point['times']), "%Y-%m-%d %H:%M:%S")
  213. begin = time.strftime("%Y-%m-%d %H:00:00", timeArray)
  214. end = time.strftime("%Y-%m-%d %H:59:59", timeArray)
  215. begin_timeArray = time.strptime(str(begin), "%Y-%m-%d %H:%M:%S")
  216. end_timeArray = time.strptime(str(end), "%Y-%m-%d %H:%M:%S")
  217. begin_timestamp = int(time.mktime(begin_timeArray))
  218. end_timestamp = int(time.mktime(end_timeArray))
  219. # 获取断点前后一个小时的数据
  220. data_list = mysql_object.get_hour_data_datetime(table_name, begin, end)
  221. history_data_list = data_alignment(data_list, begin_timestamp, end_timestamp, 2)
  222. if len(code_devices) >= 1:
  223. for index in range(len(code_devices)):
  224. # 获取设备码表
  225. code_list = mysql_object.get_mqtt_point(code_devices[index])
  226. param, id = getMqttDataFromMysqlHistory(history_data_list, code_list)
  227. txt_name = get_txt_name(begin_timestamp, param[0]["code"])
  228. # 将txt_path目录下所有设备txt组包
  229. if len(os.listdir(txt_path)) >= 20:
  230. zip_name = make_zip(txt_path)
  231. time.sleep(5)
  232. with open(os.path.join(txt_path, txt_name), 'w') as f:
  233. txt_content = [str(line) + '\n' for line in param]
  234. f.writelines(txt_content)
  235. time.sleep(1)
  236. # print(zip_name)
  237. if index == len(code_devices) - 1:
  238. # pass
  239. mysql_object.set_many_send_status(table_name, data_list)
  240. i += 1
  241. pack_time = time.time()
  242. if len(os.listdir(txt_path)) > 0:
  243. zip_name = make_zip(txt_path)
  244. time.sleep(5)