node_monitor.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. #
  2. # Copyright (C) 2014-2015 UAVCAN Development Team <uavcan.org>
  3. #
  4. # This software is distributed under the terms of the MIT License.
  5. #
  6. # Author: Ben Dyer <ben_dyer@mac.com>
  7. # Pavel Kirienko <pavel.kirienko@zubax.com>
  8. #
  9. from __future__ import division, absolute_import, print_function, unicode_literals
  10. import time
  11. from logging import getLogger
  12. import uavcan
  13. logger = getLogger(__name__)
  14. class NodeMonitor(object):
  15. TIMEOUT = uavcan.protocol.NodeStatus().OFFLINE_TIMEOUT_MS / 1000 # @UndefinedVariable
  16. TRANSFER_PRIORITY = uavcan.TRANSFER_PRIORITY_LOWEST - 1
  17. MIN_RETRY_INTERVAL = 0.5
  18. MAX_RETRIES = 10
  19. class Entry:
  20. def __init__(self):
  21. self.node_id = None
  22. self.status = None
  23. self.info = None
  24. self.monotonic_timestamp = None
  25. self._remaining_retries = NodeMonitor.MAX_RETRIES
  26. @property
  27. def discovered(self):
  28. return self.info is not None or self._remaining_retries <= 0
  29. def _update_from_status(self, e):
  30. self.monotonic_timestamp = e.transfer.ts_monotonic
  31. self.node_id = e.transfer.source_node_id
  32. if self.status and e.message.uptime_sec < self.status.uptime_sec:
  33. self._remaining_retries = NodeMonitor.MAX_RETRIES
  34. self.info = None
  35. self.status = e.message
  36. if self.info:
  37. self.info.status = self.status
  38. def _update_from_info(self, e):
  39. self._remaining_retries = NodeMonitor.MAX_RETRIES
  40. self.monotonic_timestamp = e.transfer.ts_monotonic
  41. self.node_id = e.transfer.source_node_id
  42. self.status = e.response.status
  43. self.info = e.response
  44. def _register_retry(self):
  45. assert self._remaining_retries > 0
  46. self._remaining_retries -= 1
  47. def __str__(self):
  48. return '%d:%s' % (self.node_id, self.info if self.info else self.status)
  49. __repr__ = __str__
  50. class UpdateEvent:
  51. EVENT_ID_NEW = 'new'
  52. EVENT_ID_INFO_UPDATE = 'info_update'
  53. EVENT_ID_OFFLINE = 'offline'
  54. def __init__(self, entry, event_id):
  55. self.entry = entry
  56. self.event_id = event_id
  57. def __str__(self):
  58. return self.event_id + ':' + str(self.entry)
  59. __repr__ = __str__
  60. class UpdateHandlerRemover:
  61. def __init__(self, remover):
  62. self._remover = remover
  63. def remove(self):
  64. self._remover()
  65. def try_remove(self):
  66. try:
  67. self._remover()
  68. except ValueError:
  69. pass
  70. def __init__(self, node):
  71. self._update_callbacks = []
  72. self._handle = node.add_handler(uavcan.protocol.NodeStatus, self._on_node_status) # @UndefinedVariable
  73. self._registry = {} # {node_id: Entry}
  74. self._timer = node.periodic(1, self._remove_stale)
  75. def add_update_handler(self, callback):
  76. """
  77. Args:
  78. callback: The specified callback will be invoked when:
  79. - A new node appears
  80. - Node info for an existing node gets updated
  81. - Node goes offline
  82. Returns: Call remove() or try_remove() on the returned object to unregister the handler.
  83. """
  84. self._update_callbacks.append(callback)
  85. return self.UpdateHandlerRemover(lambda: self._update_callbacks.remove(callback))
  86. def _call_event_handlers(self, event):
  87. for cb in self._update_callbacks:
  88. cb(event)
  89. def exists(self, node_id):
  90. """
  91. Args:
  92. node_id: Returns True if the given node ID exists, false otherwise
  93. """
  94. return node_id in self._registry
  95. def get(self, node_id):
  96. """
  97. Args:
  98. node_id: Returns an Entry instance for the given node ID.
  99. If the requested node ID does not exist, throws KeyError.
  100. """
  101. if (self._registry[node_id].monotonic_timestamp + self.TIMEOUT) < time.monotonic():
  102. self._call_event_handlers(self.UpdateEvent(self._registry[node_id],
  103. self.UpdateEvent.EVENT_ID_OFFLINE))
  104. del self._registry[node_id]
  105. return self._registry[node_id]
  106. def get_all_node_id(self):
  107. """Returns a generator or an iterable containing all currently active node ID."""
  108. return self._registry.keys()
  109. def find_all(self, predicate):
  110. """Returns a generator that produces a sequence of Entry objects for which the predicate returned True.
  111. Args:
  112. predicate: A callable that returns a value coercible to bool.
  113. """
  114. for _nid, entry in self._registry.items():
  115. if predicate(entry):
  116. yield entry
  117. def are_all_nodes_discovered(self):
  118. """Reports whether there are nodes whose node info is still unknown."""
  119. undiscovered = self.find_all(lambda e: not e.discovered)
  120. return len(list(undiscovered)) == 0
  121. def close(self):
  122. """Stops the instance. The registry will not be cleared."""
  123. self._handle.remove()
  124. self._timer.remove()
  125. def _remove_stale(self):
  126. for nid, e in list(self._registry.items())[:]:
  127. if (e.monotonic_timestamp + self.TIMEOUT) < time.monotonic():
  128. del self._registry[nid]
  129. self._call_event_handlers(self.UpdateEvent(e, self.UpdateEvent.EVENT_ID_OFFLINE))
  130. def _on_node_status(self, e):
  131. node_id = e.transfer.source_node_id
  132. try:
  133. entry = self.get(node_id)
  134. new_entry = False
  135. except KeyError:
  136. entry = self.Entry()
  137. entry._info_requested_at = 0
  138. self._registry[node_id] = entry
  139. new_entry = True
  140. # noinspection PyProtectedMember
  141. entry._update_from_status(e)
  142. if new_entry:
  143. self._call_event_handlers(self.UpdateEvent(entry, self.UpdateEvent.EVENT_ID_NEW))
  144. should_retry_now = entry.monotonic_timestamp - entry._info_requested_at > self.MIN_RETRY_INTERVAL
  145. if not entry.discovered and should_retry_now and not e.node.is_anonymous:
  146. entry._info_requested_at = entry.monotonic_timestamp
  147. # noinspection PyProtectedMember
  148. entry._register_retry()
  149. e.node.request(uavcan.protocol.GetNodeInfo.Request(), node_id, # @UndefinedVariable
  150. priority=self.TRANSFER_PRIORITY, callback=self._on_info_response)
  151. def _on_info_response(self, e):
  152. if not e:
  153. return
  154. try:
  155. entry = self.get(e.transfer.source_node_id)
  156. except KeyError:
  157. entry = self.Entry()
  158. self._registry[e.transfer.source_node_id] = entry
  159. # noinspection PyProtectedMember
  160. entry._update_from_info(e)
  161. hw_unique_id = "".join(format(c, "02X") for c in e.response.hardware_version.unique_id)
  162. msg = (
  163. "[#{0:03d}:uavcan.protocol.GetNodeInfo] " +
  164. "software_version.major={1:d} " +
  165. "software_version.minor={2:d} " +
  166. "software_version.vcs_commit={3:08x} " +
  167. "software_version.image_crc={4:016X} " +
  168. "hardware_version.major={5:d} " +
  169. "hardware_version.minor={6:d} " +
  170. "hardware_version.unique_id={7!s} " +
  171. "name={8!r}"
  172. ).format(
  173. e.transfer.source_node_id,
  174. e.response.software_version.major,
  175. e.response.software_version.minor,
  176. e.response.software_version.vcs_commit,
  177. e.response.software_version.image_crc,
  178. e.response.hardware_version.major,
  179. e.response.hardware_version.minor,
  180. hw_unique_id,
  181. e.response.name.decode()
  182. )
  183. logger.info(msg)
  184. self._call_event_handlers(self.UpdateEvent(entry, self.UpdateEvent.EVENT_ID_INFO_UPDATE))