channel.hpp 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. //
  2. // Copyright (c) 2022 Klemens Morgenstern ([email protected])
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_COBALT_CHANNEL_HPP
  8. #define BOOST_COBALT_CHANNEL_HPP
  9. #include <boost/cobalt/this_thread.hpp>
  10. #include <boost/cobalt/unique_handle.hpp>
  11. #include <boost/cobalt/detail/util.hpp>
  12. #include <boost/asio/cancellation_signal.hpp>
  13. #include <boost/asio/cancellation_type.hpp>
  14. #include <boost/circular_buffer.hpp>
  15. #include <boost/config.hpp>
  16. #include <boost/intrusive/list.hpp>
  17. #include <boost/variant2/variant.hpp>
  18. #include <optional>
  19. namespace boost::cobalt
  20. {
  21. template<typename T>
  22. struct channel_reader;
  23. // tag::outline[]
  24. template<typename T>
  25. struct channel
  26. {
  27. // end::outline[]
  28. #if defined(BOOST_COBALT_NO_PMR)
  29. channel(std::size_t limit = 0u,
  30. executor executor = this_thread::get_executor());
  31. #else
  32. // tag::outline[]
  33. // create a channel with a buffer limit, executor & resource.
  34. explicit
  35. channel(std::size_t limit = 0u,
  36. executor executor = this_thread::get_executor(),
  37. pmr::memory_resource * resource = this_thread::get_default_resource());
  38. // end::outline[]
  39. #endif
  40. // tag::outline[]
  41. // not movable.
  42. channel(channel && rhs) noexcept = delete;
  43. channel & operator=(channel && lhs) noexcept = delete;
  44. using executor_type = executor;
  45. const executor_type & get_executor();
  46. // Closes the channel
  47. ~channel();
  48. bool is_open() const;
  49. // close the operation, will cancel all pending ops, too
  50. void close();
  51. // end::outline[]
  52. private:
  53. #if !defined(BOOST_COBALT_NO_PMR)
  54. boost::circular_buffer<T, pmr::polymorphic_allocator<T>> buffer_;
  55. #else
  56. boost::circular_buffer<T> buffer_;
  57. #endif
  58. executor_type executor_;
  59. bool is_closed_{false};
  60. struct read_op : intrusive::list_base_hook<intrusive::link_mode<intrusive::auto_unlink> >
  61. {
  62. channel * chn;
  63. boost::source_location loc;
  64. bool cancelled = false;
  65. std::optional<T> direct{};
  66. asio::cancellation_slot cancel_slot{};
  67. unique_handle<void> awaited_from{nullptr};
  68. void (*begin_transaction)(void*) = nullptr;
  69. void transactional_unlink()
  70. {
  71. if (begin_transaction)
  72. begin_transaction(awaited_from.get());
  73. this->unlink();
  74. }
  75. struct cancel_impl;
  76. bool await_ready() { return !chn->buffer_.empty(); }
  77. template<typename Promise>
  78. BOOST_NOINLINE
  79. std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);
  80. T await_resume();
  81. std::tuple<system::error_code, T> await_resume(const struct as_tuple_tag & );
  82. system::result<T> await_resume(const struct as_result_tag &);
  83. explicit operator bool() const {return chn && chn->is_open();}
  84. };
  85. struct write_op : intrusive::list_base_hook<intrusive::link_mode<intrusive::auto_unlink> >
  86. {
  87. channel * chn;
  88. variant2::variant<T*, const T*> ref;
  89. boost::source_location loc;
  90. bool cancelled = false, direct = false;
  91. asio::cancellation_slot cancel_slot{};
  92. unique_handle<void> awaited_from{nullptr};
  93. void (*begin_transaction)(void*) = nullptr;
  94. void transactional_unlink()
  95. {
  96. if (begin_transaction)
  97. begin_transaction(awaited_from.get());
  98. this->unlink();
  99. }
  100. struct cancel_impl;
  101. bool await_ready() { return !chn->buffer_.full(); }
  102. template<typename Promise>
  103. BOOST_NOINLINE
  104. std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);
  105. void await_resume();
  106. std::tuple<system::error_code> await_resume(const struct as_tuple_tag & );
  107. system::result<void> await_resume(const struct as_result_tag &);
  108. explicit operator bool() const {return chn && chn->is_open();}
  109. };
  110. boost::intrusive::list<read_op, intrusive::constant_time_size<false> > read_queue_;
  111. boost::intrusive::list<write_op, intrusive::constant_time_size<false> > write_queue_;
  112. public:
  113. read_op read(const boost::source_location & loc = BOOST_CURRENT_LOCATION) {return read_op{{}, this, loc}; }
  114. write_op write(const T && value, const boost::source_location & loc = BOOST_CURRENT_LOCATION)
  115. {
  116. return write_op{{}, this, &value, loc};
  117. }
  118. write_op write(const T & value, const boost::source_location & loc = BOOST_CURRENT_LOCATION)
  119. {
  120. return write_op{{}, this, &value, loc};
  121. }
  122. write_op write( T && value, const boost::source_location & loc = BOOST_CURRENT_LOCATION)
  123. {
  124. return write_op{{}, this, &value, loc};
  125. }
  126. write_op write( T & value, const boost::source_location & loc = BOOST_CURRENT_LOCATION)
  127. {
  128. return write_op{{}, this, &value, loc};
  129. }
  130. /*
  131. // tag::outline[]
  132. // an awaitable that yields T
  133. using __read_op__ = __unspecified__;
  134. // an awaitable that yields void
  135. using __write_op__ = __unspecified__;
  136. // read a value to a channel
  137. __read_op__ read();
  138. // write a value to the channel
  139. __write_op__ write(const T && value);
  140. __write_op__ write(const T & value);
  141. __write_op__ write( T && value);
  142. __write_op__ write( T & value);
  143. // write a value to the channel if T is void
  144. __write_op__ write(); // end::outline[]
  145. */
  146. // tag::outline[]
  147. };
  148. // end::outline[]
  149. template<>
  150. struct channel<void>
  151. {
  152. explicit
  153. channel(std::size_t limit = 0u,
  154. executor executor = this_thread::get_executor())
  155. : limit_(limit), executor_(executor) {}
  156. channel(channel &&) noexcept = delete;
  157. channel & operator=(channel && lhs) noexcept = delete;
  158. using executor_type = executor;
  159. const executor_type & get_executor() {return executor_;}
  160. BOOST_COBALT_DECL ~channel();
  161. bool is_open() const {return !is_closed_;}
  162. BOOST_COBALT_DECL void close();
  163. private:
  164. std::size_t limit_;
  165. std::size_t n_{0u};
  166. executor_type executor_;
  167. bool is_closed_{false};
  168. struct read_op : intrusive::list_base_hook<intrusive::link_mode<intrusive::auto_unlink> >
  169. {
  170. channel * chn;
  171. boost::source_location loc;
  172. bool cancelled = false, direct = false;
  173. asio::cancellation_slot cancel_slot{};
  174. unique_handle<void> awaited_from{nullptr};
  175. void (*begin_transaction)(void*) = nullptr;
  176. void transactional_unlink()
  177. {
  178. if (begin_transaction)
  179. begin_transaction(awaited_from.get());
  180. this->unlink();
  181. }
  182. struct cancel_impl;
  183. bool await_ready() { return (chn->n_ > 0); }
  184. template<typename Promise>
  185. BOOST_NOINLINE
  186. std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);
  187. BOOST_COBALT_DECL void await_resume();
  188. BOOST_COBALT_DECL std::tuple<system::error_code> await_resume(const struct as_tuple_tag & );
  189. BOOST_COBALT_DECL system::result<void> await_resume(const struct as_result_tag &);
  190. explicit operator bool() const {return chn && chn->is_open();}
  191. };
  192. struct write_op : intrusive::list_base_hook<intrusive::link_mode<intrusive::auto_unlink> >
  193. {
  194. channel * chn;
  195. boost::source_location loc;
  196. bool cancelled = false, direct = false;
  197. asio::cancellation_slot cancel_slot{};
  198. unique_handle<void> awaited_from{nullptr};
  199. void (*begin_transaction)(void*) = nullptr;
  200. void transactional_unlink()
  201. {
  202. if (begin_transaction)
  203. begin_transaction(awaited_from.get());
  204. this->unlink();
  205. }
  206. struct cancel_impl;
  207. bool await_ready()
  208. {
  209. return chn->n_ < chn->limit_;
  210. }
  211. template<typename Promise>
  212. BOOST_NOINLINE
  213. std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);
  214. BOOST_COBALT_DECL void await_resume();
  215. BOOST_COBALT_DECL std::tuple<system::error_code> await_resume(const struct as_tuple_tag & );
  216. BOOST_COBALT_DECL system::result<void> await_resume(const struct as_result_tag &);
  217. explicit operator bool() const {return chn && chn->is_open();}
  218. };
  219. boost::intrusive::list<read_op, intrusive::constant_time_size<false> > read_queue_;
  220. boost::intrusive::list<write_op, intrusive::constant_time_size<false> > write_queue_;
  221. public:
  222. read_op read(const boost::source_location & loc = BOOST_CURRENT_LOCATION) {return read_op{{}, this, loc}; }
  223. write_op write(const boost::source_location & loc = BOOST_CURRENT_LOCATION) {return write_op{{}, this, loc}; }
  224. };
  225. template<typename T>
  226. struct channel_reader
  227. {
  228. channel_reader(channel<T> & chan,
  229. const boost::source_location & loc = BOOST_CURRENT_LOCATION) : chan_(&chan), loc_(loc) {}
  230. auto operator co_await ()
  231. {
  232. return chan_->read(loc_);
  233. }
  234. explicit operator bool () const {return chan_ && chan_->is_open();}
  235. private:
  236. channel<T> * chan_;
  237. boost::source_location loc_;
  238. };
  239. }
  240. #include <boost/cobalt/impl/channel.hpp>
  241. #endif //BOOST_COBALT_CHANNEL_HPP