modbus_rtu_over_tcp_connector.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. import json
  2. import threading
  3. import queue
  4. import time
  5. from modbus_tk import modbus_rtu_over_tcp
  6. from connector import Connector
  7. from event_storage import EventStorage
  8. from logging_config import logger
  9. class ModbusRtuOverTcpConnector(Connector, threading.Thread):
  10. def __init__(self, name, config, converter):
  11. super().__init__()
  12. self._master = None
  13. self.__stopped = False
  14. self._connected = False
  15. self._ip = config['ip'] # ip
  16. self._port = config['port'] # 端口
  17. self._save_frequency = config['save_frequency'] # 数据存储时间间隔
  18. self.setDaemon(True)
  19. self.setName(name)
  20. self.__converter = converter
  21. self.__storager = EventStorage()
  22. self.__command_queue = queue.Queue(500)
  23. self.__last_save_time = 0
  24. self.__data_point_config = self.__storager.get_station_info(name)
  25. self._command = self.__storager.get_command_info(name)
  26. def open(self):
  27. self.__stopped = False
  28. self.start()
  29. def run(self):
  30. self._connect()
  31. self._connected = True
  32. while True:
  33. if isinstance(self._command, list):
  34. for i in self._command:
  35. command_list = json.loads(i['command'])
  36. self.command_polling(command_list, resend_times=5)
  37. time.sleep(1)
  38. time.sleep(1)
  39. if self.__stopped:
  40. break
  41. def _connect(self):
  42. try:
  43. self._master = modbus_rtu_over_tcp.RtuOverTcpMaster(host=self._ip, port=self._port)
  44. logger.info(f"{self._ip}:{self._port} connect success!")
  45. except Exception as e:
  46. logger.error(f'Error in modbus_tcp_connector.__connect: {e}')
  47. self._connected = False
  48. self._reconnect()
  49. def _reconnect(self):
  50. while True:
  51. try:
  52. self._master = modbus_rtu_over_tcp.RtuOverTcpMaster(host=self._ip, port=self._port)
  53. logger.error('client start connect to host/port:{}'.format(self._port))
  54. break
  55. except ConnectionRefusedError:
  56. logger.error('modbus server refused or not started, reconnect to server in 5s .... host/port:{}'.format(self._port))
  57. time.sleep(5)
  58. except Exception as e:
  59. logger.error('do connect error:{}'.format(str(e)))
  60. time.sleep(5)
  61. def close(self):
  62. pass
  63. def get_name(self):
  64. return self.name
  65. def is_connected(self):
  66. return self._connected
  67. def send_command(self, command):
  68. # command = {'device_id': 1, 'start_addr': 0, 'output_value': [0, 0, 0, 0], 'function_code': 15}
  69. # print(f"[send_command] {command}")
  70. try:
  71. if isinstance(command, dict):
  72. result = self.exec_command(command)
  73. elif isinstance(command, list):
  74. for each in command:
  75. result = self.exec_command(each)
  76. except Exception as e:
  77. print(f"[ModbusTcpConnector][send_command] error: {e}")
  78. result = False
  79. # print(f"[send_command][result] {result}")
  80. return result
  81. def exec_command(self, command):
  82. if isinstance(command, str):
  83. command = json.loads(command)
  84. device_id = int(command['device_id'])
  85. function_code = int(command['function_code'])
  86. start_addr = int(command['start_addr'])
  87. if function_code in (1, 2, 3, 4):
  88. # 读寄存器
  89. length = int(command['length'])
  90. try:
  91. self._master.set_timeout(2.0) # modbus读取数据超时时间设置
  92. self._master.set_verbose(True)
  93. # print(device_id, ' ', function_code, " ", start_addr, " ", length)
  94. receive_data = self._master.execute(device_id, function_code, start_addr, length)
  95. datadict = {}
  96. for i in range(len(receive_data)):
  97. addr = start_addr + i
  98. datadict[addr] = receive_data[i]
  99. result = [device_id, datadict]
  100. return result
  101. except Exception as e:
  102. logger.error(f'An error occurred while executing the read register command:{e}')
  103. elif function_code in (5, 6, 15, 16):
  104. # 写寄存器
  105. output_value = command['output_value']
  106. try:
  107. self._master.set_timeout(10.0)
  108. self._master.set_verbose(True)
  109. data = self._master.execute(device_id, function_code, start_addr, output_value=output_value)
  110. # print("data = ", data)
  111. # data = (0, 65280) or (0, 0)
  112. result = False
  113. if function_code == 5 and "res" in command.keys():
  114. res = command["res"]
  115. if start_addr == data[0] and res == data[1]:
  116. result = True
  117. return result
  118. except Exception as e:
  119. logger.error(f'An error occurred while executing the write register command:{e}')
  120. else:
  121. logger.error(f'Unsupported function code.')
  122. def command_polling(self, command_list, resend_times=None):
  123. # msg = str(time.strftime("%Y-%m-%d %H:%M:%S"))
  124. for i in range(len(command_list)):
  125. command_item = command_list[i]
  126. if not self.__command_queue.empty():
  127. write_command = self.__command_queue.get() # 写命令来自数组
  128. try:
  129. res = self.exec_command(command=write_command)
  130. except Exception as e:
  131. logger.error(f"modbus_rtu,write:{e}")
  132. else:
  133. result = self.exec_command(command=command_item)
  134. format_data = None
  135. if result:
  136. format_data = self.__converter.convert(self.__data_point_config, result)
  137. if format_data:
  138. if format_data != "error" and format_data != 'pass':
  139. # 往redis存储数据
  140. self.__storager.real_time_data_storage(format_data)