introspect.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. #
  2. # Copyright (C) 2014-2016 UAVCAN Development Team <uavcan.org>
  3. #
  4. # This software is distributed under the terms of the MIT License.
  5. #
  6. # Author: Pavel Kirienko <pavel.kirienko@zubax.com>
  7. # Ben Dyer <ben_dyer@mac.com>
  8. #
  9. from __future__ import division, absolute_import, print_function, unicode_literals
  10. import os
  11. import uavcan
  12. from uavcan.transport import CompoundValue, PrimitiveValue, ArrayValue, VoidValue
  13. try:
  14. from io import StringIO
  15. except ImportError:
  16. # noinspection PyUnresolvedReferences
  17. from StringIO import StringIO
  18. def _to_yaml_impl(obj, indent_level=0, parent=None, name=None, uavcan_type=None):
  19. buf = StringIO()
  20. def write(fmt, *args):
  21. buf.write((fmt % args) if len(args) else fmt)
  22. def indent_newline():
  23. buf.write(os.linesep + ' ' * 2 * indent_level)
  24. # Decomposing PrimitiveValue to value and type. This is ugly but it's by design...
  25. if isinstance(obj, PrimitiveValue):
  26. uavcan_type = uavcan.get_uavcan_data_type(obj)
  27. obj = obj.value
  28. # CompoundValue
  29. if isinstance(obj, CompoundValue):
  30. first_field = True
  31. # Rendering all fields than can be rendered
  32. for field_name, field in uavcan.get_fields(obj).items():
  33. if uavcan.is_union(obj) and uavcan.get_active_union_field(obj) != field_name:
  34. continue
  35. if isinstance(field, VoidValue):
  36. continue
  37. if (first_field and indent_level > 0) or not first_field:
  38. indent_newline()
  39. first_field = False
  40. rendered_field = _to_yaml_impl(field, indent_level=indent_level + 1, parent=obj, name=field_name)
  41. write('%s: %s', field_name, rendered_field)
  42. # Special case - empty non-union struct is rendered as empty map
  43. if first_field and not uavcan.is_union(obj):
  44. if indent_level > 0:
  45. indent_newline()
  46. write('{}')
  47. # ArrayValue
  48. elif isinstance(obj, ArrayValue):
  49. t = uavcan.get_uavcan_data_type(obj)
  50. if t.value_type.category == t.value_type.CATEGORY_PRIMITIVE:
  51. def is_nice_character(ch):
  52. if 32 <= ch <= 126:
  53. return True
  54. if ch in b'\n\r\t':
  55. return True
  56. return False
  57. as_bytes = '[%s]' % ', '.join([_to_yaml_impl(x, indent_level=indent_level + 1, uavcan_type=t.value_type)
  58. for x in obj])
  59. if t.is_string_like and all(map(is_nice_character, obj)):
  60. write('%r # ', obj.decode())
  61. write(as_bytes)
  62. else:
  63. if len(obj) == 0:
  64. write('[]')
  65. else:
  66. for x in obj:
  67. indent_newline()
  68. write('- %s', _to_yaml_impl(x, indent_level=indent_level + 1, uavcan_type=t.value_type))
  69. # Primitive types
  70. elif isinstance(obj, float):
  71. assert uavcan_type is not None
  72. float_fmt = {
  73. 16: '%.4f',
  74. 32: '%.6f',
  75. 64: '%.9f',
  76. }[uavcan_type.bitlen]
  77. write(float_fmt, obj)
  78. elif isinstance(obj, bool):
  79. write('%s', 'true' if obj else 'false')
  80. elif isinstance(obj, int):
  81. write('%s', obj)
  82. if parent is not None and name is not None:
  83. resolved_name = value_to_constant_name(parent, name)
  84. if isinstance(resolved_name, str):
  85. write(' # %s', resolved_name)
  86. # Non-printable types
  87. elif isinstance(obj, VoidValue):
  88. pass
  89. # Unknown types
  90. else:
  91. raise ValueError('Cannot generate YAML representation for %r' % type(obj))
  92. return buf.getvalue()
  93. def to_yaml(obj):
  94. """
  95. This function returns correct YAML representation of a UAVCAN structure (message, request, or response), or
  96. a DSDL entity (array or primitive), or a UAVCAN transfer, with comments for human benefit.
  97. Args:
  98. obj: Object to convert.
  99. Returns: Unicode string containing YAML representation of the object.
  100. """
  101. if not isinstance(obj, CompoundValue) and hasattr(obj, 'transfer'):
  102. if hasattr(obj, 'message'):
  103. payload = obj.message
  104. header = 'Message'
  105. elif hasattr(obj, 'request'):
  106. payload = obj.request
  107. header = 'Request'
  108. elif hasattr(obj, 'response'):
  109. payload = obj.response
  110. header = 'Response'
  111. else:
  112. raise ValueError('Cannot generate YAML representation for %r' % type(obj))
  113. prefix = '### %s from %s to %s ts_mono=%.6f ts_real=%.6f\n' % \
  114. (header,
  115. obj.transfer.source_node_id or 'Anon',
  116. obj.transfer.dest_node_id or 'All',
  117. obj.transfer.ts_monotonic, obj.transfer.ts_real)
  118. return prefix + _to_yaml_impl(payload)
  119. else:
  120. return _to_yaml_impl(obj)
  121. def value_to_constant_name(struct, field_name, keep_literal=False):
  122. """
  123. This function accepts a UAVCAN struct (message, request, or response), and a field name; and returns
  124. the name of constant or bit mask that match the value. If no match could be established, the literal
  125. value will be returned as is.
  126. Args:
  127. struct: UAVCAN struct to work with
  128. field_name: Name of the field to work with
  129. keep_literal: Whether to include the input integer value in the output string
  130. Returns: Name of the constant or flags if match could be detected, otherwise integer as is.
  131. """
  132. # Extracting constants
  133. uavcan_type = uavcan.get_uavcan_data_type(struct)
  134. if uavcan.is_request(struct):
  135. consts = uavcan_type.request_constants
  136. fields = uavcan_type.request_fields
  137. elif uavcan.is_response(struct):
  138. consts = uavcan_type.response_constants
  139. fields = uavcan_type.response_fields
  140. else:
  141. consts = uavcan_type.constants
  142. fields = uavcan_type.fields
  143. assert len(fields) > 0
  144. # noinspection PyShadowingNames
  145. def format_output(name, value, remove_common_prefix):
  146. if remove_common_prefix:
  147. num_seps = len(field_name.split('_'))
  148. parts = name.split('_')[num_seps:]
  149. name = '_'.join(parts)
  150. return ('%s (%r)' % (name, value)) if keep_literal else name
  151. # noinspection PyShadowingNames
  152. def match_one_prefix(prefix, value):
  153. matches = []
  154. for cname, cval in [(x.name, x.value) for x in consts if x.name.lower().startswith(prefix.lower())]:
  155. if cval == value:
  156. matches.append(cname)
  157. # Making sure we found exactly one match, otherwise it's not a correct result
  158. if len(matches) == 1:
  159. return matches[0]
  160. # noinspection PyShadowingNames
  161. def match_value(value):
  162. # Trying direct match
  163. match = match_one_prefix(field_name + '_', value)
  164. if match:
  165. return format_output(match, value, True)
  166. # Trying direct match without the terminal letter if it is 's' (plural). This works for 'flags'.
  167. # TODO: this is sketchy.
  168. if field_name[-1] == 's':
  169. match = match_one_prefix(field_name[:-1] + '_', value)
  170. if match:
  171. return format_output(match, value, True)
  172. # Trying match without prefix, only if there's just one field
  173. if len(fields) == 1:
  174. match = match_one_prefix('', value)
  175. if match:
  176. return format_output(match, value, False)
  177. # Trying single value first
  178. value = getattr(struct, field_name)
  179. match = match_value(value)
  180. if match:
  181. return match
  182. # Trying bit masks
  183. def extract_powers_of_2(x):
  184. i = 1
  185. while i <= x:
  186. if i & x:
  187. yield i
  188. i <<= 1
  189. matches = []
  190. for pow2 in extract_powers_of_2(value):
  191. match = match_value(pow2)
  192. if match:
  193. matches.append(match)
  194. else:
  195. matches = []
  196. break # If at least one couldn't be matched, we're on a wrong track, stop
  197. if len(matches) > 0:
  198. return ' | '.join(matches)
  199. # No match could be found, returning the value as is
  200. return value
  201. if __name__ == '__main__':
  202. # to_yaml()
  203. print(to_yaml(uavcan.protocol.NodeStatus()))
  204. info = uavcan.protocol.GetNodeInfo.Response(name='legion')
  205. info.hardware_version.certificate_of_authenticity = b'\x01\x02\x03\xff'
  206. print(to_yaml(info))
  207. lights = uavcan.equipment.indication.LightsCommand()
  208. lcmd = uavcan.equipment.indication.SingleLightCommand(light_id=123)
  209. lcmd.color.red = 1
  210. lcmd.color.green = 2
  211. lcmd.color.blue = 3
  212. lights.commands.append(lcmd)
  213. lcmd.light_id += 1
  214. lights.commands.append(lcmd)
  215. print(to_yaml(lights))
  216. print(to_yaml(uavcan.equipment.power.BatteryInfo()))
  217. print(to_yaml(uavcan.protocol.param.Empty()))
  218. getset = uavcan.protocol.param.GetSet.Response()
  219. print(to_yaml(getset))
  220. uavcan.switch_union_field(getset.value, 'empty')
  221. print(to_yaml(getset))
  222. # value_to_constant_name()
  223. print(value_to_constant_name(
  224. uavcan.protocol.NodeStatus(mode=uavcan.protocol.NodeStatus().MODE_OPERATIONAL),
  225. 'mode'
  226. ))
  227. print(value_to_constant_name(
  228. uavcan.protocol.NodeStatus(mode=uavcan.protocol.NodeStatus().HEALTH_OK),
  229. 'health'
  230. ))
  231. print(value_to_constant_name(
  232. uavcan.equipment.range_sensor.Measurement(reading_type=uavcan.equipment.range_sensor.Measurement()
  233. .READING_TYPE_TOO_FAR),
  234. 'reading_type'
  235. ))
  236. print(value_to_constant_name(
  237. uavcan.protocol.param.ExecuteOpcode.Request(opcode=uavcan.protocol.param.ExecuteOpcode.Request().OPCODE_ERASE),
  238. 'opcode'
  239. ))
  240. print(value_to_constant_name(
  241. uavcan.protocol.file.Error(value=uavcan.protocol.file.Error().ACCESS_DENIED),
  242. 'value'
  243. ))
  244. print(value_to_constant_name(
  245. uavcan.equipment.power.BatteryInfo(status_flags=
  246. uavcan.equipment.power.BatteryInfo().STATUS_FLAG_NEED_SERVICE),
  247. 'status_flags'
  248. ))
  249. print(value_to_constant_name(
  250. uavcan.equipment.power.BatteryInfo(status_flags=
  251. uavcan.equipment.power.BatteryInfo().STATUS_FLAG_NEED_SERVICE |
  252. uavcan.equipment.power.BatteryInfo().STATUS_FLAG_TEMP_HOT |
  253. uavcan.equipment.power.BatteryInfo().STATUS_FLAG_CHARGED),
  254. 'status_flags'
  255. ))
  256. print(value_to_constant_name(
  257. uavcan.protocol.AccessCommandShell.Response(flags=
  258. uavcan.protocol.AccessCommandShell.Response().FLAG_SHELL_ERROR |
  259. uavcan.protocol.AccessCommandShell.Response().
  260. FLAG_HAS_PENDING_STDOUT),
  261. 'flags'
  262. ))
  263. # Printing transfers
  264. node = uavcan.make_node('vcan0', node_id=42)
  265. node.request(uavcan.protocol.GetNodeInfo.Request(), 100, lambda e: print(to_yaml(e)))
  266. node.add_handler(uavcan.protocol.NodeStatus, lambda e: print(to_yaml(e)))
  267. node.spin()