shuizhi_tcp_connector.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. """
  2. @Date :2021/5/21/00219:10:57
  3. @Desc : 原本用于insitu水质传感器的读取和解析,目前此连接器未启动,insitu读取和解析均为mudbus_rtu_over_tcp。
  4. """
  5. import binascii
  6. import threading
  7. import time
  8. import struct
  9. import socket
  10. from connector import Connector
  11. from event_storage import EventStorage
  12. # from logging_config import shuizhi_converter as logger
  13. from binascii import *
  14. from crcmod import *
  15. sendFlag = 0
  16. class ShuizhiTcpConnector(Connector, threading.Thread):
  17. def __init__(self, name, config, converter):
  18. super().__init__()
  19. self._param_id = {}
  20. self._len_param = None
  21. self.__sock = None
  22. self.__connected = False
  23. self.__stopped = False
  24. self.__size = 1024
  25. self.__ip = config['ip']
  26. self.__port = config['port']
  27. self.__converter = converter
  28. self.__storager = EventStorage()
  29. self.__save_frequency = config['save_frequency']
  30. self.setName(name)
  31. self.__last_seve_time = 0
  32. self.__data_point_config = self.__storager.get_station_info(name)
  33. self._storage = EventStorage()
  34. def open(self):
  35. self.__stopped = False
  36. self.start()
  37. # 建立socket连接
  38. def __connect(self):
  39. if self.__sock:
  40. self.close()
  41. self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  42. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  43. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客户端开启心跳维护
  44. try:
  45. self.__sock.connect((self.__ip, self.__port))
  46. logger.info(f'Connect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] success !')
  47. self.__connected = True
  48. except socket.error as e:
  49. logger.error(f'Connect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] failed:{e} !!!')
  50. self.__connected = False
  51. self.__reconnect()
  52. def __reconnect(self):
  53. while True:
  54. try:
  55. self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  56. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  57. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客户端开启心跳维护
  58. self.__sock.connect((self.__ip, self.__port))
  59. self.__connected = True
  60. logger.info(f'Reconnect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] success !')
  61. break
  62. except Exception as e:
  63. logger.info(f'Reconnect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] failed:{e} !!! Continue reconnect in 5s..')
  64. self.__connected = False
  65. time.sleep(5)
  66. def close(self):
  67. """Close the connection with the TCP Slave"""
  68. if self.__sock:
  69. self.__sock.close()
  70. self.__stopped = True
  71. self.__sock = None
  72. self.__connected = False
  73. return None
  74. def get_name(self):
  75. return self.name
  76. def is_connected(self):
  77. return self.__connected
  78. def send_command(self, command_list):
  79. pass
  80. def command_polling(self):
  81. pass
  82. def run(self):
  83. self.__connect()
  84. self.__connected = True
  85. # dissolved_oxygen = bytes.fromhex('01 03 15 CF 00 05 B1 FA') # 读溶解氧发送指令,接收数据:
  86. # temperature = bytes.fromhex('01 03 15 4A 00 05 A0 13') # 读温度发送指令,接收数据:01 03 0A 41 C0 BC 1C 00 00 00 01 00 01 61 42
  87. # salinity = bytes.fromhex('01 03 15 97 00 05 30 29') # 读盐度发送指令,接收数据:01 03 0A 00 00 00 00 00 00 00 61 00 0C 75 6D
  88. # PH = bytes.fromhex('01 03 15 BA 00 05 A0 20') # 读 PH 发送指令,接收数据:01 03 0A 40 F9 2B 79 00 00 00 91 00 11 72 BB
  89. # chlorophyll = bytes.fromhex('01 03 16 A8 00 05 00 61') # 读叶绿素发送指令,接收数据:01 03 0A 00 00 00 00 00 00 01 01 00 33 34 9F
  90. # depth = bytes.fromhex('01 03 15 66 00 05 61 DA') # 读深度发送指令,接收数据:01 03 0A 42 08 1B F4 00 00 00 26 00 05 34 D0
  91. # 获取需要读取的参数的相关信息
  92. param_list = self._storage.get_in_situ_command()
  93. depth_index = None
  94. depth = ""
  95. # ①判断参数中是否有 深度, ②处理 寄存器偏移量 的位置
  96. for each in param_list:
  97. self._param_id[each["parameter_id"]] = each["name"]
  98. if each["name"] == "深度":
  99. crc_check = crc16Add(each['station_code'] + each["function_code"] + dec_to_hex(each["address"]) + "0005")
  100. depth = bytes.fromhex(crc_check)
  101. depth_index = param_list.index(each)
  102. if depth_index is not None:
  103. param_list.pop(depth_index)
  104. instruct_list = []
  105. if len(depth) > 0:
  106. for each in param_list:
  107. crc_check = crc16Add(each['station_code'] + each["function_code"] + dec_to_hex(each["address"]) + "0005")
  108. instruct_list.append(depth)
  109. instruct_list.append(bytes.fromhex(crc_check))
  110. instruct_list.append(depth)
  111. else:
  112. for each in param_list:
  113. crc_check = crc16Add(each['station_code'] + each["function_code"] + dec_to_hex(each["address"]) + "0005")
  114. instruct_list.append(bytes.fromhex(crc_check))
  115. self._len_param = len(instruct_list)
  116. # 创建接收线程
  117. threading.Thread(target=self.SocketReceive, args=(self.__sock,)).start()
  118. # 循环发送指令
  119. while 1:
  120. time.sleep(0.5)
  121. if not self.__connected:
  122. continue
  123. try:
  124. for i in instruct_list:
  125. # self.__sock.send(instruct_list[sendFlag])\
  126. time.sleep(1)
  127. self.__sock.send(i)
  128. time.sleep(1)
  129. self.__sock.send(i)
  130. except Exception as e:
  131. self.__connected = False
  132. self.__reconnect()
  133. threading.Thread(target=self.SocketReceive, args=(self.__sock,)).start()
  134. if self.__stopped:
  135. break
  136. def save_format_data(self, t, name):
  137. data = {}
  138. for index in self.__data_point_config:
  139. if index["io_point_name"] == name:
  140. if index['divisor'] is not None:
  141. t = t / index['divisor']
  142. if index['offset'] is not None:
  143. t = t - index['offset']
  144. data = {'c' + str(index['serial_number']): t}
  145. self.__storager.real_time_data_storage(data)
  146. def SocketReceive(self, clientSocket):
  147. global sendFlag
  148. ''' Socket 接收线程。'''
  149. while 1:
  150. time.sleep(0.2)
  151. try:
  152. recvData = clientSocket.recv(1024)
  153. except Exception as e:
  154. logger.info(f"Socket receive error:{e}")
  155. break
  156. length = len(recvData)
  157. if length == 15:
  158. fmt = str(length) + 'B'
  159. res = struct.unpack(fmt, recvData)
  160. t = int_to_hex(res[3], res[4], res[5], res[6])
  161. logger.info(f" {self._param_id[res[12]]}, t= {t}")
  162. self.save_format_data(t, self._param_id[res[12]])
  163. else:
  164. logger.info(f"读取错误:{recvData},跳过。")
  165. # if sendFlag == self._len_param - 1:
  166. # logger.info("-------------------")
  167. # sendFlag = 0
  168. # else:
  169. # sendFlag = sendFlag + 1
  170. clientSocket.close()
  171. logger.info("Client closed.")
  172. def int_to_hex(a1, a2, b1, b2):
  173. t1 = hex(a1 * 256 + a2)[2:]
  174. t2 = hex(b1 * 256 + b2)[2:]
  175. if len(t1) != 4: t1 = (4 - len(t1)) * '0' + t1
  176. if len(t2) != 4: t2 = (4 - len(t2)) * '0' + t2
  177. t = t1 + t2
  178. t = struct.unpack('>f', binascii.unhexlify(t.replace(' ', '')))[0]
  179. return t
  180. def dec_to_hex(num):
  181. """
  182. 十进制转十六进制
  183. """
  184. t = hex(num)
  185. t = (6 - len(t)) * "0" + t[2:]
  186. return t
  187. # CRC16-MODBUS
  188. def crc16Add(read):
  189. """
  190. 生成CRC16校验位
  191. """
  192. crc16 = crcmod.mkCrcFun(0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000)
  193. data = read.replace(" ", "")
  194. read_crcout = hex(crc16(unhexlify(data))).upper()
  195. str_list = list(read_crcout)
  196. if len(str_list) < 6:
  197. str_list.insert(2, '0' * (6 - len(str_list))) # 位数不足补0
  198. crc_data = "".join(str_list)
  199. read = read.strip() + '' + crc_data[4:] + '' + crc_data[2:4]
  200. return read