#!/usr/bin/env python ''' parse a MAVLink protocol XML file and generate a python implementation Copyright Andrew Tridgell 2011 Released under GNU GPL version 3 or later ''' from __future__ import print_function from builtins import range import os import textwrap from . import mavtemplate t = mavtemplate.MAVTemplate() def generate_preamble(outf, msgs, basename, args, xml): print("Generating preamble") t.write(outf, """ ''' MAVLink protocol implementation (auto-generated by mavgen.py) Generated from: ${FILELIST} Note: this file has been auto-generated. DO NOT EDIT ''' from __future__ import print_function from builtins import range from builtins import object import struct, array, time, json, os, sys, platform from ...generator.mavcrc import x25crc import hashlib WIRE_PROTOCOL_VERSION = '${WIRE_PROTOCOL_VERSION}' DIALECT = '${DIALECT}' PROTOCOL_MARKER_V1 = 0xFE PROTOCOL_MARKER_V2 = 0xFD HEADER_LEN_V1 = 6 HEADER_LEN_V2 = 10 MAVLINK_SIGNATURE_BLOCK_LEN = 13 MAVLINK_IFLAG_SIGNED = 0x01 native_supported = platform.system() != 'Windows' # Not yet supported on other dialects native_force = 'MAVNATIVE_FORCE' in os.environ # Will force use of native code regardless of what client app wants native_testing = 'MAVNATIVE_TESTING' in os.environ # Will force both native and legacy code to be used and their results compared if native_supported and float(WIRE_PROTOCOL_VERSION) <= 1: try: import mavnative except ImportError: print('ERROR LOADING MAVNATIVE - falling back to python implementation') native_supported = False else: # mavnative isn't supported for MAVLink2 yet native_supported = False # some base types from mavlink_types.h MAVLINK_TYPE_CHAR = 0 MAVLINK_TYPE_UINT8_T = 1 MAVLINK_TYPE_INT8_T = 2 MAVLINK_TYPE_UINT16_T = 3 MAVLINK_TYPE_INT16_T = 4 MAVLINK_TYPE_UINT32_T = 5 MAVLINK_TYPE_INT32_T = 6 MAVLINK_TYPE_UINT64_T = 7 MAVLINK_TYPE_INT64_T = 8 MAVLINK_TYPE_FLOAT = 9 MAVLINK_TYPE_DOUBLE = 10 class MAVLink_header(object): '''MAVLink message header''' def __init__(self, msgId, incompat_flags=0, compat_flags=0, mlen=0, seq=0, srcSystem=0, srcComponent=0): self.mlen = mlen self.seq = seq self.srcSystem = srcSystem self.srcComponent = srcComponent self.msgId = msgId self.incompat_flags = incompat_flags self.compat_flags = compat_flags def pack(self, force_mavlink1=False): if WIRE_PROTOCOL_VERSION == '2.0' and not force_mavlink1: return struct.pack('>16) return struct.pack(' 1 and payload[plen-1] == chr(0): plen -= 1 self._payload = payload[:plen] incompat_flags = 0 if mav.signing.sign_outgoing: incompat_flags |= MAVLINK_IFLAG_SIGNED self._header = MAVLink_header(self._header.msgId, incompat_flags=incompat_flags, compat_flags=0, mlen=len(self._payload), seq=mav.seq, srcSystem=mav.srcSystem, srcComponent=mav.srcComponent) self._msgbuf = self._header.pack(force_mavlink1=force_mavlink1) + self._payload crc = x25crc(self._msgbuf[1:]) if ${crc_extra}: # using CRC extra crc.accumulate_str(struct.pack('B', crc_extra)) self._crc = crc.crc self._msgbuf += struct.pack('= m.extensions_start: fdefault = m.fielddefaults[i] outf.write(", %s=%s" % (fname, fdefault)) else: outf.write(", %s" % fname) outf.write("):\n") outf.write(" MAVLink_message.__init__(self, %s.id, %s.name)\n" % (classname, classname)) outf.write(" self._fieldnames = %s.fieldnames\n" % (classname)) for f in m.fields: outf.write(" self.%s = %s\n" % (f.name, f.name)) outf.write(""" def pack(self, mav, force_mavlink1=False): return MAVLink_message.pack(self, mav, %u, struct.pack('%s'""" % (m.crc_extra, m.fmtstr)) for field in m.ordered_fields: if (field.type != "char" and field.array_length > 1): for i in range(field.array_length): outf.write(", self.{0:s}[{1:d}]".format(field.name, i)) else: outf.write(", self.{0:s}".format(field.name)) outf.write("), force_mavlink1=force_mavlink1)\n") def native_mavfmt(field): '''work out the struct format for a type (in a form expected by mavnative)''' map = { 'float': 'f', 'double': 'd', 'char': 'c', 'int8_t': 'b', 'uint8_t': 'B', 'uint8_t_mavlink_version': 'v', 'int16_t': 'h', 'uint16_t': 'H', 'int32_t': 'i', 'uint32_t': 'I', 'int64_t': 'q', 'uint64_t': 'Q', } return map[field.type] def mavfmt(field): '''work out the struct format for a type''' map = { 'float': 'f', 'double': 'd', 'char': 'c', 'int8_t': 'b', 'uint8_t': 'B', 'uint8_t_mavlink_version': 'B', 'int16_t': 'h', 'uint16_t': 'H', 'int32_t': 'i', 'uint32_t': 'I', 'int64_t': 'q', 'uint64_t': 'Q', } if field.array_length: if field.type == 'char': return str(field.array_length)+'s' return str(field.array_length)+map[field.type] return map[field.type] def mavdefault(field): '''returns default value for field (as string) for mavlink2 extensions''' if field.type == 'char': default_value = "''" else: default_value = "0" if field.array_length == 0: return default_value return "[" + ",".join([default_value] * field.array_length) + "]" def generate_mavlink_class(outf, msgs, xml): print("Generating MAVLink class") outf.write("\n\nmavlink_map = {\n") for m in msgs: outf.write(" MAVLINK_MSG_ID_%s : MAVLink_%s_message,\n" % ( m.name.upper(), m.name.lower())) outf.write("}\n\n") t.write(outf, """ class MAVError(Exception): '''MAVLink error class''' def __init__(self, msg): Exception.__init__(self, msg) self.message = msg class MAVString(str): '''NUL terminated string''' def __init__(self, s): str.__init__(self) def __str__(self): i = self.find(chr(0)) if i == -1: return self[:] return self[0:i] class MAVLink_bad_data(MAVLink_message): ''' a piece of bad data in a mavlink stream ''' def __init__(self, data, reason): MAVLink_message.__init__(self, MAVLINK_MSG_ID_BAD_DATA, 'BAD_DATA') self._fieldnames = ['data', 'reason'] self.data = data self.reason = reason self._msgbuf = data def __str__(self): '''Override the __str__ function from MAVLink_messages because non-printable characters are common in to be the reason for this message to exist.''' return '%s {%s, data:%s}' % (self._type, self.reason, [('%x' % ord(i) if isinstance(i, str) else '%x' % i) for i in self.data]) class MAVLinkSigning(object): '''MAVLink signing state class''' def __init__(self): self.secret_key = None self.timestamp = 0 self.link_id = 0 self.sign_outgoing = False self.allow_unsigned_callback = None self.stream_timestamps = {} self.sig_count = 0 self.badsig_count = 0 self.goodsig_count = 0 self.unsigned_count = 0 self.reject_count = 0 class MAVLink(object): '''MAVLink protocol handling class''' def __init__(self, file, srcSystem=0, srcComponent=0, use_native=False): self.seq = 0 self.file = file self.srcSystem = srcSystem self.srcComponent = srcComponent self.callback = None self.callback_args = None self.callback_kwargs = None self.send_callback = None self.send_callback_args = None self.send_callback_kwargs = None self.buf = bytearray() self.buf_index = 0 self.expected_length = HEADER_LEN_V1+2 self.have_prefix_error = False self.robust_parsing = False self.protocol_marker = ${protocol_marker} self.little_endian = ${little_endian} self.crc_extra = ${crc_extra} self.sort_fields = ${sort_fields} self.total_packets_sent = 0 self.total_bytes_sent = 0 self.total_packets_received = 0 self.total_bytes_received = 0 self.total_receive_errors = 0 self.startup_time = time.time() self.signing = MAVLinkSigning() if native_supported and (use_native or native_testing or native_force): print("NOTE: mavnative is currently beta-test code") self.native = mavnative.NativeConnection(MAVLink_message, mavlink_map) else: self.native = None if native_testing: self.test_buf = bytearray() self.mav20_unpacker = struct.Struct('= 1 and self.buf[self.buf_index] == PROTOCOL_MARKER_V2: header_len = HEADER_LEN_V2 if self.buf_len() >= 1 and self.buf[self.buf_index] != PROTOCOL_MARKER_V1 and self.buf[self.buf_index] != PROTOCOL_MARKER_V2: magic = self.buf[self.buf_index] self.buf_index += 1 if self.robust_parsing: m = MAVLink_bad_data(bytearray([magic]), 'Bad prefix') self.expected_length = header_len+2 self.total_receive_errors += 1 return m if self.have_prefix_error: return None self.have_prefix_error = True self.total_receive_errors += 1 raise MAVError("invalid MAVLink prefix '%s'" % magic) self.have_prefix_error = False if self.buf_len() >= 3: sbuf = self.buf[self.buf_index:3+self.buf_index] if sys.version_info.major < 3: sbuf = str(sbuf) (magic, self.expected_length, incompat_flags) = self.mav20_h3_unpacker.unpack(sbuf) if magic == PROTOCOL_MARKER_V2 and (incompat_flags & MAVLINK_IFLAG_SIGNED): self.expected_length += MAVLINK_SIGNATURE_BLOCK_LEN self.expected_length += header_len + 2 if self.expected_length >= (header_len+2) and self.buf_len() >= self.expected_length: mbuf = array.array('B', self.buf[self.buf_index:self.buf_index+self.expected_length]) self.buf_index += self.expected_length self.expected_length = header_len+2 if self.robust_parsing: try: if magic == PROTOCOL_MARKER_V2 and (incompat_flags & ~MAVLINK_IFLAG_SIGNED) != 0: raise MAVError('invalid incompat_flags 0x%x 0x%x %u' % (incompat_flags, magic, self.expected_length)) m = self.decode(mbuf) except MAVError as reason: m = MAVLink_bad_data(mbuf, reason.message) self.total_receive_errors += 1 else: if magic == PROTOCOL_MARKER_V2 and (incompat_flags & ~MAVLINK_IFLAG_SIGNED) != 0: raise MAVError('invalid incompat_flags 0x%x 0x%x %u' % (incompat_flags, magic, self.expected_length)) m = self.decode(mbuf) return m return None def parse_buffer(self, s): '''input some data bytes, possibly returning a list of new messages''' m = self.parse_char(s) if m is None: return None ret = [m] while True: m = self.parse_char("") if m is None: return ret ret.append(m) return ret def check_signature(self, msgbuf, srcSystem, srcComponent): '''check signature on incoming message''' if isinstance(msgbuf, array.array): msgbuf = msgbuf.tostring() timestamp_buf = msgbuf[-12:-6] link_id = msgbuf[-13] (tlow, thigh) = self.mav_sign_unpacker.unpack(timestamp_buf) timestamp = tlow + (thigh<<32) # see if the timestamp is acceptable stream_key = (link_id,srcSystem,srcComponent) if stream_key in self.signing.stream_timestamps: if timestamp <= self.signing.stream_timestamps[stream_key]: # reject old timestamp # print('old timestamp') return False else: # a new stream has appeared. Accept the timestamp if it is at most # one minute behind our current timestamp if timestamp + 6000*1000 < self.signing.timestamp: # print('bad new stream ', timestamp/(100.0*1000*60*60*24*365), self.signing.timestamp/(100.0*1000*60*60*24*365)) return False self.signing.stream_timestamps[stream_key] = timestamp # print('new stream') h = hashlib.new('sha256') h.update(self.signing.secret_key) h.update(msgbuf[:-6]) sig1 = str(h.digest())[:6] sig2 = str(msgbuf)[-6:] if sig1 != sig2: # print('sig mismatch') return False # the timestamp we next send with is the max of the received timestamp and # our current timestamp self.signing.timestamp = max(self.signing.timestamp, timestamp) return True def decode(self, msgbuf): '''decode a buffer as a MAVLink message''' # decode the header if msgbuf[0] != PROTOCOL_MARKER_V1: headerlen = 10 try: magic, mlen, incompat_flags, compat_flags, seq, srcSystem, srcComponent, msgIdlow, msgIdhigh = self.mav20_unpacker.unpack(msgbuf[:headerlen]) except struct.error as emsg: raise MAVError('Unable to unpack MAVLink header: %s' % emsg) msgId = msgIdlow | (msgIdhigh<<16) mapkey = msgId else: headerlen = 6 try: magic, mlen, seq, srcSystem, srcComponent, msgId = self.mav10_unpacker.unpack(msgbuf[:headerlen]) incompat_flags = 0 compat_flags = 0 except struct.error as emsg: raise MAVError('Unable to unpack MAVLink header: %s' % emsg) mapkey = msgId if (incompat_flags & MAVLINK_IFLAG_SIGNED) != 0: signature_len = MAVLINK_SIGNATURE_BLOCK_LEN else: signature_len = 0 if ord(magic) != PROTOCOL_MARKER_V1 and ord(magic) != PROTOCOL_MARKER_V2: raise MAVError("invalid MAVLink prefix '%s'" % magic) if mlen != len(msgbuf)-(headerlen+2+signature_len): raise MAVError('invalid MAVLink message length. Got %u expected %u, msgId=%u headerlen=%u' % (len(msgbuf)-(headerlen+2+signature_len), mlen, msgId, headerlen)) if not mapkey in mavlink_map: raise MAVError('unknown MAVLink message ID %s' % str(mapkey)) # decode the payload type = mavlink_map[mapkey] fmt = type.format order_map = type.orders len_map = type.lengths crc_extra = type.crc_extra # decode the checksum try: crc, = self.mav_csum_unpacker.unpack(msgbuf[-(2+signature_len):][:2]) except struct.error as emsg: raise MAVError('Unable to unpack MAVLink CRC: %s' % emsg) crcbuf = msgbuf[1:-(2+signature_len)] if ${crc_extra}: # using CRC extra crcbuf.append(crc_extra) crc2 = x25crc(crcbuf) if crc != crc2.crc: raise MAVError('invalid MAVLink CRC in msgID %u 0x%04x should be 0x%04x' % (msgId, crc, crc2.crc)) sig_ok = False if signature_len == MAVLINK_SIGNATURE_BLOCK_LEN: self.signing.sig_count += 1 if self.signing.secret_key is not None: accept_signature = False if signature_len == MAVLINK_SIGNATURE_BLOCK_LEN: sig_ok = self.check_signature(msgbuf, srcSystem, srcComponent) accept_signature = sig_ok if sig_ok: self.signing.goodsig_count += 1 else: self.signing.badsig_count += 1 if not accept_signature and self.signing.allow_unsigned_callback is not None: accept_signature = self.signing.allow_unsigned_callback(self, msgId) if accept_signature: self.signing.unsigned_count += 1 else: self.signing.reject_count += 1 elif self.signing.allow_unsigned_callback is not None: accept_signature = self.signing.allow_unsigned_callback(self, msgId) if accept_signature: self.signing.unsigned_count += 1 else: self.signing.reject_count += 1 if not accept_signature: raise MAVError('Invalid signature') csize = type.unpacker.size mbuf = msgbuf[headerlen:-(2+signature_len)] if len(mbuf) < csize: # zero pad to give right size mbuf.extend([0]*(csize - len(mbuf))) if len(mbuf) < csize: raise MAVError('Bad message of type %s length %u needs %s' % ( type, len(mbuf), csize)) mbuf = mbuf[:csize] try: t = type.unpacker.unpack(mbuf) except struct.error as emsg: raise MAVError('Unable to unpack MAVLink payload type=%s fmt=%s payloadLength=%u: %s' % ( type, fmt, len(mbuf), emsg)) tlist = list(t) # handle sorted fields if ${sort_fields}: t = tlist[:] if sum(len_map) == len(len_map): # message has no arrays in it for i in range(0, len(tlist)): tlist[i] = t[order_map[i]] else: # message has some arrays tlist = [] for i in range(0, len(order_map)): order = order_map[i] L = len_map[order] tip = sum(len_map[:order]) field = t[tip] if L == 1 or isinstance(field, str): tlist.append(field) else: tlist.append(t[tip:(tip + L)]) # terminate any strings for i in range(0, len(tlist)): if type.fieldtypes[i] == 'char': if sys.version_info.major >= 3: tlist[i] = tlist[i].decode('utf-8') tlist[i] = str(MAVString(tlist[i])) t = tuple(tlist) # construct the message object try: m = type(*t) except Exception as emsg: raise MAVError('Unable to instantiate MAVLink message of type %s : %s' % (type, emsg)) m._signed = sig_ok if m._signed: m._link_id = msgbuf[-13] m._msgbuf = msgbuf m._payload = msgbuf[6:-(2+signature_len)] m._crc = crc m._header = MAVLink_header(msgId, incompat_flags, compat_flags, mlen, seq, srcSystem, srcComponent) return m """, xml) def generate_methods(outf, msgs): print("Generating methods") def field_descriptions(fields): ret = "" for f in fields: field_info = "" if f.units: field_info += "%s " % f.units field_info += "(type:%s" % f.type if f.enum: field_info += ", values:%s" % f.enum field_info += ")" ret += " %-18s : %s %s\n" % (f.name, f.description.strip(), field_info) return ret wrapper = textwrap.TextWrapper(initial_indent="", subsequent_indent=" ") for m in msgs: comment = "%s\n\n%s" % (wrapper.fill(m.description.strip()), field_descriptions(m.fields)) selffieldnames = 'self, ' for i in range(len(m.fields)): f = m.fields[i] if f.omit_arg: selffieldnames += '%s=%s, ' % (f.name, f.const_value) elif m.extensions_start is not None and i >= m.extensions_start: fdefault = m.fielddefaults[i] selffieldnames += "%s=%s, " % (f.name, fdefault) else: selffieldnames += '%s, ' % f.name selffieldnames = selffieldnames[:-2] sub = {'NAMELOWER': m.name.lower(), 'SELFFIELDNAMES': selffieldnames, 'COMMENT': comment, 'FIELDNAMES': ", ".join(m.fieldnames)} t.write(outf, """ def ${NAMELOWER}_encode(${SELFFIELDNAMES}): ''' ${COMMENT} ''' return MAVLink_${NAMELOWER}_message(${FIELDNAMES}) """, sub) t.write(outf, """ def ${NAMELOWER}_send(${SELFFIELDNAMES}, force_mavlink1=False): ''' ${COMMENT} ''' return self.send(self.${NAMELOWER}_encode(${FIELDNAMES}), force_mavlink1=force_mavlink1) """, sub) def generate(basename, xml): '''generate complete python implementation''' if basename.endswith('.py'): filename = basename else: filename = basename + '.py' msgs = [] enums = [] filelist = [] for x in xml: msgs.extend(x.message) enums.extend(x.enum) filelist.append(os.path.basename(x.filename)) for m in msgs: m.fielddefaults = [] if xml[0].little_endian: m.fmtstr = '<' else: m.fmtstr = '>' m.native_fmtstr = m.fmtstr for f in m.ordered_fields: m.fmtstr += mavfmt(f) m.fielddefaults.append(mavdefault(f)) m.native_fmtstr += native_mavfmt(f) m.order_map = [0] * len(m.fieldnames) m.len_map = [0] * len(m.fieldnames) m.array_len_map = [0] * len(m.fieldnames) for i in range(0, len(m.fieldnames)): m.order_map[i] = m.ordered_fieldnames.index(m.fieldnames[i]) m.array_len_map[i] = m.ordered_fields[i].array_length for i in range(0, len(m.fieldnames)): n = m.order_map[i] m.len_map[n] = m.fieldlengths[i] print("Generating %s" % filename) outf = open(filename, "w") generate_preamble(outf, msgs, basename, filelist, xml[0]) generate_enums(outf, enums) generate_message_ids(outf, msgs) generate_classes(outf, msgs) generate_mavlink_class(outf, msgs, xml[0]) generate_methods(outf, msgs) outf.close() print("Generated %s OK" % filename)