socketcan.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. #
  2. # Copyright (C) 2014-2016 UAVCAN Development Team <uavcan.org>
  3. #
  4. # This software is distributed under the terms of the MIT License.
  5. #
  6. # Author: Ben Dyer <ben_dyer@mac.com>
  7. # Pavel Kirienko <pavel.kirienko@zubax.com>
  8. #
  9. from __future__ import division, absolute_import, print_function, unicode_literals
  10. import os
  11. import errno
  12. import fcntl
  13. import socket
  14. import struct
  15. import select
  16. import time
  17. import threading
  18. from logging import getLogger
  19. from .common import DriverError, TxQueueFullError, CANFrame, AbstractDriver
  20. from .timestamp_estimator import TimestampEstimator
  21. try:
  22. import queue
  23. except ImportError:
  24. # noinspection PyPep8Naming,PyUnresolvedReferences
  25. import Queue as queue
  26. logger = getLogger(__name__)
  27. # Python 3.3+'s socket module has support for SocketCAN when running on Linux. Use that if possible.
  28. # noinspection PyBroadException
  29. try:
  30. # noinspection PyStatementEffect
  31. socket.CAN_RAW
  32. def get_socket(ifname):
  33. s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
  34. s.bind((ifname, ))
  35. return s
  36. NATIVE_SOCKETCAN = True
  37. except Exception:
  38. NATIVE_SOCKETCAN = False
  39. import ctypes
  40. import ctypes.util
  41. libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True)
  42. # from linux/can.h
  43. CAN_RAW = 1
  44. # from linux/socket.h
  45. AF_CAN = 29
  46. from socket import SOL_SOCKET
  47. SOL_CAN_BASE = 100
  48. SOL_CAN_RAW = SOL_CAN_BASE + CAN_RAW
  49. CAN_RAW_FILTER = 1 # set 0 .. n can_filter(s)
  50. CAN_RAW_ERR_FILTER = 2 # set filter for error frames
  51. CAN_RAW_LOOPBACK = 3 # local loopback (default:on)
  52. CAN_RAW_RECV_OWN_MSGS = 4 # receive my own msgs (default:off)
  53. CAN_RAW_FD_FRAMES = 5 # allow CAN FD frames (default:off)
  54. # noinspection PyPep8Naming
  55. class sockaddr_can(ctypes.Structure):
  56. """
  57. typedef __u32 canid_t;
  58. struct sockaddr_can {
  59. sa_family_t can_family;
  60. int can_ifindex;
  61. union {
  62. struct { canid_t rx_id, tx_id; } tp;
  63. } can_addr;
  64. };
  65. """
  66. _fields_ = [
  67. ("can_family", ctypes.c_uint16),
  68. ("can_ifindex", ctypes.c_int),
  69. ("can_addr_tp_rx_id", ctypes.c_uint32),
  70. ("can_addr_tp_tx_id", ctypes.c_uint32)
  71. ]
  72. # noinspection PyPep8Naming
  73. class can_frame(ctypes.Structure):
  74. """
  75. typedef __u32 canid_t;
  76. struct can_frame {
  77. canid_t can_id;
  78. __u8 can_dlc;
  79. __u8 data[8] __attribute__((aligned(8)));
  80. };
  81. """
  82. _fields_ = [
  83. ("can_id", ctypes.c_uint32),
  84. ("can_dlc", ctypes.c_uint8),
  85. ("_pad", ctypes.c_ubyte * 3),
  86. ("data", ctypes.c_uint8 * 8)
  87. ]
  88. class CANSocket(object):
  89. def __init__(self, fd):
  90. if fd < 0:
  91. raise DriverError('Invalid socket fd')
  92. self.fd = fd
  93. def recv(self, bufsize):
  94. buf = ctypes.create_string_buffer(bufsize)
  95. nbytes = libc.read(self.fd, ctypes.byref(buf), bufsize)
  96. return buf[0:nbytes]
  97. def send(self, data):
  98. frame = can_frame()
  99. ctypes.memmove(ctypes.byref(frame), data, ctypes.sizeof(frame))
  100. return libc.write(self.fd, ctypes.byref(frame), ctypes.sizeof(frame))
  101. def fileno(self):
  102. return self.fd
  103. def close(self):
  104. libc.close(self.fd)
  105. def get_socket(ifname):
  106. socket_fd = libc.socket(AF_CAN, socket.SOCK_RAW, CAN_RAW)
  107. if socket_fd < 0:
  108. raise DriverError('Could not open socket')
  109. libc.fcntl(socket_fd, fcntl.F_SETFL, os.O_NONBLOCK)
  110. ifidx = libc.if_nametoindex(ifname)
  111. if ctypes.get_errno() != 0:
  112. raise DriverError('Could not determine iface index [errno %s]' % ctypes.get_errno())
  113. addr = sockaddr_can(AF_CAN, ifidx)
  114. error = libc.bind(socket_fd, ctypes.byref(addr), ctypes.sizeof(addr))
  115. if error != 0:
  116. raise DriverError('Could not bind socket [errno %s]' % ctypes.get_errno())
  117. return CANSocket(socket_fd)
  118. # from linux/can.h
  119. CAN_EFF_FLAG = 0x80000000
  120. CAN_EFF_MASK = 0x1FFFFFFF
  121. SO_TIMESTAMP = 29
  122. class SocketCAN(AbstractDriver):
  123. FRAME_FORMAT = '=IB3x8s'
  124. FRAME_SIZE = 16
  125. TIMEVAL_FORMAT = '@LL'
  126. TX_QUEUE_SIZE = 1000
  127. def __init__(self, interface, **_extras):
  128. super(SocketCAN, self).__init__()
  129. self.socket = get_socket(interface)
  130. self._poll_rx = select.poll()
  131. self._poll_rx.register(self.socket.fileno(), select.POLLIN | select.POLLPRI | select.POLLERR)
  132. self._writer_thread_should_stop = False
  133. self._write_queue = queue.Queue(self.TX_QUEUE_SIZE)
  134. self._write_feedback_queue = queue.Queue()
  135. self._writer_thread = threading.Thread(target=self._writer_thread_loop, name='socketcan_writer')
  136. self._writer_thread.daemon = True
  137. self._writer_thread.start()
  138. # Timestamping
  139. if NATIVE_SOCKETCAN:
  140. self.socket.setsockopt(socket.SOL_SOCKET, SO_TIMESTAMP, 1)
  141. ppm = lambda x: x / 1e6
  142. milliseconds = lambda x: x * 1e-3
  143. # We're using this thing to estimate the difference between monotonic and real clocks
  144. # See http://stackoverflow.com/questions/35426864 (at the time of writing the question was unanswered)
  145. self._mono_to_real_estimator = TimestampEstimator(max_rate_error=ppm(100),
  146. fixed_delay=milliseconds(0.001),
  147. max_phase_error_to_resync=milliseconds(50))
  148. def _convert_real_to_monotonic(self, value):
  149. mono = time.monotonic() # est_real is the best guess about real timestamp here
  150. real = time.time()
  151. est_real = self._mono_to_real_estimator.update(mono, real)
  152. mono_to_real_offset = est_real - mono
  153. return value - mono_to_real_offset
  154. def _writer_thread_loop(self):
  155. while not self._writer_thread_should_stop:
  156. try:
  157. frame = self._write_queue.get(timeout=0.1)
  158. except queue.Empty:
  159. continue
  160. try:
  161. while not self._writer_thread_should_stop:
  162. try:
  163. message_id = frame.id | (CAN_EFF_FLAG if frame.extended else 0)
  164. message_pad = bytes(frame.data) + b'\x00' * (8 - len(frame.data))
  165. raw_message = struct.pack(self.FRAME_FORMAT, message_id, len(frame.data), message_pad)
  166. self.socket.send(raw_message)
  167. frame.ts_monotonic = time.monotonic()
  168. frame.ts_real = time.time()
  169. self._write_feedback_queue.put(frame)
  170. except OSError as ex:
  171. if ex.errno == errno.ENOBUFS:
  172. time.sleep(0.0002)
  173. else:
  174. raise
  175. else:
  176. break
  177. except Exception as ex:
  178. self._write_feedback_queue.put(ex)
  179. def _check_write_feedback(self):
  180. try:
  181. item = self._write_feedback_queue.get_nowait()
  182. except queue.Empty:
  183. return
  184. if isinstance(item, Exception):
  185. raise item
  186. if isinstance(item, CANFrame):
  187. self._tx_hook(item)
  188. else:
  189. raise DriverError('Unexpected item in write feedback queue: %r' % item)
  190. def close(self):
  191. self._writer_thread_should_stop = True
  192. self._writer_thread.join()
  193. self.socket.close()
  194. def receive(self, timeout=None):
  195. self._check_write_feedback()
  196. timeout = -1 if timeout is None else (timeout * 1000)
  197. if self._poll_rx.poll(timeout):
  198. ts_real = None
  199. ts_mono = None
  200. if NATIVE_SOCKETCAN:
  201. # Reading the frame together with timestamps in the ancillary data structures
  202. ancillary_len = 64 # Arbitrary value, must be large enough to accommodate all ancillary data
  203. packet_raw, ancdata, _flags, _addr = self.socket.recvmsg(self.FRAME_SIZE,
  204. socket.CMSG_SPACE(ancillary_len))
  205. # Parsing the timestamps
  206. for cmsg_level, cmsg_type, cmsg_data in ancdata:
  207. if cmsg_level == socket.SOL_SOCKET and cmsg_type == SO_TIMESTAMP:
  208. sec, usec = struct.unpack(self.TIMEVAL_FORMAT, cmsg_data)
  209. ts_real = sec + usec * 1e-6
  210. else:
  211. packet_raw = self.socket.recv(self.FRAME_SIZE)
  212. # Parsing the frame
  213. can_id, can_dlc, can_data = struct.unpack(self.FRAME_FORMAT, packet_raw)
  214. # TODO: receive timestamps directly from hardware
  215. # TODO: ...or at least obtain timestamps from the socket layer in local monotonic domain
  216. if ts_real and not ts_mono:
  217. ts_mono = self._convert_real_to_monotonic(ts_real)
  218. frame = CANFrame(can_id & CAN_EFF_MASK, can_data[0:can_dlc], bool(can_id & CAN_EFF_FLAG),
  219. ts_monotonic=ts_mono, ts_real=ts_real)
  220. self._rx_hook(frame)
  221. return frame
  222. def send(self, message_id, message, extended=False):
  223. self._check_write_feedback()
  224. try:
  225. self._write_queue.put_nowait(CANFrame(message_id, message, extended))
  226. except queue.Full:
  227. raise TxQueueFullError()