#!/usr/bin/env python import time import configparser from mysqlDataBase import MysqldbOperational import json from Crypto.Cipher import AES import hashlib import paho.mqtt.client as mqtt import ssl import LogOut def getCodeAndPoint(addrData): str = "." code = addrData[:addrData.index(str)] point = addrData[addrData.index(str) + 1:] return code, point def getDeviceConnectionStatus(data_time_stamp): now_time_stamp = time.time() if data_time_stamp < now_time_stamp - 300: return 1 else: return 0 def dateAndTimeToTimestamp(date_time): time_array = time.strptime(date_time, "%Y-%m-%d %H:%M:%S") time_stamp = int(time.mktime(time_array)) return time_stamp def setSendStatusIsSucceed(dataInfo): if dataInfo != None: for i in range(len(dataInfo)): id = dataInfo[i]['id'] table_name = "table_" + str(dataInfo[i]['tableName']) my.set_send_status(table_name, id, 1) def getMqttDataFromMysql(list_devices, appId, token): dataList = [] data = {} data_from_mysql_info_list = [] for index in range(len(list_devices)): list_points = my.get_mqtt_point(list_devices[index]) data_from_mysql = my.get_newest_data('table_' + str(list_points[0]['deviceName'])) if len(data_from_mysql) > 0: date_time = str(data_from_mysql['times']) date_time = dateAndTimeToTimestamp(date_time) STS_DI = getDeviceConnectionStatus(date_time) # print("STS_DI = ", STS_DI) else: STS_DI = 1 dataDist = {} data_from_mysql_info_dict = {} code = '' if STS_DI == 0: dataDist['DQ_DI'] = '0' dataDist['STS_DI'] = '0' data_from_mysql_info_dict['id'] = data_from_mysql['id'] data_from_mysql_info_dict['tableName'] = list_points[0]['deviceName'] data_from_mysql_info_list.append(data_from_mysql_info_dict) for i in range(len(list_points)): code, point = getCodeAndPoint(list_points[i]['mqttCode']) columnName = "c" + str(list_points[i]['serialNumber']) dataDist[point] = str(data_from_mysql[columnName]) if list_points[i]['lowLimit'] != None and list_points[i]['upLimit'] != None: if float(dataDist[point]) <= list_points[i]['lowLimit'] or float(dataDist[point]) > list_points[i]['upLimit']: print(point, dataDist[point], list_points[i]['lowLimit'], list_points[i]['upLimit']) # dataDist.pop(point) dataDist['DQ_DI'] = '1' if list_points[i]['storageType'] == "datetime": try: timeArray = time.strptime(dataDist[point], "%Y-%m-%d %H:%M:%S") timeStamp = int(time.mktime(timeArray) * 1000) dataDist[point] = timeStamp except Exception as e: dataDist['DQ_DI'] = '1' dataDist[point] = '0' print(e) dataDist['code'] = code dataDist['ts'] = int(round(time.time() * 1000)) # print(dataDist['ts']) dataList.append(dataDist) elif STS_DI == 1: code, point = getCodeAndPoint(list_points[0]['mqttCode']) dataDist['DQ_DI'] = '1' dataDist['STS_DI'] = '1' dataDist['code'] = code dataDist['ts'] = int(round(time.time() * 1000)) dataList.append(dataDist) print("dataDist = ", dataDist) data['data'] = dataList data['appId'] = appId data['token'] = token param = json.dumps(data) return param, data_from_mysql_info_list def padding_pkcs5(value): return str.encode(value + (BS - len(value) % BS) * chr(BS - len(value) % BS)) def padding_zero(value): while len(value) % 16 != 0: value += '\0' return str.encode(value) def aes_ecb_encrypt(key, value): # AES/ECB/PKCS5padding # key is sha1prng encrypted before cryptor = AES.new(bytes.fromhex(key), AES.MODE_ECB) padding_value = padding_pkcs5(value) # padding content with pkcs5 ciphertext = cryptor.encrypt(padding_value) return ''.join(['%02x' % i for i in ciphertext]).upper() # def get_sha1prng_key(key): # '''[summary] # encrypt key with SHA1PRNG # same as java AES crypto key generator SHA1PRNG # Arguments: # key {[string]} -- [key] # # Returns: # [string] -- [hexstring] # ''' # signature = hashlib.sha1(key.encode()).digest() # signature = hashlib.sha1(signature).digest() # return ''.join(['%02x' % i for i in signature]).upper()[:32] def get_sha1prng_key(key): '''[summary] encrypt key with SHA1PRNG same as java AES crypto key generator SHA1PRNG Arguments: key {[string]} -- [key] Returns: [string] -- [hexstring] ''' signature = hashlib.sha1(key.encode()).digest() signature = hashlib.sha1(signature).digest() return ''.join(['%02x' % i for i in signature]).upper()[:32] def on_connect(client, userdata, flags, rc): print("Connected with result code " + str(rc)) # client.subscribe("hdbrecord1") # 订阅消息 def on_message(client, userdata, msg): print("主题:" + msg.topic + " 消息:" + str(msg.payload.decode('utf-8'))) def on_subscribe(client, userdata, mid, granted_qos): print("On Subscribed: qos = %d" % granted_qos) def on_disconnect(client, userdata, rc): if rc != 0: print("Unexpected disconnection %s" % rc) if __name__ == "__main__": _logger = LogOut.Log('iotServerMQTT') # 创建读取配置文件对象 config = configparser.ConfigParser() config.read("config.ini", encoding="utf-8") # 获取通用配置项 section = "General" # 读取的部section标签 mysql_host = config.get(section, 'mysqlHost') mysql_username = config.get(section, 'mysqlUsername') mysql_password = config.get(section, 'mysqlPassword') mysql_port = config.getint(section, 'mysqlPort') token = config.get(section, 'token') appId = config.get(section, 'appId') # 获取特有配置项 section = 'iotServerMQTT' # 读取的部section标签 mysql_database = config.get(section, 'mysqlDatabase') HOST = config.get(section, 'mqttHost') PORT = config.getint(section, 'mqttPort') client_id = config.get(section, 'mqttClientId') key = config.get(section, 'mqttKey') # keypassword username = config.get(section, 'mqttUsername') password = config.get(section, 'mqttPassword') topic = config.get(section, 'mqttTopic') frequency = config.getint(section, 'mqttFrequency') # 连接数据库 my = MysqldbOperational(host=mysql_host, username=mysql_username, password=mysql_password, port=mysql_port, database=mysql_database, logger=_logger) # r = redis.StrictRedis(host='127.0.0.1', port=6379, db=0, decode_responses=True) BS = AES.block_size post_time = 0 list_devices = my.get_mqtt_devices() # client = mqtt.Client(client_id) # client.tls_set(ca_certs='ca.crt', certfile=None, keyfile=None, cert_reqs=ssl.CERT_NONE, # tls_version=ssl.PROTOCOL_TLSv1, ciphers=None) # # client.tls_set_context(context=None) # client.username_pw_set(username, password) # client.on_connect = on_connect # client.on_message = on_message # client.on_subscribe = on_subscribe # client.on_disconnect = on_disconnect # client.connect(HOST, PORT, 10) # client.loop_start() # while True: # time.sleep(0.1) # if post_time < time.time() - frequency: # try: # post_time = time.time() # param, dataInfo = getMqttDataFromMysql(list_devices, appId, token) # #print(param) # aes128string = aes_ecb_encrypt(get_sha1prng_key(key), param) # is_send, mid = client.publish(topic, payload=aes128string, qos=1) # if is_send == 0: # setSendStatusIsSucceed(dataInfo) # print(is_send) # except Exception as e: # print(e) # client.loop_stop() param, dataInfo = getMqttDataFromMysql(list_devices, appId, token)