common.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. #
  2. # Copyright (C) 2014-2016 UAVCAN Development Team <uavcan.org>
  3. #
  4. # This software is distributed under the terms of the MIT License.
  5. #
  6. # Author: Ben Dyer <ben_dyer@mac.com>
  7. # Pavel Kirienko <pavel.kirienko@zubax.com>
  8. #
  9. import time
  10. import sys
  11. from logging import getLogger
  12. from .. import UAVCANException
  13. logger = getLogger(__name__)
  14. class DriverError(UAVCANException):
  15. pass
  16. class TxQueueFullError(DriverError):
  17. pass
  18. class CANFrame:
  19. MAX_DATA_LENGTH = 8
  20. def __init__(self, can_id, data, extended, ts_monotonic=None, ts_real=None):
  21. self.id = can_id
  22. self.data = data
  23. self.extended = extended
  24. self.ts_monotonic = ts_monotonic or time.monotonic()
  25. self.ts_real = ts_real or time.time()
  26. def __str__(self):
  27. if sys.version_info[0] > 2:
  28. b2int = lambda x: x
  29. else:
  30. b2int = ord
  31. id_str = ('%0*x' % (8 if self.extended else 3, self.id)).rjust(8)
  32. hex_data = ' '.join(['%02x' % b2int(x) for x in self.data]).ljust(3 * self.MAX_DATA_LENGTH)
  33. ascii_data = ''.join([(chr(x) if 32 <= x <= 126 else '.') for x in self.data])
  34. return "%12.6f %12.6f %s %s '%s'" % \
  35. (self.ts_monotonic, self.ts_real, id_str, hex_data, ascii_data)
  36. __repr__ = __str__
  37. class AbstractDriver(object):
  38. FRAME_DIRECTION_INCOMING = 'rx'
  39. FRAME_DIRECTION_OUTGOING = 'tx'
  40. class HookRemover:
  41. def __init__(self, remover):
  42. self.remove = remover
  43. def __init__(self):
  44. self._io_hooks = []
  45. def add_io_hook(self, hook):
  46. """
  47. Args:
  48. hook: This hook will be invoked for every incoming and outgoing CAN frame.
  49. Hook arguments: (direction, frame)
  50. See FRAME_DIRECTION_*, CANFrame.
  51. """
  52. def proxy(*args):
  53. hook(*args)
  54. self._io_hooks.append(proxy)
  55. return self.HookRemover(lambda: self._io_hooks.remove(proxy))
  56. def _call_io_hooks(self, direction, frame):
  57. for h in self._io_hooks:
  58. try:
  59. h(direction, frame)
  60. except Exception as ex:
  61. logger.error('Uncaught exception from CAN IO hook: %r', ex, exc_info=True)
  62. def _tx_hook(self, frame):
  63. self._call_io_hooks(self.FRAME_DIRECTION_OUTGOING, frame)
  64. def _rx_hook(self, frame):
  65. self._call_io_hooks(self.FRAME_DIRECTION_INCOMING, frame)