connector.hpp 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. /* Copyright (c) 2018-2023 Marcelo Zimbres Silva ([email protected])
  2. *
  3. * Distributed under the Boost Software License, Version 1.0. (See
  4. * accompanying file LICENSE.txt)
  5. */
  6. #ifndef BOOST_REDIS_CONNECTOR_HPP
  7. #define BOOST_REDIS_CONNECTOR_HPP
  8. #include <boost/redis/detail/helper.hpp>
  9. #include <boost/redis/error.hpp>
  10. #include <boost/asio/compose.hpp>
  11. #include <boost/asio/connect.hpp>
  12. #include <boost/asio/coroutine.hpp>
  13. #include <boost/asio/experimental/parallel_group.hpp>
  14. #include <boost/asio/ip/tcp.hpp>
  15. #include <boost/asio/steady_timer.hpp>
  16. #include <string>
  17. #include <chrono>
  18. namespace boost::redis::detail
  19. {
  20. template <class Connector, class Stream>
  21. struct connect_op {
  22. Connector* ctor_ = nullptr;
  23. Stream* stream = nullptr;
  24. asio::ip::tcp::resolver::results_type const* res_ = nullptr;
  25. asio::coroutine coro{};
  26. template <class Self>
  27. void operator()( Self& self
  28. , std::array<std::size_t, 2> const& order = {}
  29. , system::error_code const& ec1 = {}
  30. , asio::ip::tcp::endpoint const& ep= {}
  31. , system::error_code const& ec2 = {})
  32. {
  33. BOOST_ASIO_CORO_REENTER (coro)
  34. {
  35. ctor_->timer_.expires_after(ctor_->timeout_);
  36. BOOST_ASIO_CORO_YIELD
  37. asio::experimental::make_parallel_group(
  38. [this](auto token)
  39. {
  40. auto f = [](system::error_code const&, auto const&) { return true; };
  41. return asio::async_connect(*stream, *res_, f, token);
  42. },
  43. [this](auto token) { return ctor_->timer_.async_wait(token);}
  44. ).async_wait(
  45. asio::experimental::wait_for_one(),
  46. std::move(self));
  47. if (is_cancelled(self)) {
  48. self.complete(asio::error::operation_aborted);
  49. return;
  50. }
  51. switch (order[0]) {
  52. case 0: {
  53. ctor_->endpoint_ = ep;
  54. self.complete(ec1);
  55. } break;
  56. case 1:
  57. {
  58. if (ec2) {
  59. self.complete(ec2);
  60. } else {
  61. self.complete(error::connect_timeout);
  62. }
  63. } break;
  64. default: BOOST_ASSERT(false);
  65. }
  66. }
  67. }
  68. };
  69. template <class Executor>
  70. class connector {
  71. public:
  72. using timer_type =
  73. asio::basic_waitable_timer<
  74. std::chrono::steady_clock,
  75. asio::wait_traits<std::chrono::steady_clock>,
  76. Executor>;
  77. connector(Executor ex)
  78. : timer_{ex}
  79. {}
  80. void set_config(config const& cfg)
  81. { timeout_ = cfg.connect_timeout; }
  82. template <class Stream, class CompletionToken>
  83. auto
  84. async_connect(
  85. Stream& stream,
  86. asio::ip::tcp::resolver::results_type const& res,
  87. CompletionToken&& token)
  88. {
  89. return asio::async_compose
  90. < CompletionToken
  91. , void(system::error_code)
  92. >(connect_op<connector, Stream>{this, &stream, &res}, token, timer_);
  93. }
  94. std::size_t cancel(operation op)
  95. {
  96. switch (op) {
  97. case operation::connect:
  98. case operation::all:
  99. timer_.cancel();
  100. break;
  101. default: /* ignore */;
  102. }
  103. return 0;
  104. }
  105. auto const& endpoint() const noexcept { return endpoint_;}
  106. private:
  107. template <class, class> friend struct connect_op;
  108. timer_type timer_;
  109. std::chrono::steady_clock::duration timeout_ = std::chrono::seconds{2};
  110. asio::ip::tcp::endpoint endpoint_;
  111. };
  112. } // boost::redis::detail
  113. #endif // BOOST_REDIS_CONNECTOR_HPP