shuizhi_tcp_connector_old.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. """
  2. @Date :2021/5/21/00219:10:57
  3. @Desc :
  4. """
  5. import binascii
  6. import threading
  7. import time
  8. import struct
  9. import socket
  10. from connector import Connector
  11. from event_storage import EventStorage
  12. from log import Log
  13. sendFlag = 0
  14. class ShuizhiTcpConnector(Connector, threading.Thread):
  15. def __init__(self, name, config, converter):
  16. super().__init__()
  17. self.__log = Log()
  18. self.__sock = None
  19. self.__connected = False
  20. self.__stopped = False
  21. self.__size = 1024
  22. self.__ip = config['ip']
  23. self.__port = config['port']
  24. self.__converter = converter
  25. self.__storager = EventStorage()
  26. self.__save_frequency = config['save_frequency']
  27. self.setName(name)
  28. self.__last_seve_time = 0
  29. self.__data_point_config = self.__storager.get_station_info(name)
  30. # for i in self.__data_point_config:
  31. # print(i)
  32. def open(self):
  33. self.__stopped = False
  34. self.start()
  35. # 建立socket连接
  36. def __connect(self):
  37. if self.__sock:
  38. self.close()
  39. self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  40. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  41. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客户端开启心跳维护
  42. try:
  43. self.__sock.connect((self.__ip, self.__port))
  44. self.__log.info(f'Connect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] success !')
  45. self.__connected = True
  46. except socket.error as e:
  47. self.__log.info(f'Connect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] failed:{e} !!!')
  48. self.__connected = False
  49. self.__reconnect()
  50. def __reconnect(self):
  51. while True:
  52. try:
  53. self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  54. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  55. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客户端开启心跳维护
  56. self.__sock.connect((self.__ip, self.__port))
  57. self.__connected = True
  58. self.__log.info(f'Reconnect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] success !')
  59. break
  60. except Exception as e:
  61. print("Continue reconnect in 5s..")
  62. self.__log.info(
  63. f'Reconnect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] failed:{e} !!! Continue reconnect in 5s..')
  64. self.__connected = False
  65. time.sleep(5)
  66. def close(self):
  67. """Close the connection with the TCP Slave"""
  68. if self.__sock:
  69. self.__sock.close()
  70. self.__stopped = True
  71. self.__sock = None
  72. self.__connected = False
  73. return None
  74. def get_name(self):
  75. return self.name
  76. def is_connected(self):
  77. return self.__connected
  78. def send_command(self, command_list):
  79. pass
  80. def command_polling(self):
  81. pass
  82. def run(self):
  83. self.__connect()
  84. self.__connected = True
  85. # dissolved_oxygen = bytes.fromhex('01030025000AD406') # 读溶解氧发送指令,接收数据长度:25
  86. dissolved_oxygen = bytes.fromhex('010315CF0005B1FA') # 读溶解氧发送指令,接收数据长度:15
  87. temperature_salinity = bytes.fromhex('010300FF001AF431') # 读温度和盐度发送指令,接收数据长度:52 + 5 = 57
  88. PH = bytes.fromhex('010301D90002140C') # 读PH发送指令,数据接收长度:4 + 5 = 9
  89. # chlorophyll = bytes.fromhex('010316A1000411A3') # 读叶绿素发送指令,接收数据长度:8 + 5 = 13
  90. chlorophyll = bytes.fromhex('010316A80004C1A1') # 读叶绿素发送指令,接收数据长度:8 + 5 = 13
  91. # depth = bytes.fromhex('0103046F0012F4EA') # 读深度发送指令,接收数据长度:36 + 5 = 41
  92. # depth = bytes.fromhex('0103155100035016') # 读深度发送指令,接收数据长度:6 + 5 = 11
  93. depth = bytes.fromhex('010315660003E1D8') # 读深度发送指令,接收数据长度:6 + 5 = 11
  94. # 01 03 06 40 BA FD 82 00 00 67 EA 返回数据测试
  95. # 创建接收线程
  96. threading.Thread(target=self.SocketReceive, args=(self.__sock,)).start()
  97. while 1:
  98. time.sleep(0.2)
  99. if not self.__connected:
  100. continue
  101. try:
  102. if sendFlag == 0:
  103. self.__sock.send(depth)
  104. elif sendFlag == 1:
  105. self.__sock.send(dissolved_oxygen)
  106. elif sendFlag == 2:
  107. self.__sock.send(depth)
  108. elif sendFlag == 3:
  109. self.__sock.send(temperature_salinity)
  110. elif sendFlag == 4:
  111. self.__sock.send(depth)
  112. elif sendFlag == 5:
  113. self.__sock.send(PH)
  114. elif sendFlag == 6:
  115. self.__sock.send(depth)
  116. elif sendFlag == 7:
  117. self.__sock.send(chlorophyll)
  118. elif sendFlag == 8:
  119. self.__sock.send(depth)
  120. except Exception as e:
  121. self.__connected = False
  122. self.__reconnect()
  123. threading.Thread(target=self.SocketReceive, args=(self.__sock,)).start()
  124. if self.__stopped:
  125. break
  126. # 水质解析器
  127. def save_format_data(self, t, name):
  128. data = {}
  129. for index in self.__data_point_config:
  130. if index["io_point_name"] == name:
  131. if index['divisor'] is not None:
  132. t = t / index['divisor']
  133. if index['offset'] is not None:
  134. t = t - index['offset']
  135. if index['low_limit'] is not None and index['up_limit'] is not None and index['low_limit'] <= t <= \
  136. index['up_limit']:
  137. data = {'c' + str(index['serial_number']): t}
  138. self.__storager.real_time_data_storage(data)
  139. print(data)
  140. def SocketReceive(self, clientSocket):
  141. global sendFlag
  142. ''' Socket 接收线程。'''
  143. # global socket_flag, socket_msg # 通过全局变量,让外部可以控制线程的运行,也可以处理信息
  144. while 1:
  145. time.sleep(0.5)
  146. # print(sendFlag, time.time())
  147. # 深度
  148. if sendFlag == 0:
  149. try:
  150. # print(clientSocket)
  151. recvData = clientSocket.recv(1024)
  152. except Exception as e:
  153. # print("深度1", e)
  154. break
  155. length = len(recvData)
  156. fmt = str(length) + 'B'
  157. res = struct.unpack(fmt, recvData)
  158. if length == 11:
  159. # print(len(res), res)
  160. t = int_to_hex(res[3], res[4], res[5], res[6])
  161. print('----深度:', t, 'res:', len(res), 'length:', length, time.strftime('%Y-%m-%d %H:%M:%S'))
  162. self.save_format_data(t, "深度")
  163. sendFlag = 1
  164. # 溶解氧
  165. if sendFlag == 1:
  166. try:
  167. recvData = clientSocket.recv(1024)
  168. except Exception as e:
  169. break
  170. length = len(recvData)
  171. fmt = str(length) + 'B'
  172. res = struct.unpack(fmt, recvData)
  173. # print(time.strftime('%Y-%m-%d %H:%M:%S'))
  174. if length == 15:
  175. # print(len(res), res)
  176. t = int_to_hex(res[3], res[4], res[5], res[6])
  177. print('溶解氧:', t, 'res:', len(res), 'length:', length, time.strftime('%Y-%m-%d %H:%M:%S'))
  178. self.save_format_data(t, "溶解氧")
  179. sendFlag = 2
  180. # 深度
  181. if sendFlag == 2:
  182. try:
  183. recvData = clientSocket.recv(1024)
  184. except Exception as e: # 忽视掉超时
  185. break
  186. length = len(recvData)
  187. fmt = str(length) + 'B'
  188. res = struct.unpack(fmt, recvData)
  189. if length == 11:
  190. # print(len(res), res)
  191. t = int_to_hex(res[3], res[4], res[5], res[6])
  192. print('深度:', t, 'length:', length, time.strftime('%Y-%m-%d %H:%M:%S'))
  193. self.save_format_data(t, "深度")
  194. sendFlag = 3
  195. # 温度、盐度
  196. if sendFlag == 3:
  197. try:
  198. recvData = clientSocket.recv(1024)
  199. except Exception as e: # 忽视掉超时
  200. break
  201. length = len(recvData)
  202. fmt = str(length) + 'B'
  203. res = struct.unpack(fmt, recvData)
  204. if length == 57:
  205. # print(len(res), res)
  206. t = int_to_hex(res[3], res[4], res[5], res[6])
  207. print('温度:', t, 'len:', len(res), time.strftime('%Y-%m-%d %H:%M:%S'))
  208. self.save_format_data(t, "温度")
  209. q = int_to_hex(res[-6], res[-5], res[-4], res[-3])
  210. print('盐度:', q, 'len:', len(res), time.strftime('%Y-%m-%d %H:%M:%S'))
  211. self.save_format_data(q, "盐度")
  212. sendFlag = 4
  213. # 深度
  214. if sendFlag == 4:
  215. try:
  216. recvData = clientSocket.recv(1024)
  217. except Exception as e: # 忽视掉超时
  218. break
  219. length = len(recvData)
  220. fmt = str(length) + 'B'
  221. res = struct.unpack(fmt, recvData)
  222. if length == 11:
  223. t = int_to_hex(res[3], res[4], res[5], res[6])
  224. print('深度:', t, 'length:', length, time.strftime('%Y-%m-%d %H:%M:%S'))
  225. self.save_format_data(t, "深度")
  226. sendFlag = 5
  227. # PH
  228. if sendFlag == 5:
  229. try:
  230. recvData = clientSocket.recv(1024)
  231. except Exception as e: # 忽视掉超时
  232. break
  233. length = len(recvData)
  234. fmt = str(length) + 'B'
  235. res = struct.unpack(fmt, recvData)
  236. if length == 9:
  237. # print(len(res), res)
  238. t = int_to_hex(res[3], res[4], res[5], res[6])
  239. # print('PH:', t, 'len:', len(res), time.strftime('%Y-%m-%d %H:%M:%S'))
  240. self.save_format_data(t, "PH")
  241. sendFlag = 6
  242. # 深度
  243. if sendFlag == 6:
  244. try:
  245. recvData = clientSocket.recv(1024)
  246. except Exception as e: # 忽视掉超时
  247. break
  248. length = len(recvData)
  249. fmt = str(length) + 'B'
  250. res = struct.unpack(fmt, recvData)
  251. if length == 11:
  252. # print(len(res), res)
  253. t = int_to_hex(res[3], res[4], res[5], res[6])
  254. print('深度:', t, 'length:', length, time.strftime('%Y-%m-%d %H:%M:%S'))
  255. self.save_format_data(t, "深度")
  256. sendFlag = 7
  257. # 叶绿素
  258. if sendFlag == 7:
  259. try:
  260. recvData = clientSocket.recv(1024)
  261. except Exception as e: # 忽视掉超时
  262. break
  263. length = len(recvData)
  264. fmt = str(length) + 'B'
  265. res = struct.unpack(fmt, recvData)
  266. if length == 13:
  267. t = int_to_hex(res[3], res[4], res[5], res[6])
  268. # print('叶绿素:', t, 'len:', len(res), time.strftime('%Y-%m-%d %H:%M:%S'))
  269. self.save_format_data(t, "叶绿素")
  270. sendFlag = 8
  271. # 深度
  272. if sendFlag == 8:
  273. try:
  274. recvData = clientSocket.recv(1024)
  275. except Exception as e: # 忽视掉超时
  276. break
  277. length = len(recvData)
  278. fmt = str(length) + 'B'
  279. res = struct.unpack(fmt, recvData)
  280. if length == 11:
  281. # print(len(res), res)
  282. t = int_to_hex(res[3], res[4], res[5], res[6])
  283. print('深度:', t, 'len:', len(res), time.strftime('%Y-%m-%d %H:%M:%S'))
  284. self.save_format_data(t, "深度")
  285. sendFlag = 0
  286. clientSocket.close()
  287. self.__log.info("Client closed.")
  288. # print(time.strftime('%Y-%m-%d %H:%M:%S'), len(recvData))
  289. # socket_msg = recvData.decode() # 将接收到的字节数据转为 string
  290. # print("Socket receive: " + socket_msg)
  291. def int_to_hex(a1, a2, b1, b2):
  292. t1 = hex(a1 * 256 + a2)[2:]
  293. t2 = hex(b1 * 256 + b2)[2:]
  294. if len(t1) != 4: t1 = (4 - len(t1)) * '0' + t1
  295. if len(t2) != 4: t2 = (4 - len(t2)) * '0' + t2
  296. t = t1 + t2
  297. t = struct.unpack('>f', binascii.unhexlify(t.replace(' ', '')))[0]
  298. return t