tcp_connector.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. """
  2. @Date :2021/5/21/00219:10:57
  3. @Desc :
  4. """
  5. import time
  6. import threading
  7. import struct
  8. import socket
  9. import queue
  10. import traceback
  11. from log import Log
  12. from connector import Connector
  13. from event_storage import EventStorage
  14. class TcpConnector(Connector, threading.Thread):
  15. def __init__(self, name, config, converter):
  16. super().__init__()
  17. self.__log = Log()
  18. self.__sock = None
  19. self.__connected = False
  20. self.__stopped = False
  21. self.__size = 1024
  22. self.__ip = config['ip']
  23. self.__port = config['port']
  24. self.__converter = converter
  25. self.__storager = EventStorage()
  26. self.__save_frequency = config['save_frequency']
  27. self.__command_queue = queue.Queue(50)
  28. self.setName(name)
  29. self.__last_seve_time = 0
  30. self.__data_point_config = self.__storager.get_station_info(name)
  31. def open(self):
  32. self.__stopped = False
  33. self.start()
  34. def run(self):
  35. self.__connect()
  36. self.__connected = True
  37. while True:
  38. time.sleep(1)
  39. self.command_polling()
  40. if self.__stopped:
  41. break
  42. # 建立socket连接
  43. def __connect(self):
  44. if self.__sock:
  45. self.close()
  46. self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  47. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 允许重用本地地址和端口
  48. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客户端开启心跳维护
  49. self.__sock.settimeout(180) # 设置超时时间3mins
  50. try:
  51. self.__sock.connect((self.__ip, self.__port))
  52. self.__log.info(f'Connect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] success !')
  53. self.__connected = True
  54. except Exception as e:
  55. self.__log.info(f'Connect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] failed:{e} !!!')
  56. self.__connected = False
  57. self.__reconnect()
  58. def __reconnect(self):
  59. while True:
  60. try:
  61. self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  62. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  63. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客户端开启心跳维护
  64. self.__sock.settimeout(180) # 设置超时时间3mins
  65. self.__sock.connect((self.__ip, self.__port))
  66. self.__connected = True
  67. self.__log.info(f'Reconnect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] success !')
  68. break
  69. except Exception as e:
  70. self.__log.info(f'Reconnect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] failed:{e} !!! Continue reconnect in 5s..')
  71. self.__connected = False
  72. time.sleep(5)
  73. def close(self):
  74. """Close the connection with the TCP Slave"""
  75. if self.__sock:
  76. self.__sock.close()
  77. self.__stopped = True
  78. self.__sock = None
  79. self.__connected = False
  80. return None
  81. def get_name(self):
  82. return self.name
  83. def is_connected(self):
  84. return self.__connected
  85. def send_command(self, data):
  86. if self.__sock:
  87. try:
  88. self.__sock.send(data.encode(encoding='utf-8'))
  89. except Exception as e:
  90. self.__log.info(f'Send command to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] error:{e}')
  91. def command_polling(self):
  92. if self.__connected:
  93. try:
  94. time.sleep(0.2)
  95. data = self.__sock.recv(self.__size)
  96. data = self.__converter.convert(self.__data_point_config, data)
  97. # print(data)
  98. if data:
  99. if data != "error" and data != 'pass':
  100. self.__storager.real_time_data_storage(data)
  101. except Exception as e:
  102. self.__log.error(f'Other error occur [{self.get_name()}]:[{self.__ip}]:[{self.__port}]:{e}')
  103. time.sleep(5)
  104. self.__reconnect()
  105. else:
  106. self.__reconnect()