iotServerMQTT_001.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. #!/usr/bin/env python
  2. import time
  3. import configparser
  4. from mysqlDataBase import MysqldbOperational
  5. import json
  6. from Crypto.Cipher import AES
  7. import hashlib
  8. import paho.mqtt.client as mqtt
  9. import ssl
  10. import LogOut
  11. def getCodeAndPoint(addrData):
  12. str = "."
  13. code = addrData[:addrData.index(str)]
  14. point = addrData[addrData.index(str) + 1:]
  15. return code, point
  16. def getDeviceConnectionStatus(data_time_stamp):
  17. now_time_stamp = time.time()
  18. if data_time_stamp < now_time_stamp - 300:
  19. return 1
  20. else:
  21. return 0
  22. def dateAndTimeToTimestamp(date_time):
  23. time_array = time.strptime(date_time, "%Y-%m-%d %H:%M:%S")
  24. time_stamp = int(time.mktime(time_array))
  25. return time_stamp
  26. def setSendStatusIsSucceed(dataInfo):
  27. if dataInfo != None:
  28. for i in range(len(dataInfo)):
  29. id = dataInfo[i]['id']
  30. table_name = "table_" + str(dataInfo[i]['tableName'])
  31. my.set_send_status(table_name, id, 1)
  32. def getMqttDataFromMysql(list_devices, appId, token):
  33. dataList = []
  34. data = {}
  35. data_from_mysql_info_list = []
  36. for index in range(len(list_devices)):
  37. list_points = my.get_mqtt_point(list_devices[index])
  38. data_from_mysql = my.get_newest_data('table_' + str(list_points[0]['deviceName']))
  39. if len(data_from_mysql) > 0:
  40. date_time = str(data_from_mysql['times'])
  41. date_time = dateAndTimeToTimestamp(date_time)
  42. STS_DI = getDeviceConnectionStatus(date_time)
  43. # print("STS_DI = ", STS_DI)
  44. else:
  45. STS_DI = 1
  46. dataDist = {}
  47. data_from_mysql_info_dict = {}
  48. code = ''
  49. if STS_DI == 0:
  50. dataDist['DQ_DI'] = '0'
  51. dataDist['STS_DI'] = '0'
  52. data_from_mysql_info_dict['id'] = data_from_mysql['id']
  53. data_from_mysql_info_dict['tableName'] = list_points[0]['deviceName']
  54. data_from_mysql_info_list.append(data_from_mysql_info_dict)
  55. for i in range(len(list_points)):
  56. code, point = getCodeAndPoint(list_points[i]['mqttCode'])
  57. columnName = "c" + str(list_points[i]['serialNumber'])
  58. dataDist[point] = str(data_from_mysql[columnName])
  59. if list_points[i]['lowLimit'] != None and list_points[i]['upLimit'] != None:
  60. if float(dataDist[point]) <= list_points[i]['lowLimit'] or float(dataDist[point]) > list_points[i]['upLimit']:
  61. print(point, dataDist[point], list_points[i]['lowLimit'], list_points[i]['upLimit'])
  62. # dataDist.pop(point)
  63. dataDist['DQ_DI'] = '1'
  64. if list_points[i]['storageType'] == "datetime":
  65. try:
  66. timeArray = time.strptime(dataDist[point], "%Y-%m-%d %H:%M:%S")
  67. timeStamp = int(time.mktime(timeArray) * 1000)
  68. dataDist[point] = timeStamp
  69. except Exception as e:
  70. dataDist['DQ_DI'] = '1'
  71. dataDist[point] = '0'
  72. print(e)
  73. dataDist['code'] = code
  74. dataDist['ts'] = int(round(time.time() * 1000))
  75. # print(dataDist['ts'])
  76. dataList.append(dataDist)
  77. elif STS_DI == 1:
  78. code, point = getCodeAndPoint(list_points[0]['mqttCode'])
  79. dataDist['DQ_DI'] = '1'
  80. dataDist['STS_DI'] = '1'
  81. dataDist['code'] = code
  82. dataDist['ts'] = int(round(time.time() * 1000))
  83. dataList.append(dataDist)
  84. print("dataDist = ", dataDist)
  85. data['data'] = dataList
  86. data['appId'] = appId
  87. data['token'] = token
  88. param = json.dumps(data)
  89. return param, data_from_mysql_info_list
  90. def padding_pkcs5(value):
  91. return str.encode(value + (BS - len(value) % BS) * chr(BS - len(value) % BS))
  92. def padding_zero(value):
  93. while len(value) % 16 != 0:
  94. value += '\0'
  95. return str.encode(value)
  96. def aes_ecb_encrypt(key, value):
  97. # AES/ECB/PKCS5padding
  98. # key is sha1prng encrypted before
  99. cryptor = AES.new(bytes.fromhex(key), AES.MODE_ECB)
  100. padding_value = padding_pkcs5(value) # padding content with pkcs5
  101. ciphertext = cryptor.encrypt(padding_value)
  102. return ''.join(['%02x' % i for i in ciphertext]).upper()
  103. # def get_sha1prng_key(key):
  104. # '''[summary]
  105. # encrypt key with SHA1PRNG
  106. # same as java AES crypto key generator SHA1PRNG
  107. # Arguments:
  108. # key {[string]} -- [key]
  109. #
  110. # Returns:
  111. # [string] -- [hexstring]
  112. # '''
  113. # signature = hashlib.sha1(key.encode()).digest()
  114. # signature = hashlib.sha1(signature).digest()
  115. # return ''.join(['%02x' % i for i in signature]).upper()[:32]
  116. def get_sha1prng_key(key):
  117. '''[summary]
  118. encrypt key with SHA1PRNG
  119. same as java AES crypto key generator SHA1PRNG
  120. Arguments:
  121. key {[string]} -- [key]
  122. Returns:
  123. [string] -- [hexstring]
  124. '''
  125. signature = hashlib.sha1(key.encode()).digest()
  126. signature = hashlib.sha1(signature).digest()
  127. return ''.join(['%02x' % i for i in signature]).upper()[:32]
  128. def on_connect(client, userdata, flags, rc):
  129. print("Connected with result code " + str(rc))
  130. # client.subscribe("hdbrecord1") # 订阅消息
  131. def on_message(client, userdata, msg):
  132. print("主题:" + msg.topic + " 消息:" + str(msg.payload.decode('utf-8')))
  133. def on_subscribe(client, userdata, mid, granted_qos):
  134. print("On Subscribed: qos = %d" % granted_qos)
  135. def on_disconnect(client, userdata, rc):
  136. if rc != 0:
  137. print("Unexpected disconnection %s" % rc)
  138. if __name__ == "__main__":
  139. _logger = LogOut.Log('iotServerMQTT')
  140. # 创建读取配置文件对象
  141. config = configparser.ConfigParser()
  142. config.read("config.ini", encoding="utf-8")
  143. # 获取通用配置项
  144. section = "General" # 读取的部section标签
  145. mysql_host = config.get(section, 'mysqlHost')
  146. mysql_username = config.get(section, 'mysqlUsername')
  147. mysql_password = config.get(section, 'mysqlPassword')
  148. mysql_port = config.getint(section, 'mysqlPort')
  149. token = config.get(section, 'token')
  150. appId = config.get(section, 'appId')
  151. # 获取特有配置项
  152. section = 'iotServerMQTT' # 读取的部section标签
  153. mysql_database = config.get(section, 'mysqlDatabase')
  154. HOST = config.get(section, 'mqttHost')
  155. PORT = config.getint(section, 'mqttPort')
  156. client_id = config.get(section, 'mqttClientId')
  157. key = config.get(section, 'mqttKey') # keypassword
  158. username = config.get(section, 'mqttUsername')
  159. password = config.get(section, 'mqttPassword')
  160. topic = config.get(section, 'mqttTopic')
  161. frequency = config.getint(section, 'mqttFrequency')
  162. # 连接数据库
  163. my = MysqldbOperational(host=mysql_host,
  164. username=mysql_username,
  165. password=mysql_password,
  166. port=mysql_port,
  167. database=mysql_database,
  168. logger=_logger)
  169. # r = redis.StrictRedis(host='127.0.0.1', port=6379, db=0, decode_responses=True)
  170. BS = AES.block_size
  171. post_time = 0
  172. list_devices = my.get_mqtt_devices()
  173. # client = mqtt.Client(client_id)
  174. # client.tls_set(ca_certs='ca.crt', certfile=None, keyfile=None, cert_reqs=ssl.CERT_NONE,
  175. # tls_version=ssl.PROTOCOL_TLSv1, ciphers=None)
  176. # # client.tls_set_context(context=None)
  177. # client.username_pw_set(username, password)
  178. # client.on_connect = on_connect
  179. # client.on_message = on_message
  180. # client.on_subscribe = on_subscribe
  181. # client.on_disconnect = on_disconnect
  182. # client.connect(HOST, PORT, 10)
  183. # client.loop_start()
  184. # while True:
  185. # time.sleep(0.1)
  186. # if post_time < time.time() - frequency:
  187. # try:
  188. # post_time = time.time()
  189. # param, dataInfo = getMqttDataFromMysql(list_devices, appId, token)
  190. # #print(param)
  191. # aes128string = aes_ecb_encrypt(get_sha1prng_key(key), param)
  192. # is_send, mid = client.publish(topic, payload=aes128string, qos=1)
  193. # if is_send == 0:
  194. # setSendStatusIsSucceed(dataInfo)
  195. # print(is_send)
  196. # except Exception as e:
  197. # print(e)
  198. # client.loop_stop()
  199. param, dataInfo = getMqttDataFromMysql(list_devices, appId, token)