tcp_connector.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. """
  2. @Date :2021/5/21/00219:10:57
  3. @Desc :
  4. """
  5. import json
  6. import time
  7. import threading
  8. import socket
  9. import queue
  10. from modbus_tk import utils
  11. from logging_config import logger
  12. from connector import Connector
  13. from event_storage import EventStorage
  14. class TcpConnector(Connector, threading.Thread):
  15. def __init__(self, name, config, converter):
  16. super().__init__()
  17. self.__sock = None
  18. self.__connected = False
  19. self.__stopped = False
  20. self.__size = 1024
  21. self.__ip = config['ip']
  22. self.__port = config['port']
  23. self.__converter = converter
  24. self.__storager = EventStorage()
  25. self.__save_frequency = config['save_frequency']
  26. self.__command_queue = queue.Queue(50)
  27. self.setName(name)
  28. self.__data_point_config = self.__storager.get_station_info(name)
  29. self.__command = self.__storager.get_command_info(name)
  30. def open(self):
  31. self.__stopped = False
  32. self.start()
  33. def run(self):
  34. self.__connect()
  35. self.__connected = True
  36. while True:
  37. if isinstance(self.__command, list):
  38. for i in self.__command:
  39. command_list = json.loads(i['command'])
  40. self.command_polling(command_list=command_list)
  41. time.sleep(1)
  42. else:
  43. self.command_polling()
  44. time.sleep(1)
  45. if self.__stopped:
  46. break
  47. # 建立socket连接
  48. def __connect(self):
  49. if self.__sock:
  50. self.__sock.close()
  51. self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  52. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 允许重用本地地址和端口
  53. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客户端开启心跳维护
  54. self.__sock.settimeout(180) # 设置超时时间3mins
  55. try:
  56. self.__sock.connect((self.__ip, self.__port))
  57. logger.info(f'Connect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] success !')
  58. self.__connected = True
  59. except Exception as e:
  60. logger.info(f'Connect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] failed:{e} !!!')
  61. self.__connected = False
  62. self.__reconnect()
  63. def __reconnect(self):
  64. while True:
  65. try:
  66. if self.__sock:
  67. self.__sock.close()
  68. self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  69. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  70. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客户端开启心跳维护
  71. self.__sock.settimeout(180) # 设置超时时间3mins
  72. self.__sock.connect((self.__ip, self.__port))
  73. self.__connected = True
  74. logger.info(f'Reconnect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] success !')
  75. break
  76. except Exception as e:
  77. logger.info(f'Reconnect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] failed:{e} !!! Continue reconnect in 5s..')
  78. self.__connected = False
  79. time.sleep(5)
  80. def close(self):
  81. """Close the connection with the TCP Slave"""
  82. if self.__sock:
  83. self.__sock.close()
  84. self.__stopped = True
  85. self.__sock = None
  86. self.__connected = False
  87. def get_name(self):
  88. return self.name
  89. def is_connected(self):
  90. return self.__connected
  91. def send_command(self, data):
  92. if self.__sock:
  93. try:
  94. self.__sock.send(data.encode(encoding='utf-8'))
  95. except Exception as e:
  96. logger.info(f'Send command to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] error:{e}')
  97. def exec_command(self, command):
  98. try:
  99. com = bytes.fromhex(command['instruct'])
  100. self.__sock.send(com)
  101. recv_data = self.__sock.recv(self.__size)
  102. return recv_data
  103. except Exception as e:
  104. logger.error(f"{self.name}: {e}")
  105. def command_polling(self, command_list=None):
  106. if command_list:
  107. try:
  108. for i in range(len(command_list)):
  109. command_item = command_list[i]
  110. recv_data = self.exec_command(command=command_item)
  111. format_data = self.__converter.convert(self.__data_point_config, recv_data)
  112. if format_data and format_data != "error" and format_data != 'pass':
  113. self.__storager.real_time_data_storage(format_data)
  114. except Exception as e:
  115. logger.error(f'Other error occur [{self.get_name()}]:[{self.__ip}]:[{self.__port}]:{e}')
  116. time.sleep(5)
  117. self.__reconnect()
  118. else:
  119. try:
  120. recv_data = self.__sock.recv(self.__size)
  121. format_data = self.__converter.convert(self.__data_point_config, recv_data)
  122. if format_data and format_data != "error" and format_data != 'pass':
  123. self.__storager.real_time_data_storage(format_data)
  124. except socket.timeout as e:
  125. logger.error(f"{self.name}: {e}")
  126. except Exception as e:
  127. logger.error(f"{self.name}: {e}")
  128. time.sleep(5)
  129. self.__reconnect()