dynamic_node_id.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. #
  2. # Copyright (C) 2014-2015 UAVCAN Development Team <uavcan.org>
  3. #
  4. # This software is distributed under the terms of the MIT License.
  5. #
  6. # Author: Ben Dyer <ben_dyer@mac.com>
  7. # Pavel Kirienko <pavel.kirienko@zubax.com>
  8. #
  9. from __future__ import division, absolute_import, print_function, unicode_literals
  10. import time
  11. import sqlite3
  12. from logging import getLogger
  13. import uavcan
  14. from uavcan import UAVCANException
  15. logger = getLogger(__name__)
  16. def _unique_id_to_string(uid):
  17. return ' '.join(['%02X' % x for x in bytearray(uid)]) if uid else None
  18. class CentralizedServer(object):
  19. QUERY_TIMEOUT = uavcan.protocol.dynamic_node_id.Allocation().FOLLOWUP_TIMEOUT_MS / 1000 # @UndefinedVariable
  20. DEFAULT_NODE_ID_RANGE = 1, 125
  21. DATABASE_STORAGE_MEMORY = ':memory:'
  22. class AllocationTable(object):
  23. def __init__(self, path):
  24. # Disabling same thread check on the assumption that the developer knows what they are doing.
  25. self.db = sqlite3.connect(path, check_same_thread=False) # @UndefinedVariable
  26. self._modify('''CREATE TABLE IF NOT EXISTS `allocation` (
  27. `node_id` INTEGER NOT NULL UNIQUE,
  28. `unique_id` blob,
  29. `ts` time NOT NULL DEFAULT CURRENT_TIMESTAMP,
  30. PRIMARY KEY(node_id));''')
  31. def _modify(self, what, *args):
  32. c = self.db.cursor()
  33. c.execute(what, args) # Tuple!
  34. self.db.commit()
  35. def close(self):
  36. self.db.close()
  37. def set(self, unique_id, node_id):
  38. if unique_id is not None and unique_id == bytes([0] * len(unique_id)):
  39. unique_id = None
  40. if unique_id is not None:
  41. unique_id = sqlite3.Binary(unique_id)
  42. logger.debug('[CentralizedServer] AllocationTable update: %d %s', node_id, _unique_id_to_string(unique_id))
  43. self._modify('''insert or replace into allocation (node_id, unique_id) values (?, ?);''',
  44. node_id, unique_id)
  45. def get_node_id(self, unique_id):
  46. assert isinstance(unique_id, bytes)
  47. c = self.db.cursor()
  48. c.execute('''select node_id from allocation where unique_id = ? order by ts desc limit 1''',
  49. (unique_id,))
  50. res = c.fetchone()
  51. return res[0] if res else None
  52. def get_unique_id(self, node_id):
  53. assert isinstance(node_id, int)
  54. c = self.db.cursor()
  55. c.execute('''select unique_id from allocation where node_id = ?''', (node_id,))
  56. res = c.fetchone()
  57. return res[0] if res else None
  58. def is_known_node_id(self, node_id):
  59. assert isinstance(node_id, int)
  60. c = self.db.cursor()
  61. c.execute('''select count(*) from allocation where node_id = ?''', (node_id,))
  62. return c.fetchone()[0] > 0
  63. def get_entries(self):
  64. c = self.db.cursor()
  65. c.execute('''select unique_id, node_id from allocation order by ts desc''')
  66. return list(c.fetchall())
  67. def __init__(self, node, node_monitor, database_storage=None, dynamic_node_id_range=None):
  68. """
  69. :param node: Node instance.
  70. :param node_monitor: Instance of NodeMonitor.
  71. :param database_storage: Path to the file where the instance will keep the allocation table.
  72. If not provided, the allocation table will be kept in memory.
  73. :param dynamic_node_id_range: Range of node ID available for dynamic allocation; defaults to [1, 125].
  74. """
  75. if node.is_anonymous:
  76. raise UAVCANException('Dynamic node ID server cannot be launched on an anonymous node')
  77. self._node_monitor = node_monitor
  78. self._allocation_table = CentralizedServer.AllocationTable(database_storage or self.DATABASE_STORAGE_MEMORY)
  79. self._query = bytes()
  80. self._query_timestamp = 0
  81. self._node_monitor_event_handle = node_monitor.add_update_handler(self._handle_monitor_event)
  82. self._dynamic_node_id_range = dynamic_node_id_range or CentralizedServer.DEFAULT_NODE_ID_RANGE
  83. self._handle = node.add_handler(uavcan.protocol.dynamic_node_id.Allocation, # @UndefinedVariable
  84. self._on_allocation_message)
  85. self._allocation_table.set(node.node_info.hardware_version.unique_id.to_bytes(), node.node_id)
  86. # Initializing the table
  87. for entry in node_monitor.find_all(lambda _: True):
  88. unique_id = entry.info.hardware_version.unique_id.to_bytes() if entry.info else None
  89. self._allocation_table.set(unique_id, entry.node_id)
  90. def get_allocation_table(self):
  91. return self._allocation_table.get_entries()
  92. def _handle_monitor_event(self, event):
  93. unique_id = event.entry.info.hardware_version.unique_id.to_bytes() if event.entry.info else None
  94. self._allocation_table.set(unique_id, event.entry.node_id)
  95. def close(self):
  96. """Stops the instance and closes the allocation table storage.
  97. """
  98. self._handle.remove()
  99. self._node_monitor_event_handle.remove()
  100. self._allocation_table.close()
  101. def _on_allocation_message(self, e):
  102. # TODO: request validation
  103. # Centralized allocator cannot co-exist with other allocators; this is a network configuration error.
  104. if e.transfer.source_node_id != 0:
  105. logger.warning('[CentralizedServer] Message from another allocator ignored: %r', e)
  106. return
  107. # We can't grant allocations as long as there are undiscovered nodes - see specification
  108. if not self._node_monitor.are_all_nodes_discovered():
  109. logger.info('[CentralizedServer] Request ignored: not all nodes are discovered')
  110. return
  111. # The local state must be reset after the specified timeout
  112. if len(self._query) and time.monotonic() - self._query_timestamp > CentralizedServer.QUERY_TIMEOUT:
  113. self._query = bytes()
  114. logger.info("[CentralizedServer] Query timeout, resetting query")
  115. # Handling the message
  116. if e.message.first_part_of_unique_id:
  117. # First-phase messages trigger a second-phase query
  118. self._query = e.message.unique_id.to_bytes()
  119. self._query_timestamp = e.transfer.ts_monotonic
  120. response = uavcan.protocol.dynamic_node_id.Allocation() # @UndefinedVariable
  121. response.first_part_of_unique_id = 0
  122. response.node_id = 0
  123. response.unique_id.from_bytes(self._query)
  124. e.node.broadcast(response)
  125. logger.debug("[CentralizedServer] Got first-stage dynamic ID request for %s",
  126. _unique_id_to_string(self._query))
  127. elif len(e.message.unique_id) == 6 and len(self._query) == 6:
  128. # Second-phase messages trigger a third-phase query
  129. self._query += e.message.unique_id.to_bytes()
  130. self._query_timestamp = e.transfer.ts_monotonic
  131. response = uavcan.protocol.dynamic_node_id.Allocation() # @UndefinedVariable
  132. response.first_part_of_unique_id = 0
  133. response.node_id = 0
  134. response.unique_id.from_bytes(self._query)
  135. e.node.broadcast(response)
  136. logger.debug("[CentralizedServer] Got second-stage dynamic ID request for %s",
  137. _unique_id_to_string(self._query))
  138. elif len(e.message.unique_id) == 4 and len(self._query) == 12:
  139. # Third-phase messages trigger an allocation
  140. self._query += e.message.unique_id.to_bytes()
  141. self._query_timestamp = e.transfer.ts_monotonic
  142. logger.debug("[CentralizedServer] Got third-stage dynamic ID request for %s",
  143. _unique_id_to_string(self._query))
  144. node_requested_id = e.message.node_id
  145. node_allocated_id = self._allocation_table.get_node_id(self._query)
  146. # If an ID was requested but not allocated yet, allocate the first
  147. # ID equal to or higher than the one that was requested
  148. if node_requested_id and not node_allocated_id:
  149. for node_id in range(node_requested_id, self._dynamic_node_id_range[1]):
  150. if not self._allocation_table.is_known_node_id(node_id):
  151. node_allocated_id = node_id
  152. break
  153. # If no ID was allocated in the above step (also if the requested
  154. # ID was zero), allocate the highest unallocated node ID
  155. if not node_allocated_id:
  156. for node_id in range(self._dynamic_node_id_range[1], self._dynamic_node_id_range[0], -1):
  157. if not self._allocation_table.is_known_node_id(node_id):
  158. node_allocated_id = node_id
  159. break
  160. if node_allocated_id:
  161. self._allocation_table.set(self._query, node_allocated_id)
  162. response = uavcan.protocol.dynamic_node_id.Allocation() # @UndefinedVariable
  163. response.first_part_of_unique_id = 0
  164. response.node_id = node_allocated_id
  165. response.unique_id.from_bytes(self._query)
  166. e.node.broadcast(response)
  167. logger.info("[CentralizedServer] Allocated node ID %d to node with unique ID %s",
  168. node_allocated_id, _unique_id_to_string(self._query))
  169. self._query = bytes() # Resetting the state
  170. else:
  171. logger.error("[CentralizedServer] Couldn't allocate dynamic node ID")