message_collector.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. #
  2. # Copyright (C) 2014-2015 UAVCAN Development Team <uavcan.org>
  3. #
  4. # This software is distributed under the terms of the MIT License.
  5. #
  6. # Author: Ben Dyer <ben_dyer@mac.com>
  7. # Pavel Kirienko <pavel.kirienko@zubax.com>
  8. #
  9. from __future__ import division, absolute_import, print_function, unicode_literals
  10. import time
  11. from logging import getLogger
  12. import collections
  13. try:
  14. # noinspection PyUnresolvedReferences
  15. collections_abc = collections.abc
  16. except AttributeError:
  17. collections_abc = collections
  18. logger = getLogger(__name__)
  19. class MessageCollector(collections_abc.Mapping):
  20. """This class keeps the latest TransferEvent of a given message type categorized by specified key.
  21. The stored items can be automatically removed if they were not updated in a specified time interval.
  22. Defaults are as follows:
  23. - Categorization key: source node ID
  24. - Entry timeout: infinite
  25. """
  26. def __init__(self, node, data_type, key=None, timeout=None):
  27. """
  28. :param node: Node instance.
  29. :param data_type: Data type to subscribe to.
  30. :param key: A callable that accepts a TransferEvent instance and returns a hashable.
  31. The returned hashable will be used as categorization key.
  32. If this argument is not provided, the messages will be categorized by source node ID.
  33. :param timeout: Entry timeout. If an entry was not updated in this time, it will be removed.
  34. By default entry lifetime is not limited.
  35. """
  36. self._handle = node.add_handler(data_type, lambda e: self._storage.update({self._key_function(e): e}))
  37. self._storage = {}
  38. self._key_function = key or (lambda e: e.transfer.source_node_id)
  39. self._timeout = timeout
  40. def close(self):
  41. self._handle.remove()
  42. def __getitem__(self, key):
  43. if self._timeout is not None:
  44. if (self._storage[key].transfer.ts_monotonic + self._timeout) < time.monotonic():
  45. del self._storage[key]
  46. return self._storage[key]
  47. def __iter__(self):
  48. for x in list(self._storage.keys())[:]:
  49. try:
  50. self[x] # __getitem__ is mutating here - it removes outdated entries
  51. except KeyError:
  52. pass
  53. return iter(self._storage)
  54. def __len__(self):
  55. return len(self._storage)