unbuffered_channel.hpp 16 KB


  1. // Copyright Oliver Kowalke 2016.
  2. // Distributed under the Boost Software License, Version 1.0.
  3. // (See accompanying file LICENSE_1_0.txt or copy at
  4. // http://www.boost.org/LICENSE_1_0.txt)
  5. #ifndef BOOST_FIBERS_UNBUFFERED_CHANNEL_H
  6. #define BOOST_FIBERS_UNBUFFERED_CHANNEL_H
  7. #include <atomic>
  8. #include <chrono>
  9. #include <cstddef>
  10. #include <cstdint>
  11. #include <memory>
  12. #include <vector>
  13. #include <boost/config.hpp>
  14. #include <boost/fiber/channel_op_status.hpp>
  15. #include <boost/fiber/context.hpp>
  16. #include <boost/fiber/detail/config.hpp>
  17. #include <boost/fiber/detail/convert.hpp>
  18. #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
  19. #include <boost/fiber/detail/exchange.hpp>
  20. #endif
  21. #include <boost/fiber/detail/spinlock.hpp>
  22. #include <boost/fiber/exceptions.hpp>
  23. #include <boost/fiber/waker.hpp>
  24. #ifdef BOOST_HAS_ABI_HEADERS
  25. # include BOOST_ABI_PREFIX
  26. #endif
  27. namespace boost {
  28. namespace fibers {
  29. template< typename T >
  30. class unbuffered_channel {
  31. public:
  32. using value_type = typename std::remove_reference<T>::type;
  33. private:
  34. struct slot {
  35. value_type value;
  36. waker w;
  37. slot( value_type const& value_, waker && w) :
  38. value{ value_ },
  39. w{ std::move(w) } {
  40. }
  41. slot( value_type && value_, waker && w) :
  42. value{ std::move( value_) },
  43. w{ std::move(w) } {
  44. }
  45. };
  46. // shared cacheline
  47. std::atomic< slot * > slot_{ nullptr };
  48. // shared cacheline
  49. std::atomic_bool closed_{ false };
  50. mutable detail::spinlock splk_producers_{};
  51. wait_queue waiting_producers_{};
  52. mutable detail::spinlock splk_consumers_{};
  53. wait_queue waiting_consumers_{};
  54. char pad_[cacheline_length];
  55. bool is_empty_() {
  56. return nullptr == slot_.load( std::memory_order_acquire);
  57. }
  58. bool try_push_( slot * own_slot) {
  59. for (;;) {
  60. slot * s = slot_.load( std::memory_order_acquire);
  61. if ( nullptr == s) {
  62. if ( ! slot_.compare_exchange_strong( s, own_slot, std::memory_order_acq_rel) ) {
  63. continue;
  64. }
  65. return true;
  66. }
  67. return false;
  68. }
  69. }
  70. slot * try_pop_() {
  71. slot * nil_slot = nullptr;
  72. for (;;) {
  73. slot * s = slot_.load( std::memory_order_acquire);
  74. if ( nullptr != s) {
  75. if ( ! slot_.compare_exchange_strong( s, nil_slot, std::memory_order_acq_rel) ) {
  76. continue;}
  77. }
  78. return s;
  79. }
  80. }
  81. public:
  82. unbuffered_channel() = default;
  83. ~unbuffered_channel() {
  84. close();
  85. }
  86. unbuffered_channel( unbuffered_channel const&) = delete;
  87. unbuffered_channel & operator=( unbuffered_channel const&) = delete;
  88. bool is_closed() const noexcept {
  89. return closed_.load( std::memory_order_acquire);
  90. }
  91. void close() noexcept {
  92. // set flag
  93. if ( ! closed_.exchange( true, std::memory_order_acquire) ) {
  94. // notify current waiting
  95. slot * s = slot_.load( std::memory_order_acquire);
  96. if ( nullptr != s) {
  97. // notify context
  98. s->w.wake();
  99. }
  100. detail::spinlock_lock lk1{ splk_producers_ };
  101. waiting_producers_.notify_all();
  102. detail::spinlock_lock lk2{ splk_consumers_ };
  103. waiting_consumers_.notify_all();
  104. }
  105. }
  106. channel_op_status push( value_type const& value) {
  107. context * active_ctx = context::active();
  108. slot s{ value, {} };
  109. for (;;) {
  110. if ( BOOST_UNLIKELY( is_closed() ) ) {
  111. return channel_op_status::closed;
  112. }
  113. s.w = active_ctx->create_waker();
  114. if ( try_push_( & s) ) {
  115. detail::spinlock_lock lk{ splk_consumers_ };
  116. waiting_consumers_.notify_one();
  117. // suspend till value has been consumed
  118. active_ctx->suspend( lk);
  119. // resumed
  120. if ( BOOST_UNLIKELY( is_closed() ) ) {
  121. // channel was closed before value was consumed
  122. return channel_op_status::closed;
  123. }
  124. // value has been consumed
  125. return channel_op_status::success;
  126. }
  127. detail::spinlock_lock lk{ splk_producers_ };
  128. if ( BOOST_UNLIKELY( is_closed() ) ) {
  129. return channel_op_status::closed;
  130. }
  131. if ( is_empty_() ) {
  132. continue;
  133. }
  134. waiting_producers_.suspend_and_wait( lk, active_ctx);
  135. // resumed, slot mabye free
  136. }
  137. }
  138. channel_op_status push( value_type && value) {
  139. context * active_ctx = context::active();
  140. slot s{ std::move( value), {} };
  141. for (;;) {
  142. if ( BOOST_UNLIKELY( is_closed() ) ) {
  143. return channel_op_status::closed;
  144. }
  145. s.w = active_ctx->create_waker();
  146. if ( try_push_( & s) ) {
  147. detail::spinlock_lock lk{ splk_consumers_ };
  148. waiting_consumers_.notify_one();
  149. // suspend till value has been consumed
  150. active_ctx->suspend( lk);
  151. // resumed
  152. if ( BOOST_UNLIKELY( is_closed() ) ) {
  153. // channel was closed before value was consumed
  154. return channel_op_status::closed;
  155. }
  156. // value has been consumed
  157. return channel_op_status::success;
  158. }
  159. detail::spinlock_lock lk{ splk_producers_ };
  160. if ( BOOST_UNLIKELY( is_closed() ) ) {
  161. return channel_op_status::closed;
  162. }
  163. if ( is_empty_() ) {
  164. continue;
  165. }
  166. waiting_producers_.suspend_and_wait( lk, active_ctx);
  167. // resumed, slot mabye free
  168. }
  169. }
  170. template< typename Rep, typename Period >
  171. channel_op_status push_wait_for( value_type const& value,
  172. std::chrono::duration< Rep, Period > const& timeout_duration) {
  173. return push_wait_until( value,
  174. std::chrono::steady_clock::now() + timeout_duration);
  175. }
  176. template< typename Rep, typename Period >
  177. channel_op_status push_wait_for( value_type && value,
  178. std::chrono::duration< Rep, Period > const& timeout_duration) {
  179. return push_wait_until( std::forward< value_type >( value),
  180. std::chrono::steady_clock::now() + timeout_duration);
  181. }
  182. template< typename Clock, typename Duration >
  183. channel_op_status push_wait_until( value_type const& value,
  184. std::chrono::time_point< Clock, Duration > const& timeout_time_) {
  185. context * active_ctx = context::active();
  186. slot s{ value, {} };
  187. std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
  188. for (;;) {
  189. if ( BOOST_UNLIKELY( is_closed() ) ) {
  190. return channel_op_status::closed;
  191. }
  192. s.w = active_ctx->create_waker();
  193. if ( try_push_( & s) ) {
  194. detail::spinlock_lock lk{ splk_consumers_ };
  195. waiting_consumers_.notify_one();
  196. // suspend this producer
  197. if ( ! active_ctx->wait_until(timeout_time, lk, waker(s.w))) {
  198. // clear slot
  199. slot * nil_slot = nullptr, * own_slot = & s;
  200. slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
  201. // resumed, value has not been consumed
  202. return channel_op_status::timeout;
  203. }
  204. // resumed
  205. if ( BOOST_UNLIKELY( is_closed() ) ) {
  206. // channel was closed before value was consumed
  207. return channel_op_status::closed;
  208. }
  209. // value has been consumed
  210. return channel_op_status::success;
  211. }
  212. detail::spinlock_lock lk{ splk_producers_ };
  213. if ( BOOST_UNLIKELY( is_closed() ) ) {
  214. return channel_op_status::closed;
  215. }
  216. if ( is_empty_() ) {
  217. continue;
  218. }
  219. if (! waiting_producers_.suspend_and_wait_until( lk, active_ctx, timeout_time))
  220. {
  221. return channel_op_status::timeout;
  222. }
  223. // resumed, slot maybe free
  224. }
  225. }
  226. template< typename Clock, typename Duration >
  227. channel_op_status push_wait_until( value_type && value,
  228. std::chrono::time_point< Clock, Duration > const& timeout_time_) {
  229. context * active_ctx = context::active();
  230. slot s{ std::move( value), {} };
  231. std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
  232. for (;;) {
  233. if ( BOOST_UNLIKELY( is_closed() ) ) {
  234. return channel_op_status::closed;
  235. }
  236. s.w = active_ctx->create_waker();
  237. if ( try_push_( & s) ) {
  238. detail::spinlock_lock lk{ splk_consumers_ };
  239. waiting_consumers_.notify_one();
  240. // suspend this producer
  241. if ( ! active_ctx->wait_until(timeout_time, lk, waker(s.w))) {
  242. // clear slot
  243. slot * nil_slot = nullptr, * own_slot = & s;
  244. slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
  245. // resumed, value has not been consumed
  246. return channel_op_status::timeout;
  247. }
  248. // resumed
  249. if ( BOOST_UNLIKELY( is_closed() ) ) {
  250. // channel was closed before value was consumed
  251. return channel_op_status::closed;
  252. }
  253. // value has been consumed
  254. return channel_op_status::success;
  255. }
  256. detail::spinlock_lock lk{ splk_producers_ };
  257. if ( BOOST_UNLIKELY( is_closed() ) ) {
  258. return channel_op_status::closed;
  259. }
  260. if ( is_empty_() ) {
  261. continue;
  262. }
  263. if (! waiting_producers_.suspend_and_wait_until( lk, active_ctx, timeout_time))
  264. {
  265. return channel_op_status::timeout;
  266. }
  267. // resumed, slot maybe free
  268. }
  269. }
  270. channel_op_status pop( value_type & value) {
  271. context * active_ctx = context::active();
  272. slot * s = nullptr;
  273. for (;;) {
  274. if ( nullptr != ( s = try_pop_() ) ) {
  275. {
  276. detail::spinlock_lock lk{ splk_producers_ };
  277. waiting_producers_.notify_one();
  278. }
  279. value = std::move( s->value);
  280. // notify context
  281. s->w.wake();
  282. return channel_op_status::success;
  283. }
  284. detail::spinlock_lock lk{ splk_consumers_ };
  285. if ( BOOST_UNLIKELY( is_closed() ) ) {
  286. return channel_op_status::closed;
  287. }
  288. if ( ! is_empty_() ) {
  289. continue;
  290. }
  291. waiting_consumers_.suspend_and_wait( lk, active_ctx);
  292. // resumed, slot mabye set
  293. }
  294. }
  295. value_type value_pop() {
  296. context * active_ctx = context::active();
  297. slot * s = nullptr;
  298. for (;;) {
  299. if ( nullptr != ( s = try_pop_() ) ) {
  300. {
  301. detail::spinlock_lock lk{ splk_producers_ };
  302. waiting_producers_.notify_one();
  303. }
  304. // consume value
  305. value_type value = std::move( s->value);
  306. // notify context
  307. s->w.wake();
  308. return std::move( value);
  309. }
  310. detail::spinlock_lock lk{ splk_consumers_ };
  311. if ( BOOST_UNLIKELY( is_closed() ) ) {
  312. throw fiber_error{
  313. std::make_error_code( std::errc::operation_not_permitted),
  314. "boost fiber: channel is closed" };
  315. }
  316. if ( ! is_empty_() ) {
  317. continue;
  318. }
  319. waiting_consumers_.suspend_and_wait( lk, active_ctx);
  320. // resumed, slot mabye set
  321. }
  322. }
  323. template< typename Rep, typename Period >
  324. channel_op_status pop_wait_for( value_type & value,
  325. std::chrono::duration< Rep, Period > const& timeout_duration) {
  326. return pop_wait_until( value,
  327. std::chrono::steady_clock::now() + timeout_duration);
  328. }
  329. template< typename Clock, typename Duration >
  330. channel_op_status pop_wait_until( value_type & value,
  331. std::chrono::time_point< Clock, Duration > const& timeout_time_) {
  332. context * active_ctx = context::active();
  333. slot * s = nullptr;
  334. std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
  335. for (;;) {
  336. if ( nullptr != ( s = try_pop_() ) ) {
  337. {
  338. detail::spinlock_lock lk{ splk_producers_ };
  339. waiting_producers_.notify_one();
  340. }
  341. // consume value
  342. value = std::move( s->value);
  343. // notify context
  344. s->w.wake();
  345. return channel_op_status::success;
  346. }
  347. detail::spinlock_lock lk{ splk_consumers_ };
  348. if ( BOOST_UNLIKELY( is_closed() ) ) {
  349. return channel_op_status::closed;
  350. }
  351. if ( ! is_empty_() ) {
  352. continue;
  353. }
  354. if ( ! waiting_consumers_.suspend_and_wait_until( lk, active_ctx, timeout_time)) {
  355. return channel_op_status::timeout;
  356. }
  357. }
  358. }
  359. class iterator {
  360. private:
  361. typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
  362. unbuffered_channel * chan_{ nullptr };
  363. storage_type storage_;
  364. void increment_( bool initial = false) {
  365. BOOST_ASSERT( nullptr != chan_);
  366. try {
  367. if ( ! initial) {
  368. reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
  369. }
  370. ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
  371. } catch ( fiber_error const&) {
  372. chan_ = nullptr;
  373. }
  374. }
  375. public:
  376. using iterator_category = std::input_iterator_tag;
  377. using difference_type = std::ptrdiff_t;
  378. using pointer = value_type *;
  379. using reference = value_type &;
  380. using pointer_t = pointer;
  381. using reference_t = reference;
  382. iterator() = default;
  383. explicit iterator( unbuffered_channel< T > * chan) noexcept :
  384. chan_{ chan } {
  385. increment_( true);
  386. }
  387. iterator( iterator const& other) noexcept :
  388. chan_{ other.chan_ } {
  389. }
  390. iterator & operator=( iterator const& other) noexcept {
  391. if ( this == & other) return * this;
  392. chan_ = other.chan_;
  393. return * this;
  394. }
  395. bool operator==( iterator const& other) const noexcept {
  396. return other.chan_ == chan_;
  397. }
  398. bool operator!=( iterator const& other) const noexcept {
  399. return other.chan_ != chan_;
  400. }
  401. iterator & operator++() {
  402. reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
  403. increment_();
  404. return * this;
  405. }
  406. const iterator operator++( int) = delete;
  407. reference_t operator*() noexcept {
  408. return * reinterpret_cast< value_type * >( std::addressof( storage_) );
  409. }
  410. pointer_t operator->() noexcept {
  411. return reinterpret_cast< value_type * >( std::addressof( storage_) );
  412. }
  413. };
  414. friend class iterator;
  415. };
  416. template< typename T >
  417. typename unbuffered_channel< T >::iterator
  418. begin( unbuffered_channel< T > & chan) {
  419. return typename unbuffered_channel< T >::iterator( & chan);
  420. }
  421. template< typename T >
  422. typename unbuffered_channel< T >::iterator
  423. end( unbuffered_channel< T > &) {
  424. return typename unbuffered_channel< T >::iterator();
  425. }
  426. }}
  427. #ifdef BOOST_HAS_ABI_HEADERS
  428. # include BOOST_ABI_SUFFIX
  429. #endif
  430. #endif // BOOST_FIBERS_UNBUFFERED_CHANNEL_H