buffered_channel.hpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449
  1. // Copyright Oliver Kowalke 2016.
  2. // Distributed under the Boost Software License, Version 1.0.
  3. // (See accompanying file LICENSE_1_0.txt or copy at
  4. // http://www.boost.org/LICENSE_1_0.txt)
  5. //
  6. #ifndef BOOST_FIBERS_BUFFERED_CHANNEL_H
  7. #define BOOST_FIBERS_BUFFERED_CHANNEL_H
  8. #include <atomic>
  9. #include <chrono>
  10. #include <cstddef>
  11. #include <cstdint>
  12. #include <memory>
  13. #include <type_traits>
  14. #include <boost/config.hpp>
  15. #include <boost/fiber/channel_op_status.hpp>
  16. #include <boost/fiber/context.hpp>
  17. #include <boost/fiber/waker.hpp>
  18. #include <boost/fiber/detail/config.hpp>
  19. #include <boost/fiber/detail/convert.hpp>
  20. #include <boost/fiber/detail/spinlock.hpp>
  21. #include <boost/fiber/exceptions.hpp>
  22. #ifdef BOOST_HAS_ABI_HEADERS
  23. # include BOOST_ABI_PREFIX
  24. #endif
  25. namespace boost {
  26. namespace fibers {
  27. template< typename T >
  28. class buffered_channel {
  29. public:
  30. using value_type = typename std::remove_reference<T>::type;
  31. private:
  32. using slot_type = value_type;
  33. mutable detail::spinlock splk_{};
  34. wait_queue waiting_producers_{};
  35. wait_queue waiting_consumers_{};
  36. slot_type * slots_;
  37. std::size_t pidx_{ 0 };
  38. std::size_t cidx_{ 0 };
  39. std::size_t capacity_;
  40. bool closed_{ false };
  41. bool is_full_() const noexcept {
  42. return cidx_ == ((pidx_ + 1) % capacity_);
  43. }
  44. bool is_empty_() const noexcept {
  45. return cidx_ == pidx_;
  46. }
  47. bool is_closed_() const noexcept {
  48. return closed_;
  49. }
  50. public:
  51. explicit buffered_channel( std::size_t capacity) :
  52. capacity_{ capacity } {
  53. if ( BOOST_UNLIKELY( 2 > capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) ) {
  54. throw fiber_error{ std::make_error_code( std::errc::invalid_argument),
  55. "boost fiber: buffer capacity is invalid" };
  56. }
  57. slots_ = new slot_type[capacity_];
  58. }
  59. ~buffered_channel() {
  60. close();
  61. delete [] slots_;
  62. }
  63. buffered_channel( buffered_channel const&) = delete;
  64. buffered_channel & operator=( buffered_channel const&) = delete;
  65. bool is_closed() const noexcept {
  66. detail::spinlock_lock lk{splk_, std::defer_lock};
  67. for(;;) {
  68. if(lk.try_lock())
  69. break;
  70. context::active()->yield();
  71. }
  72. return is_closed_();
  73. }
  74. void close() noexcept {
  75. detail::spinlock_lock lk{splk_, std::defer_lock};
  76. for(;;) {
  77. if(lk.try_lock())
  78. break;
  79. context::active()->yield();
  80. }
  81. if ( ! closed_) {
  82. closed_ = true;
  83. waiting_producers_.notify_all();
  84. waiting_consumers_.notify_all();
  85. }
  86. }
  87. channel_op_status try_push( value_type const& value) {
  88. detail::spinlock_lock lk{splk_, std::defer_lock};
  89. for(;;) {
  90. if(lk.try_lock())
  91. break;
  92. context::active()->yield();
  93. }
  94. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  95. return channel_op_status::closed;
  96. }
  97. if ( is_full_() ) {
  98. return channel_op_status::full;
  99. }
  100. slots_[pidx_] = value;
  101. pidx_ = (pidx_ + 1) % capacity_;
  102. waiting_consumers_.notify_one();
  103. return channel_op_status::success;
  104. }
  105. channel_op_status try_push( value_type && value) {
  106. detail::spinlock_lock lk{splk_, std::defer_lock};
  107. for(;;) {
  108. if(lk.try_lock())
  109. break;
  110. context::active()->yield();
  111. }
  112. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  113. return channel_op_status::closed;
  114. }
  115. if ( is_full_() ) {
  116. return channel_op_status::full;
  117. }
  118. slots_[pidx_] = std::move( value);
  119. pidx_ = (pidx_ + 1) % capacity_;
  120. waiting_consumers_.notify_one();
  121. return channel_op_status::success;
  122. }
  123. channel_op_status push( value_type const& value) {
  124. context * active_ctx = context::active();
  125. for (;;) {
  126. detail::spinlock_lock lk{splk_, std::try_to_lock};
  127. if (!lk) {
  128. active_ctx->yield();
  129. continue;
  130. }
  131. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  132. return channel_op_status::closed;
  133. }
  134. if ( is_full_() ) {
  135. waiting_producers_.suspend_and_wait( lk, active_ctx);
  136. } else {
  137. slots_[pidx_] = value;
  138. pidx_ = (pidx_ + 1) % capacity_;
  139. waiting_consumers_.notify_one();
  140. return channel_op_status::success;
  141. }
  142. }
  143. }
  144. channel_op_status push( value_type && value) {
  145. context * active_ctx = context::active();
  146. for (;;) {
  147. detail::spinlock_lock lk{splk_, std::try_to_lock};
  148. if (!lk) {
  149. active_ctx->yield();
  150. continue;
  151. }
  152. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  153. return channel_op_status::closed;
  154. }
  155. if ( is_full_() ) {
  156. waiting_producers_.suspend_and_wait( lk, active_ctx);
  157. } else {
  158. slots_[pidx_] = std::move( value);
  159. pidx_ = (pidx_ + 1) % capacity_;
  160. waiting_consumers_.notify_one();
  161. return channel_op_status::success;
  162. }
  163. }
  164. }
  165. template< typename Rep, typename Period >
  166. channel_op_status push_wait_for( value_type const& value,
  167. std::chrono::duration< Rep, Period > const& timeout_duration) {
  168. return push_wait_until( value,
  169. std::chrono::steady_clock::now() + timeout_duration);
  170. }
  171. template< typename Rep, typename Period >
  172. channel_op_status push_wait_for( value_type && value,
  173. std::chrono::duration< Rep, Period > const& timeout_duration) {
  174. return push_wait_until( std::forward< value_type >( value),
  175. std::chrono::steady_clock::now() + timeout_duration);
  176. }
  177. template< typename Clock, typename Duration >
  178. channel_op_status push_wait_until( value_type const& value,
  179. std::chrono::time_point< Clock, Duration > const& timeout_time_) {
  180. context * active_ctx = context::active();
  181. std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
  182. for (;;) {
  183. detail::spinlock_lock lk{splk_, std::try_to_lock};
  184. if (!lk) {
  185. active_ctx->yield();
  186. continue;
  187. }
  188. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  189. return channel_op_status::closed;
  190. }
  191. if ( is_full_() ) {
  192. if ( ! waiting_producers_.suspend_and_wait_until( lk, active_ctx, timeout_time)) {
  193. return channel_op_status::timeout;
  194. }
  195. } else {
  196. slots_[pidx_] = value;
  197. pidx_ = (pidx_ + 1) % capacity_;
  198. waiting_consumers_.notify_one();
  199. return channel_op_status::success;
  200. }
  201. }
  202. }
  203. template< typename Clock, typename Duration >
  204. channel_op_status push_wait_until( value_type && value,
  205. std::chrono::time_point< Clock, Duration > const& timeout_time_) {
  206. context * active_ctx = context::active();
  207. std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
  208. for (;;) {
  209. detail::spinlock_lock lk{splk_, std::try_to_lock};
  210. if (!lk) {
  211. active_ctx->yield();
  212. continue;
  213. }
  214. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  215. return channel_op_status::closed;
  216. }
  217. if ( is_full_() ) {
  218. if ( ! waiting_producers_.suspend_and_wait_until( lk, active_ctx, timeout_time)) {
  219. return channel_op_status::timeout;
  220. }
  221. } else {
  222. slots_[pidx_] = std::move( value);
  223. pidx_ = (pidx_ + 1) % capacity_;
  224. // notify one waiting consumer
  225. waiting_consumers_.notify_one();
  226. return channel_op_status::success;
  227. }
  228. }
  229. }
  230. channel_op_status try_pop( value_type & value) {
  231. detail::spinlock_lock lk{splk_, std::defer_lock};
  232. for(;;) {
  233. if(lk.try_lock())
  234. break;
  235. context::active()->yield();
  236. }
  237. if ( is_empty_() ) {
  238. return is_closed_()
  239. ? channel_op_status::closed
  240. : channel_op_status::empty;
  241. }
  242. value = std::move( slots_[cidx_]);
  243. cidx_ = (cidx_ + 1) % capacity_;
  244. waiting_producers_.notify_one();
  245. return channel_op_status::success;
  246. }
  247. channel_op_status pop( value_type & value) {
  248. context * active_ctx = context::active();
  249. for (;;) {
  250. detail::spinlock_lock lk{splk_, std::try_to_lock};
  251. if (!lk) {
  252. active_ctx->yield();
  253. continue;
  254. }
  255. if ( is_empty_() ) {
  256. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  257. return channel_op_status::closed;
  258. }
  259. waiting_consumers_.suspend_and_wait( lk, active_ctx);
  260. } else {
  261. value = std::move( slots_[cidx_]);
  262. cidx_ = (cidx_ + 1) % capacity_;
  263. waiting_producers_.notify_one();
  264. return channel_op_status::success;
  265. }
  266. }
  267. }
  268. value_type value_pop() {
  269. context * active_ctx = context::active();
  270. for (;;) {
  271. detail::spinlock_lock lk{splk_, std::try_to_lock};
  272. if (!lk) {
  273. active_ctx->yield();
  274. continue;
  275. }
  276. if ( is_empty_() ) {
  277. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  278. throw fiber_error{
  279. std::make_error_code( std::errc::operation_not_permitted),
  280. "boost fiber: channel is closed" };
  281. }
  282. waiting_consumers_.suspend_and_wait( lk, active_ctx);
  283. } else {
  284. value_type value = std::move( slots_[cidx_]);
  285. cidx_ = (cidx_ + 1) % capacity_;
  286. waiting_producers_.notify_one();
  287. return value;
  288. }
  289. }
  290. }
  291. template< typename Rep, typename Period >
  292. channel_op_status pop_wait_for( value_type & value,
  293. std::chrono::duration< Rep, Period > const& timeout_duration) {
  294. return pop_wait_until( value,
  295. std::chrono::steady_clock::now() + timeout_duration);
  296. }
  297. template< typename Clock, typename Duration >
  298. channel_op_status pop_wait_until( value_type & value,
  299. std::chrono::time_point< Clock, Duration > const& timeout_time_) {
  300. context * active_ctx = context::active();
  301. std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
  302. for (;;) {
  303. detail::spinlock_lock lk{splk_, std::try_to_lock};
  304. if (!lk) {
  305. active_ctx->yield();
  306. continue;
  307. }
  308. if ( is_empty_() ) {
  309. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  310. return channel_op_status::closed;
  311. }
  312. if ( ! waiting_consumers_.suspend_and_wait_until( lk, active_ctx, timeout_time)) {
  313. return channel_op_status::timeout;
  314. }
  315. } else {
  316. value = std::move( slots_[cidx_]);
  317. cidx_ = (cidx_ + 1) % capacity_;
  318. waiting_producers_.notify_one();
  319. return channel_op_status::success;
  320. }
  321. }
  322. }
  323. class iterator {
  324. private:
  325. typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
  326. buffered_channel * chan_{ nullptr };
  327. storage_type storage_;
  328. void increment_( bool initial = false) {
  329. BOOST_ASSERT( nullptr != chan_);
  330. try {
  331. if ( ! initial) {
  332. reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
  333. }
  334. ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
  335. } catch ( fiber_error const&) {
  336. chan_ = nullptr;
  337. }
  338. }
  339. public:
  340. using iterator_category = std::input_iterator_tag;
  341. using difference_type = std::ptrdiff_t;
  342. using pointer = value_type *;
  343. using reference = value_type &;
  344. using pointer_t = pointer;
  345. using reference_t = reference;
  346. iterator() = default;
  347. explicit iterator( buffered_channel< T > * chan) noexcept :
  348. chan_{ chan } {
  349. increment_( true);
  350. }
  351. iterator( iterator const& other) noexcept :
  352. chan_{ other.chan_ } {
  353. }
  354. iterator & operator=( iterator const& other) noexcept {
  355. if ( BOOST_LIKELY( this != & other) ) {
  356. chan_ = other.chan_;
  357. }
  358. return * this;
  359. }
  360. bool operator==( iterator const& other) const noexcept {
  361. return other.chan_ == chan_;
  362. }
  363. bool operator!=( iterator const& other) const noexcept {
  364. return other.chan_ != chan_;
  365. }
  366. iterator & operator++() {
  367. reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
  368. increment_();
  369. return * this;
  370. }
  371. const iterator operator++( int) = delete;
  372. reference_t operator*() noexcept {
  373. return * reinterpret_cast< value_type * >( std::addressof( storage_) );
  374. }
  375. pointer_t operator->() noexcept {
  376. return reinterpret_cast< value_type * >( std::addressof( storage_) );
  377. }
  378. };
  379. friend class iterator;
  380. };
  381. template< typename T >
  382. typename buffered_channel< T >::iterator
  383. begin( buffered_channel< T > & chan) {
  384. return typename buffered_channel< T >::iterator( & chan);
  385. }
  386. template< typename T >
  387. typename buffered_channel< T >::iterator
  388. end( buffered_channel< T > &) {
  389. return typename buffered_channel< T >::iterator();
  390. }
  391. }}
  392. #ifdef BOOST_HAS_ABI_HEADERS
  393. # include BOOST_ABI_SUFFIX
  394. #endif
  395. #endif // BOOST_FIBERS_BUFFERED_CHANNEL_H