node.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529
  1. #
  2. # Copyright (C) 2014-2015 UAVCAN Development Team <uavcan.org>
  3. #
  4. # This software is distributed under the terms of the MIT License.
  5. #
  6. # Author: Ben Dyer <ben_dyer@mac.com>
  7. # Pavel Kirienko <pavel.kirienko@zubax.com>
  8. #
  9. from __future__ import division, absolute_import, print_function, unicode_literals
  10. import time
  11. import collections
  12. import sched
  13. import sys
  14. import inspect
  15. from logging import getLogger
  16. import uavcan
  17. import uavcan.driver as driver
  18. import uavcan.transport as transport
  19. from uavcan.transport import get_uavcan_data_type
  20. from uavcan import UAVCANException
  21. DEFAULT_NODE_STATUS_INTERVAL = 1.0
  22. DEFAULT_SERVICE_TIMEOUT = 1.0
  23. DEFAULT_TRANSFER_PRIORITY = 20
  24. logger = getLogger(__name__)
  25. class Scheduler(object):
  26. """This class implements a simple non-blocking event scheduler.
  27. It supports one-shot and periodic events.
  28. """
  29. def __init__(self):
  30. if sys.version_info[0] > 2:
  31. # Nice and easy.
  32. self._scheduler = sched.scheduler()
  33. # The documentation says that run() returns the next deadline,
  34. # but it's not true - it returns the remaining time.
  35. self._run_scheduler = lambda: self._scheduler.run(blocking=False) + self._scheduler.timefunc()
  36. else:
  37. # Nightmare inducing hacks
  38. class SayNoToBlockingSchedulingException(uavcan.UAVCANException):
  39. pass
  40. def delayfunc_impostor(duration):
  41. if duration > 0:
  42. raise SayNoToBlockingSchedulingException('No!')
  43. self._scheduler = sched.scheduler(time.monotonic, delayfunc_impostor)
  44. def run_scheduler():
  45. try:
  46. self._scheduler.run()
  47. except SayNoToBlockingSchedulingException:
  48. q = self._scheduler.queue
  49. return q[0][0] if q else None
  50. self._run_scheduler = run_scheduler
  51. def _make_sched_handle(self, get_event):
  52. class EventHandle(object):
  53. @staticmethod
  54. def remove():
  55. self._scheduler.cancel(get_event())
  56. @staticmethod
  57. def try_remove():
  58. try:
  59. self._scheduler.cancel(get_event())
  60. return True
  61. except ValueError:
  62. return False
  63. return EventHandle()
  64. def _poll_scheduler_and_get_next_deadline(self):
  65. return self._run_scheduler()
  66. def defer(self, timeout_seconds, callback):
  67. """This method allows to invoke the callback with specified arguments once the specified amount of time.
  68. :returns: EventHandle object. Call .remove() on it to cancel the event.
  69. """
  70. priority = 1
  71. event = self._scheduler.enter(timeout_seconds, priority, callback, ())
  72. return self._make_sched_handle(lambda: event)
  73. def periodic(self, period_seconds, callback):
  74. """This method allows to invoke the callback periodically, with specified time intervals.
  75. Note that the scheduler features zero phase drift.
  76. :returns: EventHandle object. Call .remove() on it to cancel the event.
  77. """
  78. priority = 0
  79. def caller(scheduled_deadline):
  80. # Event MUST be re-registered first in order to ensure that it can be cancelled from the callback
  81. scheduled_deadline += period_seconds
  82. event_holder[0] = self._scheduler.enterabs(scheduled_deadline, priority, caller, (scheduled_deadline,))
  83. callback()
  84. first_deadline = self._scheduler.timefunc() + period_seconds
  85. event_holder = [self._scheduler.enterabs(first_deadline, priority, caller, (first_deadline,))]
  86. return self._make_sched_handle(lambda: event_holder[0])
  87. def has_pending_events(self):
  88. """Returns true if there is at least one pending event in the queue.
  89. """
  90. return not self._scheduler.empty()
  91. class TransferEvent(object):
  92. def __init__(self, transfer, node, payload_attr_name):
  93. setattr(self, payload_attr_name, transfer.payload)
  94. self.transfer = transfer
  95. self.node = node
  96. def __str__(self):
  97. return str(self.transfer)
  98. def __repr__(self):
  99. return repr(self.transfer)
  100. class HandleRemover:
  101. def __init__(self, remover):
  102. self._remover = remover
  103. def remove(self):
  104. self._remover()
  105. def try_remove(self):
  106. try:
  107. self._remover()
  108. return True
  109. except ValueError:
  110. return False
  111. class HandlerDispatcher(object):
  112. def __init__(self, node):
  113. self._handlers = [] # type, callable
  114. self._node = node
  115. def add_handler(self, uavcan_type, handler, **kwargs):
  116. service = {
  117. uavcan_type.KIND_SERVICE: True,
  118. uavcan_type.KIND_MESSAGE: False
  119. }[uavcan_type.kind]
  120. # If handler is a class, create a wrapper function and register it as a regular callback
  121. if inspect.isclass(handler):
  122. def class_handler_adapter(event):
  123. h = handler(event, **kwargs)
  124. if service:
  125. h.on_request()
  126. return h.response
  127. else:
  128. h.on_message()
  129. return self.add_handler(uavcan_type, class_handler_adapter)
  130. # At this point process the handler as a regular callback
  131. def call(transfer):
  132. event = TransferEvent(transfer, self._node, 'request' if service else 'message')
  133. result = handler(event, **kwargs)
  134. if service:
  135. if result is None:
  136. raise UAVCANException('Service request handler did not return a response [%r, %r]' %
  137. (uavcan_type, handler))
  138. self._node.respond(result,
  139. transfer.source_node_id,
  140. transfer.transfer_id,
  141. transfer.transfer_priority)
  142. else:
  143. if result is not None:
  144. raise UAVCANException('Message request handler did not return None [%r, %r]' %
  145. (uavcan_type, handler))
  146. entry = uavcan_type, call
  147. self._handlers.append(entry)
  148. return HandleRemover(lambda: self._handlers.remove(entry))
  149. def remove_handlers(self, uavcan_type):
  150. self._handlers = list(filter(lambda x: x[0] != uavcan_type, self._handlers))
  151. def call_handlers(self, transfer):
  152. for uavcan_type, wrapper in self._handlers:
  153. if uavcan_type == get_uavcan_data_type(transfer.payload):
  154. # noinspection PyBroadException
  155. try:
  156. wrapper(transfer)
  157. except Exception:
  158. logger.error('Transfer handler exception', exc_info=True)
  159. class TransferHookDispatcher(object):
  160. TRANSFER_DIRECTION_INCOMING = 'rx'
  161. TRANSFER_DIRECTION_OUTGOING = 'tx'
  162. def __init__(self):
  163. self._hooks = []
  164. def add_hook(self, hook, **kwargs):
  165. def proxy(transfer):
  166. hook(transfer, **kwargs)
  167. self._hooks.append(proxy)
  168. return HandleRemover(lambda: self._hooks.remove(proxy))
  169. def call_hooks(self, direction, transfer):
  170. setattr(transfer, 'direction', direction)
  171. for hook in self._hooks:
  172. # noinspection PyBroadException
  173. try:
  174. hook(transfer)
  175. except Exception:
  176. logger.error('Transfer hook exception', exc_info=True)
  177. class Node(Scheduler):
  178. def __init__(self, can_driver, node_id=None, node_status_interval=None,
  179. mode=None, node_info=None, **_extras):
  180. """
  181. It is recommended to use make_node() rather than instantiating this type directly.
  182. :param can_driver: CAN bus driver object. Calling close() on a node object closes its driver instance.
  183. :param node_id: Node ID of the current instance. Defaults to None, which enables passive mode.
  184. :param node_status_interval: NodeStatus broadcasting interval. Defaults to DEFAULT_NODE_STATUS_INTERVAL.
  185. :param mode: Initial operating mode (INITIALIZATION, OPERATIONAL, etc.); defaults to INITIALIZATION.
  186. :param node_info: Structure of type uavcan.protocol.GetNodeInfo.Response, responded with when the local
  187. node is queried for its node info.
  188. """
  189. super(Node, self).__init__()
  190. self._handler_dispatcher = HandlerDispatcher(self)
  191. self._can_driver = can_driver
  192. self._node_id = node_id
  193. self._transfer_manager = transport.TransferManager()
  194. self._outstanding_requests = {}
  195. self._outstanding_request_callbacks = {}
  196. self._next_transfer_ids = collections.defaultdict(int)
  197. self.start_time_monotonic = time.monotonic()
  198. # Hooks
  199. self._transfer_hook_dispatcher = TransferHookDispatcher()
  200. # NodeStatus publisher
  201. self.health = uavcan.protocol.NodeStatus().HEALTH_OK # @UndefinedVariable
  202. self.mode = uavcan.protocol.NodeStatus().MODE_INITIALIZATION if mode is None else mode # @UndefinedVariable
  203. self.vendor_specific_status_code = 0
  204. node_status_interval = node_status_interval or DEFAULT_NODE_STATUS_INTERVAL
  205. self.periodic(node_status_interval, self._send_node_status)
  206. # GetNodeInfo server
  207. def on_get_node_info(e):
  208. logger.debug('GetNodeInfo request from %r', e.transfer.source_node_id)
  209. self._fill_node_status(self.node_info.status)
  210. return self.node_info
  211. self.node_info = node_info or uavcan.protocol.GetNodeInfo.Response() # @UndefinedVariable
  212. self.add_handler(uavcan.protocol.GetNodeInfo, on_get_node_info) # @UndefinedVariable
  213. @property
  214. def is_anonymous(self):
  215. return (self._node_id or 0) == 0
  216. @property
  217. def node_id(self):
  218. return self._node_id
  219. @node_id.setter
  220. def node_id(self, value):
  221. if self.is_anonymous:
  222. value = int(value)
  223. if not (1 <= value <= 127):
  224. raise ValueError('Invalid Node ID [%d]' % value)
  225. self._node_id = value
  226. else:
  227. raise UAVCANException('Node ID can be set only once')
  228. @property
  229. def can_driver(self):
  230. return self._can_driver
  231. def _recv_frame(self, raw_frame):
  232. if not raw_frame.extended:
  233. return
  234. frame = transport.Frame(raw_frame.id, raw_frame.data, raw_frame.ts_monotonic, raw_frame.ts_real)
  235. transfer_frames = self._transfer_manager.receive_frame(frame)
  236. if not transfer_frames:
  237. return
  238. transfer = transport.Transfer()
  239. transfer.from_frames(transfer_frames)
  240. self._transfer_hook_dispatcher.call_hooks(self._transfer_hook_dispatcher.TRANSFER_DIRECTION_INCOMING, transfer)
  241. if (transfer.service_not_message and not transfer.request_not_response) and \
  242. transfer.dest_node_id == self._node_id:
  243. # This is a reply to a request we sent. Look up the original request and call the appropriate callback
  244. requests = self._outstanding_requests.keys()
  245. for key in requests:
  246. if transfer.is_response_to(self._outstanding_requests[key]):
  247. # Call the request's callback and remove it from the active list
  248. event = TransferEvent(transfer, self, 'response')
  249. self._outstanding_request_callbacks[key](event)
  250. del self._outstanding_requests[key]
  251. del self._outstanding_request_callbacks[key]
  252. break
  253. elif not transfer.service_not_message or transfer.dest_node_id == self._node_id:
  254. # This is a request or a broadcast; look up the appropriate handler by data type ID
  255. self._handler_dispatcher.call_handlers(transfer)
  256. def _next_transfer_id(self, key):
  257. transfer_id = self._next_transfer_ids[key]
  258. self._next_transfer_ids[key] = (transfer_id + 1) & 0x1F
  259. return transfer_id
  260. def _throw_if_anonymous(self):
  261. if not self._node_id:
  262. raise uavcan.UAVCANException('The local node is configured in anonymous mode')
  263. def _fill_node_status(self, msg):
  264. msg.uptime_sec = int(time.monotonic() - self.start_time_monotonic + 0.5)
  265. msg.health = self.health
  266. msg.mode = self.mode
  267. msg.vendor_specific_status_code = self.vendor_specific_status_code
  268. def _send_node_status(self):
  269. self._fill_node_status(self.node_info.status)
  270. if self._node_id:
  271. # TODO: transmit self.node_info.status instead of creating a new object
  272. msg = uavcan.protocol.NodeStatus() # @UndefinedVariable
  273. self._fill_node_status(msg)
  274. self.broadcast(msg)
  275. def add_transfer_hook(self, hook, **kwargs):
  276. """
  277. :param hook: Callable hook; expected signature: hook(transfer).
  278. :param kwargs: Extra arguments for the hook.
  279. :return: A handle object that can be used to unregister the hook by calling remove() on it.
  280. """
  281. return self._transfer_hook_dispatcher.add_hook(hook, **kwargs)
  282. def add_handler(self, uavcan_type, handler, **kwargs):
  283. """
  284. Adds a handler for the specified data type.
  285. :param uavcan_type: DSDL data type. Only transfers of this type will be accepted for this handler.
  286. :param handler: The handler. This must be either a callable or a class.
  287. :param **kwargs: Extra arguments for the handler.
  288. :return: A remover object that can be used to unregister the handler as follows:
  289. x = node.add_handler(...)
  290. # Remove the handler like this:
  291. x.remove()
  292. # Or like this:
  293. if x.try_remove():
  294. print('The handler has been removed successfully')
  295. else:
  296. print('There is no such handler')
  297. """
  298. return self._handler_dispatcher.add_handler(uavcan_type, handler, **kwargs)
  299. def remove_handlers(self, uavcan_type):
  300. """Removes all handlers for the specified DSDL data type.
  301. """
  302. self._handler_dispatcher.remove_handlers(uavcan_type)
  303. def spin(self, timeout=None):
  304. """
  305. Runs background processes until timeout expires.
  306. Note that all processing is implemented in one thread.
  307. :param timeout: The method will return once this amount of time expires.
  308. If None, the method will never return.
  309. If zero, the method will handle only those events that are ready, then return immediately.
  310. """
  311. if timeout != 0:
  312. deadline = (time.monotonic() + timeout) if timeout is not None else sys.float_info.max
  313. def execute_once():
  314. next_event_at = self._poll_scheduler_and_get_next_deadline()
  315. if next_event_at is None:
  316. next_event_at = sys.float_info.max
  317. read_timeout = min(next_event_at, deadline) - time.monotonic()
  318. read_timeout = max(read_timeout, 0)
  319. read_timeout = min(read_timeout, 1)
  320. frame = self._can_driver.receive(read_timeout)
  321. if frame:
  322. self._recv_frame(frame)
  323. execute_once()
  324. while time.monotonic() < deadline:
  325. execute_once()
  326. else:
  327. while True:
  328. frame = self._can_driver.receive(0)
  329. if frame:
  330. self._recv_frame(frame)
  331. else:
  332. break
  333. self._poll_scheduler_and_get_next_deadline()
  334. def request(self, payload, dest_node_id, callback, priority=None, timeout=None):
  335. self._throw_if_anonymous()
  336. # Preparing the transfer
  337. transfer_id = self._next_transfer_id((get_uavcan_data_type(payload).default_dtid, dest_node_id))
  338. transfer = transport.Transfer(payload=payload,
  339. source_node_id=self._node_id,
  340. dest_node_id=dest_node_id,
  341. transfer_id=transfer_id,
  342. transfer_priority=priority or DEFAULT_TRANSFER_PRIORITY,
  343. service_not_message=True,
  344. request_not_response=True)
  345. # Calling hooks
  346. self._transfer_hook_dispatcher.call_hooks(self._transfer_hook_dispatcher.TRANSFER_DIRECTION_OUTGOING, transfer)
  347. # Sending the transfer
  348. for frame in transfer.to_frames():
  349. self._can_driver.send(frame.message_id, frame.bytes, extended=True)
  350. # Registering a callback that will be invoked if there was no response after 'timeout' seconds
  351. def on_timeout():
  352. try:
  353. del self._outstanding_requests[transfer.key]
  354. except KeyError:
  355. pass
  356. try:
  357. del self._outstanding_request_callbacks[transfer.key]
  358. except KeyError:
  359. pass
  360. callback(None)
  361. timeout = timeout or DEFAULT_SERVICE_TIMEOUT
  362. timeout_caller_handle = self.defer(timeout, on_timeout)
  363. # This wrapper will automatically cancel the timeout callback if there was a response
  364. def timeout_cancelling_wrapper(event):
  365. timeout_caller_handle.try_remove()
  366. callback(event)
  367. # Registering the pending request using the wrapper above instead of the callback
  368. self._outstanding_requests[transfer.key] = transfer
  369. self._outstanding_request_callbacks[transfer.key] = timeout_cancelling_wrapper
  370. logger.debug("Node.request(dest_node_id={0:d}): sent {1!r}".format(dest_node_id, payload))
  371. def respond(self, payload, dest_node_id, transfer_id, priority):
  372. self._throw_if_anonymous()
  373. transfer = transport.Transfer(
  374. payload=payload,
  375. source_node_id=self._node_id,
  376. dest_node_id=dest_node_id,
  377. transfer_id=transfer_id,
  378. transfer_priority=priority,
  379. service_not_message=True,
  380. request_not_response=False
  381. )
  382. self._transfer_hook_dispatcher.call_hooks(self._transfer_hook_dispatcher.TRANSFER_DIRECTION_OUTGOING, transfer)
  383. for frame in transfer.to_frames():
  384. self._can_driver.send(frame.message_id, frame.bytes, extended=True)
  385. logger.debug("Node.respond(dest_node_id={0:d}, transfer_id={0:d}, priority={0:d}): sent {1!r}"
  386. .format(dest_node_id, transfer_id, priority, payload))
  387. def broadcast(self, payload, priority=None):
  388. self._throw_if_anonymous()
  389. transfer_id = self._next_transfer_id(get_uavcan_data_type(payload).default_dtid)
  390. transfer = transport.Transfer(payload=payload,
  391. source_node_id=self._node_id,
  392. transfer_id=transfer_id,
  393. transfer_priority=priority or DEFAULT_TRANSFER_PRIORITY,
  394. service_not_message=False)
  395. self._transfer_hook_dispatcher.call_hooks(self._transfer_hook_dispatcher.TRANSFER_DIRECTION_OUTGOING, transfer)
  396. for frame in transfer.to_frames():
  397. self._can_driver.send(frame.message_id, frame.bytes, extended=True)
  398. def close(self):
  399. self._can_driver.close()
  400. def make_node(can_device_name, **kwargs):
  401. """Constructs a node instance with specified CAN device.
  402. :param can_device_name: CAN device name, e.g. "/dev/ttyACM0", "COM9", "can0".
  403. :param kwargs: These arguments will be supplied to the CAN driver factory and to the node constructor.
  404. """
  405. can = driver.make_driver(can_device_name, **kwargs)
  406. return Node(can, **kwargs)
  407. class Monitor(object):
  408. def __init__(self, event):
  409. self.message = event.message
  410. self.transfer = event.transfer
  411. self.node = event.node
  412. def on_message(self):
  413. pass
  414. class Service(object):
  415. def __init__(self, event):
  416. self.request = event.request
  417. self.transfer = event.transfer
  418. self.node = event.node
  419. self.response = get_uavcan_data_type(self.request).Response()
  420. def on_request(self):
  421. pass