shuizhi_tcp_connector.py 8.8 KB


  1. """
  2. @Date :2021/5/21/00219:10:57
  3. @Desc :
  4. """
  5. import binascii
  6. import threading
  7. import time
  8. import struct
  9. import socket
  10. from connector import Connector
  11. from event_storage import EventStorage
  12. from log import Log
  13. from binascii import *
  14. from crcmod import *
  15. sendFlag = 0
  16. class ShuizhiTcpConnector(Connector, threading.Thread):
  17. def __init__(self, name, config, converter):
  18. super().__init__()
  19. self._param_id = {}
  20. self._len_param = None
  21. self.__log = Log()
  22. self.__sock = None
  23. self.__connected = False
  24. self.__stopped = False
  25. self.__size = 1024
  26. self.__ip = config['ip']
  27. self.__port = config['port']
  28. self.__converter = converter
  29. self.__storager = EventStorage()
  30. self.__save_frequency = config['save_frequency']
  31. self.setName(name)
  32. self.__last_seve_time = 0
  33. self.__data_point_config = self.__storager.get_station_info(name)
  34. self._storage = EventStorage()
  35. # for i in self.__data_point_config:
  36. # print(i)
  37. def open(self):
  38. self.__stopped = False
  39. self.start()
  40. # 建立socket连接
  41. def __connect(self):
  42. if self.__sock:
  43. self.close()
  44. self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  45. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  46. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客户端开启心跳维护
  47. try:
  48. self.__sock.connect((self.__ip, self.__port))
  49. self.__log.info(f'Connect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] success !')
  50. print(f'Connect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] success !')
  51. self.__connected = True
  52. except socket.error as e:
  53. self.__log.info(f'Connect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] failed:{e} !!!')
  54. self.__connected = False
  55. self.__reconnect()
  56. def __reconnect(self):
  57. while True:
  58. try:
  59. self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  60. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  61. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客户端开启心跳维护
  62. self.__sock.connect((self.__ip, self.__port))
  63. self.__connected = True
  64. self.__log.info(f'Reconnect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] success !')
  65. break
  66. except Exception as e:
  67. print("e=", e)
  68. print("Continue reconnect in 5s..")
  69. self.__log.info(
  70. f'Reconnect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] failed:{e} !!! Continue reconnect in 5s..')
  71. self.__connected = False
  72. time.sleep(5)
  73. def close(self):
  74. """Close the connection with the TCP Slave"""
  75. if self.__sock:
  76. self.__sock.close()
  77. self.__stopped = True
  78. self.__sock = None
  79. self.__connected = False
  80. return None
  81. def get_name(self):
  82. return self.name
  83. def is_connected(self):
  84. return self.__connected
  85. def send_command(self, command_list):
  86. pass
  87. def command_polling(self):
  88. pass
  89. def run(self):
  90. self.__connect()
  91. self.__connected = True
  92. # dissolved_oxygen = bytes.fromhex('01 03 15 CF 00 05 B1 FA') # 读溶解氧发送指令,接收数据:
  93. # temperature = bytes.fromhex('01 03 15 4A 00 05 A0 13') # 读温度发送指令,接收数据:01 03 0A 41 C0 BC 1C 00 00 00 01 00 01 61 42
  94. # salinity = bytes.fromhex('01 03 15 97 00 05 30 29') # 读盐度发送指令,接收数据:01 03 0A 00 00 00 00 00 00 00 61 00 0C 75 6D
  95. # PH = bytes.fromhex('01 03 15 BA 00 05 A0 20') # 读 PH 发送指令,接收数据:01 03 0A 40 F9 2B 79 00 00 00 91 00 11 72 BB
  96. # chlorophyll = bytes.fromhex('01 03 16 A8 00 05 00 61') # 读叶绿素发送指令,接收数据:01 03 0A 00 00 00 00 00 00 01 01 00 33 34 9F
  97. # depth = bytes.fromhex('01 03 15 66 00 05 61 DA') # 读深度发送指令,接收数据:01 03 0A 42 08 1B F4 00 00 00 26 00 05 34 D0
  98. # 获取需要读取的参数的相关信息
  99. param_list = self._storage.get_in_situ_command()
  100. depth_index = None
  101. depth = ""
  102. # ①判断参数中是否有 深度, ②处理 寄存器偏移量 的位置
  103. for each in param_list:
  104. self._param_id[each["parameter_id"]] = each["name"]
  105. if each["name"] == "深度":
  106. crc_check = crc16Add(each['station_code'] + each["function_code"] + dec_to_hex(each["address"]) + "0005")
  107. depth = bytes.fromhex(crc_check)
  108. depth_index = param_list.index(each)
  109. if depth_index is not None:
  110. param_list.pop(depth_index)
  111. instruct_list = []
  112. if len(depth) > 0:
  113. for each in param_list:
  114. crc_check = crc16Add(each['station_code'] + each["function_code"] + dec_to_hex(each["address"]) + "0005")
  115. instruct_list.append(depth)
  116. instruct_list.append(bytes.fromhex(crc_check))
  117. instruct_list.append(depth)
  118. else:
  119. for each in param_list:
  120. crc_check = crc16Add(each['station_code'] + each["function_code"] + dec_to_hex(each["address"]) + "0005")
  121. instruct_list.append(bytes.fromhex(crc_check))
  122. self._len_param = len(instruct_list)
  123. for i in instruct_list:
  124. print(i)
  125. # 创建接收线程
  126. threading.Thread(target=self.SocketReceive, args=(self.__sock,)).start()
  127. # 循环发送指令
  128. while 1:
  129. time.sleep(0.2)
  130. if not self.__connected:
  131. continue
  132. try:
  133. # print("发送", instruct_list[sendFlag], "....")
  134. self.__sock.send(instruct_list[sendFlag])
  135. except Exception as e:
  136. self.__connected = False
  137. self.__reconnect()
  138. threading.Thread(target=self.SocketReceive, args=(self.__sock,)).start()
  139. if self.__stopped:
  140. break
  141. def save_format_data(self, t, name):
  142. data = {}
  143. for index in self.__data_point_config:
  144. if index["io_point_name"] == name:
  145. if index['divisor'] is not None:
  146. t = t / index['divisor']
  147. if index['offset'] is not None:
  148. t = t - index['offset']
  149. data = {'c' + str(index['serial_number']): t}
  150. self.__storager.real_time_data_storage(data)
  151. # print(data)
  152. def SocketReceive(self, clientSocket):
  153. global sendFlag
  154. ''' Socket 接收线程。'''
  155. while 1:
  156. time.sleep(0.2)
  157. # print(sendFlag, time.time())
  158. try:
  159. recvData = clientSocket.recv(1024)
  160. # print("recvData=", recvData)
  161. except Exception as e:
  162. print("e=", e)
  163. break
  164. length = len(recvData)
  165. if length == 15:
  166. fmt = str(length) + 'B'
  167. res = struct.unpack(fmt, recvData)
  168. t = int_to_hex(res[3], res[4], res[5], res[6])
  169. print(time.strftime('%Y-%m-%d %H:%M:%S'), self._param_id[res[12]], " t=", t)
  170. self.save_format_data(t, self._param_id[res[12]])
  171. if sendFlag == self._len_param - 1:
  172. print("-------------------")
  173. sendFlag = 0
  174. else:
  175. sendFlag = sendFlag + 1
  176. clientSocket.close()
  177. self.__log.info("Client closed.")
  178. # print(time.strftime('%Y-%m-%d %H:%M:%S'), len(recvData))
  179. # socket_msg = recvData.decode() # 将接收到的字节数据转为 string
  180. # print("Socket receive: " + socket_msg)
  181. def int_to_hex(a1, a2, b1, b2):
  182. t1 = hex(a1 * 256 + a2)[2:]
  183. t2 = hex(b1 * 256 + b2)[2:]
  184. if len(t1) != 4: t1 = (4 - len(t1)) * '0' + t1
  185. if len(t2) != 4: t2 = (4 - len(t2)) * '0' + t2
  186. t = t1 + t2
  187. t = struct.unpack('>f', binascii.unhexlify(t.replace(' ', '')))[0]
  188. return t
  189. def dec_to_hex(num):
  190. """
  191. 十进制转十六进制
  192. """
  193. t = hex(num)
  194. t = (6 - len(t)) * "0" + t[2:]
  195. return t
  196. # CRC16-MODBUS
  197. def crc16Add(read):
  198. """
  199. 生成CRC16校验位
  200. """
  201. crc16 = crcmod.mkCrcFun(0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000)
  202. data = read.replace(" ", "")
  203. read_crcout = hex(crc16(unhexlify(data))).upper()
  204. str_list = list(read_crcout)
  205. if len(str_list) < 6:
  206. str_list.insert(2, '0' * (6 - len(str_list))) # 位数不足补0
  207. crc_data = "".join(str_list)
  208. read = read.strip() + '' + crc_data[4:] + '' + crc_data[2:4]
  209. return read