#!/usr/bin/env python ''' APM DataFlash log file reader Copyright Andrew Tridgell 2011 Released under GNU GPL version 3 or later Partly based on SDLog2Parser by Anton Babushkin ''' from __future__ import print_function from builtins import range from builtins import object import array import math import sys import os import mmap import platform import struct import sys from . import mavutil try: long # Python 2 has long except NameError: long = int # But Python 3 does not FORMAT_TO_STRUCT = { "a": ("64s", None, str), "b": ("b", None, int), "B": ("B", None, int), "h": ("h", None, int), "H": ("H", None, int), "i": ("i", None, int), "I": ("I", None, int), "f": ("f", None, float), "n": ("4s", None, str), "N": ("16s", None, str), "Z": ("64s", None, str), "c": ("h", 0.01, float), "C": ("H", 0.01, float), "e": ("i", 0.01, float), "E": ("I", 0.01, float), "L": ("i", 1.0e-7, float), "d": ("d", None, float), "M": ("b", None, int), "q": ("q", None, long), # Backward compat "Q": ("Q", None, long), # Backward compat } def u_ord(c): return ord(c) if sys.version_info.major < 3 else c class DFFormat(object): def __init__(self, type, name, flen, format, columns): self.type = type self.name = null_term(name) self.len = flen self.format = format self.columns = columns.split(',') if self.columns == ['']: self.columns = [] msg_struct = "<" msg_mults = [] msg_types = [] msg_fmts = [] for c in format: if u_ord(c) == 0: break try: msg_fmts.append(c) (s, mul, type) = FORMAT_TO_STRUCT[c] msg_struct += s msg_mults.append(mul) if c == "a": msg_types.append(array.array) else: msg_types.append(type) except KeyError as e: print("DFFormat: Unsupported format char: '%s' in message %s" % (c, name)) raise Exception("Unsupported format char: '%s' in message %s" % (c, name)) self.msg_struct = msg_struct self.msg_types = msg_types self.msg_mults = msg_mults self.msg_fmts = msg_fmts self.colhash = {} for i in range(len(self.columns)): self.colhash[self.columns[i]] = i self.a_indexes = [] for i in range(0, len(self.msg_fmts)): if self.msg_fmts[i] == 'a': self.a_indexes.append(i) def __str__(self): return ("DFFormat(%s,%s,%s,%s)" % (self.type, self.name, self.format, self.columns)) def to_string(s): '''desperate attempt to convert a string regardless of what garbage we get''' try: return s.decode("utf-8") except Exception as e: pass try: s2 = s.encode('utf-8', 'ignore') x = u"%s" % s2 return s2 except Exception: pass # so its a nasty one. Let's grab as many characters as we can r = '' while s != '': try: r2 = r + s[0] s = s[1:] r2 = r2.encode('ascii', 'ignore') x = u"%s" % r2 r = r2 except Exception: break return r + '_XXX' def null_term(str): '''null terminate a string''' if isinstance(str, bytes): str = to_string(str) idx = str.find("\0") if idx != -1: str = str[:idx] return str class DFMessage(object): def __init__(self, fmt, elements, apply_multiplier): self.fmt = fmt self._elements = elements self._apply_multiplier = apply_multiplier self._fieldnames = fmt.columns def to_dict(self): d = {'mavpackettype': self.fmt.name} for field in self._fieldnames: d[field] = self.__getattr__(field) return d def __getattr__(self, field): '''override field getter''' try: i = self.fmt.colhash[field] except Exception: raise AttributeError(field) if isinstance(self._elements[i], bytes): v = self._elements[i].decode("utf-8") else: v = self._elements[i] if self.fmt.format[i] == 'a': pass elif self.fmt.format[i] != 'M' or self._apply_multiplier: v = self.fmt.msg_types[i](v) if self.fmt.msg_types[i] == str: v = null_term(v) if self.fmt.msg_mults[i] is not None and self._apply_multiplier: v *= self.fmt.msg_mults[i] return v def get_type(self): return self.fmt.name def __str__(self): ret = "%s {" % self.fmt.name col_count = 0 for c in self.fmt.columns: val = self.__getattr__(c) if isinstance(val, float) and math.isnan(val): # quiet nans have more non-zero values: noisy_nan = "\x7f\xf8\x00\x00\x00\x00\x00\x00" if struct.pack(">d", val) != noisy_nan: val = "qnan" ret += "%s : %s, " % (c, val) col_count += 1 if col_count != 0: ret = ret[:-2] return ret + '}' def get_msgbuf(self): '''create a binary message buffer for a message''' values = [] is_py2 = sys.version_info < (3,0) for i in range(len(self.fmt.columns)): if i >= len(self.fmt.msg_mults): continue mul = self.fmt.msg_mults[i] name = self.fmt.columns[i] if name == 'Mode' and 'ModeNum' in self.fmt.columns: name = 'ModeNum' v = self.__getattr__(name) if is_py2: if isinstance(v,unicode): # NOQA v = str(v) else: if isinstance(v,str): v = bytes(v,'ascii') if isinstance(v, array.array): v = v.tostring() if mul is not None: v /= mul v = int(round(v)) values.append(v) ret1 = struct.pack("BBB", 0xA3, 0x95, self.fmt.type) try: ret2 = struct.pack(self.fmt.msg_struct, *values) except Exception as ex: return None return ret1 + ret2 def get_fieldnames(self): return self._fieldnames class DFReaderClock(object): '''base class for all the different ways we count time in logs''' def __init__(self): self.set_timebase(0) self.timestamp = 0 def _gpsTimeToTime(self, week, msec): '''convert GPS week and TOW to a time in seconds since 1970''' epoch = 86400*(10*365 + int((1980-1969)/4) + 1 + 6 - 2) return epoch + 86400*7*week + msec*0.001 - 15 def set_timebase(self, base): self.timebase = base def message_arrived(self, m): pass def rewind_event(self): pass class DFReaderClock_usec(DFReaderClock): '''DFReaderClock_usec - use microsecond timestamps from messages''' def __init__(self): DFReaderClock.__init__(self) def find_time_base(self, gps, first_us_stamp): '''work out time basis for the log - even newer style''' t = self._gpsTimeToTime(gps.GWk, gps.GMS) self.set_timebase(t - gps.TimeUS*0.000001) # this ensures FMT messages get appropriate timestamp: self.timestamp = self.timebase + first_us_stamp*0.000001 def type_has_good_TimeMS(self, type): '''The TimeMS in some messages is not from *our* clock!''' if type.startswith('ACC'): return False if type.startswith('GYR'): return False return True def should_use_msec_field0(self, m): if not self.type_has_good_TimeMS(m.get_type()): return False if 'TimeMS' != m._fieldnames[0]: return False if self.timebase + m.TimeMS*0.001 < self.timestamp: return False return True def set_message_timestamp(self, m): if 'TimeUS' == m._fieldnames[0]: # only format messages don't have a TimeUS in them... m._timestamp = self.timebase + m.TimeUS*0.000001 elif self.should_use_msec_field0(m): # ... in theory. I expect there to be some logs which are not # "pure": m._timestamp = self.timebase + m.TimeMS*0.001 else: m._timestamp = self.timestamp self.timestamp = m._timestamp class DFReaderClock_msec(DFReaderClock): '''DFReaderClock_msec - a format where many messages have TimeMS in their formats, and GPS messages have a "T" field giving msecs''' def find_time_base(self, gps, first_ms_stamp): '''work out time basis for the log - new style''' t = self._gpsTimeToTime(gps.Week, gps.TimeMS) self.set_timebase(t - gps.T*0.001) self.timestamp = self.timebase + first_ms_stamp*0.001 def set_message_timestamp(self, m): if 'TimeMS' == m._fieldnames[0]: m._timestamp = self.timebase + m.TimeMS*0.001 elif m.get_type() in ['GPS', 'GPS2']: m._timestamp = self.timebase + m.T*0.001 else: m._timestamp = self.timestamp self.timestamp = m._timestamp class DFReaderClock_px4(DFReaderClock): '''DFReaderClock_px4 - a format where a starting time is explicitly given in a message''' def __init__(self): DFReaderClock.__init__(self) self.px4_timebase = 0 def find_time_base(self, gps): '''work out time basis for the log - PX4 native''' t = gps.GPSTime * 1.0e-6 self.timebase = t - self.px4_timebase def set_px4_timebase(self, time_msg): self.px4_timebase = time_msg.StartTime * 1.0e-6 def set_message_timestamp(self, m): m._timestamp = self.timebase + self.px4_timebase def message_arrived(self, m): type = m.get_type() if type == 'TIME' and 'StartTime' in m._fieldnames: self.set_px4_timebase(m) class DFReaderClock_gps_interpolated(DFReaderClock): '''DFReaderClock_gps_interpolated - for when the only real references in a message are GPS timestamps''' def __init__(self): DFReaderClock.__init__(self) self.msg_rate = {} self.counts = {} self.counts_since_gps = {} def rewind_event(self): '''reset counters on rewind''' self.counts = {} self.counts_since_gps = {} def message_arrived(self, m): type = m.get_type() if type not in self.counts: self.counts[type] = 1 else: self.counts[type] += 1 # this preserves existing behaviour - but should we be doing this # if type == 'GPS'? if type not in self.counts_since_gps: self.counts_since_gps[type] = 1 else: self.counts_since_gps[type] += 1 if type == 'GPS' or type == 'GPS2': self.gps_message_arrived(m) def gps_message_arrived(self, m): '''adjust time base from GPS message''' # msec-style GPS message? gps_week = getattr(m, 'Week', None) gps_timems = getattr(m, 'TimeMS', None) if gps_week is None: # usec-style GPS message? gps_week = getattr(m, 'GWk', None) gps_timems = getattr(m, 'GMS', None) if gps_week is None: if getattr(m, 'GPSTime', None) is not None: # PX4-style timestamp; we've only been called # because we were speculatively created in case no # better clock was found. return if gps_week is None: # AvA-style logs gps_week = getattr(m, 'Wk') gps_timems = getattr(m, 'TWk') if gps_week is None or gps_timems is None: return t = self._gpsTimeToTime(gps_week, gps_timems) deltat = t - self.timebase if deltat <= 0: return for type in self.counts_since_gps: rate = self.counts_since_gps[type] / deltat if rate > self.msg_rate.get(type, 0): self.msg_rate[type] = rate self.msg_rate['IMU'] = 50.0 self.timebase = t self.counts_since_gps = {} def set_message_timestamp(self, m): rate = self.msg_rate.get(m.fmt.name, 50.0) if int(rate) == 0: rate = 50 count = self.counts_since_gps.get(m.fmt.name, 0) m._timestamp = self.timebase + count/rate class DFReader(object): '''parse a generic dataflash file''' def __init__(self): # read the whole file into memory for simplicity self.clock = None self.timestamp = 0 self.mav_type = mavutil.mavlink.MAV_TYPE_FIXED_WING self.verbose = False self.params = {} self._flightmodes = None def _rewind(self): '''reset state on rewind''' self.messages = {'MAV': self} if self._flightmodes is not None and len(self._flightmodes) > 0: self.flightmode = self._flightmodes[0][0] else: self.flightmode = "UNKNOWN" self.percent = 0 if self.clock: self.clock.rewind_event() def init_clock_px4(self, px4_msg_time, px4_msg_gps): self.clock = DFReaderClock_px4() if not self._zero_time_base: self.clock.set_px4_timebase(px4_msg_time) self.clock.find_time_base(px4_msg_gps) return True def init_clock_msec(self): # it is a new style flash log with full timestamps self.clock = DFReaderClock_msec() def init_clock_usec(self): self.clock = DFReaderClock_usec() def init_clock_gps_interpolated(self, clock): self.clock = clock def init_clock(self): '''work out time basis for the log''' self._rewind() # speculatively create a gps clock in case we don't find anything # better gps_clock = DFReaderClock_gps_interpolated() self.clock = gps_clock px4_msg_time = None px4_msg_gps = None gps_interp_msg_gps1 = None first_us_stamp = None first_ms_stamp = None have_good_clock = False while True: m = self.recv_msg() if m is None: break type = m.get_type() if first_us_stamp is None: first_us_stamp = getattr(m, "TimeUS", None) if first_ms_stamp is None and (type != 'GPS' and type != 'GPS2'): # Older GPS messages use TimeMS for msecs past start # of gps week first_ms_stamp = getattr(m, "TimeMS", None) if type == 'GPS' or type == 'GPS2': if getattr(m, "TimeUS", 0) != 0 and \ getattr(m, "GWk", 0) != 0: # everything-usec-timestamped self.init_clock_usec() if not self._zero_time_base: self.clock.find_time_base(m, first_us_stamp) have_good_clock = True break if getattr(m, "T", 0) != 0 and \ getattr(m, "Week", 0) != 0: # GPS is msec-timestamped if first_ms_stamp is None: first_ms_stamp = m.T self.init_clock_msec() if not self._zero_time_base: self.clock.find_time_base(m, first_ms_stamp) have_good_clock = True break if getattr(m, "GPSTime", 0) != 0: # px4-style-only px4_msg_gps = m if getattr(m, "Week", 0) != 0: if (gps_interp_msg_gps1 is not None and (gps_interp_msg_gps1.TimeMS != m.TimeMS or gps_interp_msg_gps1.Week != m.Week)): # we've received two distinct, non-zero GPS # packets without finding a decent clock to # use; fall back to interpolation. Q: should # we wait a few more messages befoe doing # this? self.init_clock_gps_interpolated(gps_clock) have_good_clock = True break gps_interp_msg_gps1 = m elif type == 'TIME': '''only px4-style logs use TIME''' if getattr(m, "StartTime", None) is not None: px4_msg_time = m if px4_msg_time is not None and px4_msg_gps is not None: self.init_clock_px4(px4_msg_time, px4_msg_gps) have_good_clock = True break # print("clock is " + str(self.clock)) if not have_good_clock: # we failed to find any GPS messages to set a time # base for usec and msec clocks. Also, not a # PX4-style log if first_us_stamp is not None: self.init_clock_usec() elif first_ms_stamp is not None: self.init_clock_msec() self._rewind() return def _set_time(self, m): '''set time for a message''' # really just left here for profiling m._timestamp = self.timestamp if len(m._fieldnames) > 0 and self.clock is not None: self.clock.set_message_timestamp(m) def recv_msg(self): return self._parse_next() def _add_msg(self, m): '''add a new message''' type = m.get_type() self.messages[type] = m if self.clock: self.clock.message_arrived(m) if type == 'MSG': if m.Message.find("Rover") != -1: self.mav_type = mavutil.mavlink.MAV_TYPE_GROUND_ROVER elif m.Message.find("Plane") != -1: self.mav_type = mavutil.mavlink.MAV_TYPE_FIXED_WING elif m.Message.find("Copter") != -1: self.mav_type = mavutil.mavlink.MAV_TYPE_QUADROTOR elif m.Message.startswith("Antenna"): self.mav_type = mavutil.mavlink.MAV_TYPE_ANTENNA_TRACKER elif m.Message.find("ArduSub") != -1: self.mav_type = mavutil.mavlink.MAV_TYPE_SUBMARINE if type == 'MODE': if isinstance(m.Mode, str): self.flightmode = m.Mode.upper() elif 'ModeNum' in m._fieldnames: mapping = mavutil.mode_mapping_bynumber(self.mav_type) if mapping is not None and m.ModeNum in mapping: self.flightmode = mapping[m.ModeNum] else: self.flightmode = 'UNKNOWN' else: self.flightmode = mavutil.mode_string_acm(m.Mode) if type == 'STAT' and 'MainState' in m._fieldnames: self.flightmode = mavutil.mode_string_px4(m.MainState) if type == 'PARM' and getattr(m, 'Name', None) is not None: self.params[m.Name] = m.Value self._set_time(m) def recv_match(self, condition=None, type=None, blocking=False): '''recv the next message that matches the given condition type can be a string or a list of strings''' if type is not None: if isinstance(type, str): type = set([type]) elif isinstance(type, list): type = set(type) while True: if type is not None: self.skip_to_type(type) m = self.recv_msg() if m is None: return None if type is not None and not m.get_type() in type: continue if not mavutil.evaluate_condition(condition, self.messages): continue return m def check_condition(self, condition): '''check if a condition is true''' return mavutil.evaluate_condition(condition, self.messages) def param(self, name, default=None): '''convenient function for returning an arbitrary MAVLink parameter with a default''' if name not in self.params: return default return self.params[name] def flightmode_list(self): '''return an array of tuples for all flightmodes in log. Tuple is (modestring, t0, t1)''' tstamp = None fmode = None if self._flightmodes is None: self._rewind() self._flightmodes = [] types = set(['MODE']) while True: m = self.recv_match(type=types) if m is None: break tstamp = m._timestamp if self.flightmode == fmode: continue if len(self._flightmodes) > 0: (mode, t0, t1) = self._flightmodes[-1] self._flightmodes[-1] = (mode, t0, tstamp) self._flightmodes.append((self.flightmode, tstamp, None)) fmode = self.flightmode if tstamp is not None: (mode, t0, t1) = self._flightmodes[-1] self._flightmodes[-1] = (mode, t0, self.last_timestamp()) self._rewind() return self._flightmodes class DFReader_binary(DFReader): '''parse a binary dataflash file''' def __init__(self, filename, zero_time_base=False, progress_callback=None): DFReader.__init__(self) # read the whole file into memory for simplicity self.filehandle = open(filename, 'rb') self.filehandle.seek(0, 2) self.data_len = self.filehandle.tell() self.filehandle.seek(0) if platform.system() == "Windows": self.data_map = mmap.mmap(self.filehandle.fileno(), self.data_len, None, mmap.ACCESS_READ) else: self.data_map = mmap.mmap(self.filehandle.fileno(), self.data_len, mmap.MAP_PRIVATE, mmap.PROT_READ) self.HEAD1 = 0xA3 self.HEAD2 = 0x95 self.unpackers = {} if sys.version_info.major < 3: self.HEAD1 = chr(self.HEAD1) self.HEAD2 = chr(self.HEAD2) self.formats = { 0x80: DFFormat(0x80, 'FMT', 89, 'BBnNZ', "Type,Length,Name,Format,Columns") } self._zero_time_base = zero_time_base self.prev_type = None self.init_clock() self.prev_type = None self._rewind() self.init_arrays(progress_callback) def _rewind(self): '''rewind to start of log''' DFReader._rewind(self) self.offset = 0 self.remaining = self.data_len self.type_nums = None self.timestamp = 0 def rewind(self): '''rewind to start of log''' self._rewind() def init_arrays(self, progress_callback=None): '''initialise arrays for fast recv_match()''' self.offsets = [] self.counts = [] self._count = 0 self.name_to_id = {} self.id_to_name = {} for i in range(256): self.offsets.append([]) self.counts.append(0) fmt_type = 0x80 ofs = 0 pct = 0 HEAD1 = self.HEAD1 HEAD2 = self.HEAD2 lengths = [-1] * 256 while ofs+3 < self.data_len: hdr = self.data_map[ofs:ofs+3] if hdr[0] != HEAD1 or hdr[1] != HEAD2: print("bad header 0x%02x 0x%02x" % (u_ord(hdr[0]), u_ord(hdr[1])), file=sys.stderr) break mtype = u_ord(hdr[2]) self.offsets[mtype].append(ofs) if lengths[mtype] == -1: if not mtype in self.formats: print("unknown msg type 0x%02x (%u)" % (mtype, mtype), file=sys.stderr) break self.offset = ofs self._parse_next() fmt = self.formats[mtype] lengths[mtype] = fmt.len self.counts[mtype] += 1 mlen = lengths[mtype] if mtype == fmt_type: body = self.data_map[ofs+3:ofs+mlen] if len(body)+3 < mlen: break fmt = self.formats[mtype] elements = list(struct.unpack(fmt.msg_struct, body)) mfmt = DFFormat( elements[0], null_term(elements[2]), elements[1], null_term(elements[3]), null_term(elements[4])) self.formats[elements[0]] = mfmt self.name_to_id[mfmt.name] = mfmt.type self.id_to_name[mfmt.type] = mfmt.name ofs += mlen if progress_callback is not None: new_pct = (100 * ofs) // self.data_len if new_pct != pct: progress_callback(new_pct) pct = new_pct for i in range(256): self._count += self.counts[i] self.offset = 0 def last_timestamp(self): '''get the last timestamp in the log''' highest_offset = 0 second_highest_offset = 0 for i in range(256): if self.counts[i] == -1: continue if len(self.offsets[i]) == 0: continue ofs = self.offsets[i][-1] if ofs > highest_offset: second_highest_offset = highest_offset highest_offset = ofs elif ofs > second_highest_offset: second_highest_offset = ofs self.offset = highest_offset m = self.recv_msg() if m is None: self.offset = second_highest_offset m = self.recv_msg() return m._timestamp def skip_to_type(self, type): '''skip fwd to next msg matching given type set''' if self.type_nums is None: # always add some key msg types so we can track flightmode, params etc type = type.copy() type.update(set(['MODE','MSG','PARM','STAT'])) self.indexes = [] self.type_nums = [] for t in type: if not t in self.name_to_id: continue self.type_nums.append(self.name_to_id[t]) self.indexes.append(0) smallest_index = -1 smallest_offset = self.data_len for i in range(len(self.type_nums)): mtype = self.type_nums[i] if self.indexes[i] >= self.counts[mtype]: continue ofs = self.offsets[mtype][self.indexes[i]] if ofs < smallest_offset: smallest_offset = ofs smallest_index = i if smallest_index >= 0: self.indexes[smallest_index] += 1 self.offset = smallest_offset def _parse_next(self): '''read one message, returning it as an object''' # skip over bad messages; after this loop has run msg_type # indicates the message which starts at self.offset (including # signature bytes and msg_type itself) skip_type = None skip_start = 0 while True: if self.data_len - self.offset < 3: return None hdr = self.data_map[self.offset:self.offset+3] if hdr[0] == self.HEAD1 and hdr[1] == self.HEAD2: # signature found if skip_type is not None: # emit message about skipped bytes if self.remaining >= 528: # APM logs often contain garbage at end skip_bytes = self.offset - skip_start print("Skipped %u bad bytes in log at offset %u, type=%s (prev=%s)" % (skip_bytes, skip_start, skip_type, self.prev_type), file=sys.stderr) skip_type = None # check we recognise this message type: msg_type = u_ord(hdr[2]) if msg_type in self.formats: # recognised message found self.prev_type = msg_type break; # message was not recognised; fall through so these # bytes are considered "skipped". The signature bytes # are easily recognisable in the "Skipped bytes" # message. if skip_type is None: skip_type = (u_ord(hdr[0]), u_ord(hdr[1]), u_ord(hdr[2])) skip_start = self.offset self.offset += 1 self.remaining -= 1 self.offset += 3 self.remaining = self.data_len - self.offset fmt = self.formats[msg_type] if self.remaining < fmt.len-3: # out of data - can often happen half way through a message if self.verbose: print("out of data", file=sys.stderr) return None body = self.data_map[self.offset:self.offset+fmt.len-3] elements = None try: if not msg_type in self.unpackers: self.unpackers[msg_type] = struct.Struct(fmt.msg_struct).unpack elements = list(self.unpackers[msg_type](body)) except Exception as ex: print(ex) if self.remaining < 528: # we can have garbage at the end of an APM2 log return None # we should also cope with other corruption; logs # transfered via DataFlash_MAVLink may have blocks of 0s # in them, for example print("Failed to parse %s/%s with len %u (remaining %u)" % (fmt.name, fmt.msg_struct, len(body), self.remaining), file=sys.stderr) if elements is None: return self._parse_next() name = fmt.name # transform elements which can't be done at unpack time: for a_index in fmt.a_indexes: try: elements[a_index] = array.array('h', elements[a_index]) except Exception as e: print("Failed to transform array: %s" % str(e), file=sys.stderr) if name == 'FMT': # add to formats # name, len, format, headings try: self.formats[elements[0]] = DFFormat( elements[0], null_term(elements[2]), elements[1], null_term(elements[3]), null_term(elements[4])) except Exception: return self._parse_next() self.offset += fmt.len - 3 self.remaining = self.data_len - self.offset m = DFMessage(fmt, elements, True) try: self._add_msg(m) except Exception as ex: print("bad msg at offset %u" % self.offset, ex) pass self.percent = 100.0 * (self.offset / float(self.data_len)) return m def DFReader_is_text_log(filename): '''return True if a file appears to be a valid text log''' with open(filename, 'r') as f: ret = (f.read(8000).find('FMT, ') != -1) return ret class DFReader_text(DFReader): '''parse a text dataflash file''' def __init__(self, filename, zero_time_base=False, progress_callback=None): DFReader.__init__(self) # read the whole file into memory for simplicity self.filehandle = open(filename, 'r') self.filehandle.seek(0, 2) self.data_len = self.filehandle.tell() if platform.system() == "Windows": self.data_map = mmap.mmap(self.filehandle.fileno(), self.data_len, None, mmap.ACCESS_READ) else: self.data_map = mmap.mmap(self.filehandle.fileno(), self.data_len, mmap.MAP_PRIVATE, mmap.PROT_READ) self.offset = 0 self.formats = { 'FMT': DFFormat(0x80, 'FMT', 89, 'BBnNZ', "Type,Length,Name,Format,Columns") } self._rewind() self._zero_time_base = zero_time_base self.init_clock() self._rewind() self.init_arrays(progress_callback) def _rewind(self): '''rewind to start of log''' DFReader._rewind(self) # find the first valid line self.offset = self.data_map.find(b'FMT, ') self.type_list = None def rewind(self): '''rewind to start of log''' self._rewind() def init_arrays(self, progress_callback=None): '''initialise arrays for fast recv_match()''' self.offsets = {} self.counts = {} self._count = 0 ofs = self.offset pct = 0 while ofs+16 < self.data_len: mtype = self.data_map[ofs:ofs+4] if mtype[3] == ',': mtype = mtype[0:3] if not mtype in self.offsets: self.counts[mtype] = 0 self.offsets[mtype] = [] self.offset = ofs self._parse_next() self.offsets[mtype].append(ofs) self.counts[mtype] += 1 if mtype == "FMT": self.offset = ofs self._parse_next() ofs = self.data_map.find(b"\n", ofs) if ofs == -1: break ofs += 1 new_pct = (100 * ofs) // self.data_len if progress_callback is not None and new_pct != pct: progress_callback(new_pct) pct = new_pct for mtype in self.counts.keys(): self._count += self.counts[mtype] self.offset = 0 def skip_to_type(self, type): '''skip fwd to next msg matching given type set''' if self.type_list is None: # always add some key msg types so we can track flightmode, params etc self.type_list = type.copy() self.type_list.update(set(['MODE','MSG','PARM','STAT'])) self.type_list = list(self.type_list) self.indexes = [] self.type_nums = [] for t in self.type_list: self.indexes.append(0) smallest_index = -1 smallest_offset = self.data_len for i in range(len(self.type_list)): mtype = self.type_list[i] if not mtype in self.counts: continue if self.indexes[i] >= self.counts[mtype]: continue ofs = self.offsets[mtype][self.indexes[i]] if ofs < smallest_offset: smallest_offset = ofs smallest_index = i if smallest_index >= 0: self.indexes[smallest_index] += 1 self.offset = smallest_offset def _parse_next(self): '''read one message, returning it as an object''' while True: endline = self.data_map.find(b'\n',self.offset) if endline == -1: endline = self.data_len if endline < self.offset: break s = self.data_map[self.offset:endline].rstrip() if sys.version_info.major >= 3: s = s.decode('utf-8') elements = s.split(", ") self.offset = endline+1 if len(elements) >= 2: # this_line is good break if self.offset > self.data_len: return None # cope with empty structures if len(elements) == 5 and elements[-1] == ',': elements[-1] = '' elements.append('') self.percent = 100.0 * (self.offset / float(self.data_len)) msg_type = elements[0] if msg_type not in self.formats: return self._parse_next() fmt = self.formats[msg_type] if len(elements) < len(fmt.format)+1: # not enough columns return self._parse_next() elements = elements[1:] name = fmt.name.rstrip('\0') if name == 'FMT': # add to formats # name, len, format, headings if elements[2] == 'FMT' and elements[4] == 'Type,Length,Name,Format': # some logs have the 'Columns' column missing from text logs elements[4] = "Type,Length,Name,Format,Columns" new_fmt = DFFormat(int(elements[0]), elements[2], int(elements[1]), elements[3], elements[4]) self.formats[elements[2]] = new_fmt try: m = DFMessage(fmt, elements, False) except ValueError: return self._parse_next() self._add_msg(m) return m def last_timestamp(self): '''get the last timestamp in the log''' highest_offset = 0 for mtype in self.counts.keys(): if len(self.offsets[mtype]) == 0: continue ofs = self.offsets[mtype][-1] if ofs > highest_offset: highest_offset = ofs self.offset = highest_offset m = self.recv_msg() return m._timestamp if __name__ == "__main__": use_profiler = False if use_profiler: from line_profiler import LineProfiler profiler = LineProfiler() profiler.add_function(DFReader_binary._parse_next) profiler.add_function(DFReader_binary._add_msg) profiler.add_function(DFReader._set_time) profiler.enable_by_count() filename = sys.argv[1] if filename.endswith('.log'): log = DFReader_text(filename) else: log = DFReader_binary(filename) while True: m = log.recv_msg() if m is None: break if use_profiler: profiler.print_stats()