123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227 |
- #-*-coding=utf-8-*-
- """
- @Date :2021/5/21/00219:10:57
- @Desc :
- """
- import json
- import time
- import threading
- import socket
- import queue
- from log import OutPutLog
- from connector import Connector
- from event_storage import EventStorage
- from datetime import datetime
- class TcpConnector(Connector, threading.Thread):
- def __init__(self, name, config, converter):
- super().__init__()
- self._log = OutPutLog()
- self.__sock = None
- self.__connected = False
- self.__stopped = False
- self.__size = 1024
- self.__ip = config['ip']
- self.__port = config['port']
- self.__converter = converter
- self.__storager = EventStorage()
- self.__save_frequency = config['save_frequency']
- self.__command_queue = queue.Queue(50)
- self.setDaemon(True)
- self.setName(name)
- self.__last_seve_time = 0
- self.__data_point_config = self.__storager.get_station_info(name)
- self.__command = self.__storager.get_command_info(name) # [{'command': '[{"size": 15, "status_command": "483a01530000000000000000d64544"}]'}]
- def open(self):
- self.__stopped = False
- self.start()
- def run(self):
- self.__connect()
- # self.__connected = True
- while True:
- time.sleep(1)
- print(datetime.now(), self.__command)
- if len(self.__command) > 0 and isinstance(self.__command, list):
- command_list = json.loads(self.__command[0]['command'])
- self.command_polling2(command_list)
- else:
- self.command_polling()
- if self.__stopped:
- break
- # ����socket����
- def __connect(self):
- if self.__sock:
- self.close()
- self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # �������ñ��ص�ַ�Ͷ˿�
- self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # �ڿͻ��˿�������ά��
- self.__sock.settimeout(1) # ���ó�ʱʱ��3mins
- try:
- self.__sock.connect((self.__ip, self.__port))
- self.__connected = True
- print(datetime.now(), self.__ip, self.__port, "connect success!")
- except Exception as e:
- self.__connected = False
- self.__reconnect()
- print(datetime.now(), self.__ip, self.__port, "connect failed")
- def __reconnect(self):
- while True:
- try:
- self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # �ڿͻ��˿�������ά��
- self.__sock.settimeout(1) # ���ó�ʱʱ��3mins
- self.__sock.connect((self.__ip, self.__port))
- self.__connected = True
- print(datetime.now(), self.__ip, self.__port, "reconnect success!")
- break
- except Exception as e:
- print(datetime.now(), self.__ip, self.__port, "reconnect failed")
- self.__connected = False
- time.sleep(5)
- def close(self):
- """Close the connection with the TCP Slave"""
- if self.__sock:
- self.__sock.close()
- self.__stopped = True
- self.__sock = None
- self.__connected = False
- return None
- def get_name(self):
- return self.name
- def is_connected(self):
- return self.__connected
- """
- def send_command(self, data):
- if self.__sock:
- try:
- self.__sock.send(data.encode(encoding='utf-8'))
- print("--------------------------[tcp_connector]send_command: ", self.__ip,data.encode(encoding='utf-8'))
- except Exception as e:
- pass
- """
- def send_command(self, data):
- if self.__sock:
- # print("**************************", self.__ip, type(data),data)
- # send = True
- # while send:
- if self.__connected:
- try:
- if isinstance(data, str):
- restult_command = self.__sock.send(data.encode(encoding='utf-8'))
- elif isinstance(data, list):
- restult_command = self.__sock.send(data[1])
- if data[0] == "read_status":
- return_command = ""
- while len(return_command) != 15:
- return_command = self.__sock.recv(15)
- return return_command
- elif isinstance(data, dict):
- command = data["command"]
- size = data["size"] # ���յ�ָ��ij���
- try:
- self.__sock.send(bytes.fromhex(command))
- return_command = ""
- while len(return_command) != size:
- return_command = self.__sock.recv(size)
- # return_command = b'\xaa\x88\x00\x00\x01\x00U'
- return return_command
- except:
- self.__connected = False
- return None
- # send = False
-
- except socket.error :
- print(datetime.now(), self.__ip, "\r\nsocket error,do reconnect ")
- time.sleep(3)
- self.__connected = False
- self.__connect()
- # send = True
- except Exception as e:
- print(datetime.now(), self.__ip, "[tcp_connector]send_command error: ", e)
- self.__connected = False
- # send = False
- else:
- self.__reconnect()
- return "connect failed"
-
- def command_polling(self):
- if self.__connected:
- try:
- time.sleep(0.2)
- data = self.__sock.recv(self.__size)
- data = self.__converter.convert(self.__data_point_config, data)
- # print(data)
- if data:
- if data != "error" and data != 'pass':
- self.__storager.real_time_data_storage(data)
- except Exception as e:
- time.sleep(5)
- self.__reconnect()
- else:
- self.__reconnect()
- def command_polling2(self, command_list):
- # command_list = [{'size': 15, 'status_command': '483a01530000000000000000d64544'}]
- if self.__connected:
- for i in range(len(command_list)):
- command_item = command_list[i]
- if not self.__command_queue.empty():
- write_command = self.__command_queue.get() # �����������
- try:
- res = self.send_command(command=write_command)
- print(res)
- except Exception as e:
- print("mofbus_rtu,write[ERROR]:" + str(e))
- # if not res:
- # # ���ݣ���ѯԤ����ʱ��ʣ�����ʱ�䣩�����ط�����
- # sent_times = 1
- # while sent_times < resend_times:
- # res = self.exec_command(write_command)
- # sent_times += 1
- # if res:
- # break
- else:
- try:
- time.sleep(0.2)
- if isinstance(command_item, dict) and "status_command" in command_item.keys():
- # �鿴ˮ����̨�Ƶ�״̬
- status_command = command_item["status_command"]
- size = command_item["size"]
- result = self.send_command({"size": size, "command": status_command})
- if result:
- # print("result = ", result)
- format_data = self.__converter.convert(self.__data_point_config, result)
- if format_data:
- if format_data != "error" and format_data != 'pass':
- # ��redis�洢����
- self.__storager.real_time_data_storage(format_data)
- status = 1
- while status:
- try:
- self.__sock.recv(size)
- self.send_command(close_query_status)
- except:
- status = 0
- except Exception as e:
- print(datetime.now(), " [command_polling2] ", self.__ip, e)
- time.sleep(5)
- # self.__reconnect()
- else:
- self.__reconnect()
|