runner.hpp 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. /* Copyright (c) 2018-2023 Marcelo Zimbres Silva ([email protected])
  2. *
  3. * Distributed under the Boost Software License, Version 1.0. (See
  4. * accompanying file LICENSE.txt)
  5. */
  6. #ifndef BOOST_REDIS_RUNNER_HPP
  7. #define BOOST_REDIS_RUNNER_HPP
  8. #include <boost/redis/detail/health_checker.hpp>
  9. #include <boost/redis/config.hpp>
  10. #include <boost/redis/response.hpp>
  11. #include <boost/redis/detail/helper.hpp>
  12. #include <boost/redis/error.hpp>
  13. #include <boost/redis/logger.hpp>
  14. #include <boost/redis/operation.hpp>
  15. #include <boost/redis/detail/connector.hpp>
  16. #include <boost/redis/detail/resolver.hpp>
  17. #include <boost/redis/detail/handshaker.hpp>
  18. #include <boost/asio/compose.hpp>
  19. #include <boost/asio/connect.hpp>
  20. #include <boost/asio/coroutine.hpp>
  21. #include <boost/asio/experimental/parallel_group.hpp>
  22. #include <boost/asio/ip/tcp.hpp>
  23. #include <boost/asio/steady_timer.hpp>
  24. #include <string>
  25. #include <memory>
  26. #include <chrono>
  27. namespace boost::redis::detail
  28. {
  29. void push_hello(config const& cfg, request& req);
  30. template <class Runner, class Connection, class Logger>
  31. struct hello_op {
  32. Runner* runner_ = nullptr;
  33. Connection* conn_ = nullptr;
  34. Logger logger_;
  35. asio::coroutine coro_{};
  36. template <class Self>
  37. void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
  38. {
  39. BOOST_ASIO_CORO_REENTER (coro_)
  40. {
  41. runner_->add_hello();
  42. BOOST_ASIO_CORO_YIELD
  43. conn_->async_exec(runner_->hello_req_, runner_->hello_resp_, std::move(self));
  44. logger_.on_hello(ec, runner_->hello_resp_);
  45. if (ec || runner_->has_error_in_response() || is_cancelled(self)) {
  46. logger_.trace("hello-op: error/canceled. Exiting ...");
  47. conn_->cancel(operation::run);
  48. self.complete(!!ec ? ec : asio::error::operation_aborted);
  49. return;
  50. }
  51. self.complete({});
  52. }
  53. }
  54. };
  55. template <class Runner, class Connection, class Logger>
  56. class runner_op {
  57. private:
  58. Runner* runner_ = nullptr;
  59. Connection* conn_ = nullptr;
  60. Logger logger_;
  61. asio::coroutine coro_{};
  62. public:
  63. runner_op(Runner* runner, Connection* conn, Logger l)
  64. : runner_{runner}
  65. , conn_{conn}
  66. , logger_{l}
  67. {}
  68. template <class Self>
  69. void operator()( Self& self
  70. , std::array<std::size_t, 3> order = {}
  71. , system::error_code ec0 = {}
  72. , system::error_code ec1 = {}
  73. , system::error_code ec2 = {}
  74. , std::size_t = 0)
  75. {
  76. BOOST_ASIO_CORO_REENTER (coro_)
  77. {
  78. BOOST_ASIO_CORO_YIELD
  79. asio::experimental::make_parallel_group(
  80. [this](auto token) { return runner_->async_run_all(*conn_, logger_, token); },
  81. [this](auto token) { return runner_->health_checker_.async_check_health(*conn_, logger_, token); },
  82. [this](auto token) { return runner_->async_hello(*conn_, logger_, token); }
  83. ).async_wait(
  84. asio::experimental::wait_for_all(),
  85. std::move(self));
  86. logger_.on_runner(ec0, ec1, ec2);
  87. if (is_cancelled(self)) {
  88. self.complete(asio::error::operation_aborted);
  89. return;
  90. }
  91. if (ec0 == error::connect_timeout || ec0 == error::resolve_timeout) {
  92. self.complete(ec0);
  93. return;
  94. }
  95. if (order[0] == 2 && !!ec2) {
  96. self.complete(ec2);
  97. return;
  98. }
  99. if (order[0] == 1 && ec1 == error::pong_timeout) {
  100. self.complete(ec1);
  101. return;
  102. }
  103. self.complete(ec0);
  104. }
  105. }
  106. };
  107. template <class Runner, class Connection, class Logger>
  108. struct run_all_op {
  109. Runner* runner_ = nullptr;
  110. Connection* conn_ = nullptr;
  111. Logger logger_;
  112. asio::coroutine coro_{};
  113. template <class Self>
  114. void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
  115. {
  116. BOOST_ASIO_CORO_REENTER (coro_)
  117. {
  118. BOOST_ASIO_CORO_YIELD
  119. runner_->resv_.async_resolve(std::move(self));
  120. logger_.on_resolve(ec, runner_->resv_.results());
  121. BOOST_REDIS_CHECK_OP0(conn_->cancel(operation::run);)
  122. BOOST_ASIO_CORO_YIELD
  123. runner_->ctor_.async_connect(conn_->next_layer().next_layer(), runner_->resv_.results(), std::move(self));
  124. logger_.on_connect(ec, runner_->ctor_.endpoint());
  125. BOOST_REDIS_CHECK_OP0(conn_->cancel(operation::run);)
  126. if (conn_->use_ssl()) {
  127. BOOST_ASIO_CORO_YIELD
  128. runner_->hsher_.async_handshake(conn_->next_layer(), std::move(self));
  129. logger_.on_ssl_handshake(ec);
  130. BOOST_REDIS_CHECK_OP0(conn_->cancel(operation::run);)
  131. }
  132. BOOST_ASIO_CORO_YIELD
  133. conn_->async_run_lean(runner_->cfg_, logger_, std::move(self));
  134. BOOST_REDIS_CHECK_OP0(;)
  135. self.complete(ec);
  136. }
  137. }
  138. };
  139. template <class Executor>
  140. class runner {
  141. public:
  142. runner(Executor ex, config cfg)
  143. : resv_{ex}
  144. , ctor_{ex}
  145. , hsher_{ex}
  146. , health_checker_{ex}
  147. , cfg_{cfg}
  148. { }
  149. std::size_t cancel(operation op)
  150. {
  151. resv_.cancel(op);
  152. ctor_.cancel(op);
  153. hsher_.cancel(op);
  154. health_checker_.cancel(op);
  155. return 0U;
  156. }
  157. void set_config(config const& cfg)
  158. {
  159. cfg_ = cfg;
  160. resv_.set_config(cfg);
  161. ctor_.set_config(cfg);
  162. hsher_.set_config(cfg);
  163. health_checker_.set_config(cfg);
  164. }
  165. template <class Connection, class Logger, class CompletionToken>
  166. auto async_run(Connection& conn, Logger l, CompletionToken token)
  167. {
  168. return asio::async_compose
  169. < CompletionToken
  170. , void(system::error_code)
  171. >(runner_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
  172. }
  173. config const& get_config() const noexcept {return cfg_;}
  174. private:
  175. using resolver_type = resolver<Executor>;
  176. using connector_type = connector<Executor>;
  177. using handshaker_type = detail::handshaker<Executor>;
  178. using health_checker_type = health_checker<Executor>;
  179. using timer_type = typename connector_type::timer_type;
  180. template <class, class, class> friend struct run_all_op;
  181. template <class, class, class> friend class runner_op;
  182. template <class, class, class> friend struct hello_op;
  183. template <class Connection, class Logger, class CompletionToken>
  184. auto async_run_all(Connection& conn, Logger l, CompletionToken token)
  185. {
  186. return asio::async_compose
  187. < CompletionToken
  188. , void(system::error_code)
  189. >(run_all_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
  190. }
  191. template <class Connection, class Logger, class CompletionToken>
  192. auto async_hello(Connection& conn, Logger l, CompletionToken token)
  193. {
  194. return asio::async_compose
  195. < CompletionToken
  196. , void(system::error_code)
  197. >(hello_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
  198. }
  199. void add_hello()
  200. {
  201. hello_req_.clear();
  202. if (hello_resp_.has_value())
  203. hello_resp_.value().clear();
  204. push_hello(cfg_, hello_req_);
  205. }
  206. bool has_error_in_response() const noexcept
  207. {
  208. if (!hello_resp_.has_value())
  209. return true;
  210. auto f = [](auto const& e)
  211. {
  212. switch (e.data_type) {
  213. case resp3::type::simple_error:
  214. case resp3::type::blob_error: return true;
  215. default: return false;
  216. }
  217. };
  218. return std::any_of(std::cbegin(hello_resp_.value()), std::cend(hello_resp_.value()), f);
  219. }
  220. resolver_type resv_;
  221. connector_type ctor_;
  222. handshaker_type hsher_;
  223. health_checker_type health_checker_;
  224. request hello_req_;
  225. generic_response hello_resp_;
  226. config cfg_;
  227. };
  228. } // boost::redis::detail
  229. #endif // BOOST_REDIS_RUNNER_HPP