tcp_connector.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. """
  2. @Date :2021/5/21/00219:10:57
  3. @Desc : 此连接器用于wxt536气象传感器、td266单点流速仪、sm140波浪传感器、adcp流速流向仪、cec26单点流速仪
  4. """
  5. import json
  6. import time
  7. import threading
  8. import socket
  9. import queue
  10. from logging_config import tcp_connector as logger
  11. from connector import Connector
  12. from event_storage import EventStorage
  13. class TcpConnector(Connector, threading.Thread):
  14. def __init__(self, name, config, converter):
  15. super().__init__()
  16. self.__sock = None
  17. self.__connected = False
  18. self.__stopped = False
  19. self.__size = 1024
  20. self.__ip = config['ip']
  21. self.__port = config['port']
  22. self.__converter = converter
  23. self.__storager = EventStorage()
  24. self.__save_frequency = config['save_frequency']
  25. self.__command_queue = queue.Queue(50)
  26. self.setName(name)
  27. self.__data_point_config = self.__storager.get_station_info(name)
  28. self.__command = self.__storager.get_command_info(name)
  29. def open(self):
  30. self.__stopped = False
  31. self.start()
  32. def run(self):
  33. self.__connect()
  34. self.__connected = True
  35. while True:
  36. if isinstance(self.__command, list):
  37. for i in self.__command:
  38. command_list = json.loads(i['command'])
  39. self.command_polling(command_list=command_list)
  40. time.sleep(1)
  41. else:
  42. self.command_polling()
  43. time.sleep(1)
  44. if self.__stopped:
  45. break
  46. # 建立socket连接
  47. def __connect(self):
  48. if self.__sock:
  49. self.__sock.close()
  50. self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  51. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 允许重用本地地址和端口
  52. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客户端开启心跳维护
  53. self.__sock.settimeout(180) # 设置超时时间3mins
  54. try:
  55. self.__sock.connect((self.__ip, self.__port))
  56. logger.info(f'Connect to [{self.name}]:[{self.__ip}]:[{self.__port}] success !')
  57. self.__connected = True
  58. except Exception as e:
  59. logger.info(f'Connect to [{self.name}]:[{self.__ip}]:[{self.__port}] failed:{e} !!!')
  60. self.__connected = False
  61. self.__reconnect()
  62. def __reconnect(self):
  63. while True:
  64. try:
  65. if self.__sock:
  66. self.__sock.close()
  67. self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  68. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  69. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客户端开启心跳维护
  70. self.__sock.settimeout(180) # 设置超时时间3mins
  71. self.__sock.connect((self.__ip, self.__port))
  72. self.__connected = True
  73. logger.info(f'Reconnect to [{self.name}]:[{self.__ip}]:[{self.__port}] success !')
  74. break
  75. except Exception as e:
  76. logger.info(f'Reconnect to [{self.name}]:[{self.__ip}]:[{self.__port}] failed:{e} !!! Continue reconnect in 5s..')
  77. self.__connected = False
  78. time.sleep(5)
  79. def close(self):
  80. """Close the connection with the TCP Slave"""
  81. if self.__sock:
  82. self.__sock.close()
  83. self.__stopped = True
  84. self.__sock = None
  85. self.__connected = False
  86. def get_name(self):
  87. return self.name
  88. def is_connected(self):
  89. return self.__connected
  90. def send_command(self, data):
  91. if self.__sock:
  92. try:
  93. self.__sock.send(data.encode(encoding='utf-8'))
  94. except Exception as e:
  95. logger.info(f'Send command to [{self.name}]:[{self.__ip}]:[{self.__port}] error:{e}')
  96. def exec_command(self, command):
  97. try:
  98. com = bytes.fromhex(command['instruct'])
  99. self.__sock.send(com)
  100. recv_data = self.__sock.recv(self.__size)
  101. return recv_data
  102. except Exception as e:
  103. logger.error(f"{self.name}: {e}")
  104. def command_polling(self, command_list=None):
  105. if command_list:
  106. try:
  107. for i in range(len(command_list)):
  108. command_item = command_list[i]
  109. recv_data = self.exec_command(command=command_item)
  110. format_data = self.__converter.convert(self.__data_point_config, recv_data)
  111. if format_data and format_data != "error" and format_data != 'pass':
  112. self.__storager.real_time_data_storage(format_data)
  113. except Exception as e:
  114. logger.error(f'Other error occur [{self.name}]:[{self.__ip}]:[{self.__port}]:{e}')
  115. time.sleep(5)
  116. self.__reconnect()
  117. else:
  118. try:
  119. recv_data = self.__sock.recv(self.__size)
  120. format_data = self.__converter.convert(self.__data_point_config, recv_data)
  121. if format_data and format_data != "error" and format_data != 'pass':
  122. self.__storager.real_time_data_storage(format_data)
  123. except socket.timeout as e:
  124. logger.error(f"{self.name}: {e}")
  125. except Exception as e:
  126. logger.error(f"{self.name}: {e}")
  127. time.sleep(5)
  128. self.__reconnect()