123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231 |
- #!/usr/bin/env python
- '''
- interactively select accel and gyro data for FFT analysis
- '''
- from __future__ import print_function
- import numpy
- import pylab
- import matplotlib
- import matplotlib.pyplot as pyplot
- from argparse import ArgumentParser
- parser = ArgumentParser(description=__doc__)
- parser.add_argument("--condition", default=None, help="select packets by condition")
- parser.add_argument("logs", metavar="LOG", nargs="+")
- args = parser.parse_args()
- from pymavlink import mavutil
- try:
- raw_input # Python 2
- except NameError:
- raw_input = input # Python 3
- def plot_input(data, msg, prefix, start, end):
- preview = pylab.figure()
- preview.set_size_inches(12, 3, forward=True)
- for axis in ['X', 'Y', 'Z']:
- field = msg + '.' + prefix + axis
- d = numpy.array(data[field][start:end])
- pylab.plot( d, marker='.', label=field )
- pylab.legend(loc='upper right')
- # pylab.ylabel('m/sec/sec')
- pylab.subplots_adjust(left=0.06, right=0.95, top=0.95, bottom=0.16)
- preview.canvas.set_window_title('FFT input: ' + msg)
- pylab.show()
- def check_drops(data, msg, start, end):
- ts = 1e-6 * numpy.array(data[msg + '.TimeUS'])
- seqcnt = numpy.array(data[msg + '.SampleC'])
- deltas = numpy.diff(seqcnt[start:end])
- # print('ndeltas: ', len(deltas))
- duration = ts[end] - ts[start]
- print(msg + ' duration: {0:.3f} seconds'.format(duration))
- avg_rate = float(end - start - 1) / duration
- print('average logging rate: {0:.0f} Hz'.format(avg_rate))
- ts_mean = numpy.mean(deltas)
- dmin = numpy.min(deltas)
- dmax = numpy.max(deltas)
- print('sample count delta min: {0}, max: {1}'.format(dmin, dmax))
- if (dmin != dmax):
- print('sample count delta mean: ', '{0:.2f}, std: {0:.2f}'.format(ts_mean, numpy.std(deltas)))
- print('sensor sample rate: {0:.0f} Hz'.format(ts_mean * avg_rate))
- drop_lens = []
- drop_times = []
- intvl_count = [0]
- for i in range(0, len(deltas)):
- if (deltas[i] > 1.5 * ts_mean):
- drop_lens.append(deltas[i])
- drop_times.append(ts[start+i])
- print('dropout at sample {0}: length {1}'.format(start+i, deltas[i]))
-
- print('{0:d} sample intervals > {1:.3f}'.format(len(drop_lens), 1.5 * ts_mean))
- return avg_rate
-
- def fft(logfile):
- '''display fft for raw ACC data in logfile'''
- print("Processing log %s" % filename)
- mlog = mavutil.mavlink_connection(filename)
- data = {'ACC1.rate' : 1000,
- 'ACC2.rate' : 1600,
- 'ACC3.rate' : 1000,
- 'GYR1.rate' : 1000,
- 'GYR2.rate' : 800,
- 'GYR3.rate' : 1000}
- for acc in ['ACC1','ACC2','ACC3']:
- for ax in ['AccX', 'AccY', 'AccZ', 'SampleC', 'TimeUS']:
- data[acc+'.'+ax] = []
- for gyr in ['GYR1','GYR2','GYR3']:
- for ax in ['GyrX', 'GyrY', 'GyrZ', 'SampleC', 'TimeUS']:
- data[gyr+'.'+ax] = []
- # now gather all the data
- while True:
- m = mlog.recv_match(condition=args.condition)
- if m is None:
- break
- type = m.get_type()
- if type.startswith("ACC"):
- data[type+'.AccX'].append(m.AccX)
- data[type+'.AccY'].append(m.AccY)
- data[type+'.AccZ'].append(m.AccZ)
- data[type+'.SampleC'].append(m.SampleC)
- data[type+'.TimeUS'].append(m.TimeUS)
- if type.startswith("GYR"):
- data[type+'.GyrX'].append(m.GyrX)
- data[type+'.GyrY'].append(m.GyrY)
- data[type+'.GyrZ'].append(m.GyrZ)
- data[type+'.SampleC'].append(m.SampleC)
- data[type+'.TimeUS'].append(m.TimeUS)
- # SampleC is just a sample counter
- ts = 1e-6 * numpy.array(data['ACC1.TimeUS'])
- seqcnt = numpy.array(data['ACC1.SampleC'])
- print("Extracted %u data points" % len(data['ACC1.AccX']))
-
- # interactive selection of analysis window
- preview = pylab.figure()
- preview.set_size_inches(12, 3, forward=True)
- msg = 'ACC1'
- for axis in ['X', 'Y', 'Z']:
- field = msg + '.Acc' + axis
- d = numpy.array(data[field])
- pylab.plot( d, marker='.', label=field )
- pylab.legend(loc='upper right')
- pylab.ylabel('m/sec/sec')
- pylab.subplots_adjust(left=0.06, right=0.95, top=0.95, bottom=0.16)
- pylab.show()
- currentAxes = preview.gca()
- s_start = 0
- s_end = len(ts)-1
- n_samp = s_end - s_start
- currentAxes.set_xlim(s_start, s_end)
- # outer loop for repeating time window selection
- while True:
-
- while True:
- print('select sample range for fft analysis')
- preview.canvas.set_window_title('select sample range')
- try:
- s_start = input('start sample: ')
- s_end = input('end sample: ')
- currentAxes.set_xlim(s_start, s_end)
- except:
- break
-
- # process selected samples
- s_start = int(currentAxes.get_xlim()[0])
- s_end = int(currentAxes.get_xlim()[1])
- n_samp = s_end - s_start
- print('sample range: ', s_start, s_end)
- print('N samples: ', n_samp)
-
- # check for dropouts: (delta > 1)
- avg_rate = check_drops(data, 'ACC1', s_start, s_end)
-
- title = 'FFT input: {0:s} ACC1[{1:d}:{2:d}], {3:d} samples'.format(logfile, s_start, s_end, n_samp)
- currentAxes.set_xlabel('sample index : nsamples: {0:d}, avg rate: {1:.0f} Hz'.format(n_samp, avg_rate))
- preview.canvas.set_window_title(title)
- preview.savefig('acc1z.png')
-
- for msg in ['ACC1', 'GYR1', 'ACC2', 'GYR2']:
- if msg.startswith('ACC'):
- prefix = 'Acc'
- title = '{2} FFT [{0:d}:{1:d}]'.format(s_start, s_end, msg)
- else:
- prefix = 'Gyr'
- title = '{2} FFT [{0:d}:{1:d}]'.format(s_start, s_end, msg)
-
- # check for dropouts
- data[msg+'.rate'] = check_drops(data, msg, s_start, s_end)
- plot_input(data, msg, prefix, s_start, s_end)
-
- fftwin = pylab.figure()
- fftwin.set_size_inches(12, 3, forward=True)
- f_res = float(data[msg+'.rate']) / n_samp
-
- max_fft = 0
- abs_fft = {}
- index = 0
- avg = {'X':0, 'Y':0, 'Z':0}
- for axis in ['X', 'Y', 'Z']:
- field = msg + '.' + prefix + axis
- d = data[field][s_start:s_end]
- if len(d) == 0:
- continue
-
- d = numpy.array(d)
- freq = numpy.fft.rfftfreq(len(d), 1.0 / data[msg+'.rate'])
- # remove mean
- avg[axis] = numpy.mean(d)
- d -= avg[axis]
- print('{1} DC component: {0:.3f}'.format(avg[axis], field))
- # transform
- d_fft = numpy.fft.rfft(d)
- abs_fft[axis] = numpy.abs(d_fft)
- # remember the max over all axes
- thismax = numpy.max(abs_fft[axis])
- if (max_fft < thismax):
- max_fft = thismax
- index += 1
-
- for axis in ['X', 'Y', 'Z']:
- # scale to 0dB = max
- field = msg + '.' + prefix + axis
- db_fft = 20 * numpy.log10(abs_fft[axis] / max_fft)
- pylab.plot( freq, db_fft, label=field )
-
- fftwin.canvas.set_window_title(title)
- fftwin.gca().set_ylim(-90, 0)
- pylab.legend(loc='upper right')
- pylab.xlabel('Hz : res: ' + '{0:.3f}'.format(f_res))
- pylab.ylabel('dB X {0:.3f} Y {1:.3f} Z {2:.3f}\n'.format(avg['X'], avg['Y'], avg['Z']))
- pylab.subplots_adjust(left=0.1, right=0.95, top=0.95, bottom=0.16)
- fftwin.savefig(msg + '_fft.png')
-
- try:
- selection = raw_input('q to proceed to next file, anything else to select a new range: ')
- print(selection)
- except:
- continue
- if (selection == 'q'):
- break
- pylab.ion()
- for filename in args.logs:
- fft(filename)
- print('type ctrl-c to close windows and exit')
- pylab.ioff()
- pylab.show()
|