health_checker.hpp 7.4 KB


  1. /* Copyright (c) 2018-2023 Marcelo Zimbres Silva ([email protected])
  2. *
  3. * Distributed under the Boost Software License, Version 1.0. (See
  4. * accompanying file LICENSE.txt)
  5. */
  6. #ifndef BOOST_REDIS_HEALTH_CHECKER_HPP
  7. #define BOOST_REDIS_HEALTH_CHECKER_HPP
  8. // Has to included before promise.hpp to build on msvc.
  9. #include <boost/redis/request.hpp>
  10. #include <boost/redis/response.hpp>
  11. #include <boost/redis/operation.hpp>
  12. #include <boost/redis/detail/helper.hpp>
  13. #include <boost/redis/config.hpp>
  14. #include <boost/asio/steady_timer.hpp>
  15. #include <boost/asio/compose.hpp>
  16. #include <boost/asio/consign.hpp>
  17. #include <boost/asio/coroutine.hpp>
  18. #include <boost/asio/post.hpp>
  19. #include <boost/asio/experimental/parallel_group.hpp>
  20. #include <memory>
  21. #include <chrono>
  22. namespace boost::redis::detail {
  23. template <class HealthChecker, class Connection, class Logger>
  24. class ping_op {
  25. public:
  26. HealthChecker* checker_ = nullptr;
  27. Connection* conn_ = nullptr;
  28. Logger logger_;
  29. asio::coroutine coro_{};
  30. template <class Self>
  31. void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
  32. {
  33. BOOST_ASIO_CORO_REENTER (coro_) for (;;)
  34. {
  35. if (checker_->checker_has_exited_) {
  36. logger_.trace("ping_op: checker has exited. Exiting ...");
  37. self.complete({});
  38. return;
  39. }
  40. BOOST_ASIO_CORO_YIELD
  41. conn_->async_exec(checker_->req_, checker_->resp_, std::move(self));
  42. if (ec || is_cancelled(self)) {
  43. logger_.trace("ping_op: error/cancelled (1).");
  44. checker_->wait_timer_.cancel();
  45. self.complete(!!ec ? ec : asio::error::operation_aborted);
  46. return;
  47. }
  48. // Wait before pinging again.
  49. checker_->ping_timer_.expires_after(checker_->ping_interval_);
  50. BOOST_ASIO_CORO_YIELD
  51. checker_->ping_timer_.async_wait(std::move(self));
  52. if (ec || is_cancelled(self)) {
  53. logger_.trace("ping_op: error/cancelled (2).");
  54. self.complete(!!ec ? ec : asio::error::operation_aborted);
  55. return;
  56. }
  57. }
  58. }
  59. };
  60. template <class HealthChecker, class Connection, class Logger>
  61. class check_timeout_op {
  62. public:
  63. HealthChecker* checker_ = nullptr;
  64. Connection* conn_ = nullptr;
  65. Logger logger_;
  66. asio::coroutine coro_{};
  67. template <class Self>
  68. void operator()(Self& self, system::error_code ec = {})
  69. {
  70. BOOST_ASIO_CORO_REENTER (coro_) for (;;)
  71. {
  72. checker_->wait_timer_.expires_after(2 * checker_->ping_interval_);
  73. BOOST_ASIO_CORO_YIELD
  74. checker_->wait_timer_.async_wait(std::move(self));
  75. if (ec || is_cancelled(self)) {
  76. logger_.trace("check-timeout-op: error/canceled. Exiting ...");
  77. self.complete(!!ec ? ec : asio::error::operation_aborted);
  78. return;
  79. }
  80. if (checker_->resp_.has_error()) {
  81. logger_.trace("check-timeout-op: Response error. Exiting ...");
  82. self.complete({});
  83. return;
  84. }
  85. if (checker_->resp_.value().empty()) {
  86. logger_.trace("check-timeout-op: Response has no value. Exiting ...");
  87. checker_->ping_timer_.cancel();
  88. conn_->cancel(operation::run);
  89. checker_->checker_has_exited_ = true;
  90. self.complete(error::pong_timeout);
  91. return;
  92. }
  93. if (checker_->resp_.has_value()) {
  94. checker_->resp_.value().clear();
  95. }
  96. }
  97. }
  98. };
  99. template <class HealthChecker, class Connection, class Logger>
  100. class check_health_op {
  101. public:
  102. HealthChecker* checker_ = nullptr;
  103. Connection* conn_ = nullptr;
  104. Logger logger_;
  105. asio::coroutine coro_{};
  106. template <class Self>
  107. void
  108. operator()(
  109. Self& self,
  110. std::array<std::size_t, 2> order = {},
  111. system::error_code ec1 = {},
  112. system::error_code ec2 = {})
  113. {
  114. BOOST_ASIO_CORO_REENTER (coro_)
  115. {
  116. if (checker_->ping_interval_ == std::chrono::seconds::zero()) {
  117. logger_.trace("check-health-op: timeout disabled.");
  118. BOOST_ASIO_CORO_YIELD
  119. asio::post(std::move(self));
  120. self.complete({});
  121. return;
  122. }
  123. BOOST_ASIO_CORO_YIELD
  124. asio::experimental::make_parallel_group(
  125. [this](auto token) { return checker_->async_ping(*conn_, logger_, token); },
  126. [this](auto token) { return checker_->async_check_timeout(*conn_, logger_, token);}
  127. ).async_wait(
  128. asio::experimental::wait_for_one(),
  129. std::move(self));
  130. logger_.on_check_health(ec1, ec2);
  131. if (is_cancelled(self)) {
  132. logger_.trace("check-health-op: canceled. Exiting ...");
  133. self.complete(asio::error::operation_aborted);
  134. return;
  135. }
  136. switch (order[0]) {
  137. case 0: self.complete(ec1); return;
  138. case 1: self.complete(ec2); return;
  139. default: BOOST_ASSERT(false);
  140. }
  141. }
  142. }
  143. };
  144. template <class Executor>
  145. class health_checker {
  146. private:
  147. using timer_type =
  148. asio::basic_waitable_timer<
  149. std::chrono::steady_clock,
  150. asio::wait_traits<std::chrono::steady_clock>,
  151. Executor>;
  152. public:
  153. health_checker(Executor ex)
  154. : ping_timer_{ex}
  155. , wait_timer_{ex}
  156. {
  157. req_.push("PING", "Boost.Redis");
  158. }
  159. void set_config(config const& cfg)
  160. {
  161. req_.clear();
  162. req_.push("PING", cfg.health_check_id);
  163. ping_interval_ = cfg.health_check_interval;
  164. }
  165. template <
  166. class Connection,
  167. class Logger,
  168. class CompletionToken = asio::default_completion_token_t<Executor>
  169. >
  170. auto
  171. async_check_health(
  172. Connection& conn,
  173. Logger l,
  174. CompletionToken token = CompletionToken{})
  175. {
  176. checker_has_exited_ = false;
  177. return asio::async_compose
  178. < CompletionToken
  179. , void(system::error_code)
  180. >(check_health_op<health_checker, Connection, Logger>{this, &conn, l}, token, conn);
  181. }
  182. std::size_t cancel(operation op)
  183. {
  184. switch (op) {
  185. case operation::health_check:
  186. case operation::all:
  187. ping_timer_.cancel();
  188. wait_timer_.cancel();
  189. break;
  190. default: /* ignore */;
  191. }
  192. return 0;
  193. }
  194. private:
  195. template <class Connection, class Logger, class CompletionToken>
  196. auto async_ping(Connection& conn, Logger l, CompletionToken token)
  197. {
  198. return asio::async_compose
  199. < CompletionToken
  200. , void(system::error_code)
  201. >(ping_op<health_checker, Connection, Logger>{this, &conn, l}, token, conn, ping_timer_);
  202. }
  203. template <class Connection, class Logger, class CompletionToken>
  204. auto async_check_timeout(Connection& conn, Logger l, CompletionToken token)
  205. {
  206. return asio::async_compose
  207. < CompletionToken
  208. , void(system::error_code)
  209. >(check_timeout_op<health_checker, Connection, Logger>{this, &conn, l}, token, conn, wait_timer_);
  210. }
  211. template <class, class, class> friend class ping_op;
  212. template <class, class, class> friend class check_timeout_op;
  213. template <class, class, class> friend class check_health_op;
  214. timer_type ping_timer_;
  215. timer_type wait_timer_;
  216. redis::request req_;
  217. redis::generic_response resp_;
  218. std::chrono::steady_clock::duration ping_interval_ = std::chrono::seconds{5};
  219. bool checker_has_exited_ = false;
  220. };
  221. } // boost::redis::detail
  222. #endif // BOOST_REDIS_HEALTH_CHECKER_HPP