123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- #!/usr/bin/env python
- '''
- extract ISBH and ISBD messages from AP_Logging files and produce FFT plots
- '''
- from __future__ import print_function
- import numpy
- import os
- import pylab
- import sys
- import time
- from argparse import ArgumentParser
- parser = ArgumentParser(description=__doc__)
- parser.add_argument("--condition", default=None, help="select packets by condition")
- parser.add_argument("logs", metavar="LOG", nargs="+")
- args = parser.parse_args()
- from pymavlink import mavutil
- def mavfft_fttd(logfile):
- '''display fft for raw ACC data in logfile'''
- '''object to store data about a single FFT plot'''
- class PlotData(object):
- def __init__(self, ffth):
- self.seqno = -1
- self.fftnum = ffth.N
- self.sensor_type = ffth.type
- self.instance = ffth.instance
- self.sample_rate_hz = ffth.smp_rate
- self.multiplier = ffth.mul
- self.data = {}
- self.data["X"] = []
- self.data["Y"] = []
- self.data["Z"] = []
- self.holes = False
- self.freq = None
- def add_fftd(self, fftd):
- if fftd.N != self.fftnum:
- print("Skipping ISBD with wrong fftnum (%u vs %u)\n" % (fftd.fftnum, self.fftnum))
- return
- if self.holes:
- print("Skipping ISBD(%u) for ISBH(%u) with holes in it" % (fftd.seqno, self.fftnum))
- return
- if fftd.seqno != self.seqno+1:
- print("ISBH(%u) has holes in it" % fftd.N)
- self.holes = True
- return
- self.seqno += 1
- self.data["X"].extend(fftd.x)
- self.data["Y"].extend(fftd.y)
- self.data["Z"].extend(fftd.z)
- def prefix(self):
- if self.sensor_type == 0:
- return "Accel"
- elif self.sensor_type == 1:
- return "Gyro"
- else:
- return "?Unknown Sensor Type?"
- def tag(self):
- return str(self)
- def __str__(self):
- return "%s[%u]" % (self.prefix(), self.instance)
- print("Processing log %s" % filename)
- mlog = mavutil.mavlink_connection(filename)
- things_to_plot = []
- plotdata = None
- msgcount = 0
- start_time = time.time()
- while True:
- m = mlog.recv_match(condition=args.condition)
- if m is None:
- break
- msgcount += 1
- if msgcount % 1000 == 0:
- sys.stderr.write(".")
- msg_type = m.get_type()
- if msg_type == "ISBH":
- if plotdata is not None:
- # close off previous data collection
- things_to_plot.append(plotdata)
- # initialise plot-data collection object
- plotdata = PlotData(m)
- continue
- if msg_type == "ISBD":
- if plotdata is None:
- print("Skipping ISBD outside ISBH (fftnum=%u)\n" % m.N)
- continue
- plotdata.add_fftd(m)
- print("", file=sys.stderr)
- time_delta = time.time() - start_time
- print("%us messages %u messages/second %u kB/second" % (msgcount, msgcount/time_delta, os.stat(filename).st_size/time_delta))
- print("Extracted %u fft data sets" % len(things_to_plot), file=sys.stderr)
- sum_fft = {}
- freqmap = {}
- count = 0
- first_freq = None
- for thing_to_plot in things_to_plot:
- for axis in [ "X","Y","Z" ]:
- d = numpy.array(thing_to_plot.data[axis])/float(thing_to_plot.multiplier)
- if len(d) == 0:
- print("No data?!?!?!")
- continue
-
- avg = numpy.sum(d) / len(d)
- d -= avg
- d_fft = numpy.fft.rfft(d)
- if thing_to_plot.tag() not in sum_fft:
- sum_fft[thing_to_plot.tag()] = {
- "X": 0,
- "Y": 0,
- "Z": 0
- }
- sum_fft[thing_to_plot.tag()][axis] = numpy.add(sum_fft[thing_to_plot.tag()][axis], d_fft)
- count += 1
- freq = numpy.fft.rfftfreq(len(d), 1.0/thing_to_plot.sample_rate_hz)
- freqmap[thing_to_plot.tag()] = freq
- for sensor in sum_fft:
- print("Sensor: %s" % str(sensor))
- pylab.figure(str(sensor))
- for axis in [ "X","Y","Z" ]:
- pylab.plot(freqmap[sensor], numpy.abs(sum_fft[sensor][axis]/count), label=axis)
- pylab.legend(loc='upper right')
- pylab.xlabel('Hz')
- for filename in args.logs:
- mavfft_fttd(filename)
- pylab.show()
|