connection.hpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. /* Copyright (c) 2018-2023 Marcelo Zimbres Silva ([email protected])
  2. *
  3. * Distributed under the Boost Software License, Version 1.0. (See
  4. * accompanying file LICENSE.txt)
  5. */
  6. #ifndef BOOST_REDIS_CONNECTION_HPP
  7. #define BOOST_REDIS_CONNECTION_HPP
  8. #include <boost/redis/detail/connection_base.hpp>
  9. #include <boost/redis/logger.hpp>
  10. #include <boost/redis/config.hpp>
  11. #include <boost/asio/io_context.hpp>
  12. #include <boost/asio/coroutine.hpp>
  13. #include <boost/asio/steady_timer.hpp>
  14. #include <boost/asio/any_io_executor.hpp>
  15. #include <boost/asio/any_completion_handler.hpp>
  16. #include <chrono>
  17. #include <memory>
  18. #include <limits>
  19. namespace boost::redis {
  20. namespace detail
  21. {
  22. template <class Connection, class Logger>
  23. struct reconnection_op {
  24. Connection* conn_ = nullptr;
  25. Logger logger_;
  26. asio::coroutine coro_{};
  27. template <class Self>
  28. void operator()(Self& self, system::error_code ec = {})
  29. {
  30. BOOST_ASIO_CORO_REENTER (coro_) for (;;)
  31. {
  32. BOOST_ASIO_CORO_YIELD
  33. conn_->impl_.async_run(conn_->cfg_, logger_, std::move(self));
  34. conn_->cancel(operation::receive);
  35. logger_.on_connection_lost(ec);
  36. if (!conn_->will_reconnect() || is_cancelled(self)) {
  37. conn_->cancel(operation::reconnection);
  38. self.complete(!!ec ? ec : asio::error::operation_aborted);
  39. return;
  40. }
  41. conn_->timer_.expires_after(conn_->cfg_.reconnect_wait_interval);
  42. BOOST_ASIO_CORO_YIELD
  43. conn_->timer_.async_wait(std::move(self));
  44. BOOST_REDIS_CHECK_OP0(;)
  45. if (!conn_->will_reconnect()) {
  46. self.complete(asio::error::operation_aborted);
  47. return;
  48. }
  49. conn_->reset_stream();
  50. }
  51. }
  52. };
  53. } // detail
  54. /** @brief A SSL connection to the Redis server.
  55. * @ingroup high-level-api
  56. *
  57. * This class keeps a healthy connection to the Redis instance where
  58. * commands can be sent at any time. For more details, please see the
  59. * documentation of each individual function.
  60. *
  61. * @tparam Socket The socket type e.g. asio::ip::tcp::socket.
  62. *
  63. */
  64. template <class Executor>
  65. class basic_connection {
  66. public:
  67. /// Executor type.
  68. using executor_type = Executor;
  69. /// Returns the underlying executor.
  70. executor_type get_executor() noexcept
  71. { return impl_.get_executor(); }
  72. /// Rebinds the socket type to another executor.
  73. template <class Executor1>
  74. struct rebind_executor
  75. {
  76. /// The connection type when rebound to the specified executor.
  77. using other = basic_connection<Executor1>;
  78. };
  79. /** @brief Constructor
  80. *
  81. * @param ex Executor on which connection operation will run.
  82. * @param ctx SSL context.
  83. * @param max_read_size Maximum read size that is passed to
  84. * the internal `asio::dynamic_buffer` constructor.
  85. */
  86. explicit
  87. basic_connection(
  88. executor_type ex,
  89. asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
  90. std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)())
  91. : impl_{ex, std::move(ctx), max_read_size}
  92. , timer_{ex}
  93. { }
  94. /// Contructs from a context.
  95. explicit
  96. basic_connection(
  97. asio::io_context& ioc,
  98. asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
  99. std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)())
  100. : basic_connection(ioc.get_executor(), std::move(ctx), max_read_size)
  101. { }
  102. /** @brief Starts underlying connection operations.
  103. *
  104. * This member function provides the following functionality
  105. *
  106. * 1. Resolve the address passed on `boost::redis::config::addr`.
  107. * 2. Connect to one of the results obtained in the resolve operation.
  108. * 3. Send a [HELLO](https://redis.io/commands/hello/) command where each of its parameters are read from `cfg`.
  109. * 4. Start a health-check operation where ping commands are sent
  110. * at intervals specified in
  111. * `boost::redis::config::health_check_interval`. The message passed to
  112. * `PING` will be `boost::redis::config::health_check_id`. Passing a
  113. * timeout with value zero will disable health-checks. If the Redis
  114. * server does not respond to a health-check within two times the value
  115. * specified here, it will be considered unresponsive and the connection
  116. * will be closed and a new connection will be stablished.
  117. * 5. Starts read and write operations with the Redis
  118. * server. More specifically it will trigger the write of all
  119. * requests i.e. calls to `async_exec` that happened prior to this
  120. * call.
  121. *
  122. * When a connection is lost for any reason, a new one is
  123. * stablished automatically. To disable reconnection call
  124. * `boost::redis::connection::cancel(operation::reconnection)`.
  125. *
  126. * @param cfg Configuration paramters.
  127. * @param l Logger object. The interface expected is specified in the class `boost::redis::logger`.
  128. * @param token Completion token.
  129. *
  130. * The completion token must have the following signature
  131. *
  132. * @code
  133. * void f(system::error_code);
  134. * @endcode
  135. *
  136. * For example on how to call this function refer to
  137. * cpp20_intro.cpp or any other example.
  138. */
  139. template <
  140. class Logger = logger,
  141. class CompletionToken = asio::default_completion_token_t<executor_type>>
  142. auto
  143. async_run(
  144. config const& cfg = {},
  145. Logger l = Logger{},
  146. CompletionToken token = CompletionToken{})
  147. {
  148. using this_type = basic_connection<executor_type>;
  149. cfg_ = cfg;
  150. l.set_prefix(cfg_.log_prefix);
  151. return asio::async_compose
  152. < CompletionToken
  153. , void(system::error_code)
  154. >(detail::reconnection_op<this_type, Logger>{this, l}, token, timer_);
  155. }
  156. /** @brief Receives server side pushes asynchronously.
  157. *
  158. * When pushes arrive and there is no `async_receive` operation in
  159. * progress, pushed data, requests, and responses will be paused
  160. * until `async_receive` is called again. Apps will usually want
  161. * to call `async_receive` in a loop.
  162. *
  163. * To cancel an ongoing receive operation apps should call
  164. * `connection::cancel(operation::receive)`.
  165. *
  166. * @param token Completion token.
  167. *
  168. * For an example see cpp20_subscriber.cpp. The completion token must
  169. * have the following signature
  170. *
  171. * @code
  172. * void f(system::error_code, std::size_t);
  173. * @endcode
  174. *
  175. * Where the second parameter is the size of the push received in
  176. * bytes.
  177. */
  178. template <class CompletionToken = asio::default_completion_token_t<executor_type>>
  179. auto async_receive(CompletionToken token = CompletionToken{})
  180. { return impl_.async_receive(std::move(token)); }
  181. /** @brief Receives server pushes synchronously without blocking.
  182. *
  183. * Receives a server push synchronously by calling `try_receive` on
  184. * the underlying channel. If the operation fails because
  185. * `try_receive` returns `false`, `ec` will be set to
  186. * `boost::redis::error::sync_receive_push_failed`.
  187. *
  188. * @param ec Contains the error if any occurred.
  189. *
  190. * @returns The number of bytes read from the socket.
  191. */
  192. std::size_t receive(system::error_code& ec)
  193. {
  194. return impl_.receive(ec);
  195. }
  196. template <
  197. class Response = ignore_t,
  198. class CompletionToken = asio::default_completion_token_t<executor_type>
  199. >
  200. [[deprecated("Set the response with set_receive_response and use the other overload.")]]
  201. auto
  202. async_receive(
  203. Response& response,
  204. CompletionToken token = CompletionToken{})
  205. {
  206. return impl_.async_receive(response, token);
  207. }
  208. /** @brief Executes commands on the Redis server asynchronously.
  209. *
  210. * This function sends a request to the Redis server and waits for
  211. * the responses to each individual command in the request. If the
  212. * request contains only commands that don't expect a response,
  213. * the completion occurs after it has been written to the
  214. * underlying stream. Multiple concurrent calls to this function
  215. * will be automatically queued by the implementation.
  216. *
  217. * @param req Request.
  218. * @param resp Response.
  219. * @param token Completion token.
  220. *
  221. * For an example see cpp20_echo_server.cpp. The completion token must
  222. * have the following signature
  223. *
  224. * @code
  225. * void f(system::error_code, std::size_t);
  226. * @endcode
  227. *
  228. * Where the second parameter is the size of the response received
  229. * in bytes.
  230. */
  231. template <
  232. class Response = ignore_t,
  233. class CompletionToken = asio::default_completion_token_t<executor_type>
  234. >
  235. auto
  236. async_exec(
  237. request const& req,
  238. Response& resp = ignore,
  239. CompletionToken&& token = CompletionToken{})
  240. {
  241. return impl_.async_exec(req, resp, std::forward<CompletionToken>(token));
  242. }
  243. /** @brief Cancel operations.
  244. *
  245. * @li `operation::exec`: Cancels operations started with
  246. * `async_exec`. Affects only requests that haven't been written
  247. * yet.
  248. * @li operation::run: Cancels the `async_run` operation.
  249. * @li operation::receive: Cancels any ongoing calls to `async_receive`.
  250. * @li operation::all: Cancels all operations listed above.
  251. *
  252. * @param op: The operation to be cancelled.
  253. */
  254. void cancel(operation op = operation::all)
  255. {
  256. switch (op) {
  257. case operation::reconnection:
  258. case operation::all:
  259. cfg_.reconnect_wait_interval = std::chrono::seconds::zero();
  260. timer_.cancel();
  261. break;
  262. default: /* ignore */;
  263. }
  264. impl_.cancel(op);
  265. }
  266. /// Returns true if the connection was canceled.
  267. bool will_reconnect() const noexcept
  268. { return cfg_.reconnect_wait_interval != std::chrono::seconds::zero();}
  269. /// Returns the ssl context.
  270. auto const& get_ssl_context() const noexcept
  271. { return impl_.get_ssl_context();}
  272. /// Resets the underlying stream.
  273. void reset_stream()
  274. { impl_.reset_stream(); }
  275. /// Returns a reference to the next layer.
  276. auto& next_layer() noexcept
  277. { return impl_.next_layer(); }
  278. /// Returns a const reference to the next layer.
  279. auto const& next_layer() const noexcept
  280. { return impl_.next_layer(); }
  281. /// Sets the response object of `async_receive` operations.
  282. template <class Response>
  283. void set_receive_response(Response& response)
  284. { impl_.set_receive_response(response); }
  285. /// Returns connection usage information.
  286. usage get_usage() const noexcept
  287. { return impl_.get_usage(); }
  288. private:
  289. using timer_type =
  290. asio::basic_waitable_timer<
  291. std::chrono::steady_clock,
  292. asio::wait_traits<std::chrono::steady_clock>,
  293. Executor>;
  294. template <class, class> friend struct detail::reconnection_op;
  295. config cfg_;
  296. detail::connection_base<executor_type> impl_;
  297. timer_type timer_;
  298. };
  299. /** \brief A basic_connection that type erases the executor.
  300. * \ingroup high-level-api
  301. *
  302. * This connection type uses the asio::any_io_executor and
  303. * asio::any_completion_token to reduce compilation times.
  304. *
  305. * For documentaiton of each member function see
  306. * `boost::redis::basic_connection`.
  307. */
  308. class connection {
  309. public:
  310. /// Executor type.
  311. using executor_type = asio::any_io_executor;
  312. /// Contructs from an executor.
  313. explicit
  314. connection(
  315. executor_type ex,
  316. asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
  317. std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)());
  318. /// Contructs from a context.
  319. explicit
  320. connection(
  321. asio::io_context& ioc,
  322. asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
  323. std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)());
  324. /// Returns the underlying executor.
  325. executor_type get_executor() noexcept
  326. { return impl_.get_executor(); }
  327. /// Calls `boost::redis::basic_connection::async_run`.
  328. template <class CompletionToken>
  329. auto async_run(config const& cfg, logger l, CompletionToken token)
  330. {
  331. return asio::async_initiate<
  332. CompletionToken, void(boost::system::error_code)>(
  333. [](auto handler, connection* self, config const* cfg, logger l)
  334. {
  335. self->async_run_impl(*cfg, l, std::move(handler));
  336. }, token, this, &cfg, l);
  337. }
  338. /// Calls `boost::redis::basic_connection::async_receive`.
  339. template <class Response, class CompletionToken>
  340. [[deprecated("Set the response with set_receive_response and use the other overload.")]]
  341. auto async_receive(Response& response, CompletionToken token)
  342. {
  343. return impl_.async_receive(response, std::move(token));
  344. }
  345. /// Calls `boost::redis::basic_connection::async_receive`.
  346. template <class CompletionToken>
  347. auto async_receive(CompletionToken token)
  348. { return impl_.async_receive(std::move(token)); }
  349. /// Calls `boost::redis::basic_connection::receive`.
  350. std::size_t receive(system::error_code& ec)
  351. {
  352. return impl_.receive(ec);
  353. }
  354. /// Calls `boost::redis::basic_connection::async_exec`.
  355. template <class Response, class CompletionToken>
  356. auto async_exec(request const& req, Response& resp, CompletionToken token)
  357. {
  358. return impl_.async_exec(req, resp, std::move(token));
  359. }
  360. /// Calls `boost::redis::basic_connection::cancel`.
  361. void cancel(operation op = operation::all);
  362. /// Calls `boost::redis::basic_connection::will_reconnect`.
  363. bool will_reconnect() const noexcept
  364. { return impl_.will_reconnect();}
  365. /// Calls `boost::redis::basic_connection::next_layer`.
  366. auto& next_layer() noexcept
  367. { return impl_.next_layer(); }
  368. /// Calls `boost::redis::basic_connection::next_layer`.
  369. auto const& next_layer() const noexcept
  370. { return impl_.next_layer(); }
  371. /// Calls `boost::redis::basic_connection::reset_stream`.
  372. void reset_stream()
  373. { impl_.reset_stream();}
  374. /// Sets the response object of `async_receive` operations.
  375. template <class Response>
  376. void set_receive_response(Response& response)
  377. { impl_.set_receive_response(response); }
  378. /// Returns connection usage information.
  379. usage get_usage() const noexcept
  380. { return impl_.get_usage(); }
  381. /// Returns the ssl context.
  382. auto const& get_ssl_context() const noexcept
  383. { return impl_.get_ssl_context();}
  384. private:
  385. void
  386. async_run_impl(
  387. config const& cfg,
  388. logger l,
  389. asio::any_completion_handler<void(boost::system::error_code)> token);
  390. basic_connection<executor_type> impl_;
  391. };
  392. } // boost::redis
  393. #endif // BOOST_REDIS_CONNECTION_HPP