modbus_rtu_over_tcp_connector.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. """
  2. @Date :2021/5/21/00219:10:57
  3. @Desc : 目前此连接器用于的传感器有:insitu水质传感器,水质传感器,气象传感器
  4. """
  5. import json
  6. import queue
  7. import threading
  8. import time
  9. from modbus_tk import modbus_rtu_over_tcp
  10. from connector import Connector
  11. from event_storage import EventStorage
  12. from logging_config import modbus_connector as logger
  13. class ModbusRtuOverTcpConnector(Connector, threading.Thread):
  14. def __init__(self, name, config, converter):
  15. super().__init__()
  16. self._master = None
  17. self.__stopped = False
  18. self._connected = False
  19. self._ip = config['ip'] # ip
  20. self._port = config['port'] # 端口
  21. self._save_frequency = config['save_frequency'] # 数据存储时间间隔
  22. self.setDaemon(True)
  23. self.setName(name)
  24. self.__converter = converter
  25. self.__storager = EventStorage()
  26. self.__command_queue = queue.Queue(500)
  27. self.__last_save_time = 0
  28. self.__data_point_config = self.__storager.get_station_info(name)
  29. self._command = self.__storager.get_command_info(name)
  30. def open(self):
  31. self.__stopped = False
  32. self.start()
  33. def run(self):
  34. self._connect()
  35. self._connected = True
  36. while True:
  37. if isinstance(self._command, list):
  38. for i in self._command:
  39. command_list = json.loads(i['command'])
  40. self.command_polling(command_list, resend_times=5)
  41. time.sleep(1)
  42. time.sleep(1)
  43. if self.__stopped:
  44. break
  45. def _connect(self):
  46. try:
  47. self._master = modbus_rtu_over_tcp.RtuOverTcpMaster(host=self._ip, port=self._port)
  48. logger.info(f'Connect to [{self.name}]:[{self._ip}]:[{self._port}] success !')
  49. except Exception as e:
  50. logger.info(f'Connect to [{self.name}]:[{self._ip}]:[{self._port}] failed:{e} !!!')
  51. self._connected = False
  52. self._reconnect()
  53. def _reconnect(self):
  54. while True:
  55. try:
  56. self._master = modbus_rtu_over_tcp.RtuOverTcpMaster(host=self._ip, port=self._port)
  57. logger.info(f'Reconnect to [{self.name}]:[{self._ip}]:[{self._port}] success !')
  58. break
  59. except Exception as e:
  60. logger.error(f'Reconnect to [{self.name}]:[{self._ip}]:[{self._port}] failed:{e} !!! Continue reconnect in 5s..')
  61. time.sleep(5)
  62. def close(self):
  63. pass
  64. def get_name(self):
  65. return self.name
  66. def is_connected(self):
  67. return self._connected
  68. def send_command(self, command):
  69. # command = {'device_id': 1, 'start_addr': 0, 'output_value': [0, 0, 0, 0], 'function_code': 15}
  70. # print(f"[send_command] {command}")
  71. try:
  72. if isinstance(command, dict):
  73. result = self.exec_command(command)
  74. elif isinstance(command, list):
  75. for each in command:
  76. result = self.exec_command(each)
  77. except Exception as e:
  78. print(f"[ModbusTcpConnector][send_command] error: {e}")
  79. result = False
  80. # print(f"[send_command][result] {result}")
  81. return result
  82. def exec_command(self, command):
  83. if isinstance(command, str):
  84. command = json.loads(command)
  85. device_id = int(command['device_id'])
  86. function_code = int(command['function_code'])
  87. start_addr = int(command['start_addr'])
  88. if function_code in (1, 2, 3, 4):
  89. # 读寄存器
  90. length = int(command['length'])
  91. try:
  92. self._master.set_timeout(3.0) # modbus读取数据超时时间设置
  93. self._master.set_verbose(True)
  94. receive_data = self._master.execute(device_id, function_code, start_addr, length)
  95. datadict = {}
  96. for i in range(len(receive_data)):
  97. addr = start_addr + i
  98. datadict[addr] = receive_data[i]
  99. result = [device_id, datadict]
  100. return result
  101. except Exception as e:
  102. logger.error(f'[{self.name}]: read {device_id}, {function_code}, {start_addr}, {length}]:{repr(e)}')
  103. self._reconnect()
  104. elif function_code in (5, 6, 15, 16):
  105. # 写寄存器
  106. output_value = command['output_value']
  107. try:
  108. self._master.set_timeout(10.0)
  109. self._master.set_verbose(True)
  110. data = self._master.execute(device_id, function_code, start_addr, output_value=output_value)
  111. # print("data = ", data)
  112. # data = (0, 65280) or (0, 0)
  113. result = False
  114. if function_code == 5 and "res" in command.keys():
  115. res = command["res"]
  116. if start_addr == data[0] and res == data[1]:
  117. result = True
  118. return result
  119. except Exception as e:
  120. logger.error(f'[{self.name}]: An error occurred while executing the write register command:{e}')
  121. else:
  122. logger.error(f'[{self.name}]: Unsupported function code.')
  123. def command_polling(self, command_list, resend_times=None):
  124. # msg = str(time.strftime("%Y-%m-%d %H:%M:%S"))
  125. for i in range(len(command_list)):
  126. command_item = command_list[i]
  127. if not self.__command_queue.empty():
  128. write_command = self.__command_queue.get() # 写命令来自队列
  129. try:
  130. res = self.exec_command(command=write_command)
  131. except Exception as e:
  132. logger.error(f"[{self.name}]: modbus_rtu,write:{e}")
  133. else:
  134. try:
  135. result = self.exec_command(command=command_item)
  136. format_data = None
  137. if result:
  138. format_data = self.__converter.convert(self.__data_point_config, result)
  139. if format_data:
  140. if format_data != "error" and format_data != 'pass':
  141. # 往redis存储数据
  142. self.__storager.real_time_data_storage(format_data)
  143. except Exception as e:
  144. logger.error(f'[{self.name}]: {e}')
  145. self._reconnect()