#!/usr/bin/env python ''' mavlink python utility functions Copyright Andrew Tridgell 2011 Released under GNU GPL version 3 or later ''' from __future__ import print_function from builtins import object import socket, math, struct, time, os, fnmatch, array, sys, errno import select from pymavlink import mavexpression # adding these extra imports allows pymavlink to be used directly with pyinstaller # without having complex spec files. To allow for installs that don't have ardupilotmega # at all we avoid throwing an exception if it isn't installed try: import json from pymavlink.dialects.v10 import ardupilotmega except Exception: pass # maximum packet length for a single receive call - use the UDP limit UDP_MAX_PACKET_LEN = 65535 # Store the MAVLink library for the currently-selected dialect # (set by set_dialect()) mavlink = None # Store the mavlink file currently being operated on # (set by mavlink_connection()) mavfile_global = None # If the caller hasn't specified a particular native/legacy version, use this default_native = False # link_id used for signing global_link_id = 0 # Use a globally-set MAVLink dialect if one has been specified as an environment variable. if not 'MAVLINK_DIALECT' in os.environ: os.environ['MAVLINK_DIALECT'] = 'ardupilotmega' def mavlink10(): '''return True if using MAVLink 1.0 or later''' return not 'MAVLINK09' in os.environ def mavlink20(): '''return True if using MAVLink 2.0''' return 'MAVLINK20' in os.environ def evaluate_expression(expression, vars): '''evaluation an expression''' return mavexpression.evaluate_expression(expression, vars) def evaluate_condition(condition, vars): '''evaluation a conditional (boolean) statement''' if condition is None: return True v = evaluate_expression(condition, vars) if v is None: return False return v def u_ord(c): return ord(c) if sys.version_info.major < 3 else c class location(object): '''represent a GPS coordinate''' def __init__(self, lat, lng, alt=0, heading=0): self.lat = lat self.lng = lng self.alt = alt self.heading = heading def __str__(self): return "lat=%.6f,lon=%.6f,alt=%.1f" % (self.lat, self.lng, self.alt) def set_dialect(dialect): '''set the MAVLink dialect to work with. For example, set_dialect("ardupilotmega") ''' global mavlink, current_dialect from .generator import mavparse if 'MAVLINK20' in os.environ: wire_protocol = mavparse.PROTOCOL_2_0 modname = "pymavlink.dialects.v20." + dialect elif mavlink is None or mavlink.WIRE_PROTOCOL_VERSION == "1.0" or not 'MAVLINK09' in os.environ: wire_protocol = mavparse.PROTOCOL_1_0 modname = "pymavlink.dialects.v10." + dialect else: wire_protocol = mavparse.PROTOCOL_0_9 modname = "pymavlink.dialects.v09." + dialect try: mod = __import__(modname) except Exception: # auto-generate the dialect module from .generator.mavgen import mavgen_python_dialect mavgen_python_dialect(dialect, wire_protocol) mod = __import__(modname) components = modname.split('.') for comp in components[1:]: mod = getattr(mod, comp) current_dialect = dialect mavlink = mod # Set the default dialect. This is done here as it needs to be after the function declaration set_dialect(os.environ['MAVLINK_DIALECT']) class mavfile_state(object): '''state for a particular system id''' def __init__(self): self.messages = { 'MAV' : self } self.flightmode = "UNKNOWN" self.vehicle_type = "UNKNOWN" self.mav_type = mavlink.MAV_TYPE_FIXED_WING self.base_mode = 0 self.armed = False # canonical arm state for the vehicle as a whole if float(mavlink.WIRE_PROTOCOL_VERSION) >= 1: self.messages['HOME'] = mavlink.MAVLink_gps_raw_int_message(0,0,0,0,0,0,0,0,0,0) mavlink.MAVLink_waypoint_message = mavlink.MAVLink_mission_item_message else: self.messages['HOME'] = mavlink.MAVLink_gps_raw_message(0,0,0,0,0,0,0,0,0) class param_state(object): '''state for a particular system id/component id pair''' def __init__(self): self.params = {} class mavfile(object): '''a generic mavlink port''' def __init__(self, fd, address, source_system=255, source_component=0, notimestamps=False, input=True, use_native=default_native): global mavfile_global if input: mavfile_global = self self.fd = fd self.sysid = 0 self.param_sysid = (0,0) self.address = address self.timestamp = 0 self.last_seq = {} self.mav_loss = 0 self.mav_count = 0 self.param_fetch_start = 0 # state for each sysid self.sysid_state = {} self.sysid_state[self.sysid] = mavfile_state() # param state for each sysid/compid tuple self.param_state = {} self.param_state[self.param_sysid] = param_state() # status of param fetch, indexed by sysid,compid tuple self.source_system = source_system self.source_component = source_component self.first_byte = True self.robust_parsing = True self.mav = mavlink.MAVLink(self, srcSystem=self.source_system, srcComponent=self.source_component, use_native=use_native) self.mav.robust_parsing = self.robust_parsing self.logfile = None self.logfile_raw = None self.start_time = time.time() self.message_hooks = [] self.idle_hooks = [] self.uptime = 0.0 self.notimestamps = notimestamps self._timestamp = None self.WIRE_PROTOCOL_VERSION = mavlink.WIRE_PROTOCOL_VERSION self.stop_on_EOF = False self.portdead = False @property def target_system(self): return self.sysid @property def target_component(self): return self.param_sysid[1] @target_system.setter def target_system(self, value): self.sysid = value if not self.sysid in self.sysid_state: self.sysid_state[self.sysid] = mavfile_state() if self.sysid != self.param_sysid[0]: self.param_sysid = (self.sysid, self.param_sysid[1]) if not self.param_sysid in self.param_state: self.param_state[self.param_sysid] = param_state() @target_component.setter def target_component(self, value): if value != self.param_sysid[1]: self.param_sysid = (self.param_sysid[0], value) if not self.param_sysid in self.param_state: self.param_state[self.param_sysid] = param_state() @property def params(self): if self.param_sysid[1] == 0: eff_tuple = (self.sysid, 1) if eff_tuple in self.param_state: return getattr(self.param_state[eff_tuple],'params') return getattr(self.param_state[self.param_sysid],'params') @property def messages(self): return getattr(self.sysid_state[self.sysid],'messages') @property def flightmode(self): return getattr(self.sysid_state[self.sysid],'flightmode') @flightmode.setter def flightmode(self, value): setattr(self.sysid_state[self.sysid],'flightmode',value) @property def vehicle_type(self): return getattr(self.sysid_state[self.sysid],'vehicle_type') @vehicle_type.setter def vehicle_type(self, value): setattr(self.sysid_state[self.sysid],'vehicle_type',value) @property def mav_type(self): return getattr(self.sysid_state[self.sysid],'mav_type') @mav_type.setter def mav_type(self, value): setattr(self.sysid_state[self.sysid],'mav_type',value) @property def base_mode(self): return getattr(self.sysid_state[self.sysid],'base_mode') @base_mode.setter def base_mode(self, value): setattr(self.sysid_state[self.sysid],'base_mode',value) def auto_mavlink_version(self, buf): '''auto-switch mavlink protocol version''' global mavlink if len(buf) == 0: return try: magic = ord(buf[0]) except: magic = buf[0] if not magic in [ 85, 254, 253 ]: return self.first_byte = False if self.WIRE_PROTOCOL_VERSION == "0.9" and magic == 254: self.WIRE_PROTOCOL_VERSION = "1.0" set_dialect(current_dialect) elif self.WIRE_PROTOCOL_VERSION == "1.0" and magic == 85: self.WIRE_PROTOCOL_VERSION = "0.9" os.environ['MAVLINK09'] = '1' set_dialect(current_dialect) elif self.WIRE_PROTOCOL_VERSION != "2.0" and magic == 253: self.WIRE_PROTOCOL_VERSION = "2.0" os.environ['MAVLINK20'] = '1' set_dialect(current_dialect) else: return # switch protocol (callback, callback_args, callback_kwargs) = (self.mav.callback, self.mav.callback_args, self.mav.callback_kwargs) self.mav = mavlink.MAVLink(self, srcSystem=self.source_system, srcComponent=self.source_component) self.mav.robust_parsing = self.robust_parsing self.WIRE_PROTOCOL_VERSION = mavlink.WIRE_PROTOCOL_VERSION (self.mav.callback, self.mav.callback_args, self.mav.callback_kwargs) = (callback, callback_args, callback_kwargs) def recv(self, n=None): '''default recv method''' raise RuntimeError('no recv() method supplied') def close(self, n=None): '''default close method''' raise RuntimeError('no close() method supplied') def write(self, buf): '''default write method''' raise RuntimeError('no write() method supplied') def select(self, timeout): '''wait for up to timeout seconds for more data''' if self.fd is None: time.sleep(min(timeout,0.5)) return True try: (rin, win, xin) = select.select([self.fd], [], [], timeout) except select.error: return False return len(rin) == 1 def pre_message(self): '''default pre message call''' return def set_rtscts(self, enable): '''enable/disable RTS/CTS if applicable''' return def probably_vehicle_heartbeat(self, msg): if msg.get_srcComponent() == mavlink.MAV_COMP_ID_GIMBAL: return False if msg.type in (mavlink.MAV_TYPE_GCS, mavlink.MAV_TYPE_GIMBAL, mavlink.MAV_TYPE_ADSB, mavlink.MAV_TYPE_ONBOARD_CONTROLLER): return False return True def post_message(self, msg): '''default post message call''' if '_posted' in msg.__dict__: return msg._posted = True msg._timestamp = time.time() type = msg.get_type() if 'usec' in msg.__dict__: self.uptime = msg.usec * 1.0e-6 if 'time_boot_ms' in msg.__dict__: self.uptime = msg.time_boot_ms * 1.0e-3 if self._timestamp is not None: if self.notimestamps: msg._timestamp = self.uptime else: msg._timestamp = self._timestamp src_system = msg.get_srcSystem() src_component = msg.get_srcComponent() src_tuple = (src_system, src_component) radio_tuple = (ord('3'), ord('D')) if not src_system in self.sysid_state: # we've seen a new system self.sysid_state[src_system] = mavfile_state() self.sysid_state[src_system].messages[type] = msg if src_tuple == radio_tuple: # as a special case radio msgs are added for all sysids for s in self.sysid_state.keys(): self.sysid_state[s].messages[type] = msg if not (src_tuple == radio_tuple or msg.get_type() == 'BAD_DATA'): if not src_tuple in self.last_seq: last_seq = -1 else: last_seq = self.last_seq[src_tuple] seq = (last_seq+1) % 256 seq2 = msg.get_seq() if seq != seq2 and last_seq != -1: diff = (seq2 - seq) % 256 self.mav_loss += diff #print("lost %u seq=%u seq2=%u last_seq=%u src_tupe=%s %s" % (diff, seq, seq2, last_seq, str(src_tuple), msg.get_type())) self.last_seq[src_tuple] = seq2 self.mav_count += 1 self.timestamp = msg._timestamp if type == 'HEARTBEAT' and self.probably_vehicle_heartbeat(msg): if self.sysid == 0: # lock onto id tuple of first vehicle heartbeat self.sysid = src_system if float(mavlink.WIRE_PROTOCOL_VERSION) >= 1: self.flightmode = mode_string_v10(msg) self.mav_type = msg.type self.base_mode = msg.base_mode self.sysid_state[self.sysid].armed = (msg.base_mode & mavlink.MAV_MODE_FLAG_SAFETY_ARMED) elif type == 'PARAM_VALUE': if not src_tuple in self.param_state: self.param_state[src_tuple] = param_state() self.param_state[src_tuple].params[msg.param_id] = msg.param_value elif type == 'SYS_STATUS' and mavlink.WIRE_PROTOCOL_VERSION == '0.9': self.flightmode = mode_string_v09(msg) elif type == 'GPS_RAW': if self.sysid_state[src_system].messages['HOME'].fix_type < 2: self.sysid_state[src_system].messages['HOME'] = msg elif type == 'GPS_RAW_INT': if self.sysid_state[src_system].messages['HOME'].fix_type < 3: self.sysid_state[src_system].messages['HOME'] = msg for hook in self.message_hooks: hook(self, msg) if (msg.get_signed() and self.mav.signing.link_id == 0 and msg.get_link_id() != 0 and self.target_system == msg.get_srcSystem() and self.target_component == msg.get_srcComponent()): # change to link_id from incoming packet self.mav.signing.link_id = msg.get_link_id() def packet_loss(self): '''packet loss as a percentage''' if self.mav_count == 0: return 0 return (100.0*self.mav_loss)/(self.mav_count+self.mav_loss) def recv_msg(self): '''message receive routine''' self.pre_message() while True: n = self.mav.bytes_needed() s = self.recv(n) numnew = len(s) if numnew != 0: if self.logfile_raw: self.logfile_raw.write(str(s)) if self.first_byte: self.auto_mavlink_version(s) # We always call parse_char even if the new string is empty, because the existing message buf might already have some valid packet # we can extract msg = self.mav.parse_char(s) if msg: if self.logfile and msg.get_type() != 'BAD_DATA' : usec = int(time.time() * 1.0e6) & ~3 self.logfile.write(str(struct.pack('>Q', usec) + msg.get_msgbuf())) self.post_message(msg) return msg else: # if we failed to parse any messages _and_ no new bytes arrived, return immediately so the client has the option to # timeout if numnew == 0: return None def recv_match(self, condition=None, type=None, blocking=False, timeout=None): '''recv the next MAVLink message that matches the given condition type can be a string or a list of strings''' if type is not None and not isinstance(type, list) and not isinstance(type, set): type = [type] start_time = time.time() while True: if timeout is not None: now = time.time() if now < start_time: start_time = now # If an external process rolls back system time, we should not spin forever. if start_time + timeout < time.time(): return None m = self.recv_msg() if m is None: if blocking: for hook in self.idle_hooks: hook(self) if timeout is None: self.select(0.05) else: self.select(timeout/2) continue return None if type is not None and not m.get_type() in type: continue if not evaluate_condition(condition, self.messages): continue return m def check_condition(self, condition): '''check if a condition is true''' return evaluate_condition(condition, self.messages) def mavlink10(self): '''return True if using MAVLink 1.0 or later''' return float(self.WIRE_PROTOCOL_VERSION) >= 1 def mavlink20(self): '''return True if using MAVLink 2.0 or later''' return float(self.WIRE_PROTOCOL_VERSION) >= 2 def setup_logfile(self, logfile, mode='w'): '''start logging to the given logfile, with timestamps''' self.logfile = open(logfile, mode=mode) def setup_logfile_raw(self, logfile, mode='w'): '''start logging raw bytes to the given logfile, without timestamps''' self.logfile_raw = open(logfile, mode=mode) def wait_heartbeat(self, blocking=True, timeout=None): '''wait for a heartbeat so we know the target system IDs''' return self.recv_match(type='HEARTBEAT', blocking=blocking, timeout=timeout) def param_fetch_all(self): '''initiate fetch of all parameters''' if time.time() - self.param_fetch_start < 2.0: # don't fetch too often return self.param_fetch_start = time.time() self.mav.param_request_list_send(self.target_system, self.target_component) def param_fetch_one(self, name): '''initiate fetch of one parameter''' try: idx = int(name) self.mav.param_request_read_send(self.target_system, self.target_component, b"", idx) except Exception: if sys.version_info.major >= 3 and not isinstance(name, bytes): name = bytes(name,'ascii') self.mav.param_request_read_send(self.target_system, self.target_component, name, -1) def time_since(self, mtype): '''return the time since the last message of type mtype was received''' if not mtype in self.messages: return time.time() - self.start_time return time.time() - self.messages[mtype]._timestamp def param_set_send(self, parm_name, parm_value, parm_type=None): '''wrapper for parameter set''' if self.mavlink10(): if parm_type is None: parm_type = mavlink.MAVLINK_TYPE_FLOAT self.mav.param_set_send(self.target_system, self.target_component, parm_name.encode('utf8'), parm_value, parm_type) else: self.mav.param_set_send(self.target_system, self.target_component, parm_name.encode('utf8'), parm_value) def waypoint_request_list_send(self): '''wrapper for waypoint_request_list_send''' if self.mavlink10(): self.mav.mission_request_list_send(self.target_system, self.target_component) else: self.mav.waypoint_request_list_send(self.target_system, self.target_component) def waypoint_clear_all_send(self): '''wrapper for waypoint_clear_all_send''' if self.mavlink10(): self.mav.mission_clear_all_send(self.target_system, self.target_component) else: self.mav.waypoint_clear_all_send(self.target_system, self.target_component) def waypoint_request_send(self, seq): '''wrapper for waypoint_request_send''' if self.mavlink10(): self.mav.mission_request_send(self.target_system, self.target_component, seq) else: self.mav.waypoint_request_send(self.target_system, self.target_component, seq) def waypoint_set_current_send(self, seq): '''wrapper for waypoint_set_current_send''' if self.mavlink10(): self.mav.mission_set_current_send(self.target_system, self.target_component, seq) else: self.mav.waypoint_set_current_send(self.target_system, self.target_component, seq) def waypoint_current(self): '''return current waypoint''' if self.mavlink10(): m = self.recv_match(type='MISSION_CURRENT', blocking=True) else: m = self.recv_match(type='WAYPOINT_CURRENT', blocking=True) return m.seq def waypoint_count_send(self, seq): '''wrapper for waypoint_count_send''' if self.mavlink10(): self.mav.mission_count_send(self.target_system, self.target_component, seq) else: self.mav.waypoint_count_send(self.target_system, self.target_component, seq) def set_mode_flag(self, flag, enable): ''' Enables/ disables MAV_MODE_FLAG @param flag The mode flag, see MAV_MODE_FLAG enum @param enable Enable the flag, (True/False) ''' if self.mavlink10(): mode = self.base_mode if enable: mode = mode | flag elif not enable: mode = mode & ~flag self.mav.command_long_send(self.target_system, self.target_component, mavlink.MAV_CMD_DO_SET_MODE, 0, mode, 0, 0, 0, 0, 0, 0) else: print("Set mode flag not supported") def set_mode_auto(self): '''enter auto mode''' if self.mavlink10(): self.mav.command_long_send(self.target_system, self.target_component, mavlink.MAV_CMD_MISSION_START, 0, 0, 0, 0, 0, 0, 0, 0) else: MAV_ACTION_SET_AUTO = 13 self.mav.action_send(self.target_system, self.target_component, MAV_ACTION_SET_AUTO) def mode_mapping(self): '''return dictionary mapping mode names to numbers, or None if unknown''' mav_type = self.field('HEARTBEAT', 'type', self.mav_type) mav_autopilot = self.field('HEARTBEAT', 'autopilot', None) if mav_autopilot == mavlink.MAV_AUTOPILOT_PX4: return px4_map if mav_type is None: return None map = None if mav_type in [mavlink.MAV_TYPE_QUADROTOR, mavlink.MAV_TYPE_HELICOPTER, mavlink.MAV_TYPE_HEXAROTOR, mavlink.MAV_TYPE_OCTOROTOR, mavlink.MAV_TYPE_DODECAROTOR, mavlink.MAV_TYPE_COAXIAL, mavlink.MAV_TYPE_TRICOPTER]: map = mode_mapping_acm if mav_type == mavlink.MAV_TYPE_FIXED_WING: map = mode_mapping_apm if mav_type == mavlink.MAV_TYPE_GROUND_ROVER: map = mode_mapping_rover if mav_type == mavlink.MAV_TYPE_SURFACE_BOAT: map = mode_mapping_rover # for the time being if mav_type == mavlink.MAV_TYPE_ANTENNA_TRACKER: map = mode_mapping_tracker if mav_type == mavlink.MAV_TYPE_SUBMARINE: map = mode_mapping_sub if map is None: return None inv_map = dict((a, b) for (b, a) in map.items()) return inv_map def set_mode_apm(self, mode, custom_mode = 0, custom_sub_mode = 0): '''enter arbitrary mode''' if isinstance(mode, str): mode_map = self.mode_mapping() if mode_map is None or mode not in mode_map: print("Unknown mode '%s'" % mode) return mode = mode_map[mode] # set mode by integer mode number for ArduPilot self.mav.set_mode_send(self.target_system, mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED, mode) def set_mode_px4(self, mode, custom_mode, custom_sub_mode): '''enter arbitrary mode''' if isinstance(mode, str): mode_map = self.mode_mapping() if mode_map is None or mode not in mode_map: print("Unknown mode '%s'" % mode) return # PX4 uses two fields to define modes mode, custom_mode, custom_sub_mode = px4_map[mode] self.mav.command_long_send(self.target_system, self.target_component, mavlink.MAV_CMD_DO_SET_MODE, 0, mode, custom_mode, custom_sub_mode, 0, 0, 0, 0) def set_mode(self, mode, custom_mode = 0, custom_sub_mode = 0): '''set arbitrary flight mode''' mav_autopilot = self.field('HEARTBEAT', 'autopilot', None) if mav_autopilot == mavlink.MAV_AUTOPILOT_PX4: self.set_mode_px4(mode, custom_mode, custom_sub_mode) else: self.set_mode_apm(mode) def set_mode_rtl(self): '''enter RTL mode''' if self.mavlink10(): self.mav.command_long_send(self.target_system, self.target_component, mavlink.MAV_CMD_NAV_RETURN_TO_LAUNCH, 0, 0, 0, 0, 0, 0, 0, 0) else: MAV_ACTION_RETURN = 3 self.mav.action_send(self.target_system, self.target_component, MAV_ACTION_RETURN) def set_mode_manual(self): '''enter MANUAL mode''' if self.mavlink10(): self.mav.command_long_send(self.target_system, self.target_component, mavlink.MAV_CMD_DO_SET_MODE, 0, mavlink.MAV_MODE_MANUAL_ARMED, 0, 0, 0, 0, 0, 0) else: MAV_ACTION_SET_MANUAL = 12 self.mav.action_send(self.target_system, self.target_component, MAV_ACTION_SET_MANUAL) def set_mode_fbwa(self): '''enter FBWA mode''' if self.mavlink10(): self.mav.command_long_send(self.target_system, self.target_component, mavlink.MAV_CMD_DO_SET_MODE, 0, mavlink.MAV_MODE_STABILIZE_ARMED, 0, 0, 0, 0, 0, 0) else: print("Forcing FBWA not supported") def set_mode_loiter(self): '''enter LOITER mode''' if self.mavlink10(): self.mav.command_long_send(self.target_system, self.target_component, mavlink.MAV_CMD_NAV_LOITER_UNLIM, 0, 0, 0, 0, 0, 0, 0, 0) else: MAV_ACTION_LOITER = 27 self.mav.action_send(self.target_system, self.target_component, MAV_ACTION_LOITER) def set_servo(self, channel, pwm): '''set a servo value''' self.mav.command_long_send(self.target_system, self.target_component, mavlink.MAV_CMD_DO_SET_SERVO, 0, channel, pwm, 0, 0, 0, 0, 0) def set_relay(self, relay_pin=0, state=True): '''Set relay_pin to value of state''' if self.mavlink10(): self.mav.command_long_send( self.target_system, # target_system self.target_component, # target_component mavlink.MAV_CMD_DO_SET_RELAY, # command 0, # Confirmation relay_pin, # Relay Number int(state), # state (1 to indicate arm) 0, # param3 (all other params meaningless) 0, # param4 0, # param5 0, # param6 0) # param7 else: print("Setting relays not supported.") def calibrate_level(self): '''calibrate accels (1D version)''' self.mav.command_long_send(self.target_system, self.target_component, mavlink.MAV_CMD_PREFLIGHT_CALIBRATION, 0, 1, 1, 0, 0, 0, 0, 0) def calibrate_pressure(self): '''calibrate pressure''' if self.mavlink10(): self.mav.command_long_send(self.target_system, self.target_component, mavlink.MAV_CMD_PREFLIGHT_CALIBRATION, 0, 0, 0, 1, 0, 0, 0, 0) else: MAV_ACTION_CALIBRATE_PRESSURE = 20 self.mav.action_send(self.target_system, self.target_component, MAV_ACTION_CALIBRATE_PRESSURE) def reboot_autopilot(self, hold_in_bootloader=False): '''reboot the autopilot''' if self.mavlink10(): if hold_in_bootloader: param1 = 3 else: param1 = 1 self.mav.command_long_send(self.target_system, self.target_component, mavlink.MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN, 0, param1, 0, 0, 0, 0, 0, 0) # send an old style reboot immediately afterwards in case it is an older firmware # that doesn't understand the new convention self.mav.command_long_send(self.target_system, self.target_component, mavlink.MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN, 0, 1, 0, 0, 0, 0, 0, 0) def wait_gps_fix(self): self.recv_match(type='VFR_HUD', blocking=True) if self.mavlink10(): self.recv_match(type='GPS_RAW_INT', blocking=True, condition='GPS_RAW_INT.fix_type>=3 and GPS_RAW_INT.lat != 0') else: self.recv_match(type='GPS_RAW', blocking=True, condition='GPS_RAW.fix_type>=2 and GPS_RAW.lat != 0') def location(self, relative_alt=False): '''return current location''' self.wait_gps_fix() # wait for another VFR_HUD, to ensure we have correct altitude self.recv_match(type='VFR_HUD', blocking=True) self.recv_match(type='GLOBAL_POSITION_INT', blocking=True) if relative_alt: alt = self.messages['GLOBAL_POSITION_INT'].relative_alt*0.001 else: alt = self.messages['VFR_HUD'].alt return location(self.messages['GPS_RAW_INT'].lat*1.0e-7, self.messages['GPS_RAW_INT'].lon*1.0e-7, alt, self.messages['VFR_HUD'].heading) def arducopter_arm(self): '''arm motors (arducopter only)''' if self.mavlink10(): self.mav.command_long_send( self.target_system, # target_system self.target_component, mavlink.MAV_CMD_COMPONENT_ARM_DISARM, # command 0, # confirmation 1, # param1 (1 to indicate arm) 0, # param2 (all other params meaningless) 0, # param3 0, # param4 0, # param5 0, # param6 0) # param7 def arducopter_disarm(self): '''calibrate pressure''' if self.mavlink10(): self.mav.command_long_send( self.target_system, # target_system self.target_component, mavlink.MAV_CMD_COMPONENT_ARM_DISARM, # command 0, # confirmation 0, # param1 (0 to indicate disarm) 0, # param2 (all other params meaningless) 0, # param3 0, # param4 0, # param5 0, # param6 0) # param7 def motors_armed(self): '''return true if motors armed''' return self.sysid_state[self.sysid].armed def motors_armed_wait(self): '''wait for motors to be armed''' while True: m = self.wait_heartbeat() if self.motors_armed(): return def motors_disarmed_wait(self): '''wait for motors to be disarmed''' while True: m = self.wait_heartbeat() if not self.motors_armed(): return def field(self, type, field, default=None): '''convenient function for returning an arbitrary MAVLink field with a default''' if not type in self.messages: return default return getattr(self.messages[type], field, default) def param(self, name, default=None): '''convenient function for returning an arbitrary MAVLink parameter with a default''' if not name in self.params: return default return self.params[name] def setup_signing(self, secret_key, sign_outgoing=True, allow_unsigned_callback=None, initial_timestamp=None, link_id=None): '''setup for MAVLink2 signing''' self.mav.signing.secret_key = secret_key self.mav.signing.sign_outgoing = sign_outgoing self.mav.signing.allow_unsigned_callback = allow_unsigned_callback if link_id is None: # auto-increment the link_id for each link global global_link_id link_id = global_link_id global_link_id = min(global_link_id + 1, 255) self.mav.signing.link_id = link_id if initial_timestamp is None: # timestamp is time since 1/1/2015 epoch_offset = 1420070400 now = max(time.time(), epoch_offset) initial_timestamp = now - epoch_offset initial_timestamp = int(initial_timestamp * 100 * 1000) # initial_timestamp is in 10usec units self.mav.signing.timestamp = initial_timestamp def disable_signing(self): '''disable MAVLink2 signing''' self.mav.signing.secret_key = None self.mav.signing.sign_outgoing = False self.mav.signing.allow_unsigned_callback = None self.mav.signing.link_id = 0 self.mav.signing.timestamp = 0 def set_close_on_exec(fd): '''set the clone on exec flag on a file descriptor. Ignore exceptions''' try: import fcntl flags = fcntl.fcntl(fd, fcntl.F_GETFD) flags |= fcntl.FD_CLOEXEC fcntl.fcntl(fd, fcntl.F_SETFD, flags) except Exception: pass class FakeSerial(): def __init__(self): pass def read(self, len): return "" def write(self, buf): raise Exception("write always fails") def inWaiting(self): return 0 def close(self): pass class mavserial(mavfile): '''a serial mavlink port''' def __init__(self, device, baud=115200, autoreconnect=False, source_system=255, source_component=0, use_native=default_native, force_connected=False): import serial if ',' in device and not os.path.exists(device): device, baud = device.split(',') self.baud = baud self.device = device self.autoreconnect = autoreconnect self.force_connected = force_connected # we rather strangely set the baudrate initially to 1200, then change to the desired # baudrate. This works around a kernel bug on some Linux kernels where the baudrate # is not set correctly try: self.port = serial.Serial(self.device, 1200, timeout=0, dsrdtr=False, rtscts=False, xonxoff=False) except serial.SerialException as e: if not force_connected: raise e self.port = FakeSerial() try: fd = self.port.fileno() set_close_on_exec(fd) except Exception: fd = None self.set_baudrate(self.baud) mavfile.__init__(self, fd, device, source_system=source_system, source_component=source_component, use_native=use_native) self.rtscts = False def set_rtscts(self, enable): '''enable/disable RTS/CTS if applicable''' try: self.port.setRtsCts(enable) except Exception: self.port.rtscts = enable self.rtscts = enable def set_baudrate(self, baudrate): '''set baudrate''' try: self.port.setBaudrate(baudrate) except Exception: # for pySerial 3.0, which doesn't have setBaudrate() self.port.baudrate = baudrate def close(self): self.port.close() def recv(self,n=None): if n is None: n = self.mav.bytes_needed() if self.fd is None: waiting = self.port.inWaiting() if waiting < n: n = waiting ret = self.port.read(n) return ret def write(self, buf): try: return self.port.write(bytes(buf)) except Exception: if not self.portdead: print("Device %s is dead" % self.device) self.portdead = True if self.autoreconnect: self.reset() return -1 def reset(self): import serial try: try: newport = serial.Serial(self.device, self.baud, timeout=0, dsrdtr=False, rtscts=False, xonxoff=False) except serial.SerialException as e: if not self.force_connected: raise e newport = FakeSerial() return False self.port.close() self.port = newport print("Device %s reopened OK" % self.device) self.portdead = False try: self.fd = self.port.fileno() except Exception: self.fd = None self.set_baudrate(self.baud) if self.rtscts: self.set_rtscts(self.rtscts) return True except Exception: return False class mavudp(mavfile): '''a UDP mavlink socket''' def __init__(self, device, input=True, broadcast=False, source_system=255, source_component=0, use_native=default_native): a = device.split(':') if len(a) != 2: print("UDP ports must be specified as host:port") sys.exit(1) self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.udp_server = input self.broadcast = False if input: self.port.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.port.bind((a[0], int(a[1]))) else: self.destination_addr = (a[0], int(a[1])) if broadcast: self.port.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self.broadcast = True set_close_on_exec(self.port.fileno()) self.port.setblocking(0) self.last_address = None self.resolved_destination_addr = None mavfile.__init__(self, self.port.fileno(), device, source_system=source_system, source_component=source_component, input=input, use_native=use_native) def close(self): self.port.close() def recv(self,n=None): try: data, new_addr = self.port.recvfrom(UDP_MAX_PACKET_LEN) except socket.error as e: if e.errno in [ errno.EAGAIN, errno.EWOULDBLOCK, errno.ECONNREFUSED ]: return "" raise if self.udp_server or self.broadcast: self.last_address = new_addr return data def write(self, buf): try: if self.udp_server: if self.last_address: self.port.sendto(buf, self.last_address) else: if self.last_address and self.broadcast: self.destination_addr = self.last_address self.broadcast = False self.port.connect(self.destination_addr) # turn a (possible) hostname into an IP address to # avoid resolving the hostname for every packet sent: if self.destination_addr[0] != self.resolved_destination_addr: self.resolved_destination_addr = self.destination_addr[0] self.destination_addr = (socket.gethostbyname(self.destination_addr[0]), self.destination_addr[1]) self.port.sendto(buf, self.destination_addr) except socket.error: pass def recv_msg(self): '''message receive routine for UDP link''' self.pre_message() s = self.recv() if len(s) > 0: if self.first_byte: self.auto_mavlink_version(s) m = self.mav.parse_char(s) if m is not None: self.post_message(m) return m class mavmcast(mavfile): '''a UDP multicast mavlink socket''' def __init__(self, device, broadcast=False, source_system=255, source_component=0, use_native=default_native): a = device.split(':') mcast_ip = "239.255.145.50" mcast_port = 14550 if len(a) == 1 and len(a[0]) > 0: mcast_port = int(a[0]) elif len(a) > 1: mcast_ip = a[0] mcast_port = int(a[1]) # first the receiving socket. We use separate sending and receiving # sockets so we can use the port number of the sending socket to detect # packets from ourselves self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) self.port.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.port.bind((mcast_ip, mcast_port)) mreq = struct.pack("4sl", socket.inet_aton(mcast_ip), socket.INADDR_ANY) self.port.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) self.port.setblocking(0) set_close_on_exec(self.port.fileno()) # now the sending socket self.port_out = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.port_out.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.port_out.setblocking(0) self.port_out.connect((mcast_ip, mcast_port)) set_close_on_exec(self.port_out.fileno()) self.myport = None mavfile.__init__(self, self.port.fileno(), device, source_system=source_system, source_component=source_component, input=False, use_native=use_native) def close(self): self.port.close() self.port_out.close() def recv(self,n=None): try: data, new_addr = self.port.recvfrom(UDP_MAX_PACKET_LEN) if self.myport is None: try: (myaddr,self.myport) = self.port_out.getsockname() except Exception: pass except socket.error as e: if e.errno in [ errno.EAGAIN, errno.EWOULDBLOCK, errno.ECONNREFUSED ]: return "" raise if self.myport == new_addr[1]: # data from ourselves, discard return '' return data def write(self, buf): try: self.port_out.send(buf) except socket.error as e: pass def recv_msg(self): '''message receive routine for UDP link''' self.pre_message() s = self.recv() if len(s) > 0: if self.first_byte: self.auto_mavlink_version(s) m = self.mav.parse_char(s) if m is not None: self.post_message(m) return m class mavtcp(mavfile): '''a TCP mavlink socket''' def __init__(self, device, autoreconnect=False, source_system=255, source_component=0, retries=6, use_native=default_native): a = device.split(':') if len(a) != 2: print("TCP ports must be specified as host:port") sys.exit(1) self.destination_addr = (a[0], int(a[1])) self.autoreconnect = autoreconnect self.retries = retries self.do_connect() mavfile.__init__(self, self.port.fileno(), "tcp:" + device, source_system=source_system, source_component=source_component, use_native=use_native) def do_connect(self): self.port = socket.socket(socket.AF_INET, socket.SOCK_STREAM) retries = self.retries if retries <= 0: # try to connect at least once: retries = 1 while retries >= 0: retries -= 1 try: self.port.connect(self.destination_addr) break except Exception as e: if retries == 0: if self.port is not None: self.port.close() self.port = None raise e print(e, "sleeping") time.sleep(1) self.port.setblocking(0) set_close_on_exec(self.port.fileno()) self.port.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) def close(self): self.port.close() def handle_disconnect(self): print("Connection reset or closed by peer on TCP socket") self.reconnect() def handle_eof(self): # EOF print("EOF on TCP socket") self.reconnect() def recv(self,n=None): if self.port is None: self.reconnect() if n is None: n = self.mav.bytes_needed() try: data = self.port.recv(n) except socket.error as e: if e.errno in [ errno.EAGAIN, errno.EWOULDBLOCK ]: return "" if e.errno in [ errno.ECONNRESET, errno.EPIPE ]: self.handle_disconnect() raise if len(data) == 0: self.handle_eof() return data def write(self, buf): if self.port is None: try: self.reconnect() except socket.error as e: pass try: self.port.send(buf) except socket.error as e: if e.errno in [ errno.ECONNRESET, errno.EPIPE ]: self.handle_disconnect() pass def reconnect(self): if self.autoreconnect: print("Attempting reconnect") if self.port is not None: self.port.close() self.port = None self.do_connect() class mavtcpin(mavfile): '''a TCP input mavlink socket''' def __init__(self, device, source_system=255, source_component=0, retries=3, use_native=default_native): a = device.split(':') if len(a) != 2: print("TCP ports must be specified as host:port") sys.exit(1) self.listen = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.listen_addr = (a[0], int(a[1])) self.listen.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.listen.bind(self.listen_addr) self.listen.listen(1) self.listen.setblocking(0) set_close_on_exec(self.listen.fileno()) self.listen.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) mavfile.__init__(self, self.listen.fileno(), "tcpin:" + device, source_system=source_system, source_component=source_component, use_native=use_native) self.port = None def close(self): self.listen.close() def recv(self,n=None): if not self.port: try: (self.port, addr) = self.listen.accept() except Exception: return '' self.port.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) self.port.setblocking(0) set_close_on_exec(self.port.fileno()) self.fd = self.port.fileno() if n is None: n = self.mav.bytes_needed() try: data = self.port.recv(n) except socket.error as e: if e.errno in [ errno.EAGAIN, errno.EWOULDBLOCK ]: return "" self.port.close() self.port = None self.fd = self.listen.fileno() return '' return data def write(self, buf): if self.port is None: return try: self.port.send(buf) except socket.error as e: if e.errno in [ errno.EPIPE ]: self.port.close() self.port = None self.fd = self.listen.fileno() pass class mavlogfile(mavfile): '''a MAVLink logfile reader/writer''' def __init__(self, filename, planner_format=None, write=False, append=False, robust_parsing=True, notimestamps=False, source_system=255, source_component=0, use_native=default_native): self.filename = filename self.writeable = write self.robust_parsing = robust_parsing self.planner_format = planner_format self._two64 = math.pow(2.0, 63) mode = 'rb' if self.writeable: if append: mode = 'ab' else: mode = 'wb' self.f = open(filename, mode) self.filesize = os.path.getsize(filename) self.percent = 0 mavfile.__init__(self, None, filename, source_system=source_system, source_component=source_component, notimestamps=notimestamps, use_native=use_native) if self.notimestamps: self._timestamp = 0 else: self._timestamp = time.time() self.stop_on_EOF = True self._last_message = None self._last_timestamp = None self._link = 0 def close(self): self.f.close() def recv(self,n=None): if n is None: n = self.mav.bytes_needed() return self.f.read(n) def write(self, buf): self.f.write(buf) def scan_timestamp(self, tbuf): '''scan forward looking in a tlog for a timestamp in a reasonable range''' while True: (tusec,) = struct.unpack('>Q', tbuf) t = tusec * 1.0e-6 if abs(t - self._last_timestamp) <= 3*24*60*60: break c = self.f.read(1) if len(c) != 1: break tbuf = tbuf[1:] + c return t def pre_message(self): '''read timestamp if needed''' # read the timestamp if self.filesize != 0: self.percent = (100.0 * self.f.tell()) / self.filesize if self.notimestamps: return if self.planner_format: tbuf = self.f.read(21) if len(tbuf) != 21 or tbuf[0] != '-' or tbuf[20] != ':': raise RuntimeError('bad planner timestamp %s' % tbuf) hnsec = self._two64 + float(tbuf[0:20]) t = hnsec * 1.0e-7 # convert to seconds t -= 719163 * 24 * 60 * 60 # convert to 1970 base self._link = 0 else: tbuf = self.f.read(8) if len(tbuf) != 8: return (tusec,) = struct.unpack('>Q', tbuf) t = tusec * 1.0e-6 if (self._last_timestamp is not None and self._last_message.get_type() == "BAD_DATA" and abs(t - self._last_timestamp) > 3*24*60*60): t = self.scan_timestamp(tbuf) self._link = tusec & 0x3 self._timestamp = t def post_message(self, msg): '''add timestamp to message''' # read the timestamp super(mavlogfile, self).post_message(msg) if self.planner_format: self.f.read(1) # trailing newline self.timestamp = msg._timestamp self._last_message = msg if msg.get_type() != "BAD_DATA": self._last_timestamp = msg._timestamp msg._link = self._link class mavmmaplog(mavlogfile): '''a MAVLink log file accessed via mmap. Used for fast read-only access with low memory overhead where particular message types are wanted''' def __init__(self, filename, progress_callback=None): import platform, mmap mavlogfile.__init__(self, filename) self.f.seek(0, 2) self.data_len = self.f.tell() self.f.seek(0) if platform.system() == "Windows": self.data_map = mmap.mmap(self.f.fileno(), self.data_len, None, mmap.ACCESS_READ) else: self.data_map = mmap.mmap(self.f.fileno(), self.data_len, mmap.MAP_PRIVATE, mmap.PROT_READ) self._rewind() self.init_arrays(progress_callback) self._flightmodes = None def _rewind(self): '''rewind to start of log''' self.flightmode = "UNKNOWN" self.offset = 0 self.type_nums = None self.f.seek(0) def rewind(self): '''rewind to start of log''' self._rewind() def init_arrays(self, progress_callback=None): '''initialise arrays for fast recv_match()''' # dictionary indexed by msgid, mapping to arrays of file offsets where # each instance of a msg type is found self.offsets = {} # number of msgs of each msg type self.counts = {} self._count = 0 # mapping from msg name to msg id self.name_to_id = {} # mapping from msg id to name self.id_to_name = {} self.type_nums = None ofs = 0 pct = 0 MARKER_V1 = 0xFE MARKER_V2 = 0xFD while ofs+8+6 < self.data_len: marker = u_ord(self.data_map[ofs+8]) mlen = u_ord(self.data_map[ofs+9]) + 8 if marker == MARKER_V1: mtype = u_ord(self.data_map[ofs+13]) mlen += 8 elif marker == MARKER_V2: mtype = u_ord(self.data_map[ofs+15]) | (u_ord(self.data_map[ofs+16])<<8) | (u_ord(self.data_map[ofs+17])<<16) mlen += 12 incompat_flags = u_ord(self.data_map[ofs+10]) if incompat_flags & mavlink.MAVLINK_IFLAG_SIGNED: mlen += mavlink.MAVLINK_SIGNATURE_BLOCK_LEN else: # unrecognised marker; probably a malformed log ofs += 1 continue if not mtype in self.offsets: if not mtype in mavlink.mavlink_map: ofs += mlen continue self.offsets[mtype] = [] self.counts[mtype] = 0 msg = mavlink.mavlink_map[mtype] self.name_to_id[msg.name] = mtype self.id_to_name[mtype] = msg.name self.f.seek(ofs) m = self.recv_msg() self.messages[msg.name] = m self.offsets[mtype].append(ofs) self.counts[mtype] += 1 ofs += mlen new_pct = (100 * ofs) // self.data_len if progress_callback is not None and new_pct != pct: progress_callback(new_pct) pct = new_pct for mtype in self.counts: self._count += self.counts[mtype] self.offset = 0 self._rewind() def skip_to_type(self, type): '''skip fwd to next msg matching given type set''' if self.type_nums is None: # always add some key msg types so we can track flightmode, params etc type = type.copy() type.update(set(['HEARTBEAT','PARAM_VALUE'])) self.indexes = [] self.type_nums = [] for t in type: if not t in self.name_to_id: continue self.type_nums.append(self.name_to_id[t]) self.indexes.append(0) smallest_index = -1 smallest_offset = self.data_len for i in range(len(self.type_nums)): mtype = self.type_nums[i] if self.indexes[i] >= self.counts[mtype]: continue ofs = self.offsets[mtype][self.indexes[i]] if ofs < smallest_offset: smallest_offset = ofs smallest_index = i if smallest_index >= 0: self.indexes[smallest_index] += 1 self.offset = smallest_offset self.f.seek(smallest_offset) def recv_match(self, condition=None, type=None, blocking=False, timeout=None): '''recv the next message that matches the given condition type can be a string or a list of strings''' if type is not None: if isinstance(type, str): type = set([type]) elif isinstance(type, list): type = set(type) while True: if type is not None: self.skip_to_type(type) m = self.recv_msg() if m is None: if blocking: for hook in self.idle_hooks: hook(self) if timeout is None: self.select(0.05) else: self.select(timeout/2) continue return None if type is not None and not m.get_type() in type: continue if not evaluate_condition(condition, self.messages): continue return m def flightmode_list(self): '''return an array of tuples for all flightmodes in log. Tuple is (modestring, t0, t1)''' tstamp = None fmode = None if self._flightmodes is None: self._rewind() self._flightmodes = [] types = set(['HEARTBEAT']) while True: m = self.recv_match(type=types) if m is None: break tstamp = m._timestamp if self.flightmode == fmode: continue if len(self._flightmodes) > 0: (mode, t0, t1) = self._flightmodes[-1] self._flightmodes[-1] = (mode, t0, tstamp) self._flightmodes.append((self.flightmode, tstamp, None)) fmode = self.flightmode if tstamp is not None: (mode, t0, t1) = self._flightmodes[-1] self._flightmodes[-1] = (mode, t0, tstamp) self._rewind() return self._flightmodes class mavchildexec(mavfile): '''a MAVLink child processes reader/writer''' def __init__(self, filename, source_system=255, source_component=0, use_native=default_native): from subprocess import Popen, PIPE import fcntl self.filename = filename self.child = Popen(filename, shell=False, stdout=PIPE, stdin=PIPE, bufsize=0) self.fd = self.child.stdout.fileno() fl = fcntl.fcntl(self.fd, fcntl.F_GETFL) fcntl.fcntl(self.fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) fl = fcntl.fcntl(self.child.stdout.fileno(), fcntl.F_GETFL) fcntl.fcntl(self.child.stdout.fileno(), fcntl.F_SETFL, fl | os.O_NONBLOCK) mavfile.__init__(self, self.fd, filename, source_system=source_system, source_component=source_component, use_native=use_native) def close(self): self.child.close() def recv(self,n=None): try: x = self.child.stdout.read(1) except Exception: return '' return x def write(self, buf): self.child.stdin.write(buf) def mavlink_connection(device, baud=115200, source_system=255, source_component=0, planner_format=None, write=False, append=False, robust_parsing=True, notimestamps=False, input=True, dialect=None, autoreconnect=False, zero_time_base=False, retries=3, use_native=default_native, force_connected=False, progress_callback=None): '''open a serial, UDP, TCP or file mavlink connection''' global mavfile_global if force_connected: # force_connected implies autoreconnect autoreconnect = True if dialect is not None: set_dialect(dialect) if device.startswith('tcp:'): return mavtcp(device[4:], autoreconnect=autoreconnect, source_system=source_system, source_component=source_component, retries=retries, use_native=use_native) if device.startswith('tcpin:'): return mavtcpin(device[6:], source_system=source_system, source_component=source_component, retries=retries, use_native=use_native) if device.startswith('udpin:'): return mavudp(device[6:], input=True, source_system=source_system, source_component=source_component, use_native=use_native) if device.startswith('udpout:'): return mavudp(device[7:], input=False, source_system=source_system, source_component=source_component, use_native=use_native) if device.startswith('udpbcast:'): return mavudp(device[9:], input=False, source_system=source_system, source_component=source_component, use_native=use_native, broadcast=True) # For legacy purposes we accept the following syntax and let the caller to specify direction if device.startswith('udp:'): return mavudp(device[4:], input=input, source_system=source_system, source_component=source_component, use_native=use_native) if device.startswith('mcast:'): return mavmcast(device[6:], source_system=source_system, source_component=source_component, use_native=use_native) if device.lower().endswith('.bin') or device.lower().endswith('.px4log'): # support dataflash logs from pymavlink import DFReader m = DFReader.DFReader_binary(device, zero_time_base=zero_time_base, progress_callback=progress_callback) mavfile_global = m return m if device.endswith('.log'): # support dataflash text logs from pymavlink import DFReader if DFReader.DFReader_is_text_log(device): m = DFReader.DFReader_text(device, zero_time_base=zero_time_base, progress_callback=progress_callback) mavfile_global = m return m # list of suffixes to prevent setting DOS paths as UDP sockets logsuffixes = ['mavlink', 'log', 'raw', 'tlog' ] suffix = device.split('.')[-1].lower() if device.find(':') != -1 and not suffix in logsuffixes: return mavudp(device, source_system=source_system, source_component=source_component, input=input, use_native=use_native) if os.path.isfile(device): if device.endswith(".elf") or device.find("/bin/") != -1: print("executing '%s'" % device) return mavchildexec(device, source_system=source_system, source_component=source_component, use_native=use_native) elif not write and not append and not notimestamps: return mavmmaplog(device, progress_callback=progress_callback) else: return mavlogfile(device, planner_format=planner_format, write=write, append=append, robust_parsing=robust_parsing, notimestamps=notimestamps, source_system=source_system, source_component=source_component, use_native=use_native) return mavserial(device, baud=baud, source_system=source_system, source_component=source_component, autoreconnect=autoreconnect, use_native=use_native, force_connected=force_connected) class periodic_event(object): '''a class for fixed frequency events''' def __init__(self, frequency): self.frequency = float(frequency) self.last_time = time.time() def force(self): '''force immediate triggering''' self.last_time = 0 def trigger(self): '''return True if we should trigger now''' tnow = time.time() if tnow < self.last_time: print("Warning, time moved backwards. Restarting timer.") self.last_time = tnow if self.last_time + (1.0/self.frequency) <= tnow: self.last_time = tnow return True return False try: from curses import ascii have_ascii = True except: have_ascii = False def is_printable(c): '''see if a character is printable''' global have_ascii if have_ascii: return ascii.isprint(c) if isinstance(c, int): ic = c else: ic = ord(c) return ic >= 32 and ic <= 126 def all_printable(buf): '''see if a string is all printable''' for c in buf: if not is_printable(c) and not c in ['\r', '\n', '\t'] and not c in [ord('\r'), ord('\n'), ord('\t')]: return False return True class SerialPort(object): '''auto-detected serial port''' def __init__(self, device, description=None, hwid=None): self.device = device self.description = description self.hwid = hwid def __str__(self): ret = self.device if self.description is not None: ret += " : " + self.description if self.hwid is not None: ret += " : " + self.hwid return ret def auto_detect_serial_win32(preferred_list=['*']): '''try to auto-detect serial ports on win32''' try: from serial.tools.list_ports_windows import comports list = sorted(comports()) except: return [] ret = [] others = [] for port, description, hwid in list: matches = False p = SerialPort(port, description=description, hwid=hwid) for preferred in preferred_list: if fnmatch.fnmatch(description, preferred) or fnmatch.fnmatch(hwid, preferred): matches = True if matches: ret.append(p) else: others.append(p) if len(ret) > 0: return ret # now the rest ret.extend(others) return ret def auto_detect_serial_unix(preferred_list=['*']): '''try to auto-detect serial ports on unix''' import glob glist = glob.glob('/dev/ttyS*') + glob.glob('/dev/ttyUSB*') + glob.glob('/dev/ttyACM*') + glob.glob('/dev/serial/by-id/*') ret = [] others = [] # try preferred ones first for d in glist: matches = False for preferred in preferred_list: if fnmatch.fnmatch(d, preferred): matches = True if matches: ret.append(SerialPort(d)) else: others.append(SerialPort(d)) if len(ret) > 0: return ret ret.extend(others) return ret def auto_detect_serial(preferred_list=['*']): '''try to auto-detect serial port''' # see if if os.name == 'nt': return auto_detect_serial_win32(preferred_list=preferred_list) return auto_detect_serial_unix(preferred_list=preferred_list) def mode_string_v09(msg): '''mode string for 0.9 protocol''' mode = msg.mode nav_mode = msg.nav_mode MAV_MODE_UNINIT = 0 MAV_MODE_MANUAL = 2 MAV_MODE_GUIDED = 3 MAV_MODE_AUTO = 4 MAV_MODE_TEST1 = 5 MAV_MODE_TEST2 = 6 MAV_MODE_TEST3 = 7 MAV_NAV_GROUNDED = 0 MAV_NAV_LIFTOFF = 1 MAV_NAV_HOLD = 2 MAV_NAV_WAYPOINT = 3 MAV_NAV_VECTOR = 4 MAV_NAV_RETURNING = 5 MAV_NAV_LANDING = 6 MAV_NAV_LOST = 7 MAV_NAV_LOITER = 8 cmode = (mode, nav_mode) mapping = { (MAV_MODE_UNINIT, MAV_NAV_GROUNDED) : "INITIALISING", (MAV_MODE_MANUAL, MAV_NAV_VECTOR) : "MANUAL", (MAV_MODE_TEST3, MAV_NAV_VECTOR) : "CIRCLE", (MAV_MODE_GUIDED, MAV_NAV_VECTOR) : "GUIDED", (MAV_MODE_TEST1, MAV_NAV_VECTOR) : "STABILIZE", (MAV_MODE_TEST2, MAV_NAV_LIFTOFF) : "FBWA", (MAV_MODE_AUTO, MAV_NAV_WAYPOINT) : "AUTO", (MAV_MODE_AUTO, MAV_NAV_RETURNING) : "RTL", (MAV_MODE_AUTO, MAV_NAV_LOITER) : "LOITER", (MAV_MODE_AUTO, MAV_NAV_LIFTOFF) : "TAKEOFF", (MAV_MODE_AUTO, MAV_NAV_LANDING) : "LANDING", (MAV_MODE_AUTO, MAV_NAV_HOLD) : "LOITER", (MAV_MODE_GUIDED, MAV_NAV_VECTOR) : "GUIDED", (MAV_MODE_GUIDED, MAV_NAV_WAYPOINT) : "GUIDED", (100, MAV_NAV_VECTOR) : "STABILIZE", (101, MAV_NAV_VECTOR) : "ACRO", (102, MAV_NAV_VECTOR) : "ALT_HOLD", (107, MAV_NAV_VECTOR) : "CIRCLE", (109, MAV_NAV_VECTOR) : "LAND", } if cmode in mapping: return mapping[cmode] return "Mode(%s,%s)" % cmode mode_mapping_apm = { 0 : 'MANUAL', 1 : 'CIRCLE', 2 : 'STABILIZE', 3 : 'TRAINING', 4 : 'ACRO', 5 : 'FBWA', 6 : 'FBWB', 7 : 'CRUISE', 8 : 'AUTOTUNE', 10 : 'AUTO', 11 : 'RTL', 12 : 'LOITER', 14 : 'LAND', 15 : 'GUIDED', 16 : 'INITIALISING', 17 : 'QSTABILIZE', 18 : 'QHOVER', 19 : 'QLOITER', 20 : 'QLAND', 21 : 'QRTL', 22 : 'QAUTOTUNE', } mode_mapping_acm = { 0 : 'STABILIZE', 1 : 'ACRO', 2 : 'ALT_HOLD', 3 : 'AUTO', 4 : 'GUIDED', 5 : 'LOITER', 6 : 'RTL', 7 : 'CIRCLE', 8 : 'POSITION', 9 : 'LAND', 10 : 'OF_LOITER', 11 : 'DRIFT', 13 : 'SPORT', 14 : 'FLIP', 15 : 'AUTOTUNE', 16 : 'POSHOLD', 17 : 'BRAKE', 18 : 'THROW', 19 : 'AVOID_ADSB', 20 : 'GUIDED_NOGPS', 21 : 'SMART_RTL', 22 : 'FLOWHOLD', 23 : 'FOLLOW', } mode_mapping_rover = { 0 : 'MANUAL', 1 : 'ACRO', 2 : 'LEARNING', 3 : 'STEERING', 4 : 'HOLD', 5 : 'LOITER', 10 : 'AUTO', 11 : 'RTL', 12 : 'SMART_RTL', 15 : 'GUIDED', 16 : 'INITIALISING' } mode_mapping_tracker = { 0 : 'MANUAL', 1 : 'STOP', 2 : 'SCAN', 10 : 'AUTO', 16 : 'INITIALISING' } mode_mapping_sub = { 0: 'STABILIZE', 1: 'ACRO', 2: 'ALT_HOLD', 3: 'AUTO', 4: 'GUIDED', 7: 'CIRCLE', 9: 'SURFACE', 16: 'POSHOLD', 19: 'MANUAL', } # map from a PX4 "main_state" to a string; see msg/commander_state.msg # This allows us to map sdlog STAT.MainState to a simple "mode" # string, used in DFReader and possibly other places. These are # related but distict from what is found in mavlink messages; see # "Custom mode definitions", below. mainstate_mapping_px4 = { 0 : 'MANUAL', 1 : 'ALTCTL', 2 : 'POSCTL', 3 : 'AUTO_MISSION', 4 : 'AUTO_LOITER', 5 : 'AUTO_RTL', 6 : 'ACRO', 7 : 'OFFBOARD', 8 : 'STAB', 9 : 'RATTITUDE', 10 : 'AUTO_TAKEOFF', 11 : 'AUTO_LAND', 12 : 'AUTO_FOLLOW_TARGET', 13 : 'MAX', } def mode_string_px4(MainState): return mainstate_mapping_px4.get(MainState, "Unknown") # Custom mode definitions from PX4 PX4_CUSTOM_MAIN_MODE_MANUAL = 1 PX4_CUSTOM_MAIN_MODE_ALTCTL = 2 PX4_CUSTOM_MAIN_MODE_POSCTL = 3 PX4_CUSTOM_MAIN_MODE_AUTO = 4 PX4_CUSTOM_MAIN_MODE_ACRO = 5 PX4_CUSTOM_MAIN_MODE_OFFBOARD = 6 PX4_CUSTOM_MAIN_MODE_STABILIZED = 7 PX4_CUSTOM_MAIN_MODE_RATTITUDE = 8 PX4_CUSTOM_SUB_MODE_OFFBOARD = 0 PX4_CUSTOM_SUB_MODE_AUTO_READY = 1 PX4_CUSTOM_SUB_MODE_AUTO_TAKEOFF = 2 PX4_CUSTOM_SUB_MODE_AUTO_LOITER = 3 PX4_CUSTOM_SUB_MODE_AUTO_MISSION = 4 PX4_CUSTOM_SUB_MODE_AUTO_RTL = 5 PX4_CUSTOM_SUB_MODE_AUTO_LAND = 6 PX4_CUSTOM_SUB_MODE_AUTO_RTGS = 7 PX4_CUSTOM_SUB_MODE_AUTO_FOLLOW_TARGET = 8 auto_mode_flags = mavlink.MAV_MODE_FLAG_AUTO_ENABLED \ | mavlink.MAV_MODE_FLAG_STABILIZE_ENABLED \ | mavlink.MAV_MODE_FLAG_GUIDED_ENABLED px4_map = { "MANUAL": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | mavlink.MAV_MODE_FLAG_STABILIZE_ENABLED | mavlink.MAV_MODE_FLAG_MANUAL_INPUT_ENABLED, PX4_CUSTOM_MAIN_MODE_MANUAL, 0 ), "STABILIZED": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | mavlink.MAV_MODE_FLAG_STABILIZE_ENABLED | mavlink.MAV_MODE_FLAG_MANUAL_INPUT_ENABLED, PX4_CUSTOM_MAIN_MODE_STABILIZED, 0 ), "ACRO": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | mavlink.MAV_MODE_FLAG_MANUAL_INPUT_ENABLED, PX4_CUSTOM_MAIN_MODE_ACRO, 0 ), "RATTITUDE": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | mavlink.MAV_MODE_FLAG_MANUAL_INPUT_ENABLED, PX4_CUSTOM_MAIN_MODE_RATTITUDE, 0 ), "ALTCTL": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | mavlink.MAV_MODE_FLAG_STABILIZE_ENABLED | mavlink.MAV_MODE_FLAG_MANUAL_INPUT_ENABLED, PX4_CUSTOM_MAIN_MODE_ALTCTL, 0 ), "POSCTL": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | mavlink.MAV_MODE_FLAG_STABILIZE_ENABLED | mavlink.MAV_MODE_FLAG_MANUAL_INPUT_ENABLED, PX4_CUSTOM_MAIN_MODE_POSCTL, 0 ), "LOITER": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_AUTO, PX4_CUSTOM_SUB_MODE_AUTO_LOITER ), "MISSION": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_AUTO, PX4_CUSTOM_SUB_MODE_AUTO_MISSION ), "RTL": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_AUTO, PX4_CUSTOM_SUB_MODE_AUTO_RTL ), "LAND": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_AUTO, PX4_CUSTOM_SUB_MODE_AUTO_LAND ), "RTGS": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_AUTO, PX4_CUSTOM_SUB_MODE_AUTO_RTGS ), "FOLLOWME": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_AUTO, PX4_CUSTOM_SUB_MODE_AUTO_FOLLOW_TARGET ), "OFFBOARD": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_OFFBOARD, 0 ), "TAKEOFF": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_AUTO, PX4_CUSTOM_SUB_MODE_AUTO_TAKEOFF )} def interpret_px4_mode(base_mode, custom_mode): custom_main_mode = (custom_mode & 0xFF0000) >> 16 custom_sub_mode = (custom_mode & 0xFF000000) >> 24 if base_mode & mavlink.MAV_MODE_FLAG_MANUAL_INPUT_ENABLED != 0: #manual modes if custom_main_mode == PX4_CUSTOM_MAIN_MODE_MANUAL: return "MANUAL" elif custom_main_mode == PX4_CUSTOM_MAIN_MODE_ACRO: return "ACRO" elif custom_main_mode == PX4_CUSTOM_MAIN_MODE_RATTITUDE: return "RATTITUDE" elif custom_main_mode == PX4_CUSTOM_MAIN_MODE_STABILIZED: return "STABILIZED" elif custom_main_mode == PX4_CUSTOM_MAIN_MODE_ALTCTL: return "ALTCTL" elif custom_main_mode == PX4_CUSTOM_MAIN_MODE_POSCTL: return "POSCTL" elif (base_mode & auto_mode_flags) == auto_mode_flags: #auto modes if custom_main_mode & PX4_CUSTOM_MAIN_MODE_AUTO != 0: if custom_sub_mode == PX4_CUSTOM_SUB_MODE_AUTO_MISSION: return "MISSION" elif custom_sub_mode == PX4_CUSTOM_SUB_MODE_AUTO_TAKEOFF: return "TAKEOFF" elif custom_sub_mode == PX4_CUSTOM_SUB_MODE_AUTO_LOITER: return "LOITER" elif custom_sub_mode == PX4_CUSTOM_SUB_MODE_AUTO_FOLLOW_TARGET: return "FOLLOWME" elif custom_sub_mode == PX4_CUSTOM_SUB_MODE_AUTO_RTL: return "RTL" elif custom_sub_mode == PX4_CUSTOM_SUB_MODE_AUTO_LAND: return "LAND" elif custom_sub_mode == PX4_CUSTOM_SUB_MODE_AUTO_RTGS: return "RTGS" elif custom_sub_mode == PX4_CUSTOM_SUB_MODE_OFFBOARD: return "OFFBOARD" return "UNKNOWN" def mode_mapping_byname(mav_type): '''return dictionary mapping mode names to numbers, or None if unknown''' map = None if mav_type in [mavlink.MAV_TYPE_QUADROTOR, mavlink.MAV_TYPE_HELICOPTER, mavlink.MAV_TYPE_HEXAROTOR, mavlink.MAV_TYPE_OCTOROTOR, mavlink.MAV_TYPE_COAXIAL, mavlink.MAV_TYPE_TRICOPTER]: map = mode_mapping_acm if mav_type == mavlink.MAV_TYPE_FIXED_WING: map = mode_mapping_apm if mav_type == mavlink.MAV_TYPE_GROUND_ROVER: map = mode_mapping_rover if mav_type == mavlink.MAV_TYPE_SURFACE_BOAT: map = mode_mapping_rover # for the time being if mav_type == mavlink.MAV_TYPE_ANTENNA_TRACKER: map = mode_mapping_tracker if mav_type == mavlink.MAV_TYPE_SUBMARINE: map = mode_mapping_sub if map is None: return None inv_map = dict((a, b) for (b, a) in map.items()) return inv_map def mode_mapping_bynumber(mav_type): '''return dictionary mapping mode numbers to name, or None if unknown''' map = None if mav_type in [mavlink.MAV_TYPE_QUADROTOR, mavlink.MAV_TYPE_HELICOPTER, mavlink.MAV_TYPE_HEXAROTOR, mavlink.MAV_TYPE_OCTOROTOR, mavlink.MAV_TYPE_DODECAROTOR, mavlink.MAV_TYPE_COAXIAL, mavlink.MAV_TYPE_TRICOPTER]: map = mode_mapping_acm if mav_type == mavlink.MAV_TYPE_FIXED_WING: map = mode_mapping_apm if mav_type == mavlink.MAV_TYPE_GROUND_ROVER: map = mode_mapping_rover if mav_type == mavlink.MAV_TYPE_SURFACE_BOAT: map = mode_mapping_rover # for the time being if mav_type == mavlink.MAV_TYPE_ANTENNA_TRACKER: map = mode_mapping_tracker if mav_type == mavlink.MAV_TYPE_SUBMARINE: map = mode_mapping_sub if map is None: return None return map def mode_string_v10(msg): '''mode string for 1.0 protocol, from heartbeat''' if msg.autopilot == mavlink.MAV_AUTOPILOT_PX4: return interpret_px4_mode(msg.base_mode, msg.custom_mode) if not msg.base_mode & mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED: return "Mode(0x%08x)" % msg.base_mode if msg.type in [ mavlink.MAV_TYPE_QUADROTOR, mavlink.MAV_TYPE_HEXAROTOR, mavlink.MAV_TYPE_OCTOROTOR, mavlink.MAV_TYPE_TRICOPTER, mavlink.MAV_TYPE_COAXIAL, mavlink.MAV_TYPE_HELICOPTER ]: if msg.custom_mode in mode_mapping_acm: return mode_mapping_acm[msg.custom_mode] if msg.type == mavlink.MAV_TYPE_FIXED_WING: if msg.custom_mode in mode_mapping_apm: return mode_mapping_apm[msg.custom_mode] if msg.type == mavlink.MAV_TYPE_GROUND_ROVER: if msg.custom_mode in mode_mapping_rover: return mode_mapping_rover[msg.custom_mode] if msg.type == mavlink.MAV_TYPE_SURFACE_BOAT: if msg.custom_mode in mode_mapping_rover: return mode_mapping_rover[msg.custom_mode] if msg.type == mavlink.MAV_TYPE_ANTENNA_TRACKER: if msg.custom_mode in mode_mapping_tracker: return mode_mapping_tracker[msg.custom_mode] if msg.type == mavlink.MAV_TYPE_SUBMARINE: if msg.custom_mode in mode_mapping_sub: return mode_mapping_sub[msg.custom_mode] return "Mode(%u)" % msg.custom_mode def mode_string_apm(mode_number): '''return mode string for APM:Plane''' if mode_number in mode_mapping_apm: return mode_mapping_apm[mode_number] return "Mode(%u)" % mode_number def mode_string_acm(mode_number): '''return mode string for APM:Copter''' if mode_number in mode_mapping_acm: return mode_mapping_acm[mode_number] return "Mode(%u)" % mode_number class x25crc(object): '''x25 CRC - based on checksum.h from mavlink library''' def __init__(self, buf=''): self.crc = 0xffff self.accumulate(buf) def accumulate(self, buf): '''add in some more bytes''' byte_buf = array.array('B') if isinstance(buf, array.array): byte_buf.extend(buf) else: byte_buf.fromstring(buf) accum = self.crc for b in byte_buf: tmp = b ^ (accum & 0xff) tmp = (tmp ^ (tmp<<4)) & 0xFF accum = (accum>>8) ^ (tmp<<8) ^ (tmp<<3) ^ (tmp>>4) accum = accum & 0xFFFF self.crc = accum class MavlinkSerialPort(object): '''an object that looks like a serial port, but transmits using mavlink SERIAL_CONTROL packets''' def __init__(self, portname, baudrate, devnum=0, devbaud=0, timeout=3, debug=0): from . import mavutil self.baudrate = 0 self.timeout = timeout self._debug = debug self.buf = '' self.port = devnum self.debug("Connecting with MAVLink to %s ..." % portname) self.mav = mavutil.mavlink_connection(portname, autoreconnect=True, baud=baudrate) self.mav.wait_heartbeat() self.debug("HEARTBEAT OK\n") if devbaud != 0: self.setBaudrate(devbaud) self.debug("Locked serial device\n") def debug(self, s, level=1): '''write some debug text''' if self._debug >= level: print(s) def write(self, b): '''write some bytes''' from . import mavutil self.debug("sending '%s' (0x%02x) of len %u\n" % (b, ord(b[0]), len(b)), 2) while len(b) > 0: n = len(b) if n > 70: n = 70 buf = [ord(x) for x in b[:n]] buf.extend([0]*(70-len(buf))) self.mav.mav.serial_control_send(self.port, mavutil.mavlink.SERIAL_CONTROL_FLAG_EXCLUSIVE | mavutil.mavlink.SERIAL_CONTROL_FLAG_RESPOND, 0, 0, n, buf) b = b[n:] def _recv(self): '''read some bytes into self.buf''' from . import mavutil start_time = time.time() while time.time() < start_time + self.timeout: m = self.mav.recv_match(condition='SERIAL_CONTROL.count!=0', type='SERIAL_CONTROL', blocking=False, timeout=0) if m is not None and m.count != 0: break self.mav.mav.serial_control_send(self.port, mavutil.mavlink.SERIAL_CONTROL_FLAG_EXCLUSIVE | mavutil.mavlink.SERIAL_CONTROL_FLAG_RESPOND, 0, 0, 0, [0]*70) m = self.mav.recv_match(condition='SERIAL_CONTROL.count!=0', type='SERIAL_CONTROL', blocking=True, timeout=0.01) if m is not None and m.count != 0: break if m is not None: if self._debug > 2: print(m) data = m.data[:m.count] self.buf += ''.join(str(chr(x)) for x in data) def read(self, n): '''read some bytes''' if len(self.buf) == 0: self._recv() if len(self.buf) > 0: if n > len(self.buf): n = len(self.buf) ret = self.buf[:n] self.buf = self.buf[n:] if self._debug >= 2: for b in ret: self.debug("read 0x%x" % ord(b), 2) return ret return '' def flushInput(self): '''flush any pending input''' self.buf = '' saved_timeout = self.timeout self.timeout = 0.5 self._recv() self.timeout = saved_timeout self.buf = '' self.debug("flushInput") def setBaudrate(self, baudrate): '''set baudrate''' from . import mavutil if self.baudrate == baudrate: return self.baudrate = baudrate self.mav.mav.serial_control_send(self.port, mavutil.mavlink.SERIAL_CONTROL_FLAG_EXCLUSIVE, 0, self.baudrate, 0, [0]*70) self.flushInput() self.debug("Changed baudrate %u" % self.baudrate) def decode_bitmask(messagetype, field, value): try: _class = eval("mavlink.MAVLink_%s_message" % messagetype.lower()) except AttributeError as e: raise AttributeError("No such message type") try: display = _class.fielddisplays_by_name[field] except KeyError as e: raise AttributeError("Not a bitmask field (none specified)") if display != "bitmask": raise ValueError("Not a bitmask field") try: enum_name = _class.fieldenums_by_name[field] except KeyError as e: raise AttributeError("No enum found for bitmask field") try: enum = mavlink.enums[enum_name] except KeyError as e: raise AttributeError("Did not find specified enumeration (%s)" % enum_name) class EnumBitInfo(object): def __init__(self, offset, value, name): self.offset = offset self.value = value self.name = name ret = [] for i in range(0, 64): bit_value = (1 << i) try: enum_entry = enum[bit_value] enum_entry_name = enum_entry.name except KeyError as e: enum_entry_name = None if value == 0: continue if value & bit_value: ret.append( EnumBitInfo(i, True, enum_entry_name) ) value = value & ~bit_value else: ret.append( EnumBitInfo(i, False, enum_entry_name) ) return ret def dump_message_verbose(f, m): '''write an excruciatingly detailed dump of message m to file descriptor f''' try: # __getattr__ may be overridden on m, thus this try/except timestamp = m._timestamp except IOError as e: timestamp = "" if timestamp != "": timestamp = "%s.%02u: " % ( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp)), int(timestamp*100.0)%100) f.write("%s%s (link=%s) (signed=%s) (seq=%u) (src=%u/%u)\n" % (timestamp, m.get_type(), str(m.get_link_id()), str(m.get_signed()), m.get_seq(), m.get_srcSystem(), m.get_srcComponent())) for fieldname in m.get_fieldnames(): # format in those most boring way possible: value = m.format_attr(fieldname) # try to add units: try: units = m.fieldunits_by_name[fieldname] # perform simple unit conversions: if units == "d%": value = value / 10.0 units = "%" if units == "c%": value = value / 100.0 units = "%" if units == "cA": value = value / 100.0 units = "A" elif units == "cdegC": value = value / 100.0 units = "degC" elif units == "cdeg": value = value / 100.0 units = "deg" elif units == "degE7": value = value / 10000000.0 units = "deg" elif units == "mG": value = value / 1000.0 units = "G" elif units == "mrad/s": value = value / 1000.0 units = "rad/s" elif units == "mV": value = value / 1000.0 units = "V" # and give radians in degrees too: if units == "rad": value = "%s%s (%sdeg)" % (value, units, math.degrees(value)) elif units == "rad/s": value = "%s%s (%sdeg/s)" % (value, units, math.degrees(value)) elif units == "rad/s/s": value = "%s%s (%sdeg/s/s)" % (value, units, math.degrees(value)) else: value = "%s%s" % (value, units) except AttributeError as e: # e.g. BAD_DATA pass except KeyError as e: pass # format any bitmask enumerations: try: enum_name = m.fieldenums_by_name[fieldname] display = m.fielddisplays_by_name[fieldname] if enum_name is not None and display == "bitmask": bits = decode_bitmask(m.get_type(), fieldname, value) f.write(" %s: %s\n" % (fieldname, value)) for bit in bits: value = bit.value name = bit.name svalue = " " if not value: svalue = "!" if name is None: name = "[UNKNOWN]" f.write(" %s %s\n" % (svalue, name)) continue # except NameError as e: # pass except AttributeError as e: # e.g. BAD_DATA pass except KeyError as e: pass # add any enumeration name: try: enum_name = m.fieldenums_by_name[fieldname] try: enum_value = mavlink.enums[enum_name][value].name value = "%s (%s)" % (value, enum_value) except KeyError as e: value = "%s (%s)" % (value, "[UNKNOWN]") except AttributeError as e: # e.g. BAD_DATA pass except KeyError as e: pass f.write(" %s: %s\n" % (fieldname, value)) if __name__ == '__main__': serial_list = auto_detect_serial(preferred_list=['*FTDI*',"*Arduino_Mega_2560*", "*3D_Robotics*", "*USB_to_UART*", '*PX4*', '*FMU*']) for port in serial_list: print("%s" % port)