123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237 |
- #!/usr/bin/env python
- import time
- import configparser
- from mysqlDataBase import MysqldbOperational
- import json
- from Crypto.Cipher import AES
- import hashlib
- import paho.mqtt.client as mqtt
- import ssl
- import LogOut
- def getCodeAndPoint(addrData):
- str = "."
- code = addrData[:addrData.index(str)]
- point = addrData[addrData.index(str) + 1:]
- return code, point
- def getDeviceConnectionStatus(data_time_stamp):
- now_time_stamp = time.time()
- if data_time_stamp < now_time_stamp - 300:
- return 1
- else:
- return 0
- def dateAndTimeToTimestamp(date_time):
- time_array = time.strptime(date_time, "%Y-%m-%d %H:%M:%S")
- time_stamp = int(time.mktime(time_array))
- return time_stamp
- def setSendStatusIsSucceed(dataInfo):
- if dataInfo != None:
- for i in range(len(dataInfo)):
- id = dataInfo[i]['id']
- table_name = "table_" + str(dataInfo[i]['tableName'])
- my.set_send_status(table_name, id, 1)
- def getMqttDataFromMysql(list_devices, appId, token):
- dataList = []
- data = {}
- data_from_mysql_info_list = []
- for index in range(len(list_devices)):
- list_points = my.get_mqtt_point(list_devices[index])
- data_from_mysql = my.get_newest_data('table_' + str(list_points[0]['deviceName']))
- if len(data_from_mysql) > 0:
- date_time = str(data_from_mysql['times'])
- date_time = dateAndTimeToTimestamp(date_time)
- STS_DI = getDeviceConnectionStatus(date_time)
- # print("STS_DI = ", STS_DI)
- else:
- STS_DI = 1
- dataDist = {}
- data_from_mysql_info_dict = {}
- code = ''
- if STS_DI == 0:
- dataDist['DQ_DI'] = '0'
- dataDist['STS_DI'] = '0'
- data_from_mysql_info_dict['id'] = data_from_mysql['id']
- data_from_mysql_info_dict['tableName'] = list_points[0]['deviceName']
- data_from_mysql_info_list.append(data_from_mysql_info_dict)
- for i in range(len(list_points)):
- code, point = getCodeAndPoint(list_points[i]['mqttCode'])
- columnName = "c" + str(list_points[i]['serialNumber'])
- dataDist[point] = str(data_from_mysql[columnName])
- if list_points[i]['lowLimit'] != None and list_points[i]['upLimit'] != None:
- if float(dataDist[point]) <= list_points[i]['lowLimit'] or float(dataDist[point]) > list_points[i]['upLimit']:
- print(point, dataDist[point], list_points[i]['lowLimit'], list_points[i]['upLimit'])
- # dataDist.pop(point)
- dataDist['DQ_DI'] = '1'
- if list_points[i]['storageType'] == "datetime":
- try:
- timeArray = time.strptime(dataDist[point], "%Y-%m-%d %H:%M:%S")
- timeStamp = int(time.mktime(timeArray) * 1000)
- dataDist[point] = timeStamp
- except Exception as e:
- dataDist['DQ_DI'] = '1'
- dataDist[point] = '0'
- print(e)
- dataDist['code'] = code
- dataDist['ts'] = int(round(time.time() * 1000))
- # print(dataDist['ts'])
- dataList.append(dataDist)
- elif STS_DI == 1:
- code, point = getCodeAndPoint(list_points[0]['mqttCode'])
- dataDist['DQ_DI'] = '1'
- dataDist['STS_DI'] = '1'
- dataDist['code'] = code
- dataDist['ts'] = int(round(time.time() * 1000))
- dataList.append(dataDist)
- print("dataDist = ", dataDist)
- data['data'] = dataList
- data['appId'] = appId
- data['token'] = token
- param = json.dumps(data)
- return param, data_from_mysql_info_list
- def padding_pkcs5(value):
- return str.encode(value + (BS - len(value) % BS) * chr(BS - len(value) % BS))
- def padding_zero(value):
- while len(value) % 16 != 0:
- value += '\0'
- return str.encode(value)
- def aes_ecb_encrypt(key, value):
- # AES/ECB/PKCS5padding
- # key is sha1prng encrypted before
- cryptor = AES.new(bytes.fromhex(key), AES.MODE_ECB)
- padding_value = padding_pkcs5(value) # padding content with pkcs5
- ciphertext = cryptor.encrypt(padding_value)
- return ''.join(['%02x' % i for i in ciphertext]).upper()
- # def get_sha1prng_key(key):
- # '''[summary]
- # encrypt key with SHA1PRNG
- # same as java AES crypto key generator SHA1PRNG
- # Arguments:
- # key {[string]} -- [key]
- #
- # Returns:
- # [string] -- [hexstring]
- # '''
- # signature = hashlib.sha1(key.encode()).digest()
- # signature = hashlib.sha1(signature).digest()
- # return ''.join(['%02x' % i for i in signature]).upper()[:32]
- def get_sha1prng_key(key):
- '''[summary]
- encrypt key with SHA1PRNG
- same as java AES crypto key generator SHA1PRNG
- Arguments:
- key {[string]} -- [key]
- Returns:
- [string] -- [hexstring]
- '''
- signature = hashlib.sha1(key.encode()).digest()
- signature = hashlib.sha1(signature).digest()
- return ''.join(['%02x' % i for i in signature]).upper()[:32]
- def on_connect(client, userdata, flags, rc):
- print("Connected with result code " + str(rc))
- # client.subscribe("hdbrecord1") # 订阅消息
- def on_message(client, userdata, msg):
- print("主题:" + msg.topic + " 消息:" + str(msg.payload.decode('utf-8')))
- def on_subscribe(client, userdata, mid, granted_qos):
- print("On Subscribed: qos = %d" % granted_qos)
- def on_disconnect(client, userdata, rc):
- if rc != 0:
- print("Unexpected disconnection %s" % rc)
- if __name__ == "__main__":
- _logger = LogOut.Log('iotServerMQTT')
- # 创建读取配置文件对象
- config = configparser.ConfigParser()
- config.read("config.ini", encoding="utf-8")
- # 获取通用配置项
- section = "General" # 读取的部section标签
- mysql_host = config.get(section, 'mysqlHost')
- mysql_username = config.get(section, 'mysqlUsername')
- mysql_password = config.get(section, 'mysqlPassword')
- mysql_port = config.getint(section, 'mysqlPort')
- token = config.get(section, 'token')
- appId = config.get(section, 'appId')
- # 获取特有配置项
- section = 'iotServerMQTT' # 读取的部section标签
- mysql_database = config.get(section, 'mysqlDatabase')
- HOST = config.get(section, 'mqttHost')
- PORT = config.getint(section, 'mqttPort')
- client_id = config.get(section, 'mqttClientId')
- key = config.get(section, 'mqttKey') # keypassword
- username = config.get(section, 'mqttUsername')
- password = config.get(section, 'mqttPassword')
- topic = config.get(section, 'mqttTopic')
- frequency = config.getint(section, 'mqttFrequency')
- # 连接数据库
- my = MysqldbOperational(host=mysql_host,
- username=mysql_username,
- password=mysql_password,
- port=mysql_port,
- database=mysql_database,
- logger=_logger)
- # r = redis.StrictRedis(host='127.0.0.1', port=6379, db=0, decode_responses=True)
- BS = AES.block_size
- post_time = 0
- list_devices = my.get_mqtt_devices()
- # client = mqtt.Client(client_id)
- # client.tls_set(ca_certs='ca.crt', certfile=None, keyfile=None, cert_reqs=ssl.CERT_NONE,
- # tls_version=ssl.PROTOCOL_TLSv1, ciphers=None)
- # # client.tls_set_context(context=None)
- # client.username_pw_set(username, password)
- # client.on_connect = on_connect
- # client.on_message = on_message
- # client.on_subscribe = on_subscribe
- # client.on_disconnect = on_disconnect
- # client.connect(HOST, PORT, 10)
- # client.loop_start()
- # while True:
- # time.sleep(0.1)
- # if post_time < time.time() - frequency:
- # try:
- # post_time = time.time()
- # param, dataInfo = getMqttDataFromMysql(list_devices, appId, token)
- # #print(param)
- # aes128string = aes_ecb_encrypt(get_sha1prng_key(key), param)
- # is_send, mid = client.publish(topic, payload=aes128string, qos=1)
- # if is_send == 0:
- # setSendStatusIsSucceed(dataInfo)
- # print(is_send)
- # except Exception as e:
- # print(e)
- # client.loop_stop()
- param, dataInfo = getMqttDataFromMysql(list_devices, appId, token)
|