test_multithreading.cpp 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554
  1. /*
  2. * Copyright (C) 2015 Pavel Kirienko <pavel.kirienko@gmail.com>
  3. */
  4. #ifndef NDEBUG
  5. # define UAVCAN_DEBUG 1
  6. #endif
  7. #include <iostream>
  8. #include <thread>
  9. #include <condition_variable>
  10. #include <uavcan_linux/uavcan_linux.hpp>
  11. #include <uavcan/node/sub_node.hpp>
  12. #include <uavcan/protocol/node_status_monitor.hpp>
  13. #include <uavcan/protocol/debug/KeyValue.hpp>
  14. #include "debug.hpp"
  15. /**
  16. * Generic queue based on the linked list class defined in libuavcan.
  17. * This class does not use heap memory.
  18. */
  19. template <typename T>
  20. class Queue
  21. {
  22. struct Item : public uavcan::LinkedListNode<Item>
  23. {
  24. T payload;
  25. template <typename... Args>
  26. Item(Args... args) : payload(args...) { }
  27. };
  28. uavcan::LimitedPoolAllocator allocator_;
  29. uavcan::LinkedListRoot<Item> list_;
  30. public:
  31. Queue(uavcan::IPoolAllocator& arg_allocator, std::size_t block_allocation_quota) :
  32. allocator_(arg_allocator, block_allocation_quota)
  33. {
  34. uavcan::IsDynamicallyAllocatable<Item>::check();
  35. }
  36. bool isEmpty() const { return list_.isEmpty(); }
  37. /**
  38. * Creates one item in-place at the end of the list.
  39. * Returns true if the item was appended successfully, false if there's not enough memory.
  40. * Complexity is O(N) where N is queue length.
  41. */
  42. template <typename... Args>
  43. bool tryEmplace(Args... args)
  44. {
  45. // Allocating memory
  46. void* const ptr = allocator_.allocate(sizeof(Item));
  47. if (ptr == nullptr)
  48. {
  49. return false;
  50. }
  51. // Constructing the new item
  52. Item* const item = new (ptr) Item(args...);
  53. assert(item != nullptr);
  54. // Inserting the new item at the end of the list
  55. Item* p = list_.get();
  56. if (p == nullptr)
  57. {
  58. list_.insert(item);
  59. }
  60. else
  61. {
  62. while (p->getNextListNode() != nullptr)
  63. {
  64. p = p->getNextListNode();
  65. }
  66. assert(p->getNextListNode() == nullptr);
  67. p->setNextListNode(item);
  68. assert(p->getNextListNode()->getNextListNode() == nullptr);
  69. }
  70. return true;
  71. }
  72. /**
  73. * Accesses the first element.
  74. * Nullptr will be returned if the queue is empty.
  75. * Complexity is O(1).
  76. */
  77. T* peek() { return isEmpty() ? nullptr : &list_.get()->payload; }
  78. const T* peek() const { return isEmpty() ? nullptr : &list_.get()->payload; }
  79. /**
  80. * Removes the first element.
  81. * If the queue is empty, nothing will be done and assertion failure will be triggered.
  82. * Complexity is O(1).
  83. */
  84. void pop()
  85. {
  86. Item* const item = list_.get();
  87. assert(item != nullptr);
  88. if (item != nullptr)
  89. {
  90. list_.remove(item);
  91. item->~Item();
  92. allocator_.deallocate(item);
  93. }
  94. }
  95. };
  96. /**
  97. * Feel free to remove.
  98. */
  99. static void testQueue()
  100. {
  101. uavcan::PoolAllocator<1024, uavcan::MemPoolBlockSize> allocator;
  102. Queue<typename uavcan::MakeString<50>::Type> q(allocator, 4);
  103. ENFORCE(q.isEmpty());
  104. ENFORCE(q.peek() == nullptr);
  105. ENFORCE(q.tryEmplace("One"));
  106. ENFORCE(q.tryEmplace("Two"));
  107. ENFORCE(q.tryEmplace("Three"));
  108. ENFORCE(q.tryEmplace("Four"));
  109. ENFORCE(!q.tryEmplace("Five"));
  110. ENFORCE(*q.peek() == "One");
  111. q.pop();
  112. ENFORCE(*q.peek() == "Two");
  113. q.pop();
  114. ENFORCE(*q.peek() == "Three");
  115. q.pop();
  116. ENFORCE(*q.peek() == "Four");
  117. q.pop();
  118. ENFORCE(q.isEmpty());
  119. ENFORCE(q.peek() == nullptr);
  120. }
  121. /**
  122. * Objects of this class are owned by the sub-node thread.
  123. * This class does not use heap memory.
  124. */
  125. class VirtualCanIface : public uavcan::ICanIface,
  126. uavcan::Noncopyable
  127. {
  128. struct RxItem
  129. {
  130. const uavcan::CanRxFrame frame;
  131. const uavcan::CanIOFlags flags;
  132. RxItem(const uavcan::CanRxFrame& arg_frame, uavcan::CanIOFlags arg_flags) :
  133. frame(arg_frame),
  134. flags(arg_flags)
  135. { }
  136. };
  137. std::mutex& mutex_;
  138. uavcan::CanTxQueue prioritized_tx_queue_;
  139. Queue<RxItem> rx_queue_;
  140. int16_t send(const uavcan::CanFrame& frame, uavcan::MonotonicTime tx_deadline, uavcan::CanIOFlags flags) override
  141. {
  142. std::lock_guard<std::mutex> lock(mutex_);
  143. prioritized_tx_queue_.push(frame, tx_deadline, uavcan::CanTxQueue::Volatile, flags);
  144. return 1;
  145. }
  146. int16_t receive(uavcan::CanFrame& out_frame, uavcan::MonotonicTime& out_ts_monotonic,
  147. uavcan::UtcTime& out_ts_utc, uavcan::CanIOFlags& out_flags) override
  148. {
  149. std::lock_guard<std::mutex> lock(mutex_);
  150. if (rx_queue_.isEmpty())
  151. {
  152. return 0;
  153. }
  154. const auto item = *rx_queue_.peek();
  155. rx_queue_.pop();
  156. out_frame = item.frame;
  157. out_ts_monotonic = item.frame.ts_mono;
  158. out_ts_utc = item.frame.ts_utc;
  159. out_flags = item.flags;
  160. return 1;
  161. }
  162. int16_t configureFilters(const uavcan::CanFilterConfig*, std::uint16_t) override { return -uavcan::ErrDriver; }
  163. uint16_t getNumFilters() const override { return 0; }
  164. uint64_t getErrorCount() const override { return 0; }
  165. public:
  166. VirtualCanIface(uavcan::IPoolAllocator& allocator, uavcan::ISystemClock& clock,
  167. std::mutex& arg_mutex, unsigned quota_per_queue) :
  168. mutex_(arg_mutex),
  169. prioritized_tx_queue_(allocator, clock, quota_per_queue),
  170. rx_queue_(allocator, quota_per_queue)
  171. { }
  172. /**
  173. * Note that RX queue overwrites oldest items when overflowed.
  174. * Call this from the main thread only.
  175. * No additional locking is required.
  176. */
  177. void addRxFrame(const uavcan::CanRxFrame& frame, uavcan::CanIOFlags flags)
  178. {
  179. std::lock_guard<std::mutex> lock(mutex_);
  180. if (!rx_queue_.tryEmplace(frame, flags) && !rx_queue_.isEmpty())
  181. {
  182. rx_queue_.pop();
  183. (void)rx_queue_.tryEmplace(frame, flags);
  184. }
  185. }
  186. /**
  187. * Call this from the main thread only.
  188. * No additional locking is required.
  189. */
  190. void flushTxQueueTo(uavcan::INode& main_node, std::uint8_t iface_index)
  191. {
  192. std::lock_guard<std::mutex> lock(mutex_);
  193. const std::uint8_t iface_mask = static_cast<std::uint8_t>(1U << iface_index);
  194. while (auto e = prioritized_tx_queue_.peek())
  195. {
  196. UAVCAN_TRACE("VirtualCanIface", "TX injection [iface=0x%02x]: %s",
  197. unsigned(iface_mask), e->toString().c_str());
  198. const int res = main_node.injectTxFrame(e->frame, e->deadline, iface_mask,
  199. uavcan::CanTxQueue::Qos(e->qos), e->flags);
  200. prioritized_tx_queue_.remove(e);
  201. if (res <= 0)
  202. {
  203. break;
  204. }
  205. }
  206. }
  207. /**
  208. * Call this from the sub-node thread only.
  209. * No additional locking is required.
  210. */
  211. bool hasDataInRxQueue() const
  212. {
  213. std::lock_guard<std::mutex> lock(mutex_);
  214. return !rx_queue_.isEmpty();
  215. }
  216. };
  217. /**
  218. * This interface defines one method that will be called by the main node thread periodically in order to
  219. * transfer contents of TX queue of the sub-node into the TX queue of the main node.
  220. */
  221. class ITxQueueInjector
  222. {
  223. public:
  224. virtual ~ITxQueueInjector() { }
  225. /**
  226. * Flush contents of TX queues into the main node.
  227. * @param main_node Reference to the main node.
  228. */
  229. virtual void injectTxFramesInto(uavcan::INode& main_node) = 0;
  230. };
  231. /**
  232. * Objects of this class are owned by the sub-node thread.
  233. * This class does not use heap memory.
  234. * @tparam SharedMemoryPoolSize Amount of memory, in bytes, that will be statically allocated for the
  235. * memory pool that will be shared across all interfaces for RX/TX queues.
  236. * Typically this value should be no less than 4K per interface.
  237. */
  238. template <unsigned SharedMemoryPoolSize>
  239. class VirtualCanDriver : public uavcan::ICanDriver,
  240. public uavcan::IRxFrameListener,
  241. public ITxQueueInjector,
  242. uavcan::Noncopyable
  243. {
  244. class Event
  245. {
  246. std::mutex m_;
  247. std::condition_variable cv_;
  248. public:
  249. /**
  250. * Note that this method may return spuriously.
  251. */
  252. void waitFor(uavcan::MonotonicDuration duration)
  253. {
  254. std::unique_lock<std::mutex> lk(m_);
  255. (void)cv_.wait_for(lk, std::chrono::microseconds(duration.toUSec()));
  256. }
  257. void signal() { cv_.notify_all(); }
  258. };
  259. Event event_; ///< Used to unblock the select() call when IO happens.
  260. std::mutex mutex_; ///< Shared across all ifaces
  261. uavcan::PoolAllocator<SharedMemoryPoolSize, uavcan::MemPoolBlockSize> allocator_; ///< Shared across all ifaces
  262. uavcan::LazyConstructor<VirtualCanIface> ifaces_[uavcan::MaxCanIfaces];
  263. const unsigned num_ifaces_;
  264. uavcan_linux::SystemClock clock_;
  265. uavcan::ICanIface* getIface(uint8_t iface_index) override
  266. {
  267. return (iface_index < num_ifaces_) ? ifaces_[iface_index].operator VirtualCanIface*() : nullptr;
  268. }
  269. uint8_t getNumIfaces() const override { return num_ifaces_; }
  270. /**
  271. * This and other methods of ICanDriver will be invoked by the sub-node thread.
  272. */
  273. int16_t select(uavcan::CanSelectMasks& inout_masks,
  274. const uavcan::CanFrame* (&)[uavcan::MaxCanIfaces],
  275. uavcan::MonotonicTime blocking_deadline) override
  276. {
  277. bool need_block = (inout_masks.write == 0); // Write queue is infinite
  278. for (unsigned i = 0; need_block && (i < num_ifaces_); i++)
  279. {
  280. const bool need_read = inout_masks.read & (1U << i);
  281. if (need_read && ifaces_[i]->hasDataInRxQueue())
  282. {
  283. need_block = false;
  284. }
  285. }
  286. if (need_block)
  287. {
  288. event_.waitFor(blocking_deadline - clock_.getMonotonic());
  289. }
  290. inout_masks = uavcan::CanSelectMasks();
  291. for (unsigned i = 0; i < num_ifaces_; i++)
  292. {
  293. const std::uint8_t iface_mask = 1U << i;
  294. inout_masks.write |= iface_mask; // Always ready to write
  295. if (ifaces_[i]->hasDataInRxQueue())
  296. {
  297. inout_masks.read |= iface_mask;
  298. }
  299. }
  300. return num_ifaces_; // We're always ready to write, hence > 0.
  301. }
  302. /**
  303. * This handler will be invoked by the main node thread.
  304. */
  305. void handleRxFrame(const uavcan::CanRxFrame& frame, uavcan::CanIOFlags flags) override
  306. {
  307. UAVCAN_TRACE("VirtualCanDriver", "RX [flags=%u]: %s", unsigned(flags), frame.toString().c_str());
  308. if (frame.iface_index < num_ifaces_)
  309. {
  310. ifaces_[frame.iface_index]->addRxFrame(frame, flags);
  311. event_.signal();
  312. }
  313. }
  314. /**
  315. * This method will be invoked by the main node thread.
  316. */
  317. void injectTxFramesInto(uavcan::INode& main_node) override
  318. {
  319. for (unsigned i = 0; i < num_ifaces_; i++)
  320. {
  321. ifaces_[i]->flushTxQueueTo(main_node, i);
  322. }
  323. event_.signal();
  324. }
  325. public:
  326. VirtualCanDriver(unsigned arg_num_ifaces) : num_ifaces_(arg_num_ifaces)
  327. {
  328. assert(num_ifaces_ > 0 && num_ifaces_ <= uavcan::MaxCanIfaces);
  329. const unsigned quota_per_iface = allocator_.getBlockCapacity() / num_ifaces_;
  330. const unsigned quota_per_queue = quota_per_iface; // 2x overcommit
  331. UAVCAN_TRACE("VirtualCanDriver", "Total blocks: %u, quota per queue: %u",
  332. unsigned(allocator_.getBlockCapacity()), unsigned(quota_per_queue));
  333. for (unsigned i = 0; i < num_ifaces_; i++)
  334. {
  335. ifaces_[i].template construct<uavcan::IPoolAllocator&, uavcan::ISystemClock&,
  336. std::mutex&, unsigned>(allocator_, clock_, mutex_, quota_per_queue);
  337. }
  338. }
  339. };
  340. static uavcan_linux::NodePtr initMainNode(const std::vector<std::string>& ifaces, uavcan::NodeID nid,
  341. const std::string& name)
  342. {
  343. std::cout << "Initializing main node" << std::endl;
  344. auto node = uavcan_linux::makeNode(ifaces, name.c_str(),
  345. uavcan::protocol::SoftwareVersion(), uavcan::protocol::HardwareVersion(), nid);
  346. node->getLogger().setLevel(uavcan::protocol::debug::LogLevel::DEBUG);
  347. node->setModeOperational();
  348. return node;
  349. }
  350. template <unsigned QueuePoolSize>
  351. static uavcan_linux::SubNodePtr initSubNode(unsigned num_ifaces, uavcan::INode& main_node)
  352. {
  353. std::cout << "Initializing sub node" << std::endl;
  354. typedef VirtualCanDriver<QueuePoolSize> Driver;
  355. std::shared_ptr<Driver> driver(new Driver(num_ifaces));
  356. auto node = uavcan_linux::makeSubNode(driver, main_node.getNodeID());
  357. main_node.getDispatcher().installRxFrameListener(driver.get());
  358. return node;
  359. }
  360. static void runMainNode(const uavcan_linux::NodePtr& node)
  361. {
  362. std::cout << "Running main node" << std::endl;
  363. auto timer = node->makeTimer(uavcan::MonotonicDuration::fromMSec(10000), [&node](const uavcan::TimerEvent&)
  364. {
  365. node->logInfo("timer", "Your time is running out.");
  366. // coverity[dont_call]
  367. node->setVendorSpecificStatusCode(static_cast<std::uint16_t>(std::rand()));
  368. });
  369. /*
  370. * We know that in this implementation, VirtualCanDriver inherits uavcan::IRxFrameListener, so we can simply
  371. * restore the reference to ITxQueueInjector using dynamic_cast. In other implementations this may be
  372. * unacceptable, so a reference to ITxQueueInjector will have to be passed using some other means.
  373. */
  374. if (node->getDispatcher().getRxFrameListener() == nullptr)
  375. {
  376. throw std::logic_error("RX frame listener is not configured");
  377. }
  378. ITxQueueInjector& tx_injector = dynamic_cast<ITxQueueInjector&>(*node->getDispatcher().getRxFrameListener());
  379. while (true)
  380. {
  381. const int res = node->spin(uavcan::MonotonicDuration::fromMSec(1));
  382. if (res < 0)
  383. {
  384. node->logError("spin", "Error %*", res);
  385. }
  386. // TX queue transfer occurs here.
  387. tx_injector.injectTxFramesInto(*node);
  388. }
  389. }
  390. static void runSubNode(const uavcan_linux::SubNodePtr& node)
  391. {
  392. std::cout << "Running sub node" << std::endl;
  393. /*
  394. * Log subscriber
  395. */
  396. auto log_sub = node->makeSubscriber<uavcan::protocol::debug::LogMessage>(
  397. [](const uavcan::ReceivedDataStructure<uavcan::protocol::debug::LogMessage>& msg)
  398. {
  399. std::cout << msg << std::endl;
  400. });
  401. /*
  402. * Node status monitor
  403. */
  404. struct NodeStatusMonitor : public uavcan::NodeStatusMonitor
  405. {
  406. explicit NodeStatusMonitor(uavcan::INode& node) : uavcan::NodeStatusMonitor(node) { }
  407. virtual void handleNodeStatusChange(const NodeStatusChangeEvent& event) override
  408. {
  409. std::cout << "Remote node NID " << int(event.node_id.get()) << " changed status: "
  410. << event.old_status.toString() << " --> " << event.status.toString() << std::endl;
  411. }
  412. };
  413. NodeStatusMonitor nsm(*node);
  414. ENFORCE(0 == nsm.start());
  415. /*
  416. * KV subscriber
  417. */
  418. auto kv_sub = node->makeSubscriber<uavcan::protocol::debug::KeyValue>(
  419. [](const uavcan::ReceivedDataStructure<uavcan::protocol::debug::KeyValue>& msg)
  420. {
  421. std::cout << msg << std::endl;
  422. });
  423. /*
  424. * KV publisher
  425. */
  426. unsigned kv_value = 0;
  427. auto kv_pub = node->makePublisher<uavcan::protocol::debug::KeyValue>();
  428. auto timer = node->makeTimer(uavcan::MonotonicDuration::fromMSec(5000), [&](const uavcan::TimerEvent&)
  429. {
  430. uavcan::protocol::debug::KeyValue kv;
  431. kv.key = "five_seconds";
  432. kv.value = kv_value++;
  433. const int res = kv_pub->broadcast(kv);
  434. if (res < 0)
  435. {
  436. std::cerr << "Sub KV pub err " << res << std::endl;
  437. }
  438. });
  439. while (true)
  440. {
  441. const int res = node->spin(uavcan::MonotonicDuration::fromMSec(1000));
  442. if (res < 0)
  443. {
  444. std::cerr << "SubNode spin error: " << res << std::endl;
  445. }
  446. }
  447. }
  448. int main(int argc, const char** argv)
  449. {
  450. try
  451. {
  452. testQueue();
  453. constexpr unsigned VirtualIfacePoolSize = 32768;
  454. if (argc < 3)
  455. {
  456. std::cerr << "Usage:\n\t" << argv[0] << " <node-id> <can-iface-name-1> [can-iface-name-N...]" << std::endl;
  457. return 1;
  458. }
  459. const int self_node_id = std::stoi(argv[1]);
  460. std::vector<std::string> iface_names(argv + 2, argv + argc);
  461. auto node = initMainNode(iface_names, self_node_id, "org.uavcan.linux_test_node");
  462. auto sub_node = initSubNode<VirtualIfacePoolSize>(iface_names.size(), *node);
  463. std::thread sub_thread([&sub_node](){ runSubNode(sub_node); });
  464. runMainNode(node);
  465. if (sub_thread.joinable())
  466. {
  467. std::cout << "Waiting for the sub thread to join" << std::endl;
  468. sub_thread.join();
  469. }
  470. return 0;
  471. }
  472. catch (const std::exception& ex)
  473. {
  474. std::cerr << "Exception: " << ex.what() << std::endl;
  475. return 1;
  476. }
  477. }