shuizhi_tcp_connector.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. """
  2. @Date :2021/5/21/00219:10:57
  3. @Desc : 原本用于insitu水质传感器的读取和解析,目前此连接器未启动,insitu读取和解析均为mudbus_rtu_over_tcp。
  4. """
  5. import binascii
  6. import socket
  7. import threading
  8. import time
  9. from binascii import *
  10. import struct
  11. from crcmod import *
  12. from connector import Connector
  13. from event_storage import EventStorage
  14. from logging_config import shuizhi_tcp_connector as logger
  15. from tools.format_value import format_value
  16. sendFlag = 0
  17. class ShuizhiTcpConnector(Connector, threading.Thread):
  18. def __init__(self, name, config, converter):
  19. super().__init__()
  20. self._param_id = {}
  21. self._len_param = None
  22. self.__sock = None
  23. self.__connected = False
  24. self.__stopped = False
  25. self.__size = 1024
  26. self.__ip = config['ip']
  27. self.__port = config['port']
  28. self.__converter = converter
  29. self.__storager = EventStorage()
  30. self.__save_frequency = config['save_frequency']
  31. self.setName(name)
  32. self.__last_seve_time = 0
  33. self.__data_point_config = self.__storager.get_station_info(name)
  34. self._storage = EventStorage()
  35. def open(self):
  36. self.__stopped = False
  37. self.start()
  38. # 建立socket连接
  39. def __connect(self):
  40. if self.__sock:
  41. self.close()
  42. self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  43. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  44. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客户端开启心跳维护
  45. try:
  46. self.__sock.connect((self.__ip, self.__port))
  47. logger.info(f'Connect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] success !')
  48. self.__connected = True
  49. except socket.error as e:
  50. logger.error(f'Connect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] failed:{e} !!!')
  51. self.__connected = False
  52. self.__reconnect()
  53. def __reconnect(self):
  54. while True:
  55. try:
  56. self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  57. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  58. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客户端开启心跳维护
  59. self.__sock.connect((self.__ip, self.__port))
  60. self.__connected = True
  61. logger.info(f'Reconnect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] success !')
  62. break
  63. except Exception as e:
  64. logger.info(f'Reconnect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] failed:{e} !!! Continue reconnect in 5s..')
  65. self.__connected = False
  66. time.sleep(5)
  67. def close(self):
  68. """Close the connection with the TCP Slave"""
  69. if self.__sock:
  70. self.__sock.close()
  71. self.__stopped = True
  72. self.__sock = None
  73. self.__connected = False
  74. return None
  75. def get_name(self):
  76. return self.name
  77. def is_connected(self):
  78. return self.__connected
  79. def send_command(self, command_list):
  80. pass
  81. def command_polling(self):
  82. pass
  83. def run(self):
  84. self.__connect()
  85. self.__connected = True
  86. # dissolved_oxygen = bytes.fromhex('01 03 15 CF 00 05 B1 FA') # 读溶解氧发送指令,接收数据:
  87. # temperature = bytes.fromhex('01 03 15 4A 00 05 A0 13') # 读温度发送指令,接收数据:01 03 0A 41 C0 BC 1C 00 00 00 01 00 01 61 42
  88. # salinity = bytes.fromhex('01 03 15 97 00 05 30 29') # 读盐度发送指令,接收数据:01 03 0A 00 00 00 00 00 00 00 61 00 0C 75 6D
  89. # PH = bytes.fromhex('01 03 15 BA 00 05 A0 20') # 读 PH 发送指令,接收数据:01 03 0A 40 F9 2B 79 00 00 00 91 00 11 72 BB
  90. # chlorophyll = bytes.fromhex('01 03 16 A8 00 05 00 61') # 读叶绿素发送指令,接收数据:01 03 0A 00 00 00 00 00 00 01 01 00 33 34 9F
  91. # depth = bytes.fromhex('01 03 15 66 00 05 61 DA') # 读深度发送指令,接收数据:01 03 0A 42 08 1B F4 00 00 00 26 00 05 34 D0
  92. # 获取需要读取的参数的相关信息
  93. param_list = self._storage.get_in_situ_command()
  94. depth_index = None
  95. depth = ""
  96. # ①判断参数中是否有 深度, ②处理 寄存器偏移量 的位置
  97. for each in param_list:
  98. self._param_id[each["parameter_id"]] = each["name"]
  99. if each["name"] == "深度":
  100. crc_check = crc16Add(each['station_code'] + each["function_code"] + dec_to_hex(each["address"]) + "0005")
  101. depth = bytes.fromhex(crc_check)
  102. depth_index = param_list.index(each)
  103. if depth_index is not None:
  104. param_list.pop(depth_index)
  105. instruct_list = []
  106. if len(depth) > 0:
  107. for each in param_list:
  108. crc_check = crc16Add(each['station_code'] + each["function_code"] + dec_to_hex(each["address"]) + "0005")
  109. instruct_list.append(depth)
  110. instruct_list.append(bytes.fromhex(crc_check))
  111. instruct_list.append(depth)
  112. else:
  113. for each in param_list:
  114. crc_check = crc16Add(each['station_code'] + each["function_code"] + dec_to_hex(each["address"]) + "0005")
  115. instruct_list.append(bytes.fromhex(crc_check))
  116. self._len_param = len(instruct_list)
  117. # 创建接收线程
  118. threading.Thread(target=self.SocketReceive, args=(self.__sock,)).start()
  119. # 循环发送指令
  120. while 1:
  121. time.sleep(0.5)
  122. if not self.__connected:
  123. continue
  124. try:
  125. for i in instruct_list:
  126. # self.__sock.send(instruct_list[sendFlag])\
  127. time.sleep(1)
  128. self.__sock.send(i)
  129. time.sleep(1)
  130. self.__sock.send(i)
  131. except Exception as e:
  132. self.__connected = False
  133. self.__reconnect()
  134. threading.Thread(target=self.SocketReceive, args=(self.__sock,)).start()
  135. if self.__stopped:
  136. break
  137. def save_format_data(self, t, name):
  138. data = {}
  139. for index in self.__data_point_config:
  140. if index["io_point_name"] == name:
  141. data = {'c' + str(index['serial_number']): format_value(index, t)}
  142. self.__storager.real_time_data_storage(data)
  143. def SocketReceive(self, clientSocket):
  144. global sendFlag
  145. ''' Socket 接收线程。'''
  146. while 1:
  147. time.sleep(0.2)
  148. try:
  149. recvData = clientSocket.recv(1024)
  150. except Exception as e:
  151. logger.info(f"Socket receive error:{e}")
  152. break
  153. length = len(recvData)
  154. if length == 15:
  155. fmt = str(length) + 'B'
  156. res = struct.unpack(fmt, recvData)
  157. t = int_to_hex(res[3], res[4], res[5], res[6])
  158. logger.info(f" {self._param_id[res[12]]}, t= {t}")
  159. self.save_format_data(t, self._param_id[res[12]])
  160. else:
  161. logger.info(f"读取错误:{recvData},跳过。")
  162. # if sendFlag == self._len_param - 1:
  163. # logger.info("-------------------")
  164. # sendFlag = 0
  165. # else:
  166. # sendFlag = sendFlag + 1
  167. clientSocket.close()
  168. logger.info("Client closed.")
  169. def int_to_hex(a1, a2, b1, b2):
  170. t1 = hex(a1 * 256 + a2)[2:]
  171. t2 = hex(b1 * 256 + b2)[2:]
  172. if len(t1) != 4: t1 = (4 - len(t1)) * '0' + t1
  173. if len(t2) != 4: t2 = (4 - len(t2)) * '0' + t2
  174. t = t1 + t2
  175. t = struct.unpack('>f', binascii.unhexlify(t.replace(' ', '')))[0]
  176. return t
  177. def dec_to_hex(num):
  178. """
  179. 十进制转十六进制
  180. """
  181. t = hex(num)
  182. t = (6 - len(t)) * "0" + t[2:]
  183. return t
  184. # CRC16-MODBUS
  185. def crc16Add(read):
  186. """
  187. 生成CRC16校验位
  188. """
  189. crc16 = crcmod.mkCrcFun(0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000)
  190. data = read.replace(" ", "")
  191. read_crcout = hex(crc16(unhexlify(data))).upper()
  192. str_list = list(read_crcout)
  193. if len(str_list) < 6:
  194. str_list.insert(2, '0' * (6 - len(str_list))) # 位数不足补0
  195. crc_data = "".join(str_list)
  196. read = read.strip() + '' + crc_data[4:] + '' + crc_data[2:4]
  197. return read