123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 |
- import time
- import configparser
- import datetime
- from mysqlDataBase import MysqldbOperational
- import os
- import zipfile
- import copy
- import requests
- import LogOut
- import urllib3
- def get_txt_name(beginTime, code):
- # 获取txt文件名
- txt_name = code + '-' + str(int(beginTime) * 1000) + '.txt'
- return txt_name
- def delete_txt(file_path):
- # 删除txt文件及文件夹
- txt_file_list = os.listdir(file_path)
- for txt in txt_file_list:
- os.remove(os.path.join(file_path, txt))
- os.rmdir(file_path)
- def get_zip_name():
- # 获取zip包名
- zip_name = PLATFORM_ID + '-' + str(int(time.time() * 1000)) + '000' + '.zip'
- return zip_name
- def make_zip(file_path):
- # 组包
- # 参数:file_path, 包含txt的目录
- # 返回:zip包文件名
- # txt文件名列表
- txt_file_list = os.listdir(file_path)
- # zip包文件名
- zip_name = "HDU/" + get_zip_name()
- zp = zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED)
- for txt_file in txt_file_list:
- txt_file_path = os.path.join(file_path, txt_file)
- # 打开txt,写入zip
- with open(txt_file_path, 'r') as f:
- zp.write(txt_file_path, txt_file)
- os.remove(txt_file_path)
- # 关闭zip包
- zp.close()
- # 删除txt文件
- # delete_txt(file_path)
- return zip_name
- def dateAndTimeToTimestamp(date_time):
- time_array = time.strptime(date_time, "%Y-%m-%d %H:%M:%S")
- time_stamp = int(time.mktime(time_array))
- return time_stamp
- def getCodeAndPoint(addrData):
- str = "."
- code = addrData[:addrData.index(str)]
- point = addrData[addrData.index(str) + 1:]
- return code, point
- def getMqttDataFromMysqlHistory(data_list, code_list):
- dataList = []
- is_send_0 = []
- for index in range(len(data_list)):
- date_time = data_list[index]['times']
- # date_time = dateAndTimeToTimestamp(date_time)
- # print(date_time)
- dataDist = {}
- code = ''
- dataDist['DQ_DI'] = '0'
- dataDist['STS_DI'] = '0'
- for i in range(len(code_list)):
- # print(code_list[i])
- code, point = getCodeAndPoint(code_list[i]['mqttCode'])
- dataDist[point] = str(data_list[index]["c" + str(code_list[i]['serialNumber'])])
- if code_list[i]['lowLimit'] != None and code_list[i]['upLimit'] != None:
- if dataDist[point] != 'None' or float(dataDist[point]) < code_list[i]['lowLimit'] or float(dataDist[point]) > code_list[i]['upLimit']:
- # dataDist['DQ_DI'] = '1' # 功能优化
- dataDist.pop(point) # 功能优化,过滤坏数据
- if code_list[i]['storageType'] == "datetime":
- try:
- timeArray = time.strptime(dataDist[point], "%Y-%m-%d %H:%M:%S")
- timeStamp = int(time.mktime(timeArray) * 1000)
- dataDist[point] = timeStamp
- except Exception as e:
- dataDist['DQ_DI'] = '1'
- dataDist[point] = '0'
- print(e)
- dataDist['code'] = code
- dataDist['ts'] = date_time * 1000
- dataList.append(dataDist)
- # param = json.dumps(dataList)
- for data in data_list:
- is_send_0.append(data["id"])
- return dataList, is_send_0
- def data_alignment_date_and_time(data_list, begin_timestamp, end_timestamp, frequency):
- all_dict = {}
- lists = []
- for index in range(begin_timestamp, end_timestamp):
- all_dict[index] = None
- all_dict[begin_timestamp] = data_list[0]
- all_dict[end_timestamp] = data_list[len(data_list) - 1]
- for index in range(len(data_list)):
- timeArray = time.strptime(str(data_list[index]['Date']) + " " + str(data_list[index]['Time']),
- "%Y-%m-%d %H:%M:%S")
- dataTime = int(time.mktime(timeArray))
- all_dict[dataTime] = data_list[index]
- for index in range(begin_timestamp, end_timestamp):
- if all_dict[index] == None:
- all_dict[index] = copy.deepcopy(all_dict[index - 1])
- for index in range(begin_timestamp, end_timestamp):
- if index % frequency == 0:
- all_dict[index]['times'] = index
- lists.append(all_dict[index])
- return lists
- def data_alignment(data_list, begin_timestamp, end_timestamp, frequency):
- all_dict = {}
- lists = []
- for index in range(begin_timestamp, end_timestamp):
- all_dict[index] = None
- all_dict[begin_timestamp] = data_list[0]
- all_dict[end_timestamp] = data_list[len(data_list) - 1]
- for index in range(len(data_list)):
- timeArray = time.strptime(str(data_list[index]['times']), "%Y-%m-%d %H:%M:%S")
- dataTime = int(time.mktime(timeArray))
- all_dict[dataTime] = data_list[index]
- for index in range(begin_timestamp, end_timestamp):
- if all_dict[index] == None:
- all_dict[index] = copy.deepcopy(all_dict[index - 1])
- else:
- all_dict[index] = copy.deepcopy(all_dict[index])
- for index in range(begin_timestamp, end_timestamp):
- if index % frequency == 0:
- all_dict[index]['times'] = index
- lists.append(all_dict[index])
- return lists
- def post(zip_file):
- headers = {
- "Authorization": token,
- "Content-Type": "application/json"}
- result = "SUCCESS"
- step_size = 1000
- for i in range(0, len(zip_file), step_size):
- if len(zip_file) > i + step_size:
- group_zip_file = zip_file[i:i+step_size]
- else:
- group_zip_file = zip_file[i:]
- data = {"platformId": PLATFORM_ID,
- "fileNames": group_zip_file
- }
- # print(data)
- # print(len(group_zip_file))
- # print('timestamp: ',datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
- try:
- urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
- ret = requests.post('https://management.super-sight.com.cn/device/api/v2/device/history', headers=headers, json=data, verify=False)
- if ret.status_code == 200:
- print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), ret.text, ret.status_code, 'len(group_zip_file)=', len(group_zip_file))
- else:
- print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), ret.text, ret.status_code, 'len(group_zip_file)=', len(group_zip_file))
- result = "FAIL"
- except Exception as e:
- print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), "e=", e)
- return result
- if __name__ == "__main__":
- _logger = LogOut.Log('dataPackUp')
- # 创建读取配置文件对象
- config = configparser.ConfigParser()
- config.read("config.ini", encoding="utf-8")
- # 获取通用配置项
- section = "General" # 读取的section标签
- mysql_host = config.get(section, 'mysqlHost')
- mysql_username = config.get(section, 'mysqlUsername')
- mysql_password = config.get(section, 'mysqlPassword')
- mysql_port = config.getint(section, 'mysqlPort')
- token = config.get(section, 'token')
- PLATFORM_ID = config.get(section, 'appId')
- # 读取专有配置项
- section = 'dataPackUp'
- mysql_database = config.get(section, 'mysqlDatabase')
- post_time = 0
- pack_time = 0
- mysql_object = MysqldbOperational(host=mysql_host,
- username=mysql_username,
- password=mysql_password,
- port=mysql_port,
- database=mysql_database,
- logger=_logger)
- list_devices = mysql_object.get_devices_name()
- count = mysql_object.get_mqtt_devices_count()
- # print(list_devices)
- txt_path = "cache"
- if not os.path.exists(txt_path):
- os.mkdir(txt_path)
- while True:
- time.sleep(1)
- if post_time < time.time() - 3600:
- zip_list = []
- flie_name = os.listdir("HDU/")
- for filenames in flie_name:
- if filenames.endswith('.zip') == True:
- zip_list.append(filenames)
- if len(zip_list) > 0:
- # pass
- result = post(zip_list)
- post_time = time.time()
- print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ":" + str(result))
- if pack_time < time.time() - 3600:
- i = 0
- while i < max(int(200 / count), 1):
- for device_name in list_devices:
- code_devices = mysql_object.get_mqtt_devices_from_name(device_name)
- # 构造表名称,如table_fg
- table_name = 'table_' + device_name
- # 获取断点时间
- break_point = mysql_object.get_breakpoint_last_time_datetime(table_name)
- if break_point != None:
- # 找到断点时间前后一个小时的时间点
- timeArray = time.strptime(str(break_point['times']), "%Y-%m-%d %H:%M:%S")
- begin = time.strftime("%Y-%m-%d %H:00:00", timeArray)
- end = time.strftime("%Y-%m-%d %H:59:59", timeArray)
- begin_timeArray = time.strptime(str(begin), "%Y-%m-%d %H:%M:%S")
- end_timeArray = time.strptime(str(end), "%Y-%m-%d %H:%M:%S")
- begin_timestamp = int(time.mktime(begin_timeArray))
- end_timestamp = int(time.mktime(end_timeArray))
- # 获取断点前后一个小时的数据
- data_list = mysql_object.get_hour_data_datetime(table_name, begin, end)
- history_data_list = data_alignment(data_list, begin_timestamp, end_timestamp, 2)
- if len(code_devices) >= 1:
- for index in range(len(code_devices)):
- # 获取设备码表
- code_list = mysql_object.get_mqtt_point(code_devices[index])
- param, id = getMqttDataFromMysqlHistory(history_data_list, code_list)
- txt_name = get_txt_name(begin_timestamp, param[0]["code"])
- # 将txt_path目录下所有设备txt组包
- if len(os.listdir(txt_path)) >= 20:
- zip_name = make_zip(txt_path)
- time.sleep(5)
- with open(os.path.join(txt_path, txt_name), 'w') as f:
- txt_content = [str(line) + '\n' for line in param]
- f.writelines(txt_content)
- time.sleep(1)
- # print(zip_name)
- if index == len(code_devices) - 1:
- # pass
- mysql_object.set_many_send_status(table_name, data_list)
- i += 1
- pack_time = time.time()
- if len(os.listdir(txt_path)) > 0:
- zip_name = make_zip(txt_path)
- time.sleep(5)
|