channel.hpp 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. //
  2. // Copyright (c) 2022 Klemens Morgenstern ([email protected])
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_COBALT_IMPL_CHANNEL_HPP
  8. #define BOOST_COBALT_IMPL_CHANNEL_HPP
  9. #include <boost/cobalt/channel.hpp>
  10. #include <boost/cobalt/result.hpp>
  11. #include <boost/asio/post.hpp>
  12. namespace boost::cobalt
  13. {
  14. #if !defined(BOOST_COBALT_NO_PMR)
  15. template<typename T>
  16. inline channel<T>::channel(
  17. std::size_t limit,
  18. executor executor,
  19. pmr::memory_resource * resource)
  20. : buffer_(limit, resource), executor_(executor) {}
  21. #else
  22. template<typename T>
  23. inline channel<T>::channel(
  24. std::size_t limit,
  25. executor executor)
  26. : buffer_(limit), executor_(executor) {}
  27. #endif
  28. template<typename T>
  29. auto channel<T>::get_executor() -> const executor_type & {return executor_;}
  30. template<typename T>
  31. bool channel<T>::is_open() const {return !is_closed_;}
  32. template<typename T>
  33. channel<T>::~channel()
  34. {
  35. while (!read_queue_.empty())
  36. read_queue_.front().awaited_from.reset();
  37. while (!write_queue_.empty())
  38. write_queue_.front().awaited_from.reset();
  39. }
  40. template<typename T>
  41. void channel<T>::close()
  42. {
  43. is_closed_ = true;
  44. while (!read_queue_.empty())
  45. {
  46. auto & op = read_queue_.front();
  47. op.unlink();
  48. op.cancelled = true;
  49. op.cancel_slot.clear();
  50. if (op.awaited_from)
  51. asio::post(executor_, std::move(op.awaited_from));
  52. }
  53. while (!write_queue_.empty())
  54. {
  55. auto & op = write_queue_.front();
  56. op.unlink();
  57. op.cancelled = true;
  58. op.cancel_slot.clear();
  59. if (op.awaited_from)
  60. asio::post(executor_, std::move(op.awaited_from));
  61. }
  62. }
  63. template<typename T>
  64. struct channel<T>::read_op::cancel_impl
  65. {
  66. read_op * op;
  67. cancel_impl(read_op * op) : op(op) {}
  68. void operator()(asio::cancellation_type)
  69. {
  70. op->cancelled = true;
  71. op->unlink();
  72. if (op->awaited_from)
  73. asio::post(
  74. op->chn->executor_,
  75. std::move(op->awaited_from));
  76. op->cancel_slot.clear();
  77. }
  78. };
  79. template<typename T>
  80. template<typename Promise>
  81. std::coroutine_handle<void> channel<T>::read_op::await_suspend(std::coroutine_handle<Promise> h)
  82. {
  83. if constexpr (requires (Promise p) {p.get_cancellation_slot();})
  84. if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
  85. cancel_slot.emplace<cancel_impl>(this);
  86. if (awaited_from)
  87. boost::throw_exception(std::runtime_error("already-awaited"), loc);
  88. awaited_from.reset(h.address());
  89. // currently nothing to read
  90. if constexpr (requires (Promise p) {p.begin_transaction();})
  91. begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
  92. if (chn->write_queue_.empty())
  93. {
  94. chn->read_queue_.push_back(*this);
  95. return std::noop_coroutine();
  96. }
  97. else
  98. {
  99. cancel_slot.clear();
  100. auto & op = chn->write_queue_.front();
  101. op.transactional_unlink();
  102. op.direct = true;
  103. if (op.ref.index() == 0)
  104. direct = std::move(*variant2::get<0>(op.ref));
  105. else
  106. direct = *variant2::get<1>(op.ref);
  107. BOOST_ASSERT(op.awaited_from);
  108. asio::post(chn->executor_, std::move(awaited_from));
  109. return op.awaited_from.release();
  110. }
  111. }
  112. template<typename T>
  113. T channel<T>::read_op::await_resume()
  114. {
  115. return await_resume(as_result_tag{}).value(loc);
  116. }
  117. template<typename T>
  118. std::tuple<system::error_code, T> channel<T>::read_op::await_resume(const struct as_tuple_tag &)
  119. {
  120. auto res = await_resume(as_result_tag{});
  121. if (res.has_error())
  122. return {res.error(), T{}};
  123. else
  124. return {system::error_code{}, std::move(*res)};
  125. }
  126. template<typename T>
  127. system::result<T> channel<T>::read_op::await_resume(const struct as_result_tag &)
  128. {
  129. if (cancel_slot.is_connected())
  130. cancel_slot.clear();
  131. if (cancelled)
  132. return {system::in_place_error, asio::error::operation_aborted};
  133. T value = direct ? std::move(*direct) : std::move(chn->buffer_.front());
  134. if (!direct)
  135. chn->buffer_.pop_front();
  136. if (!chn->write_queue_.empty())
  137. {
  138. auto &op = chn->write_queue_.front();
  139. BOOST_ASSERT(chn->read_queue_.empty());
  140. if (op.await_ready())
  141. {
  142. op.transactional_unlink();
  143. BOOST_ASSERT(op.awaited_from);
  144. asio::post(chn->executor_, std::move(op.awaited_from));
  145. }
  146. }
  147. return {system::in_place_value, value};
  148. }
  149. template<typename T>
  150. struct channel<T>::write_op::cancel_impl
  151. {
  152. write_op * op;
  153. cancel_impl(write_op * op) : op(op) {}
  154. void operator()(asio::cancellation_type)
  155. {
  156. op->cancelled = true;
  157. op->unlink();
  158. if (op->awaited_from)
  159. asio::post(
  160. op->chn->executor_, std::move(op->awaited_from));
  161. op->cancel_slot.clear();
  162. }
  163. };
  164. template<typename T>
  165. template<typename Promise>
  166. std::coroutine_handle<void> channel<T>::write_op::await_suspend(std::coroutine_handle<Promise> h)
  167. {
  168. if constexpr (requires (Promise p) {p.get_cancellation_slot();})
  169. if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
  170. cancel_slot.emplace<cancel_impl>(this);
  171. awaited_from.reset(h.address());
  172. if constexpr (requires (Promise p) {p.begin_transaction();})
  173. begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
  174. // currently nothing to read
  175. if (chn->read_queue_.empty())
  176. {
  177. chn->write_queue_.push_back(*this);
  178. return std::noop_coroutine();
  179. }
  180. else
  181. {
  182. cancel_slot.clear();
  183. auto & op = chn->read_queue_.front();
  184. op.transactional_unlink();
  185. if (ref.index() == 0)
  186. op.direct = std::move(*variant2::get<0>(ref));
  187. else
  188. op.direct = *variant2::get<1>(ref);
  189. BOOST_ASSERT(op.awaited_from);
  190. direct = true;
  191. asio::post(chn->executor_, std::move(awaited_from));
  192. return op.awaited_from.release();
  193. }
  194. }
  195. template<typename T>
  196. std::tuple<system::error_code> channel<T>::write_op::await_resume(const struct as_tuple_tag &)
  197. {
  198. return await_resume(as_result_tag{}).error();
  199. }
  200. template<typename T>
  201. void channel<T>::write_op::await_resume()
  202. {
  203. await_resume(as_result_tag{}).value(loc);
  204. }
  205. template<typename T>
  206. system::result<void> channel<T>::write_op::await_resume(const struct as_result_tag &)
  207. {
  208. if (cancel_slot.is_connected())
  209. cancel_slot.clear();
  210. if (cancelled)
  211. boost::throw_exception(system::system_error(asio::error::operation_aborted), loc);
  212. if (!direct)
  213. {
  214. BOOST_ASSERT(!chn->buffer_.full());
  215. if (ref.index() == 0)
  216. chn->buffer_.push_back(std::move(*variant2::get<0>(ref)));
  217. else
  218. chn->buffer_.push_back(*variant2::get<1>(ref));
  219. }
  220. if (!chn->read_queue_.empty())
  221. {
  222. auto & op = chn->read_queue_.front();
  223. BOOST_ASSERT(chn->write_queue_.empty());
  224. if (op.await_ready())
  225. {
  226. op.transactional_unlink();
  227. BOOST_ASSERT(op.awaited_from);
  228. asio::post(chn->executor_, std::move(op.awaited_from));
  229. }
  230. }
  231. return system::in_place_value;
  232. }
  233. struct channel<void>::read_op::cancel_impl
  234. {
  235. read_op * op;
  236. cancel_impl(read_op * op) : op(op) {}
  237. void operator()(asio::cancellation_type)
  238. {
  239. op->cancelled = true;
  240. op->unlink();
  241. asio::post(op->chn->executor_, std::move(op->awaited_from));
  242. op->cancel_slot.clear();
  243. }
  244. };
  245. struct channel<void>::write_op::cancel_impl
  246. {
  247. write_op * op;
  248. cancel_impl(write_op * op) : op(op) {}
  249. void operator()(asio::cancellation_type)
  250. {
  251. op->cancelled = true;
  252. op->unlink();
  253. asio::post(op->chn->executor_, std::move(op->awaited_from));
  254. op->cancel_slot.clear();
  255. }
  256. };
  257. template<typename Promise>
  258. std::coroutine_handle<void> channel<void>::read_op::await_suspend(std::coroutine_handle<Promise> h)
  259. {
  260. if constexpr (requires (Promise p) {p.get_cancellation_slot();})
  261. if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
  262. cancel_slot.emplace<cancel_impl>(this);
  263. awaited_from.reset(h.address());
  264. if constexpr (requires (Promise p) {p.begin_transaction();})
  265. begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
  266. // nothing to read currently, enqueue
  267. if (chn->write_queue_.empty())
  268. {
  269. chn->read_queue_.push_back(*this);
  270. return std::noop_coroutine();
  271. }
  272. else // we're good, we can read, so we'll do that, but we need to post, so we need to initialize a transactin.
  273. {
  274. cancel_slot.clear();
  275. auto & op = chn->write_queue_.front();
  276. op.unlink();
  277. op.direct = true;
  278. BOOST_ASSERT(op.awaited_from);
  279. direct = true;
  280. asio::post(chn->executor_, std::move(awaited_from));
  281. return op.awaited_from.release();
  282. }
  283. }
  284. template<typename Promise>
  285. std::coroutine_handle<void> channel<void>::write_op::await_suspend(std::coroutine_handle<Promise> h)
  286. {
  287. if constexpr (requires (Promise p) {p.get_cancellation_slot();})
  288. if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
  289. cancel_slot.emplace<cancel_impl>(this);
  290. awaited_from.reset(h.address());
  291. // currently nothing to read
  292. if constexpr (requires (Promise p) {p.begin_transaction();})
  293. begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
  294. if (chn->read_queue_.empty())
  295. {
  296. chn->write_queue_.push_back(*this);
  297. return std::noop_coroutine();
  298. }
  299. else
  300. {
  301. cancel_slot.clear();
  302. auto & op = chn->read_queue_.front();
  303. op.unlink();
  304. op.direct = true;
  305. BOOST_ASSERT(op.awaited_from);
  306. direct = true;
  307. asio::post(chn->executor_, std::move(awaited_from));
  308. return op.awaited_from.release();
  309. }
  310. }
  311. }
  312. #endif //BOOST_COBALT_IMPL_CHANNEL_HPP