#-*-coding=utf-8-*- """ @Date :2021/5/21/00219:10:57 @Desc : """ import json import time import threading import socket import queue from log import OutPutLog from connector import Connector from event_storage import EventStorage from datetime import datetime class TcpConnector(Connector, threading.Thread): def __init__(self, name, config, converter): super().__init__() self._log = OutPutLog() self.__sock = None self.__connected = False self.__stopped = False self.__size = 1024 self.__ip = config['ip'] self.__port = config['port'] self.__converter = converter self.__storager = EventStorage() self.__save_frequency = config['save_frequency'] self.__command_queue = queue.Queue(50) self.setDaemon(True) self.setName(name) self.__last_seve_time = 0 self.__data_point_config = self.__storager.get_station_info(name) self.__command = self.__storager.get_command_info(name) # [{'command': '[{"size": 15, "status_command": "483a01530000000000000000d64544"}]'}] def open(self): self.__stopped = False self.start() def run(self): self.__connect() # self.__connected = True while True: time.sleep(1) print(datetime.now(), self.__command) if len(self.__command) > 0 and isinstance(self.__command, list): command_list = json.loads(self.__command[0]['command']) self.command_polling2(command_list) else: self.command_polling() if self.__stopped: break # ����socket���� def __connect(self): if self.__sock: self.close() self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # �������ñ��ص�ַ�Ͷ˿� self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # �ڿͻ��˿�������ά�� self.__sock.settimeout(1) # ���ó�ʱʱ��3mins try: self.__sock.connect((self.__ip, self.__port)) self.__connected = True print(datetime.now(), self.__ip, self.__port, "connect success!") except Exception as e: self.__connected = False self.__reconnect() print(datetime.now(), self.__ip, self.__port, "connect failed") def __reconnect(self): while True: try: self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # �ڿͻ��˿�������ά�� self.__sock.settimeout(1) # ���ó�ʱʱ��3mins self.__sock.connect((self.__ip, self.__port)) self.__connected = True print(datetime.now(), self.__ip, self.__port, "reconnect success!") break except Exception as e: print(datetime.now(), self.__ip, self.__port, "reconnect failed") self.__connected = False time.sleep(5) def close(self): """Close the connection with the TCP Slave""" if self.__sock: self.__sock.close() self.__stopped = True self.__sock = None self.__connected = False return None def get_name(self): return self.name def is_connected(self): return self.__connected """ def send_command(self, data): if self.__sock: try: self.__sock.send(data.encode(encoding='utf-8')) print("--------------------------[tcp_connector]send_command: ", self.__ip,data.encode(encoding='utf-8')) except Exception as e: pass """ def send_command(self, data): if self.__sock: # print("**************************", self.__ip, type(data),data) # send = True # while send: if self.__connected: try: if isinstance(data, str): restult_command = self.__sock.send(data.encode(encoding='utf-8')) elif isinstance(data, list): restult_command = self.__sock.send(data[1]) if data[0] == "read_status": return_command = "" while len(return_command) != 15: return_command = self.__sock.recv(15) return return_command elif isinstance(data, dict): command = data["command"] size = data["size"] # ���յ�ָ��ij��� try: self.__sock.send(bytes.fromhex(command)) return_command = "" while len(return_command) != size: return_command = self.__sock.recv(size) # return_command = b'\xaa\x88\x00\x00\x01\x00U' return return_command except: self.__connected = False return None # send = False except socket.error : print(datetime.now(), self.__ip, "\r\nsocket error,do reconnect ") time.sleep(3) self.__connected = False self.__connect() # send = True except Exception as e: print(datetime.now(), self.__ip, "[tcp_connector]send_command error: ", e) self.__connected = False # send = False else: self.__reconnect() return "connect failed" def command_polling(self): if self.__connected: try: time.sleep(0.2) data = self.__sock.recv(self.__size) data = self.__converter.convert(self.__data_point_config, data) # print(data) if data: if data != "error" and data != 'pass': self.__storager.real_time_data_storage(data) except Exception as e: time.sleep(5) self.__reconnect() else: self.__reconnect() def command_polling2(self, command_list): # command_list = [{'size': 15, 'status_command': '483a01530000000000000000d64544'}] if self.__connected: for i in range(len(command_list)): command_item = command_list[i] if not self.__command_queue.empty(): write_command = self.__command_queue.get() # д������������ try: res = self.send_command(command=write_command) print(res) except Exception as e: print("mofbus_rtu,write[ERROR]:" + str(e)) # if not res: # # ���ݣ���ѯԤ����ʱ��ʣ�����ʱ�䣩�����ط����� # sent_times = 1 # while sent_times < resend_times: # res = self.exec_command(write_command) # sent_times += 1 # if res: # break else: try: time.sleep(0.2) if isinstance(command_item, dict) and "status_command" in command_item.keys(): # �鿴ˮ����̨�Ƶ�״̬ status_command = command_item["status_command"] size = command_item["size"] result = self.send_command({"size": size, "command": status_command}) if result: # print("result = ", result) format_data = self.__converter.convert(self.__data_point_config, result) if format_data: if format_data != "error" and format_data != 'pass': # ��redis�洢���� self.__storager.real_time_data_storage(format_data) status = 1 while status: try: self.__sock.recv(size) self.send_command(close_query_status) except: status = 0 except Exception as e: print(datetime.now(), " [command_polling2] ", self.__ip, e) time.sleep(5) # self.__reconnect() else: self.__reconnect()