/* Copyright (c) 2018-2023 Marcelo Zimbres Silva (mzimbres@gmail.com) * * Distributed under the Boost Software License, Version 1.0. (See * accompanying file LICENSE.txt) */ #ifndef BOOST_REDIS_CONNECTION_BASE_HPP #define BOOST_REDIS_CONNECTION_BASE_HPP #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace boost::redis::detail { template std::string_view buffer_view(DynamicBuffer buf) noexcept { char const* start = static_cast(buf.data(0, buf.size()).data()); return std::string_view{start, std::size(buf)}; } template class append_some_op { private: AsyncReadStream& stream_; DynamicBuffer buf_; std::size_t size_ = 0; std::size_t tmp_ = 0; asio::coroutine coro_{}; public: append_some_op(AsyncReadStream& stream, DynamicBuffer buf, std::size_t size) : stream_ {stream} , buf_ {std::move(buf)} , size_{size} { } template void operator()( Self& self , system::error_code ec = {} , std::size_t n = 0) { BOOST_ASIO_CORO_REENTER (coro_) { tmp_ = buf_.size(); buf_.grow(size_); BOOST_ASIO_CORO_YIELD stream_.async_read_some(buf_.data(tmp_, size_), std::move(self)); if (ec) { self.complete(ec, 0); return; } buf_.shrink(buf_.size() - tmp_ - n); self.complete({}, n); } } }; template auto async_append_some( AsyncReadStream& stream, DynamicBuffer buffer, std::size_t size, CompletionToken&& token) { return asio::async_compose < CompletionToken , void(system::error_code, std::size_t) >(append_some_op {stream, buffer, size}, token, stream); } template struct exec_op { using req_info_type = typename Conn::req_info; using adapter_type = typename Conn::adapter_type; Conn* conn_ = nullptr; std::shared_ptr info_ = nullptr; asio::coroutine coro{}; template void operator()(Self& self , system::error_code ec = {}, std::size_t = 0) { BOOST_ASIO_CORO_REENTER (coro) { // Check whether the user wants to wait for the connection to // be stablished. if (info_->req_->get_config().cancel_if_not_connected && !conn_->is_open()) { BOOST_ASIO_CORO_YIELD asio::post(std::move(self)); return self.complete(error::not_connected, 0); } conn_->add_request_info(info_); EXEC_OP_WAIT: BOOST_ASIO_CORO_YIELD info_->async_wait(std::move(self)); if (info_->ec_) { self.complete(info_->ec_, 0); return; } if (info_->stop_requested()) { // Don't have to call remove_request as it has already // been by cancel(exec). return self.complete(asio::error::operation_aborted, 0); } if (is_cancelled(self)) { if (!info_->is_waiting()) { using c_t = asio::cancellation_type; auto const c = self.get_cancellation_state().cancelled(); if ((c & c_t::terminal) != c_t::none) { // Cancellation requires closing the connection // otherwise it stays in inconsistent state. conn_->cancel(operation::run); return self.complete(asio::error::operation_aborted, 0); } else { // Can't implement other cancelation types, ignoring. self.get_cancellation_state().clear(); // TODO: Find out a better way to ignore // cancelation. goto EXEC_OP_WAIT; } } else { // Cancelation can be honored. conn_->remove_request(info_); self.complete(asio::error::operation_aborted, 0); return; } } self.complete(info_->ec_, info_->read_size_); } } }; template struct run_op { Conn* conn = nullptr; Logger logger_; asio::coroutine coro{}; template void operator()( Self& self , std::array order = {} , system::error_code ec0 = {} , system::error_code ec1 = {}) { BOOST_ASIO_CORO_REENTER (coro) { conn->reset(); BOOST_ASIO_CORO_YIELD asio::experimental::make_parallel_group( [this](auto token) { return conn->reader(logger_, token);}, [this](auto token) { return conn->writer(logger_, token);} ).async_wait( asio::experimental::wait_for_one(), std::move(self)); if (is_cancelled(self)) { logger_.trace("run-op: canceled. Exiting ..."); self.complete(asio::error::operation_aborted); return; } logger_.on_run(ec0, ec1); switch (order[0]) { case 0: self.complete(ec0); break; case 1: self.complete(ec1); break; default: BOOST_ASSERT(false); } } } }; template struct writer_op { Conn* conn_; Logger logger_; asio::coroutine coro{}; template void operator()( Self& self , system::error_code ec = {} , std::size_t n = 0) { ignore_unused(n); BOOST_ASIO_CORO_REENTER (coro) for (;;) { while (conn_->coalesce_requests()) { if (conn_->use_ssl()) BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer(), asio::buffer(conn_->write_buffer_), std::move(self)); else BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer().next_layer(), asio::buffer(conn_->write_buffer_), std::move(self)); logger_.on_write(ec, conn_->write_buffer_); if (ec) { logger_.trace("writer-op: error. Exiting ..."); conn_->cancel(operation::run); self.complete(ec); return; } if (is_cancelled(self)) { logger_.trace("writer-op: canceled. Exiting ..."); self.complete(asio::error::operation_aborted); return; } conn_->on_write(); // A socket.close() may have been called while a // successful write might had already been queued, so we // have to check here before proceeding. if (!conn_->is_open()) { logger_.trace("writer-op: canceled (2). Exiting ..."); self.complete({}); return; } } BOOST_ASIO_CORO_YIELD conn_->writer_timer_.async_wait(std::move(self)); if (!conn_->is_open() || is_cancelled(self)) { logger_.trace("writer-op: canceled (3). Exiting ..."); // Notice this is not an error of the op, stoping was // requested from the outside, so we complete with // success. self.complete({}); return; } } } }; template struct reader_op { using parse_result = typename Conn::parse_result; using parse_ret_type = typename Conn::parse_ret_type; Conn* conn_; Logger logger_; parse_ret_type res_{parse_result::resp, 0}; asio::coroutine coro{}; template void operator()( Self& self , system::error_code ec = {} , std::size_t n = 0) { ignore_unused(n); BOOST_ASIO_CORO_REENTER (coro) for (;;) { // Appends some data to the buffer if necessary. if ((res_.first == parse_result::needs_more) || std::empty(conn_->read_buffer_)) { if (conn_->use_ssl()) { BOOST_ASIO_CORO_YIELD async_append_some( conn_->next_layer(), conn_->dbuf_, conn_->get_suggested_buffer_growth(), std::move(self)); } else { BOOST_ASIO_CORO_YIELD async_append_some( conn_->next_layer().next_layer(), conn_->dbuf_, conn_->get_suggested_buffer_growth(), std::move(self)); } logger_.on_read(ec, n); // EOF is not treated as error. if (ec == asio::error::eof) { logger_.trace("reader-op: EOF received. Exiting ..."); conn_->cancel(operation::run); return self.complete({}); // EOFINAE: EOF is not an error. } // The connection is not viable after an error. if (ec) { logger_.trace("reader-op: error. Exiting ..."); conn_->cancel(operation::run); self.complete(ec); return; } // Somebody might have canceled implicitly or explicitly // while we were suspended and after queueing so we have to // check. if (!conn_->is_open() || is_cancelled(self)) { logger_.trace("reader-op: canceled. Exiting ..."); self.complete(ec); return; } } res_ = conn_->on_read(buffer_view(conn_->dbuf_), ec); if (ec) { logger_.trace("reader-op: parse error. Exiting ..."); conn_->cancel(operation::run); self.complete(ec); return; } if (res_.first == parse_result::push) { if (!conn_->receive_channel_.try_send(ec, res_.second)) { BOOST_ASIO_CORO_YIELD conn_->receive_channel_.async_send(ec, res_.second, std::move(self)); } if (ec) { logger_.trace("reader-op: error. Exiting ..."); conn_->cancel(operation::run); self.complete(ec); return; } if (!conn_->is_open() || is_cancelled(self)) { logger_.trace("reader-op: canceled (2). Exiting ..."); self.complete(asio::error::operation_aborted); return; } } } } }; /** @brief Base class for high level Redis asynchronous connections. * @ingroup high-level-api * * @tparam Executor The executor type. * */ template class connection_base { public: /// Executor type using executor_type = Executor; /// Type of the next layer using next_layer_type = asio::ssl::stream>; using clock_type = std::chrono::steady_clock; using clock_traits_type = asio::wait_traits; using timer_type = asio::basic_waitable_timer; using this_type = connection_base; /// Constructs from an executor. connection_base( executor_type ex, asio::ssl::context ctx, std::size_t max_read_size) : ctx_{std::move(ctx)} , stream_{std::make_unique(ex, ctx_)} , writer_timer_{ex} , receive_channel_{ex, 256} , runner_{ex, {}} , dbuf_{read_buffer_, max_read_size} { set_receive_response(ignore); writer_timer_.expires_at((std::chrono::steady_clock::time_point::max)()); } /// Returns the ssl context. auto const& get_ssl_context() const noexcept { return ctx_;} /// Resets the underlying stream. void reset_stream() { stream_ = std::make_unique(writer_timer_.get_executor(), ctx_); } /// Returns a reference to the next layer. auto& next_layer() noexcept { return *stream_; } /// Returns a const reference to the next layer. auto const& next_layer() const noexcept { return *stream_; } /// Returns the associated executor. auto get_executor() {return writer_timer_.get_executor();} /// Cancels specific operations. void cancel(operation op) { runner_.cancel(op); if (op == operation::all) { cancel_impl(operation::run); cancel_impl(operation::receive); cancel_impl(operation::exec); return; } cancel_impl(op); } template auto async_exec(request const& req, Response& resp, CompletionToken token) { using namespace boost::redis::adapter; auto f = boost_redis_adapt(resp); BOOST_ASSERT_MSG(req.get_expected_responses() <= f.get_supported_response_size(), "Request and response have incompatible sizes."); auto info = std::make_shared(req, f, get_executor()); return asio::async_compose < CompletionToken , void(system::error_code, std::size_t) >(exec_op{this, info}, token, writer_timer_); } template [[deprecated("Set the response with set_receive_response and use the other overload.")]] auto async_receive(Response& response, CompletionToken token) { set_receive_response(response); return receive_channel_.async_receive(std::move(token)); } template auto async_receive(CompletionToken token) { return receive_channel_.async_receive(std::move(token)); } std::size_t receive(system::error_code& ec) { std::size_t size = 0; auto f = [&](system::error_code const& ec2, std::size_t n) { ec = ec2; size = n; }; auto const res = receive_channel_.try_receive(f); if (ec) return 0; if (!res) ec = error::sync_receive_push_failed; return size; } template auto async_run(config const& cfg, Logger l, CompletionToken token) { runner_.set_config(cfg); l.set_prefix(runner_.get_config().log_prefix); return runner_.async_run(*this, l, std::move(token)); } template void set_receive_response(Response& response) { using namespace boost::redis::adapter; auto g = boost_redis_adapt(response); receive_adapter_ = adapter::detail::make_adapter_wrapper(g); } usage get_usage() const noexcept { return usage_; } auto run_is_canceled() const noexcept { return cancel_run_called_; } private: using receive_channel_type = asio::experimental::channel; using runner_type = runner; using adapter_type = std::function const&, system::error_code&)>; using receiver_adapter_type = std::function const&, system::error_code&)>; using exec_notifier_type = receive_channel_type; auto use_ssl() const noexcept { return runner_.get_config().use_ssl;} auto cancel_on_conn_lost() -> std::size_t { // Must return false if the request should be removed. auto cond = [](auto const& ptr) { BOOST_ASSERT(ptr != nullptr); if (ptr->is_waiting()) { return !ptr->req_->get_config().cancel_on_connection_lost; } else { return !ptr->req_->get_config().cancel_if_unresponded; } }; auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), cond); auto const ret = std::distance(point, std::end(reqs_)); std::for_each(point, std::end(reqs_), [](auto const& ptr) { ptr->stop(); }); reqs_.erase(point, std::end(reqs_)); std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) { return ptr->mark_waiting(); }); return ret; } auto cancel_unwritten_requests() -> std::size_t { auto f = [](auto const& ptr) { BOOST_ASSERT(ptr != nullptr); return !ptr->is_waiting(); }; auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), f); auto const ret = std::distance(point, std::end(reqs_)); std::for_each(point, std::end(reqs_), [](auto const& ptr) { ptr->stop(); }); reqs_.erase(point, std::end(reqs_)); return ret; } void cancel_impl(operation op) { switch (op) { case operation::exec: { cancel_unwritten_requests(); } break; case operation::run: { // Protects the code below from being called more than // once, see https://github.com/boostorg/redis/issues/181 if (std::exchange(cancel_run_called_, true)) { return; } close(); writer_timer_.cancel(); receive_channel_.cancel(); cancel_on_conn_lost(); } break; case operation::receive: { receive_channel_.cancel(); } break; default: /* ignore */; } } void on_write() { // We have to clear the payload right after writing it to use it // as a flag that informs there is no ongoing write. write_buffer_.clear(); // Notice this must come before the for-each below. cancel_push_requests(); // There is small optimization possible here: traverse only the // partition of unwritten requests instead of them all. std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) { BOOST_ASSERT_MSG(ptr != nullptr, "Expects non-null pointer."); if (ptr->is_staged()) { ptr->mark_written(); } }); } struct req_info { public: using node_type = resp3::basic_node; using wrapped_adapter_type = std::function; explicit req_info(request const& req, adapter_type adapter, executor_type ex) : notifier_{ex, 1} , req_{&req} , adapter_{} , expected_responses_{req.get_expected_responses()} , status_{status::waiting} , ec_{{}} , read_size_{0} { adapter_ = [this, adapter](node_type const& nd, system::error_code& ec) { auto const i = req_->get_expected_responses() - expected_responses_; adapter(i, nd, ec); }; } auto proceed() { notifier_.try_send(std::error_code{}, 0); } void stop() { notifier_.close(); } [[nodiscard]] auto is_waiting() const noexcept { return status_ == status::waiting; } [[nodiscard]] auto is_written() const noexcept { return status_ == status::written; } [[nodiscard]] auto is_staged() const noexcept { return status_ == status::staged; } void mark_written() noexcept { status_ = status::written; } void mark_staged() noexcept { status_ = status::staged; } void mark_waiting() noexcept { status_ = status::waiting; } [[nodiscard]] auto stop_requested() const noexcept { return !notifier_.is_open();} template auto async_wait(CompletionToken token) { return notifier_.async_receive(std::move(token)); } //private: enum class status { waiting , staged , written }; exec_notifier_type notifier_; request const* req_; wrapped_adapter_type adapter_; // Contains the number of commands that haven't been read yet. std::size_t expected_responses_; status status_; system::error_code ec_; std::size_t read_size_; }; void remove_request(std::shared_ptr const& info) { reqs_.erase(std::remove(std::begin(reqs_), std::end(reqs_), info)); } using reqs_type = std::deque>; template friend struct reader_op; template friend struct writer_op; template friend struct run_op; template friend struct exec_op; template friend struct run_all_op; void cancel_push_requests() { auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) { return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0); }); std::for_each(point, std::end(reqs_), [](auto const& ptr) { ptr->proceed(); }); reqs_.erase(point, std::end(reqs_)); } [[nodiscard]] bool is_writing() const noexcept { return !write_buffer_.empty(); } void add_request_info(std::shared_ptr const& info) { reqs_.push_back(info); if (info->req_->has_hello_priority()) { auto rend = std::partition_point(std::rbegin(reqs_), std::rend(reqs_), [](auto const& e) { return e->is_waiting(); }); std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend); } if (is_open() && !is_writing()) writer_timer_.cancel(); } template auto reader(Logger l, CompletionToken&& token) { return asio::async_compose < CompletionToken , void(system::error_code) >(reader_op{this, l}, token, writer_timer_); } template auto writer(Logger l, CompletionToken&& token) { return asio::async_compose < CompletionToken , void(system::error_code) >(writer_op{this, l}, token, writer_timer_); } template auto async_run_lean(config const& cfg, Logger l, CompletionToken token) { runner_.set_config(cfg); l.set_prefix(runner_.get_config().log_prefix); return asio::async_compose < CompletionToken , void(system::error_code) >(run_op{this, l}, token, writer_timer_); } [[nodiscard]] bool coalesce_requests() { // Coalesces the requests and marks them staged. After a // successful write staged requests will be marked as written. auto const point = std::partition_point(std::cbegin(reqs_), std::cend(reqs_), [](auto const& ri) { return !ri->is_waiting(); }); std::for_each(point, std::cend(reqs_), [this](auto const& ri) { // Stage the request. write_buffer_ += ri->req_->payload(); ri->mark_staged(); usage_.commands_sent += ri->expected_responses_; }); usage_.bytes_sent += std::size(write_buffer_); return point != std::cend(reqs_); } bool is_waiting_response() const noexcept { if (std::empty(reqs_)) return false; // Under load and on low-latency networks we might start // receiving responses before the write operation completed and // the request is still maked as staged and not written. See // https://github.com/boostorg/redis/issues/170 return !reqs_.front()->is_waiting(); } void close() { if (stream_->next_layer().is_open()) { system::error_code ec; stream_->next_layer().close(ec); } } auto is_open() const noexcept { return stream_->next_layer().is_open(); } auto& lowest_layer() noexcept { return stream_->lowest_layer(); } auto is_next_push() { BOOST_ASSERT(!read_buffer_.empty()); // Useful links to understand the heuristics below. // // - https://github.com/redis/redis/issues/11784 // - https://github.com/redis/redis/issues/6426 // - https://github.com/boostorg/redis/issues/170 // The message's resp3 type is a push. if (resp3::to_type(read_buffer_.front()) == resp3::type::push) return true; // This is non-push type and the requests queue is empty. I have // noticed this is possible, for example with -MISCONF. I don't // know why they are not sent with a push type so we can // distinguish them from responses to commands. If we are lucky // enough to receive them when the command queue is empty they // can be treated as server pushes, otherwise it is impossible // to handle them properly if (reqs_.empty()) return true; // The request does not expect any response but we got one. This // may happen if for example, subscribe with wrong syntax. if (reqs_.front()->expected_responses_ == 0) return true; // Added to deal with MONITOR and also to fix PR170 which // happens under load and on low-latency networks, where we // might start receiving responses before the write operation // completed and the request is still maked as staged and not // written. return reqs_.front()->is_waiting(); } auto get_suggested_buffer_growth() const noexcept { return parser_.get_suggested_buffer_growth(4096); } enum class parse_result { needs_more, push, resp }; using parse_ret_type = std::pair; parse_ret_type on_finish_parsing(parse_result t) { if (t == parse_result::push) { usage_.pushes_received += 1; usage_.push_bytes_received += parser_.get_consumed(); } else { usage_.responses_received += 1; usage_.response_bytes_received += parser_.get_consumed(); } on_push_ = false; dbuf_.consume(parser_.get_consumed()); auto const res = std::make_pair(t, parser_.get_consumed()); parser_.reset(); return res; } parse_ret_type on_read(std::string_view data, system::error_code& ec) { // We arrive here in two states: // // 1. While we are parsing a message. In this case we // don't want to determine the type of the message in the // buffer (i.e. response vs push) but leave it untouched // until the parsing of a complete message ends. // // 2. On a new message, in which case we have to determine // whether the next messag is a push or a response. // if (!on_push_) // Prepare for new message. on_push_ = is_next_push(); if (on_push_) { if (!resp3::parse(parser_, data, receive_adapter_, ec)) return std::make_pair(parse_result::needs_more, 0); if (ec) return std::make_pair(parse_result::push, 0); return on_finish_parsing(parse_result::push); } BOOST_ASSERT_MSG(is_waiting_response(), "Not waiting for a response (using MONITOR command perhaps?)"); BOOST_ASSERT(!reqs_.empty()); BOOST_ASSERT(reqs_.front() != nullptr); BOOST_ASSERT(reqs_.front()->expected_responses_ != 0); if (!resp3::parse(parser_, data, reqs_.front()->adapter_, ec)) return std::make_pair(parse_result::needs_more, 0); if (ec) { reqs_.front()->ec_ = ec; reqs_.front()->proceed(); return std::make_pair(parse_result::resp, 0); } reqs_.front()->read_size_ += parser_.get_consumed(); if (--reqs_.front()->expected_responses_ == 0) { // Done with this request. reqs_.front()->proceed(); reqs_.pop_front(); } return on_finish_parsing(parse_result::resp); } void reset() { write_buffer_.clear(); read_buffer_.clear(); parser_.reset(); on_push_ = false; cancel_run_called_ = false; } asio::ssl::context ctx_; std::unique_ptr stream_; // Notice we use a timer to simulate a condition-variable. It is // also more suitable than a channel and the notify operation does // not suspend. timer_type writer_timer_; receive_channel_type receive_channel_; runner_type runner_; receiver_adapter_type receive_adapter_; using dyn_buffer_type = asio::dynamic_string_buffer, std::allocator>; std::string read_buffer_; dyn_buffer_type dbuf_; std::string write_buffer_; reqs_type reqs_; resp3::parser parser_{}; bool on_push_ = false; bool cancel_run_called_ = false; usage usage_; }; } // boost::redis::detail #endif // BOOST_REDIS_CONNECTION_BASE_HPP