fork.hpp 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. // Copyright (c) 2023 Klemens D. Morgenstern
  2. //
  3. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  4. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  5. #ifndef BOOST_COBALT_DETAIL_FORK_HPP
  6. #define BOOST_COBALT_DETAIL_FORK_HPP
  7. #include <boost/cobalt/config.hpp>
  8. #include <boost/cobalt/detail/await_result_helper.hpp>
  9. #include <boost/cobalt/detail/util.hpp>
  10. #include <boost/cobalt/this_thread.hpp>
  11. #include <boost/cobalt/unique_handle.hpp>
  12. #if defined(BOOST_COBALT_NO_PMR)
  13. #include <boost/cobalt/detail/monotonic_resource.hpp>
  14. #endif
  15. #include <boost/asio/cancellation_signal.hpp>
  16. #include <boost/intrusive_ptr.hpp>
  17. #include <coroutine>
  18. namespace boost::cobalt::detail
  19. {
  20. struct fork
  21. {
  22. fork() = default;
  23. struct shared_state
  24. {
  25. #if !defined(BOOST_COBALT_NO_PMR)
  26. pmr::monotonic_buffer_resource resource;
  27. template<typename ... Args>
  28. shared_state(Args && ... args)
  29. : resource(std::forward<Args>(args)...,
  30. this_thread::get_default_resource())
  31. {
  32. }
  33. #else
  34. detail::monotonic_resource resource;
  35. template<typename ... Args>
  36. shared_state(Args && ... args)
  37. : resource(std::forward<Args>(args)...)
  38. {
  39. }
  40. #endif
  41. // the coro awaiting the fork statement, e.g. awaiting race
  42. unique_handle<void> coro;
  43. std::size_t use_count = 0u;
  44. friend void intrusive_ptr_add_ref(shared_state * st) {st->use_count++;}
  45. friend void intrusive_ptr_release(shared_state * st)
  46. {
  47. if (st->use_count-- == 1u)
  48. st->coro.reset();
  49. }
  50. bool outstanding_work() {return use_count != 0u;}
  51. const executor * exec = nullptr;
  52. bool wired_up() {return exec != nullptr;}
  53. using executor_type = executor;
  54. const executor_type & get_executor() const
  55. {
  56. BOOST_ASSERT(exec != nullptr);
  57. return *exec;
  58. }
  59. #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
  60. boost::source_location loc;
  61. #endif
  62. };
  63. template<typename std::size_t BufferSize>
  64. struct static_shared_state : private std::array<char, BufferSize>, shared_state
  65. {
  66. static_shared_state() : shared_state{std::array<char, BufferSize>::data(),
  67. std::array<char, BufferSize>::size()}
  68. {}
  69. };
  70. struct wired_up_t {};
  71. constexpr static wired_up_t wired_up{};
  72. struct set_transaction_function
  73. {
  74. void * begin_transaction_this = nullptr;
  75. void (*begin_transaction_func)(void*) = nullptr;
  76. template<typename BeginTransaction>
  77. set_transaction_function(BeginTransaction & transaction)
  78. : begin_transaction_this(&transaction)
  79. , begin_transaction_func(
  80. +[](void * ptr)
  81. {
  82. (*static_cast<BeginTransaction*>(ptr))();
  83. })
  84. {
  85. }
  86. };
  87. struct promise_type
  88. {
  89. template<typename State, typename ... Rest>
  90. void * operator new(const std::size_t size, State & st, Rest &&...)
  91. {
  92. return st.resource.allocate(size);
  93. }
  94. template<typename ... Rest>
  95. void operator delete(void * raw, const std::size_t size, Rest && ...) noexcept;
  96. void operator delete(void *, const std::size_t) noexcept {}
  97. template<typename ... Rest>
  98. promise_type(shared_state & st, Rest & ...)
  99. : state(&st)
  100. {
  101. }
  102. intrusive_ptr<shared_state> state;
  103. asio::cancellation_slot cancel;
  104. using executor_type = executor;
  105. const executor_type & get_executor() const { return state->get_executor(); }
  106. #if defined(BOOST_COBALT_NO_PMR)
  107. using allocator_type = detail::monotonic_allocator<void>;
  108. const allocator_type get_allocator() const { return &state->resource; }
  109. #else
  110. using allocator_type = pmr::polymorphic_allocator<void>;
  111. const allocator_type get_allocator() const { return &state->resource; }
  112. #endif
  113. using cancellation_slot_type = asio::cancellation_slot;
  114. cancellation_slot_type get_cancellation_slot() const { return cancel; }
  115. constexpr static std::suspend_never initial_suspend() noexcept {return {};}
  116. struct final_awaitable
  117. {
  118. promise_type * self;
  119. bool await_ready() noexcept
  120. {
  121. return self->state->use_count != 1u;
  122. }
  123. std::coroutine_handle<void> await_suspend(std::coroutine_handle<promise_type> h) noexcept
  124. {
  125. auto pp = h.promise().state.detach();
  126. #if defined(BOOST_COBALT_NO_SELF_DELETE)
  127. h.promise().~promise_type();
  128. #else
  129. // mem is in a monotonic_resource, this is fine on msvc- gcc doesn't like it though
  130. h.destroy();
  131. #endif
  132. pp->use_count--;
  133. BOOST_ASSERT(pp->use_count == 0u);
  134. if (pp->coro)
  135. return pp->coro.release();
  136. else
  137. return std::noop_coroutine();
  138. }
  139. constexpr static void await_resume() noexcept {}
  140. };
  141. final_awaitable final_suspend() noexcept
  142. {
  143. if (cancel.is_connected())
  144. cancel.clear();
  145. return final_awaitable{this};
  146. }
  147. void return_void()
  148. {
  149. }
  150. template<awaitable<promise_type> Aw>
  151. struct wrapped_awaitable
  152. {
  153. Aw & aw;
  154. constexpr static bool await_ready() noexcept
  155. {
  156. return false;
  157. }
  158. auto await_suspend(std::coroutine_handle<promise_type> h)
  159. {
  160. BOOST_ASSERT(h.promise().state->wired_up());
  161. #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
  162. if constexpr (requires {aw.await_suspend(h, boost::source_location ());})
  163. return aw.await_suspend(h, h.promise().state->loc);
  164. #endif
  165. return aw.await_suspend(h);
  166. }
  167. auto await_resume()
  168. {
  169. return aw.await_resume();
  170. }
  171. };
  172. template<awaitable<promise_type> Aw>
  173. auto await_transform(Aw & aw)
  174. {
  175. return wrapped_awaitable<Aw>{aw};
  176. }
  177. struct wired_up_awaitable
  178. {
  179. promise_type * promise;
  180. bool await_ready() const noexcept
  181. {
  182. return promise->state->wired_up();
  183. }
  184. void await_suspend(std::coroutine_handle<promise_type>)
  185. {
  186. }
  187. constexpr static void await_resume() noexcept {}
  188. };
  189. auto await_transform(wired_up_t)
  190. {
  191. return wired_up_awaitable{this};
  192. }
  193. auto await_transform(set_transaction_function sf)
  194. {
  195. begin_transaction_this = sf.begin_transaction_this;
  196. begin_transaction_func = sf.begin_transaction_func;
  197. return std::suspend_never();
  198. }
  199. auto await_transform(asio::cancellation_slot slot)
  200. {
  201. this->cancel = slot;
  202. return std::suspend_never();
  203. }
  204. [[noreturn]] void unhandled_exception() noexcept {std::terminate();}
  205. void * begin_transaction_this = nullptr;
  206. void (*begin_transaction_func)(void*) = nullptr;
  207. void begin_transaction()
  208. {
  209. if (begin_transaction_this)
  210. begin_transaction_func(begin_transaction_this);
  211. }
  212. fork get_return_object()
  213. {
  214. return this;
  215. }
  216. };
  217. [[nodiscard]] bool done() const
  218. {
  219. return ! handle_ || handle_.done();
  220. }
  221. auto release() -> std::coroutine_handle<promise_type>
  222. {
  223. return handle_.release();
  224. }
  225. private:
  226. fork(promise_type * pt) : handle_(pt) {}
  227. unique_handle<promise_type> handle_;
  228. };
  229. }
  230. #endif //BOOST_COBALT_DETAIL_FORK_HPP