slcan.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795
  1. #
  2. # Copyright (C) 2014-2016 UAVCAN Development Team <uavcan.org>
  3. #
  4. # This software is distributed under the terms of the MIT License.
  5. #
  6. # Author: Ben Dyer <ben_dyer@mac.com>
  7. # Pavel Kirienko <pavel.kirienko@zubax.com>
  8. #
  9. from __future__ import division, absolute_import, print_function, unicode_literals
  10. import os
  11. import sys
  12. import time
  13. import inspect
  14. import binascii
  15. import select
  16. import multiprocessing
  17. import threading
  18. import copy
  19. from logging import getLogger
  20. from .common import DriverError, TxQueueFullError, CANFrame, AbstractDriver
  21. from .timestamp_estimator import TimestampEstimator
  22. try:
  23. import queue
  24. except ImportError:
  25. # noinspection PyPep8Naming,PyUnresolvedReferences
  26. import Queue as queue
  27. logger = getLogger(__name__)
  28. # If PySerial isn't available, we can't support SLCAN
  29. try:
  30. import serial
  31. except ImportError:
  32. serial = None
  33. logger.info("Cannot import PySerial; SLCAN will not be available.")
  34. try:
  35. # noinspection PyUnresolvedReferences
  36. sys.getwindowsversion()
  37. RUNNING_ON_WINDOWS = True
  38. except AttributeError:
  39. RUNNING_ON_WINDOWS = False
  40. #
  41. # Constants and defaults
  42. #
  43. if 'darwin' in sys.platform:
  44. RX_QUEUE_SIZE = 32767 # http://stackoverflow.com/questions/5900985/multiprocessing-queue-maxsize-limit-is-32767
  45. else:
  46. RX_QUEUE_SIZE = 1000000
  47. TX_QUEUE_SIZE = 1000
  48. TIMESTAMP_OVERFLOW_PERIOD = 60 # Defined by SLCAN protocol
  49. DEFAULT_BITRATE = 1000000
  50. DEFAULT_BAUDRATE = 3000000
  51. ACK_TIMEOUT = 0.5
  52. ACK = b'\r'
  53. NACK = b'\x07'
  54. CLI_END_OF_LINE = b'\r\n'
  55. CLI_END_OF_TEXT = b'\x03'
  56. DEFAULT_MAX_ADAPTER_CLOCK_RATE_ERROR_PPM = 200 # Suits virtually all adapters
  57. DEFAULT_FIXED_RX_DELAY = 0.0002 # Good for USB, could be higher for UART
  58. DEFAULT_MAX_ESTIMATED_RX_DELAY_TO_RESYNC = 0.1 # When clock divergence exceeds this value, resync
  59. IO_PROCESS_INIT_TIMEOUT = 10
  60. IO_PROCESS_NICENESS_INCREMENT = -18
  61. MAX_SUCCESSIVE_ERRORS_TO_GIVE_UP = 1000
  62. #
  63. # IPC entities
  64. #
  65. IPC_SIGNAL_INIT_OK = 'init_ok' # Sent from IO process to the parent process when init is done
  66. IPC_COMMAND_STOP = 'stop' # Sent from parent process to the IO process when it's time to exit
  67. class IPCCommandLineExecutionRequest:
  68. DEFAULT_TIMEOUT = 1
  69. def __init__(self, command, timeout=None):
  70. if isinstance(command, bytes):
  71. command = command.decode('utf8')
  72. self.command = command.lstrip()
  73. self.monotonic_deadline = time.monotonic() + (timeout or self.DEFAULT_TIMEOUT)
  74. @property
  75. def expired(self):
  76. return time.monotonic() >= self.monotonic_deadline
  77. class IPCCommandLineExecutionResponse:
  78. def __init__(self, command, lines=None, expired=False):
  79. def try_decode(what):
  80. if isinstance(what, bytes):
  81. return what.decode('utf8')
  82. return what
  83. self.command = try_decode(command)
  84. self.lines = [try_decode(ln) for ln in (lines or [])]
  85. self.expired = expired
  86. def __str__(self):
  87. if not self.expired:
  88. return '%r %r' % (self.command, self.lines)
  89. else:
  90. return '%r EXPIRED' % self.command
  91. __repr__ = __str__
  92. _pending_command_line_execution_requests = queue.Queue()
  93. #
  94. # Logic of the IO process
  95. #
  96. class RxWorker:
  97. PY2_COMPAT = sys.version_info[0] < 3
  98. SELECT_TIMEOUT = 0.1
  99. READ_BUFFER_SIZE = 1024 * 8 # Arbitrary large number
  100. def __init__(self, conn, rx_queue, ts_estimator_mono, ts_estimator_real, termination_condition):
  101. self._conn = conn
  102. self._output_queue = rx_queue
  103. self._ts_estimator_mono = ts_estimator_mono
  104. self._ts_estimator_real = ts_estimator_real
  105. self._termination_condition = termination_condition
  106. if RUNNING_ON_WINDOWS:
  107. # select() doesn't work on serial ports under Windows, so we have to resort to workarounds. :(
  108. self._conn.timeout = self.SELECT_TIMEOUT
  109. else:
  110. self._conn.timeout = 0
  111. def _read_port(self):
  112. if RUNNING_ON_WINDOWS:
  113. data = self._conn.read(max(1, self._conn.inWaiting()))
  114. # Timestamping as soon as possible after unblocking
  115. ts_mono = time.monotonic()
  116. ts_real = time.time()
  117. else:
  118. select.select([self._conn.fileno()], [], [], self.SELECT_TIMEOUT)
  119. # Timestamping as soon as possible after unblocking
  120. ts_mono = time.monotonic()
  121. ts_real = time.time()
  122. # Read as much data as possible in order to avoid RX overrun
  123. data = self._conn.read(self.READ_BUFFER_SIZE)
  124. return data, ts_mono, ts_real
  125. def _process_slcan_line(self, line, local_ts_mono, local_ts_real):
  126. line = line.strip().strip(NACK).strip(CLI_END_OF_TEXT)
  127. line_len = len(line)
  128. if line_len < 1:
  129. return
  130. # Checking the header, ignore all irrelevant lines
  131. if line[0] == b'T'[0]:
  132. id_len = 8
  133. elif line[0] == b't'[0]:
  134. id_len = 3
  135. else:
  136. return
  137. # Parsing ID and DLC
  138. packet_id = int(line[1:1 + id_len], 16)
  139. if self.PY2_COMPAT:
  140. packet_len = int(line[1 + id_len]) # This version is horribly slow
  141. else:
  142. packet_len = line[1 + id_len] - 48 # Py3 version is faster
  143. if packet_len > 8 or packet_len < 0:
  144. raise DriverError('Invalid packet length')
  145. # Parsing the payload, detecting timestamp
  146. # <type> <id> <dlc> <data> [timestamp]
  147. # 1 3|8 1 packet_len * 2 [4]
  148. with_timestamp = line_len > (2 + id_len + packet_len * 2)
  149. packet_data = binascii.a2b_hex(line[2 + id_len:2 + id_len + packet_len * 2])
  150. # Handling the timestamp, if present
  151. if with_timestamp:
  152. ts_hardware = int(line[-4:], 16) * 1e-3
  153. ts_mono = self._ts_estimator_mono.update(ts_hardware, local_ts_mono)
  154. ts_real = self._ts_estimator_real.update(ts_hardware, local_ts_real)
  155. else:
  156. ts_mono = local_ts_mono
  157. ts_real = local_ts_real
  158. frame = CANFrame(packet_id, packet_data, (id_len == 8), ts_monotonic=ts_mono, ts_real=ts_real)
  159. self._output_queue.put_nowait(frame)
  160. def _process_many_slcan_lines(self, lines, ts_mono, ts_real):
  161. for slc in lines:
  162. # noinspection PyBroadException
  163. try:
  164. self._process_slcan_line(slc, local_ts_mono=ts_mono, local_ts_real=ts_real)
  165. except Exception:
  166. logger.error('Could not process SLCAN line %r', slc, exc_info=True)
  167. # noinspection PyBroadException
  168. def run(self):
  169. logger.info('RX worker started')
  170. successive_errors = 0
  171. data = bytes()
  172. outstanding_command = None
  173. outstanding_command_response_lines = []
  174. while not self._termination_condition():
  175. try:
  176. new_data, ts_mono, ts_real = self._read_port()
  177. data += new_data
  178. # Checking the command queue and handling command timeouts
  179. while True:
  180. if outstanding_command is None:
  181. try:
  182. outstanding_command = _pending_command_line_execution_requests.get_nowait()
  183. outstanding_command_response_lines = []
  184. except queue.Empty:
  185. break
  186. if outstanding_command.expired:
  187. self._output_queue.put(IPCCommandLineExecutionResponse(outstanding_command.command,
  188. expired=True))
  189. outstanding_command = None
  190. else:
  191. break
  192. # Processing in normal mode if there's no outstanding command; using much slower CLI mode otherwise
  193. if outstanding_command is None:
  194. slcan_lines = data.split(ACK)
  195. slcan_lines, data = slcan_lines[:-1], slcan_lines[-1]
  196. self._process_many_slcan_lines(slcan_lines, ts_mono=ts_mono, ts_real=ts_real)
  197. del slcan_lines
  198. else:
  199. # TODO This branch contains dirty and poorly tested code. Refactor once the protocol matures.
  200. split_lines = data.split(CLI_END_OF_LINE)
  201. split_lines, data = split_lines[:-1], split_lines[-1]
  202. # Processing the mix of SLCAN and CLI lines
  203. for ln in split_lines:
  204. tmp = ln.split(ACK)
  205. slcan_lines, cli_line = tmp[:-1], tmp[-1]
  206. self._process_many_slcan_lines(slcan_lines, ts_mono=ts_mono, ts_real=ts_real)
  207. # Processing the CLI line
  208. logger.debug('Processing CLI response line %r as...', cli_line)
  209. if len(outstanding_command_response_lines) == 0:
  210. if outstanding_command is not None and \
  211. cli_line == outstanding_command.command.encode('utf8'):
  212. logger.debug('...echo')
  213. outstanding_command_response_lines.append(cli_line)
  214. else:
  215. # Otherwise we're receiving some CLI garbage before or after the command output, e.g.
  216. # end of the previous command output if it was missed
  217. logger.debug('...garbage')
  218. else:
  219. if cli_line == CLI_END_OF_TEXT:
  220. logger.debug('...end-of-text')
  221. # Shipping
  222. response = IPCCommandLineExecutionResponse(outstanding_command.command,
  223. lines=outstanding_command_response_lines[1:])
  224. self._output_queue.put(response)
  225. # Immediately fetching the next command, expiration is not checked
  226. try:
  227. outstanding_command = _pending_command_line_execution_requests.get_nowait()
  228. except queue.Empty:
  229. outstanding_command = None
  230. outstanding_command_response_lines = []
  231. else:
  232. logger.debug('...mid response')
  233. outstanding_command_response_lines.append(cli_line)
  234. del split_lines
  235. # The remainder may contain SLCAN and CLI lines as well;
  236. # there is no reason not to process SLCAN ones immediately.
  237. # The last byte could be beginning of an \r\n sequence, so it's excluded from parsing.
  238. data, last_byte = data[:-1], data[-1:]
  239. slcan_lines = data.split(ACK)
  240. slcan_lines, data = slcan_lines[:-1], slcan_lines[-1] + last_byte
  241. self._process_many_slcan_lines(slcan_lines, ts_mono=ts_mono, ts_real=ts_real)
  242. successive_errors = 0
  243. except Exception as ex:
  244. # TODO: handle the case when the port is closed
  245. logger.error('RX thread error %d of %d',
  246. successive_errors, MAX_SUCCESSIVE_ERRORS_TO_GIVE_UP, exc_info=True)
  247. try:
  248. self._output_queue.put_nowait(ex)
  249. except Exception:
  250. pass
  251. successive_errors += 1
  252. if successive_errors >= MAX_SUCCESSIVE_ERRORS_TO_GIVE_UP:
  253. break
  254. logger.info('RX worker is stopping')
  255. class TxWorker:
  256. QUEUE_BLOCK_TIMEOUT = 0.1
  257. def __init__(self, conn, rx_queue, tx_queue, termination_condition):
  258. self._conn = conn
  259. self._rx_queue = rx_queue
  260. self._tx_queue = tx_queue
  261. self._termination_condition = termination_condition
  262. def _send_frame(self, frame):
  263. line = '%s%d%s\r' % (('T%08X' if frame.extended else 't%03X') % frame.id,
  264. len(frame.data),
  265. binascii.b2a_hex(frame.data).decode('ascii'))
  266. self._conn.write(line.encode('ascii'))
  267. self._conn.flush()
  268. def _execute_command(self, command):
  269. logger.info('Executing command line %r', command.command)
  270. # It is extremely important to write into the queue first, in order to make the RX worker expect the response!
  271. _pending_command_line_execution_requests.put(command)
  272. self._conn.write(command.command.encode('ascii') + CLI_END_OF_LINE)
  273. self._conn.flush()
  274. def run(self):
  275. while True:
  276. try:
  277. command = self._tx_queue.get(True, self.QUEUE_BLOCK_TIMEOUT)
  278. if isinstance(command, CANFrame):
  279. self._send_frame(command)
  280. elif isinstance(command, IPCCommandLineExecutionRequest):
  281. self._execute_command(command)
  282. elif command == IPC_COMMAND_STOP:
  283. break
  284. else:
  285. raise DriverError('IO process received unknown IPC command: %r' % command)
  286. except queue.Empty:
  287. # Checking in this handler in order to avoid interference with traffic
  288. if self._termination_condition():
  289. break
  290. except Exception as ex:
  291. logger.error('TX thread exception', exc_info=True)
  292. # Propagating the exception to the parent process
  293. # noinspection PyBroadException
  294. try:
  295. self._rx_queue.put_nowait(ex)
  296. except Exception:
  297. pass
  298. # noinspection PyUnresolvedReferences
  299. def _raise_self_process_priority():
  300. if RUNNING_ON_WINDOWS:
  301. import win32api
  302. import win32process
  303. import win32con
  304. handle = win32api.OpenProcess(win32con.PROCESS_ALL_ACCESS, True, win32api.GetCurrentProcessId())
  305. win32process.SetPriorityClass(handle, win32process.REALTIME_PRIORITY_CLASS)
  306. else:
  307. import os
  308. os.nice(IO_PROCESS_NICENESS_INCREMENT)
  309. def _init_adapter(conn, bitrate):
  310. def wait_for_ack():
  311. logger.info('Init: Waiting for ACK...')
  312. conn.timeout = ACK_TIMEOUT
  313. while True:
  314. b = conn.read(1)
  315. if not b:
  316. raise DriverError('SLCAN ACK timeout')
  317. if b == NACK:
  318. raise DriverError('SLCAN NACK in response')
  319. if b == ACK:
  320. break
  321. logger.info('Init: Ignoring byte %r while waiting for ACK', b)
  322. def send_command(cmd):
  323. logger.info('Init: Sending command %r', cmd)
  324. conn.write(cmd + b'\r')
  325. speed_code = {
  326. 1000000: 8,
  327. 800000: 7,
  328. 500000: 6,
  329. 250000: 5,
  330. 125000: 4,
  331. 100000: 3,
  332. 50000: 2,
  333. 20000: 1,
  334. 10000: 0
  335. }[bitrate if bitrate is not None else DEFAULT_BITRATE]
  336. num_retries = 3
  337. while True:
  338. try:
  339. # Sending an empty command in order to reset the adapter's command parser, then discarding all output
  340. send_command(b'')
  341. try:
  342. wait_for_ack()
  343. except DriverError:
  344. pass
  345. time.sleep(0.1)
  346. conn.flushInput()
  347. # Making sure the channel is closed - some adapters may refuse to re-open if the channel is already open
  348. send_command(b'C')
  349. try:
  350. wait_for_ack()
  351. except DriverError:
  352. pass
  353. # Setting speed code
  354. send_command(('S%d' % speed_code).encode())
  355. conn.flush()
  356. wait_for_ack()
  357. # Opening the channel
  358. send_command(b'O')
  359. conn.flush()
  360. wait_for_ack()
  361. # Clearing error flags
  362. send_command(b'F')
  363. conn.flush()
  364. try:
  365. wait_for_ack()
  366. except DriverError as ex:
  367. logger.warning('Init: Could not clear error flags (command not supported by the CAN adapter?): %s', ex)
  368. except Exception as ex:
  369. if num_retries > 0:
  370. logger.error('Could not init SLCAN adapter, will retry; error was: %s', ex, exc_info=True)
  371. else:
  372. raise ex
  373. num_retries -= 1
  374. else:
  375. break
  376. # Discarding all input again
  377. time.sleep(0.1)
  378. conn.flushInput()
  379. def _stop_adapter(conn):
  380. conn.write(b'C\r' * 10)
  381. conn.flush()
  382. # noinspection PyBroadException
  383. def _io_process(device,
  384. tx_queue,
  385. rx_queue,
  386. log_queue,
  387. parent_pid,
  388. bitrate=None,
  389. baudrate=None,
  390. max_adapter_clock_rate_error_ppm=None,
  391. fixed_rx_delay=None,
  392. max_estimated_rx_delay_to_resync=None):
  393. try:
  394. # noinspection PyUnresolvedReferences
  395. from logging.handlers import QueueHandler
  396. except ImportError:
  397. pass # Python 2.7, no logging for you
  398. else:
  399. getLogger().addHandler(QueueHandler(log_queue))
  400. getLogger().setLevel('INFO')
  401. logger.info('IO process started with PID %r', os.getpid())
  402. # We don't need stdin
  403. try:
  404. stdin_fileno = sys.stdin.fileno()
  405. sys.stdin.close()
  406. os.close(stdin_fileno)
  407. except Exception:
  408. pass
  409. def is_parent_process_alive():
  410. if RUNNING_ON_WINDOWS:
  411. return True # TODO: Find a working solution for Windows (os.kill(ppid, 0) doesn't work)
  412. else:
  413. return os.getppid() == parent_pid
  414. try:
  415. _raise_self_process_priority()
  416. except Exception as ex:
  417. logger.warning('Could not adjust priority of the IO process: %r', ex)
  418. #
  419. # This is needed to convert timestamps from hardware clock to local clocks
  420. #
  421. if max_adapter_clock_rate_error_ppm is None:
  422. max_adapter_clock_rate_error = DEFAULT_MAX_ADAPTER_CLOCK_RATE_ERROR_PPM / 1e6
  423. else:
  424. max_adapter_clock_rate_error = max_adapter_clock_rate_error_ppm / 1e6
  425. fixed_rx_delay = fixed_rx_delay if fixed_rx_delay is not None else DEFAULT_FIXED_RX_DELAY
  426. max_estimated_rx_delay_to_resync = max_estimated_rx_delay_to_resync or DEFAULT_MAX_ESTIMATED_RX_DELAY_TO_RESYNC
  427. ts_estimator_mono = TimestampEstimator(max_rate_error=max_adapter_clock_rate_error,
  428. source_clock_overflow_period=TIMESTAMP_OVERFLOW_PERIOD,
  429. fixed_delay=fixed_rx_delay,
  430. max_phase_error_to_resync=max_estimated_rx_delay_to_resync)
  431. ts_estimator_real = copy.deepcopy(ts_estimator_mono)
  432. #
  433. # Preparing the RX thread
  434. #
  435. should_exit = False
  436. def rx_thread_wrapper():
  437. rx_worker = RxWorker(conn=conn,
  438. rx_queue=rx_queue,
  439. ts_estimator_mono=ts_estimator_mono,
  440. ts_estimator_real=ts_estimator_real,
  441. termination_condition=lambda: should_exit)
  442. try:
  443. rx_worker.run()
  444. except Exception as ex:
  445. logger.error('RX thread failed, exiting', exc_info=True)
  446. # Propagating the exception to the parent process
  447. rx_queue.put(ex)
  448. rxthd = threading.Thread(target=rx_thread_wrapper, name='slcan_rx')
  449. rxthd.daemon = True
  450. try:
  451. conn = serial.Serial(device, baudrate or DEFAULT_BAUDRATE)
  452. except Exception as ex:
  453. logger.error('Could not open port', exc_info=True)
  454. rx_queue.put(ex)
  455. return
  456. #
  457. # Actual work is here
  458. #
  459. try:
  460. _init_adapter(conn, bitrate)
  461. rxthd.start()
  462. logger.info('IO process initialization complete')
  463. rx_queue.put(IPC_SIGNAL_INIT_OK)
  464. tx_worker = TxWorker(conn=conn,
  465. rx_queue=rx_queue,
  466. tx_queue=tx_queue,
  467. termination_condition=lambda: (should_exit or
  468. not rxthd.is_alive() or
  469. not is_parent_process_alive()))
  470. tx_worker.run()
  471. except Exception as ex:
  472. logger.error('IO process failed', exc_info=True)
  473. rx_queue.put(ex)
  474. finally:
  475. logger.info('IO process is terminating...')
  476. should_exit = True
  477. if rxthd.is_alive():
  478. rxthd.join()
  479. _stop_adapter(conn)
  480. conn.close()
  481. logger.info('IO process is now ready to die, goodbye')
  482. #
  483. # Logic of the main process
  484. #
  485. class SLCAN(AbstractDriver):
  486. """
  487. Driver for SLCAN-compatible CAN bus adapters, with extension to support CLI commands.
  488. Some info on SLCAN can be found here:
  489. - Linux tree: drivers/net/can/slcan.c (http://lxr.free-electrons.com/source/drivers/net/can/slcan.c)
  490. - https://files.zubax.com/docs/Generic_SLCAN_API.pdf
  491. - http://www.can232.com/docs/canusb_manual.pdf
  492. - http://www.fischl.de/usbtin/
  493. The CLI extension allows to execute arbitrary CLI commands on the adapter. The commands differ from regular SLCAN
  494. exchange in the following ways:
  495. - CLI commands are echoed back.
  496. - Every output line of a CLI command, including echo, is terminated with CR LF (\r\n).
  497. - After the last line follows the ASCII End Of Text character (ETX, ^C, ASCII code 0x03) on a separate
  498. line (terminated with CR LF).
  499. - CLI commands must not begin with whitespace characters.
  500. Example:
  501. Input command "stat\r\n" may produce the following output lines:
  502. - Echo: "stat\r\n"
  503. - Data: "First line\r\n", "Second line\r\n", ...
  504. - End Of Text marker: "\x03\r\n"
  505. Refer to https://kb.zubax.com for more info.
  506. """
  507. def __init__(self, device_name, **kwargs):
  508. if not serial:
  509. raise RuntimeError("PySerial not imported; SLCAN is not available. Please install PySerial.")
  510. super(SLCAN, self).__init__()
  511. self._stopping = False
  512. self._rx_queue = multiprocessing.Queue(maxsize=RX_QUEUE_SIZE)
  513. self._tx_queue = multiprocessing.Queue(maxsize=TX_QUEUE_SIZE)
  514. self._log_queue = multiprocessing.Queue()
  515. self._cli_command_requests = [] # List of tuples: (command, callback)
  516. # https://docs.python.org/3/howto/logging-cookbook.html
  517. self._logging_thread = threading.Thread(target=self._logging_proxy_loop, name='slcan_log_proxy')
  518. self._logging_thread.daemon = True
  519. # Removing all unused stuff, because it breaks inter process communications.
  520. kwargs = copy.copy(kwargs)
  521. keep_keys = inspect.getargspec(_io_process).args
  522. for key in list(kwargs.keys()):
  523. if key not in keep_keys:
  524. del kwargs[key]
  525. kwargs['rx_queue'] = self._rx_queue
  526. kwargs['tx_queue'] = self._tx_queue
  527. kwargs['log_queue'] = self._log_queue
  528. kwargs['parent_pid'] = os.getpid()
  529. self._proc = multiprocessing.Process(target=_io_process, name='slcan_io_process',
  530. args=(device_name,), kwargs=kwargs)
  531. self._proc.daemon = True
  532. self._proc.start()
  533. # The logging thread should be started immediately AFTER the IO process is started
  534. self._logging_thread.start()
  535. deadline = time.monotonic() + IO_PROCESS_INIT_TIMEOUT
  536. while True:
  537. try:
  538. sig = self._rx_queue.get(timeout=IO_PROCESS_INIT_TIMEOUT)
  539. if sig == IPC_SIGNAL_INIT_OK:
  540. break
  541. if isinstance(sig, Exception):
  542. self._tx_queue.put(IPC_COMMAND_STOP, timeout=IO_PROCESS_INIT_TIMEOUT)
  543. raise sig
  544. except queue.Empty:
  545. pass
  546. if time.monotonic() > deadline:
  547. self._tx_queue.put(IPC_COMMAND_STOP, timeout=IO_PROCESS_INIT_TIMEOUT)
  548. raise DriverError('IO process did not confirm initialization')
  549. self._check_alive()
  550. # noinspection PyBroadException
  551. def _logging_proxy_loop(self):
  552. while self._proc.is_alive() and not self._stopping:
  553. try:
  554. try:
  555. record = self._log_queue.get(timeout=0.5)
  556. except queue.Empty:
  557. continue
  558. getLogger(record.name).handle(record)
  559. except Exception as ex:
  560. try:
  561. print('SLCAN logging proxy failed:', ex, file=sys.stderr)
  562. except Exception:
  563. pass
  564. logger.info('Logging proxy thread is stopping')
  565. def close(self):
  566. if self._proc.is_alive():
  567. self._tx_queue.put(IPC_COMMAND_STOP)
  568. self._proc.join(10)
  569. # Sometimes the child process stucks at exit, this is a workaround
  570. if self._proc.is_alive() or self._proc.exitcode is None:
  571. logger.warning('IO process refused to exit and will be terminated')
  572. try:
  573. self._proc.terminate()
  574. except Exception as ex:
  575. logger.error('Failed to terminate the IO process [%r]', ex, exc_info=True)
  576. try:
  577. if self._proc.is_alive():
  578. logger.error('IO process refused to terminate, escalating to SIGKILL')
  579. import signal
  580. os.kill(self._proc.pid, signal.SIGKILL)
  581. except Exception as ex:
  582. logger.critical('Failed to kill the IO process [%r]', ex, exc_info=True)
  583. self._stopping = True
  584. self._logging_thread.join()
  585. def __del__(self):
  586. self.close()
  587. def _check_alive(self):
  588. if not self._proc.is_alive():
  589. raise DriverError('IO process is dead :(')
  590. def receive(self, timeout=None):
  591. self._check_alive()
  592. if timeout is None:
  593. deadline = None
  594. elif timeout == 0:
  595. deadline = 0
  596. else:
  597. deadline = time.monotonic() + timeout
  598. while True:
  599. # Blockingly reading the queue
  600. try:
  601. if deadline is None:
  602. get_timeout = None
  603. elif deadline == 0:
  604. # TODO this is a workaround. Zero timeout causes the IPC queue to ALWAYS throw queue.Empty!
  605. get_timeout = 1e-3
  606. else:
  607. # TODO this is a workaround. Zero timeout causes the IPC queue to ALWAYS throw queue.Empty!
  608. get_timeout = max(1e-3, deadline - time.monotonic())
  609. obj = self._rx_queue.get(timeout=get_timeout)
  610. except queue.Empty:
  611. return
  612. # Handling the received thing
  613. if isinstance(obj, CANFrame):
  614. self._rx_hook(obj)
  615. return obj
  616. elif isinstance(obj, Exception): # Propagating exceptions from the IO process to the main process
  617. raise obj
  618. elif isinstance(obj, IPCCommandLineExecutionResponse):
  619. while len(self._cli_command_requests):
  620. (stored_command, stored_callback), self._cli_command_requests = \
  621. self._cli_command_requests[0], self._cli_command_requests[1:]
  622. if stored_command == obj.command:
  623. stored_callback(obj)
  624. break
  625. else:
  626. logger.error('Mismatched CLI response: expected %r, got %r', stored_command, obj.command)
  627. else:
  628. raise DriverError('Unexpected entity in IPC channel: %r' % obj)
  629. # Termination condition
  630. if deadline == 0:
  631. break
  632. elif deadline is not None:
  633. if time.monotonic() >= deadline:
  634. return
  635. def send(self, message_id, message, extended=False):
  636. self._check_alive()
  637. frame = CANFrame(message_id, message, extended)
  638. try:
  639. self._tx_queue.put_nowait(frame)
  640. except queue.Full:
  641. raise TxQueueFullError()
  642. self._tx_hook(frame)
  643. def execute_cli_command(self, command, callback, timeout=None):
  644. """
  645. Executes an arbitrary CLI command on the SLCAN adapter, assuming that the adapter supports CLI commands.
  646. The callback will be invoked from the method receive() using same thread.
  647. If the command times out, the callback will be invoked anyway, with 'expired' flag set.
  648. Args:
  649. command: Command as unicode string or bytes
  650. callback: A callable that accepts one argument.
  651. The argument is an instance of IPCCommandLineExecutionResponse
  652. timeout: Timeout in seconds. None to use default timeout.
  653. """
  654. self._check_alive()
  655. request = IPCCommandLineExecutionRequest(command, timeout)
  656. try:
  657. self._tx_queue.put(request, timeout=timeout)
  658. except queue.Full:
  659. raise TxQueueFullError()
  660. # The command could be modified by the IPCCommandLineExecutionRequest
  661. self._cli_command_requests.append((request.command, callback))