123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 |
- """
- @Date :2021/5/21/00219:10:57
- @Desc : 原本用于insitu水质传感器的读取和解析,目前此连接器未启动,insitu读取和解析均为mudbus_rtu_over_tcp。
- """
- import binascii
- import socket
- import threading
- import time
- from binascii import *
- import struct
- from crcmod import *
- from connector import Connector
- from event_storage import EventStorage
- from logging_config import shuizhi_tcp_connector as logger
- from tools.format_value import format_value
- sendFlag = 0
- class ShuizhiTcpConnector(Connector, threading.Thread):
- def __init__(self, name, config, converter):
- super().__init__()
- self._param_id = {}
- self._len_param = None
- self.__sock = None
- self.__connected = False
- self.__stopped = False
- self.__size = 1024
- self.__ip = config['ip']
- self.__port = config['port']
- self.__converter = converter
- self.__storager = EventStorage()
- self.__save_frequency = config['save_frequency']
- self.setName(name)
- self.__last_seve_time = 0
- self.__data_point_config = self.__storager.get_station_info(name)
- self._storage = EventStorage()
- def open(self):
- self.__stopped = False
- self.start()
- # 建立socket连接
- def __connect(self):
- if self.__sock:
- self.close()
- self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客户端开启心跳维护
- try:
- self.__sock.connect((self.__ip, self.__port))
- logger.info(f'Connect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] success !')
- self.__connected = True
- except socket.error as e:
- logger.error(f'Connect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] failed:{e} !!!')
- self.__connected = False
- self.__reconnect()
- def __reconnect(self):
- while True:
- try:
- self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客户端开启心跳维护
- self.__sock.connect((self.__ip, self.__port))
- self.__connected = True
- logger.info(f'Reconnect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] success !')
- break
- except Exception as e:
- logger.info(f'Reconnect to [{self.get_name()}]:[{self.__ip}]:[{self.__port}] failed:{e} !!! Continue reconnect in 5s..')
- self.__connected = False
- time.sleep(5)
- def close(self):
- """Close the connection with the TCP Slave"""
- if self.__sock:
- self.__sock.close()
- self.__stopped = True
- self.__sock = None
- self.__connected = False
- return None
- def get_name(self):
- return self.name
- def is_connected(self):
- return self.__connected
- def send_command(self, command_list):
- pass
- def command_polling(self):
- pass
- def run(self):
- self.__connect()
- self.__connected = True
- # dissolved_oxygen = bytes.fromhex('01 03 15 CF 00 05 B1 FA') # 读溶解氧发送指令,接收数据:
- # temperature = bytes.fromhex('01 03 15 4A 00 05 A0 13') # 读温度发送指令,接收数据:01 03 0A 41 C0 BC 1C 00 00 00 01 00 01 61 42
- # salinity = bytes.fromhex('01 03 15 97 00 05 30 29') # 读盐度发送指令,接收数据:01 03 0A 00 00 00 00 00 00 00 61 00 0C 75 6D
- # PH = bytes.fromhex('01 03 15 BA 00 05 A0 20') # 读 PH 发送指令,接收数据:01 03 0A 40 F9 2B 79 00 00 00 91 00 11 72 BB
- # chlorophyll = bytes.fromhex('01 03 16 A8 00 05 00 61') # 读叶绿素发送指令,接收数据:01 03 0A 00 00 00 00 00 00 01 01 00 33 34 9F
- # depth = bytes.fromhex('01 03 15 66 00 05 61 DA') # 读深度发送指令,接收数据:01 03 0A 42 08 1B F4 00 00 00 26 00 05 34 D0
- # 获取需要读取的参数的相关信息
- param_list = self._storage.get_in_situ_command()
- depth_index = None
- depth = ""
- # ①判断参数中是否有 深度, ②处理 寄存器偏移量 的位置
- for each in param_list:
- self._param_id[each["parameter_id"]] = each["name"]
- if each["name"] == "深度":
- crc_check = crc16Add(each['station_code'] + each["function_code"] + dec_to_hex(each["address"]) + "0005")
- depth = bytes.fromhex(crc_check)
- depth_index = param_list.index(each)
- if depth_index is not None:
- param_list.pop(depth_index)
- instruct_list = []
- if len(depth) > 0:
- for each in param_list:
- crc_check = crc16Add(each['station_code'] + each["function_code"] + dec_to_hex(each["address"]) + "0005")
- instruct_list.append(depth)
- instruct_list.append(bytes.fromhex(crc_check))
- instruct_list.append(depth)
- else:
- for each in param_list:
- crc_check = crc16Add(each['station_code'] + each["function_code"] + dec_to_hex(each["address"]) + "0005")
- instruct_list.append(bytes.fromhex(crc_check))
- self._len_param = len(instruct_list)
- # 创建接收线程
- threading.Thread(target=self.SocketReceive, args=(self.__sock,)).start()
- # 循环发送指令
- while 1:
- time.sleep(0.5)
- if not self.__connected:
- continue
- try:
- for i in instruct_list:
- # self.__sock.send(instruct_list[sendFlag])\
- time.sleep(1)
- self.__sock.send(i)
- time.sleep(1)
- self.__sock.send(i)
- except Exception as e:
- self.__connected = False
- self.__reconnect()
- threading.Thread(target=self.SocketReceive, args=(self.__sock,)).start()
- if self.__stopped:
- break
- def save_format_data(self, t, name):
- data = {}
- for index in self.__data_point_config:
- if index["io_point_name"] == name:
- data = {'c' + str(index['serial_number']): format_value(index, t)}
- self.__storager.real_time_data_storage(data)
- def SocketReceive(self, clientSocket):
- global sendFlag
- ''' Socket 接收线程。'''
- while 1:
- time.sleep(0.2)
- try:
- recvData = clientSocket.recv(1024)
- except Exception as e:
- logger.info(f"Socket receive error:{e}")
- break
- length = len(recvData)
- if length == 15:
- fmt = str(length) + 'B'
- res = struct.unpack(fmt, recvData)
- t = int_to_hex(res[3], res[4], res[5], res[6])
- logger.info(f" {self._param_id[res[12]]}, t= {t}")
- self.save_format_data(t, self._param_id[res[12]])
- else:
- logger.info(f"读取错误:{recvData},跳过。")
- # if sendFlag == self._len_param - 1:
- # logger.info("-------------------")
- # sendFlag = 0
- # else:
- # sendFlag = sendFlag + 1
- clientSocket.close()
- logger.info("Client closed.")
- def int_to_hex(a1, a2, b1, b2):
- t1 = hex(a1 * 256 + a2)[2:]
- t2 = hex(b1 * 256 + b2)[2:]
- if len(t1) != 4: t1 = (4 - len(t1)) * '0' + t1
- if len(t2) != 4: t2 = (4 - len(t2)) * '0' + t2
- t = t1 + t2
- t = struct.unpack('>f', binascii.unhexlify(t.replace(' ', '')))[0]
- return t
- def dec_to_hex(num):
- """
- 十进制转十六进制
- """
- t = hex(num)
- t = (6 - len(t)) * "0" + t[2:]
- return t
- # CRC16-MODBUS
- def crc16Add(read):
- """
- 生成CRC16校验位
- """
- crc16 = crcmod.mkCrcFun(0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000)
- data = read.replace(" ", "")
- read_crcout = hex(crc16(unhexlify(data))).upper()
- str_list = list(read_crcout)
- if len(str_list) < 6:
- str_list.insert(2, '0' * (6 - len(str_list))) # 位数不足补0
- crc_data = "".join(str_list)
- read = read.strip() + '' + crc_data[4:] + '' + crc_data[2:4]
- return read
|