tcp_connector.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. #-*-coding=utf-8-*-
  2. """
  3. @Date :2021/5/21/00219:10:57
  4. @Desc :
  5. """
  6. import json
  7. import time
  8. import threading
  9. import socket
  10. import queue
  11. from log import OutPutLog
  12. from connector import Connector
  13. from event_storage import EventStorage
  14. from datetime import datetime
  15. class TcpConnector(Connector, threading.Thread):
  16. def __init__(self, name, config, converter):
  17. super().__init__()
  18. self._log = OutPutLog()
  19. self.__sock = None
  20. self.__connected = False
  21. self.__stopped = False
  22. self.__size = 1024
  23. self.__ip = config['ip']
  24. self.__port = config['port']
  25. self.__converter = converter
  26. self.__storager = EventStorage()
  27. self.__save_frequency = config['save_frequency']
  28. self.__command_queue = queue.Queue(50)
  29. self.setDaemon(True)
  30. self.setName(name)
  31. self.__last_seve_time = 0
  32. self.__data_point_config = self.__storager.get_station_info(name)
  33. self.__command = self.__storager.get_command_info(name) # [{'command': '[{"size": 15, "status_command": "483a01530000000000000000d64544"}]'}]
  34. def open(self):
  35. self.__stopped = False
  36. self.start()
  37. def run(self):
  38. self.__connect()
  39. # self.__connected = True
  40. while True:
  41. time.sleep(1)
  42. print(datetime.now(), self.__command)
  43. if len(self.__command) > 0 and isinstance(self.__command, list):
  44. command_list = json.loads(self.__command[0]['command'])
  45. self.command_polling2(command_list)
  46. else:
  47. self.command_polling()
  48. if self.__stopped:
  49. break
  50. # ����socket����
  51. def __connect(self):
  52. if self.__sock:
  53. self.close()
  54. self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  55. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # �������ñ��ص�ַ�Ͷ˿�
  56. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # �ڿͻ��˿�������ά��
  57. self.__sock.settimeout(1) # ���ó�ʱʱ��3mins
  58. try:
  59. self.__sock.connect((self.__ip, self.__port))
  60. self.__connected = True
  61. print(datetime.now(), self.__ip, self.__port, "connect success!")
  62. except Exception as e:
  63. self.__connected = False
  64. self.__reconnect()
  65. print(datetime.now(), self.__ip, self.__port, "connect failed")
  66. def __reconnect(self):
  67. while True:
  68. try:
  69. self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  70. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  71. self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # �ڿͻ��˿�������ά��
  72. self.__sock.settimeout(1) # ���ó�ʱʱ��3mins
  73. self.__sock.connect((self.__ip, self.__port))
  74. self.__connected = True
  75. print(datetime.now(), self.__ip, self.__port, "reconnect success!")
  76. break
  77. except Exception as e:
  78. print(datetime.now(), self.__ip, self.__port, "reconnect failed")
  79. self.__connected = False
  80. time.sleep(5)
  81. def close(self):
  82. """Close the connection with the TCP Slave"""
  83. if self.__sock:
  84. self.__sock.close()
  85. self.__stopped = True
  86. self.__sock = None
  87. self.__connected = False
  88. return None
  89. def get_name(self):
  90. return self.name
  91. def is_connected(self):
  92. return self.__connected
  93. """
  94. def send_command(self, data):
  95. if self.__sock:
  96. try:
  97. self.__sock.send(data.encode(encoding='utf-8'))
  98. print("--------------------------[tcp_connector]send_command: ", self.__ip,data.encode(encoding='utf-8'))
  99. except Exception as e:
  100. pass
  101. """
  102. def send_command(self, data):
  103. if self.__sock:
  104. # print("**************************", self.__ip, type(data),data)
  105. # send = True
  106. # while send:
  107. if self.__connected:
  108. try:
  109. if isinstance(data, str):
  110. restult_command = self.__sock.send(data.encode(encoding='utf-8'))
  111. elif isinstance(data, list):
  112. restult_command = self.__sock.send(data[1])
  113. if data[0] == "read_status":
  114. return_command = ""
  115. while len(return_command) != 15:
  116. return_command = self.__sock.recv(15)
  117. return return_command
  118. elif isinstance(data, dict):
  119. command = data["command"]
  120. size = data["size"] # ���յ�ָ��ij���
  121. try:
  122. self.__sock.send(bytes.fromhex(command))
  123. return_command = ""
  124. while len(return_command) != size:
  125. return_command = self.__sock.recv(size)
  126. # return_command = b'\xaa\x88\x00\x00\x01\x00U'
  127. return return_command
  128. except:
  129. self.__connected = False
  130. return None
  131. # send = False
  132. except socket.error :
  133. print(datetime.now(), self.__ip, "\r\nsocket error,do reconnect ")
  134. time.sleep(3)
  135. self.__connected = False
  136. self.__connect()
  137. # send = True
  138. except Exception as e:
  139. print(datetime.now(), self.__ip, "[tcp_connector]send_command error: ", e)
  140. self.__connected = False
  141. # send = False
  142. else:
  143. self.__reconnect()
  144. return "connect failed"
  145. def command_polling(self):
  146. if self.__connected:
  147. try:
  148. time.sleep(0.2)
  149. data = self.__sock.recv(self.__size)
  150. data = self.__converter.convert(self.__data_point_config, data)
  151. # print(data)
  152. if data:
  153. if data != "error" and data != 'pass':
  154. self.__storager.real_time_data_storage(data)
  155. except Exception as e:
  156. time.sleep(5)
  157. self.__reconnect()
  158. else:
  159. self.__reconnect()
  160. def command_polling2(self, command_list):
  161. # command_list = [{'size': 15, 'status_command': '483a01530000000000000000d64544'}]
  162. if self.__connected:
  163. for i in range(len(command_list)):
  164. command_item = command_list[i]
  165. if not self.__command_queue.empty():
  166. write_command = self.__command_queue.get() # �����������
  167. try:
  168. res = self.send_command(command=write_command)
  169. print(res)
  170. except Exception as e:
  171. print("mofbus_rtu,write[ERROR]:" + str(e))
  172. # if not res:
  173. # # ���ݣ���ѯԤ����ʱ��ʣ�����ʱ�䣩�����ط�����
  174. # sent_times = 1
  175. # while sent_times < resend_times:
  176. # res = self.exec_command(write_command)
  177. # sent_times += 1
  178. # if res:
  179. # break
  180. else:
  181. try:
  182. time.sleep(0.2)
  183. if isinstance(command_item, dict) and "status_command" in command_item.keys():
  184. # �鿴ˮ����̨�Ƶ�״̬
  185. status_command = command_item["status_command"]
  186. size = command_item["size"]
  187. result = self.send_command({"size": size, "command": status_command})
  188. if result:
  189. # print("result = ", result)
  190. format_data = self.__converter.convert(self.__data_point_config, result)
  191. if format_data:
  192. if format_data != "error" and format_data != 'pass':
  193. # ��redis�洢����
  194. self.__storager.real_time_data_storage(format_data)
  195. status = 1
  196. while status:
  197. try:
  198. self.__sock.recv(size)
  199. self.send_command(close_query_status)
  200. except:
  201. status = 0
  202. except Exception as e:
  203. print(datetime.now(), " [command_polling2] ", self.__ip, e)
  204. time.sleep(5)
  205. # self.__reconnect()
  206. else:
  207. self.__reconnect()