123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285 |
- #
- # Copyright (C) 2014-2016 UAVCAN Development Team <uavcan.org>
- #
- # This software is distributed under the terms of the MIT License.
- #
- # Author: Ben Dyer <ben_dyer@mac.com>
- # Pavel Kirienko <pavel.kirienko@zubax.com>
- #
- from __future__ import division, absolute_import, print_function, unicode_literals
- import os
- import errno
- import fcntl
- import socket
- import struct
- import select
- import time
- import threading
- from logging import getLogger
- from .common import DriverError, TxQueueFullError, CANFrame, AbstractDriver
- from .timestamp_estimator import TimestampEstimator
- try:
- import queue
- except ImportError:
- # noinspection PyPep8Naming,PyUnresolvedReferences
- import Queue as queue
- logger = getLogger(__name__)
- # Python 3.3+'s socket module has support for SocketCAN when running on Linux. Use that if possible.
- # noinspection PyBroadException
- try:
- # noinspection PyStatementEffect
- socket.CAN_RAW
- def get_socket(ifname):
- s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
- s.bind((ifname, ))
- return s
- except Exception:
- import ctypes
- import ctypes.util
- libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True)
- # from linux/can.h
- CAN_RAW = 1
- # from linux/socket.h
- AF_CAN = 29
- from socket import SOL_SOCKET
- SOL_CAN_BASE = 100
- CAN_RAW_FILTER = 1 # set 0 .. n can_filter(s)
- CAN_RAW_ERR_FILTER = 2 # set filter for error frames
- CAN_RAW_LOOPBACK = 3 # local loopback (default:on)
- CAN_RAW_RECV_OWN_MSGS = 4 # receive my own msgs (default:off)
- CAN_RAW_FD_FRAMES = 5 # allow CAN FD frames (default:off)
- # noinspection PyPep8Naming
- class sockaddr_can(ctypes.Structure):
- """
- typedef __u32 canid_t;
- struct sockaddr_can {
- sa_family_t can_family;
- int can_ifindex;
- union {
- struct { canid_t rx_id, tx_id; } tp;
- } can_addr;
- };
- """
- _fields_ = [
- ("can_family", ctypes.c_uint16),
- ("can_ifindex", ctypes.c_int),
- ("can_addr_tp_rx_id", ctypes.c_uint32),
- ("can_addr_tp_tx_id", ctypes.c_uint32)
- ]
- # noinspection PyPep8Naming
- class can_frame(ctypes.Structure):
- """
- typedef __u32 canid_t;
- struct can_frame {
- canid_t can_id;
- __u8 can_dlc;
- __u8 data[8] __attribute__((aligned(8)));
- };
- """
- _fields_ = [
- ("can_id", ctypes.c_uint32),
- ("can_dlc", ctypes.c_uint8),
- ("_pad", ctypes.c_ubyte * 3),
- ("data", ctypes.c_uint8 * 8)
- ]
- class CANSocket(object):
- def __init__(self, fd):
- if fd < 0:
- raise DriverError('Invalid socket fd')
- self.fd = fd
- def recv(self, bufsize):
- buf = ctypes.create_string_buffer(bufsize)
- nbytes = libc.read(self.fd, ctypes.byref(buf), bufsize)
- return buf[0:nbytes]
- def send(self, data):
- frame = can_frame()
- ctypes.memmove(ctypes.byref(frame), data, ctypes.sizeof(frame))
- return libc.write(self.fd, ctypes.byref(frame), ctypes.sizeof(frame))
- def fileno(self):
- return self.fd
- def close(self):
- libc.close(self.fd)
- def get_socket(ifname):
- socket_fd = libc.socket(AF_CAN, socket.SOCK_RAW, CAN_RAW)
- if socket_fd < 0:
- raise DriverError('Could not open socket')
- libc.fcntl(socket_fd, fcntl.F_SETFL, os.O_NONBLOCK)
- ifidx = libc.if_nametoindex(ifname)
- if ctypes.get_errno() != 0:
- raise DriverError('Could not determine iface index [errno %s]' % ctypes.get_errno())
- addr = sockaddr_can(AF_CAN, ifidx)
- error = libc.bind(socket_fd, ctypes.byref(addr), ctypes.sizeof(addr))
- if error != 0:
- raise DriverError('Could not bind socket [errno %s]' % ctypes.get_errno())
- return CANSocket(socket_fd)
- # from linux/can.h
- CAN_EFF_FLAG = 0x80000000
- class SocketCAN(AbstractDriver):
- FRAME_FORMAT = '=IB3x8s'
- TX_QUEUE_SIZE = 1000
- def __init__(self, interface, **_extras):
- super(SocketCAN, self).__init__()
- self.socket = get_socket(interface)
- self._poll_rx = select.poll()
- self._poll_rx.register(self.socket.fileno(), select.POLLIN | select.POLLPRI | select.POLLERR)
- self._writer_thread_should_stop = False
- self._write_queue = queue.Queue(self.TX_QUEUE_SIZE)
- self._write_feedback_queue = queue.Queue()
- self._writer_thread = threading.Thread(target=self._writer_thread_loop, name='socketcan_writer')
- self._writer_thread.daemon = True
- self._writer_thread.start()
- # Timestamping
- self.socket.setsockopt(socket.SOL_SOCKET, SO_TIMESTAMP, 1)
- ppm = lambda x: x / 1e6
- milliseconds = lambda x: x * 1e-3
- # We're using this thing to estimate the difference between monotonic and real clocks
- # See http://stackoverflow.com/questions/35426864 (at the time of writing the question was unanswered)
- self._mono_to_real_estimator = TimestampEstimator(max_rate_error=ppm(100),
- fixed_delay=milliseconds(0.001),
- max_phase_error_to_resync=milliseconds(50))
- def _convert_real_to_monotonic(self, value):
- mono = time.monotonic() # est_real is the best guess about real timestamp here
- real = time.time()
- est_real = self._mono_to_real_estimator.update(mono, real)
- mono_to_real_offset = est_real - mono
- return value - mono_to_real_offset
- def _writer_thread_loop(self):
- while not self._writer_thread_should_stop:
- try:
- frame = self._write_queue.get(timeout=0.1)
- except queue.Empty:
- continue
- try:
- while not self._writer_thread_should_stop:
- try:
- message_id = frame.id | (CAN_EFF_FLAG if frame.extended else 0)
- message_pad = bytes(frame.data) + b'\x00' * (8 - len(frame.data))
- raw_message = struct.pack(self.FRAME_FORMAT, message_id, len(frame.data), message_pad)
- self.socket.send(raw_message)
- frame.ts_monotonic = time.monotonic()
- frame.ts_real = time.time()
- self._write_feedback_queue.put(frame)
- except OSError as ex:
- if ex.errno == errno.ENOBUFS:
- time.sleep(0.0002)
- else:
- raise
- else:
- break
- except Exception as ex:
- self._write_feedback_queue.put(ex)
- def _check_write_feedback(self):
- try:
- item = self._write_feedback_queue.get_nowait()
- except queue.Empty:
- return
- if isinstance(item, Exception):
- raise item
- if isinstance(item, CANFrame):
- self._tx_hook(item)
- else:
- raise DriverError('Unexpected item in write feedback queue: %r' % item)
- def close(self):
- self._writer_thread_should_stop = True
- self._writer_thread.join()
- self.socket.close()
- def receive(self, timeout=None):
- self._check_write_feedback()
- timeout = -1 if timeout is None else (timeout * 1000)
- if self._poll_rx.poll(timeout):
- ts_real = None
- ts_mono = None
- # Reading the frame together with timestamps in the ancillary data structures
- ancillary_len = 64 # Arbitrary value, must be large enough to accommodate all ancillary data
- packet_raw, ancdata, _flags, _addr = self.socket.recvmsg(self.FRAME_SIZE,
- socket.CMSG_SPACE(ancillary_len))
- # Parsing the timestamps
- for cmsg_level, cmsg_type, cmsg_data in ancdata:
- if cmsg_level == socket.SOL_SOCKET and cmsg_type == SO_TIMESTAMP:
- sec, usec = struct.unpack(self.TIMEVAL_FORMAT, cmsg_data)
- ts_real = sec + usec * 1e-6
- else:
- packet_raw = self.socket.recv(self.FRAME_SIZE)
- # Parsing the frame
- can_id, can_dlc, can_data = struct.unpack(self.FRAME_FORMAT, packet_raw)
- # TODO: receive timestamps directly from hardware
- # TODO: ...or at least obtain timestamps from the socket layer in local monotonic domain
- if ts_real and not ts_mono:
- ts_mono = self._convert_real_to_monotonic(ts_real)
- frame = CANFrame(can_id & CAN_EFF_MASK, can_data[0:can_dlc], bool(can_id & CAN_EFF_FLAG),
- ts_monotonic=ts_mono, ts_real=ts_real)
- self._rx_hook(frame)
- return frame
- def send(self, message_id, message, extended=False):
- self._check_write_feedback()
- try:
- self._write_queue.put_nowait(CANFrame(message_id, message, extended))
- except queue.Full:
- raise TxQueueFullError()