123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204 |
- #!/usr/bin/python
- '''
- simple kml export for logfiles
- Thomas Gubler <thomasgubler@gmail.com>
- '''
- from __future__ import print_function
- from builtins import range
- from argparse import ArgumentParser
- import simplekml
- from pymavlink.mavextra import *
- from pymavlink import mavutil
- import time
- import re
- import os
- mainstate_field = 'STAT.MainState'
- position_field_types = [ # Order must be lon, lat, alt to match KML
- ['Lon', 'Lat', 'Alt'], # PX4
- ['Lng', 'Lat', 'Alt'] # APM > 3
- ]
- colors = [simplekml.Color.red, simplekml.Color.green, simplekml.Color.blue,
- simplekml.Color.violet, simplekml.Color.yellow, simplekml.Color.orange,
- simplekml.Color.burlywood, simplekml.Color.azure, simplekml.Color.lightblue,
- simplekml.Color.lawngreen, simplekml.Color.indianred, simplekml.Color.hotpink]
- kml = simplekml.Kml()
- kml_linestrings = []
- def add_to_linestring(position_data, kml_linestring):
- '''add a point to the kml file'''
- global kml
- # add altitude offset
- position_data[2] += float(args.aoff)
- kml_linestring.coords.addcoordinates([position_data])
- def save_kml(filename):
- '''saves the kml file'''
- global kml
- kml.save(filename)
- print("KML written to %s" % filename)
- def add_data(t, msg, msg_types, vars, fields, field_types, position_field_type):
- '''add some data'''
- mtype = msg.get_type()
- if mtype not in msg_types:
- return
- for i in range(0, len(fields)):
- if mtype not in field_types[i]:
- continue
- f = fields[i]
- v = mavutil.evaluate_expression(f, vars)
- if v is None:
- continue
- # Check if we have position or state information
- if f == mainstate_field:
- # Handle main state information
- # add_data.mainstate_current >= 0 : avoid starting a new linestring
- # when mainstate comes after the first position data in the log
- if (v != add_data.mainstate_current and
- add_data.mainstate_current >= 0):
- add_data.new_linestring = True
- add_data.mainstate_current = v
- else:
- # Handle position information
- # make sure lon, lat, alt is saved in the correct order in
- # position_data (the same as in position_field_types)
- add_data.position_data[i] = v
- # check if we have a full gps measurement
- gps_measurement_ready = True
- for v in add_data.position_data:
- if v is None:
- gps_measurement_ready = False
- if not gps_measurement_ready:
- return
- # if new line string is needed (because of state change): add previous
- # linestring to kml_linestrings list, add a new linestring to the kml
- # multigeometry and append to the new linestring
- # else: append to current linestring
- if add_data.new_linestring:
- if add_data.current_kml_linestring is not None:
- kml_linestrings.append(add_data.current_kml_linestring)
- name = "".join([args.source, ":", str(add_data.mainstate_current)])
- add_data.current_kml_linestring = \
- kml.newlinestring(name=name, altitudemode='absolute')
- # set rendering options
- if args.extrude:
- add_data.current_kml_linestring.extrude = 1
- add_data.current_kml_linestring.style.linestyle.color = \
- colors[max([add_data.mainstate_current, 0])]
- add_data.new_linestring = False
- add_to_linestring(add_data.position_data,
- add_data.current_kml_linestring)
- add_data.last_time = msg._timestamp
- else:
- if (msg._timestamp - add_data.last_time) > 0.1:
- add_to_linestring(add_data.position_data,
- add_data.current_kml_linestring)
- add_data.last_time = msg._timestamp
- # reset position_data
- add_data.position_data = [None for n in position_field_type]
- def process_file(filename, source):
- '''process one file'''
- print("Processing %s" % filename)
- mlog = mavutil.mavlink_connection(filename, notimestamps=args.notimestamps)
-
- position_field_type = sniff_field_spelling(mlog, source)
-
- # init fields and field_types lists
- fields = [args.source + "." + s for s in position_field_type]
- fields.append(mainstate_field)
- field_types = []
- msg_types = set()
- re_caps = re.compile('[A-Z_][A-Z0-9_]+')
- for f in fields:
- caps = set(re.findall(re_caps, f))
- msg_types = msg_types.union(caps)
- field_types.append(caps)
-
- add_data.new_linestring = True
- add_data.mainstate_current = -1
- add_data.current_kml_linestring = None
- add_data.position_data = [None for n in position_field_type]
- add_data.last_time = 0
- while True:
- msg = mlog.recv_match(args.condition)
- if msg is None:
- break
- tdays = (msg._timestamp - time.timezone) / (24 * 60 * 60)
- tdays += 719163 # pylab wants it since 0001-01-01
- add_data(tdays, msg, msg_types, mlog.messages, fields, field_types, position_field_type)
-
- def sniff_field_spelling(mlog, source):
- '''attempt to detect whether APM or PX4 attributes names are in use'''
- position_field_type_default = position_field_types[0] # Default to PX4 spelling
-
- msg = mlog.recv_match(source)
- mlog._rewind() # Unfortunately it's either call this or return a mutated object
-
- position_field_selection = [spelling for spelling in position_field_types if hasattr(msg, spelling[0])]
- return position_field_selection[0] if position_field_selection else position_field_type_default
- if __name__ == '__main__':
- parser = ArgumentParser(description=__doc__)
- parser.add_argument("--no-timestamps", dest="notimestamps",
- action='store_true', help="Log doesn't have timestamps")
- parser.add_argument("--condition", default=None,
- help="select packets by a condition [default: %(default)s]")
- parser.add_argument("--aoff", default=0.,
- help="Altitude offset for paths that go through the"
- "ground in google earth [default: %(default)s]")
- parser.add_argument("-o", "--output", dest="filename_out", default="mav.kml",
- help="Output filename [default: %(default)s] ")
- parser.add_argument("-s", "--source", default="GPOS",
- help="Select position data source"
- "(GPOS or GPS) [default: %(default)s]")
- parser.add_argument("-e", "--extrude", default=False,
- action='store_true',
- help="Extrude paths to ground [default: %(default)s]")
- parser.add_argument("logs", metavar="LOG", nargs="+")
- args = parser.parse_args()
- filenames = []
- for f in args.logs:
- if os.path.exists(f):
- filenames.append(f)
- if len(filenames) == 0:
- print("No files to process")
- sys.exit(1)
- for fi in range(0, len(filenames)):
- f = filenames[fi]
- process_file(f, args.source)
- save_kml(args.filename_out)
|