basic_stream.hpp 29 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067
  1. //
  2. // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // Official repository: https://github.com/boostorg/beast
  8. //
  9. #ifndef BOOST_BEAST_CORE_IMPL_BASIC_STREAM_HPP
  10. #define BOOST_BEAST_CORE_IMPL_BASIC_STREAM_HPP
  11. #include <boost/beast/core/async_base.hpp>
  12. #include <boost/beast/core/buffer_traits.hpp>
  13. #include <boost/beast/core/buffers_prefix.hpp>
  14. #include <boost/beast/websocket/teardown.hpp>
  15. #include <boost/asio/coroutine.hpp>
  16. #include <boost/assert.hpp>
  17. #include <boost/make_shared.hpp>
  18. #include <boost/core/exchange.hpp>
  19. #include <cstdlib>
  20. #include <type_traits>
  21. #include <utility>
  22. namespace boost {
  23. namespace beast {
  24. //------------------------------------------------------------------------------
  25. template<class Protocol, class Executor, class RatePolicy>
  26. template<class... Args>
  27. basic_stream<Protocol, Executor, RatePolicy>::
  28. impl_type::
  29. impl_type(std::false_type, Args&&... args)
  30. : socket(std::forward<Args>(args)...)
  31. , read(ex())
  32. , write(ex())
  33. , timer(ex())
  34. {
  35. reset();
  36. }
  37. template<class Protocol, class Executor, class RatePolicy>
  38. template<class RatePolicy_, class... Args>
  39. basic_stream<Protocol, Executor, RatePolicy>::
  40. impl_type::
  41. impl_type(std::true_type,
  42. RatePolicy_&& policy, Args&&... args)
  43. : boost::empty_value<RatePolicy>(
  44. boost::empty_init_t{},
  45. std::forward<RatePolicy_>(policy))
  46. , socket(std::forward<Args>(args)...)
  47. , read(ex())
  48. , write(ex())
  49. , timer(ex())
  50. {
  51. reset();
  52. }
  53. template<class Protocol, class Executor, class RatePolicy>
  54. template<class Executor2>
  55. void
  56. basic_stream<Protocol, Executor, RatePolicy>::
  57. impl_type::
  58. on_timer(Executor2 const& ex2)
  59. {
  60. BOOST_ASSERT(waiting > 0);
  61. // the last waiter starts the new slice
  62. if(--waiting > 0)
  63. return;
  64. // update the expiration time
  65. BOOST_VERIFY(timer.expires_after(
  66. std::chrono::seconds(1)) == 0);
  67. rate_policy_access::on_timer(policy());
  68. struct handler : boost::empty_value<Executor2>
  69. {
  70. boost::weak_ptr<impl_type> wp;
  71. using executor_type = Executor2;
  72. executor_type
  73. get_executor() const noexcept
  74. {
  75. return this->get();
  76. }
  77. handler(
  78. Executor2 const& ex2,
  79. boost::shared_ptr<impl_type> const& sp)
  80. : boost::empty_value<Executor2>(
  81. boost::empty_init_t{}, ex2)
  82. , wp(sp)
  83. {
  84. }
  85. void
  86. operator()(error_code ec)
  87. {
  88. auto sp = wp.lock();
  89. if(! sp)
  90. return;
  91. if(ec == net::error::operation_aborted)
  92. return;
  93. BOOST_ASSERT(! ec);
  94. if(ec)
  95. return;
  96. sp->on_timer(this->get());
  97. }
  98. };
  99. // wait on the timer again
  100. ++waiting;
  101. timer.async_wait(handler(ex2, this->shared_from_this()));
  102. }
  103. template<class Protocol, class Executor, class RatePolicy>
  104. void
  105. basic_stream<Protocol, Executor, RatePolicy>::
  106. impl_type::
  107. reset()
  108. {
  109. // If assert goes off, it means that there are
  110. // already read or write (or connect) operations
  111. // outstanding, so there is nothing to apply
  112. // the expiration time to!
  113. //
  114. BOOST_ASSERT(! read.pending || ! write.pending);
  115. if(! read.pending)
  116. BOOST_VERIFY(
  117. read.timer.expires_at(never()) == 0);
  118. if(! write.pending)
  119. BOOST_VERIFY(
  120. write.timer.expires_at(never()) == 0);
  121. }
  122. template<class Protocol, class Executor, class RatePolicy>
  123. void
  124. basic_stream<Protocol, Executor, RatePolicy>::
  125. impl_type::
  126. close() noexcept
  127. {
  128. {
  129. error_code ec;
  130. socket.close(ec);
  131. }
  132. #if !defined(BOOST_NO_EXCEPTIONS)
  133. try
  134. {
  135. timer.cancel();
  136. }
  137. catch(...)
  138. {
  139. }
  140. #else
  141. timer.cancel();
  142. #endif
  143. }
  144. //------------------------------------------------------------------------------
  145. template<class Protocol, class Executor, class RatePolicy>
  146. template<class Executor2>
  147. struct basic_stream<Protocol, Executor, RatePolicy>::
  148. timeout_handler
  149. {
  150. using executor_type = Executor2;
  151. op_state& state;
  152. boost::weak_ptr<impl_type> wp;
  153. tick_type tick;
  154. executor_type ex;
  155. executor_type get_executor() const noexcept
  156. {
  157. return ex;
  158. }
  159. void
  160. operator()(error_code ec)
  161. {
  162. // timer canceled
  163. if(ec == net::error::operation_aborted)
  164. return;
  165. BOOST_ASSERT(! ec);
  166. auto sp = wp.lock();
  167. // stream destroyed
  168. if(! sp)
  169. return;
  170. // stale timer
  171. if(tick < state.tick)
  172. return;
  173. BOOST_ASSERT(tick == state.tick);
  174. // timeout
  175. BOOST_ASSERT(! state.timeout);
  176. sp->close();
  177. state.timeout = true;
  178. }
  179. };
  180. //------------------------------------------------------------------------------
  181. template<class Protocol, class Executor, class RatePolicy>
  182. struct basic_stream<Protocol, Executor, RatePolicy>::ops
  183. {
  184. template<bool isRead, class Buffers, class Handler>
  185. class transfer_op
  186. : public async_base<Handler, Executor>
  187. , public boost::asio::coroutine
  188. {
  189. boost::shared_ptr<impl_type> impl_;
  190. pending_guard pg_;
  191. Buffers b_;
  192. using is_read = std::integral_constant<bool, isRead>;
  193. op_state&
  194. state()
  195. {
  196. if (isRead)
  197. return impl_->read;
  198. else
  199. return impl_->write;
  200. }
  201. std::size_t
  202. available_bytes()
  203. {
  204. if (isRead)
  205. return rate_policy_access::
  206. available_read_bytes(impl_->policy());
  207. else
  208. return rate_policy_access::
  209. available_write_bytes(impl_->policy());
  210. }
  211. void
  212. transfer_bytes(std::size_t n)
  213. {
  214. if (isRead)
  215. rate_policy_access::
  216. transfer_read_bytes(impl_->policy(), n);
  217. else
  218. rate_policy_access::
  219. transfer_write_bytes(impl_->policy(), n);
  220. }
  221. void
  222. async_perform(
  223. std::size_t amount, std::true_type)
  224. {
  225. impl_->socket.async_read_some(
  226. beast::buffers_prefix(amount, b_),
  227. std::move(*this));
  228. }
  229. void
  230. async_perform(
  231. std::size_t amount, std::false_type)
  232. {
  233. impl_->socket.async_write_some(
  234. beast::buffers_prefix(amount, b_),
  235. std::move(*this));
  236. }
  237. static bool never_pending_;
  238. public:
  239. template<class Handler_>
  240. transfer_op(
  241. Handler_&& h,
  242. basic_stream& s,
  243. Buffers const& b)
  244. : async_base<Handler, Executor>(
  245. std::forward<Handler_>(h), s.get_executor())
  246. , impl_(s.impl_)
  247. , pg_()
  248. , b_(b)
  249. {
  250. this->set_allowed_cancellation(net::cancellation_type::all);
  251. if (buffer_bytes(b_) == 0 && state().pending)
  252. {
  253. // Workaround:
  254. // Corner case discovered in https://github.com/boostorg/beast/issues/2065
  255. // Enclosing SSL stream wishes to complete a 0-length write early by
  256. // executing a 0-length read against the underlying stream.
  257. // This can occur even if an existing async_read is in progress.
  258. // In this specific case, we will complete the async op with no error
  259. // in order to prevent assertions and/or internal corruption of the basic_stream
  260. this->complete(false, error_code(), std::size_t{0});
  261. }
  262. else
  263. {
  264. pg_.assign(state().pending);
  265. (*this)({});
  266. }
  267. }
  268. void
  269. operator()(
  270. error_code ec,
  271. std::size_t bytes_transferred = 0)
  272. {
  273. BOOST_ASIO_CORO_REENTER(*this)
  274. {
  275. // handle empty buffers
  276. if(detail::buffers_empty(b_))
  277. {
  278. // make sure we perform the no-op
  279. BOOST_ASIO_CORO_YIELD
  280. {
  281. BOOST_ASIO_HANDLER_LOCATION((
  282. __FILE__, __LINE__,
  283. (isRead ? "basic_stream::async_read_some"
  284. : "basic_stream::async_write_some")));
  285. async_perform(0, is_read{});
  286. }
  287. // apply the timeout manually, otherwise
  288. // behavior varies across platforms.
  289. if(state().timer.expiry() <= clock_type::now())
  290. {
  291. impl_->close();
  292. BOOST_BEAST_ASSIGN_EC(ec, beast::error::timeout);
  293. }
  294. goto upcall;
  295. }
  296. // if a timeout is active, wait on the timer
  297. if(state().timer.expiry() != never())
  298. {
  299. BOOST_ASIO_HANDLER_LOCATION((
  300. __FILE__, __LINE__,
  301. (isRead ? "basic_stream::async_read_some"
  302. : "basic_stream::async_write_some")));
  303. state().timer.async_wait(
  304. timeout_handler<decltype(this->get_executor())>{
  305. state(),
  306. impl_,
  307. state().tick,
  308. this->get_executor()});
  309. }
  310. // check rate limit, maybe wait
  311. std::size_t amount;
  312. amount = available_bytes();
  313. if(amount == 0)
  314. {
  315. ++impl_->waiting;
  316. BOOST_ASIO_CORO_YIELD
  317. {
  318. BOOST_ASIO_HANDLER_LOCATION((
  319. __FILE__, __LINE__,
  320. (isRead ? "basic_stream::async_read_some"
  321. : "basic_stream::async_write_some")));
  322. impl_->timer.async_wait(std::move(*this));
  323. }
  324. if(ec)
  325. {
  326. // socket was closed, or a timeout
  327. BOOST_ASSERT(ec ==
  328. net::error::operation_aborted);
  329. // timeout handler invoked?
  330. if(state().timeout)
  331. {
  332. // yes, socket already closed
  333. BOOST_BEAST_ASSIGN_EC(ec, beast::error::timeout);
  334. state().timeout = false;
  335. }
  336. goto upcall;
  337. }
  338. impl_->on_timer(this->get_executor());
  339. // Allow at least one byte, otherwise
  340. // bytes_transferred could be 0.
  341. amount = std::max<std::size_t>(
  342. available_bytes(), 1);
  343. }
  344. BOOST_ASIO_CORO_YIELD
  345. {
  346. BOOST_ASIO_HANDLER_LOCATION((
  347. __FILE__, __LINE__,
  348. (isRead ? "basic_stream::async_read_some"
  349. : "basic_stream::async_write_some")));
  350. async_perform(amount, is_read{});
  351. }
  352. if(state().timer.expiry() != never())
  353. {
  354. ++state().tick;
  355. // try cancelling timer
  356. auto const n =
  357. state().timer.cancel();
  358. if(n == 0)
  359. {
  360. // timeout handler invoked?
  361. if(state().timeout)
  362. {
  363. // yes, socket already closed
  364. BOOST_BEAST_ASSIGN_EC(ec, beast::error::timeout);
  365. state().timeout = false;
  366. }
  367. }
  368. else
  369. {
  370. BOOST_ASSERT(n == 1);
  371. BOOST_ASSERT(! state().timeout);
  372. }
  373. }
  374. upcall:
  375. pg_.reset();
  376. transfer_bytes(bytes_transferred);
  377. this->complete_now(ec, bytes_transferred);
  378. }
  379. }
  380. };
  381. template<class Handler>
  382. class connect_op
  383. : public async_base<Handler, Executor>
  384. {
  385. boost::shared_ptr<impl_type> impl_;
  386. pending_guard pg0_;
  387. pending_guard pg1_;
  388. op_state&
  389. state() noexcept
  390. {
  391. return impl_->write;
  392. }
  393. public:
  394. template<class Handler_>
  395. connect_op(
  396. Handler_&& h,
  397. basic_stream& s,
  398. endpoint_type ep)
  399. : async_base<Handler, Executor>(
  400. std::forward<Handler_>(h), s.get_executor())
  401. , impl_(s.impl_)
  402. , pg0_(impl_->read.pending)
  403. , pg1_(impl_->write.pending)
  404. {
  405. this->set_allowed_cancellation(net::cancellation_type::all);
  406. if(state().timer.expiry() != stream_base::never())
  407. {
  408. BOOST_ASIO_HANDLER_LOCATION((
  409. __FILE__, __LINE__,
  410. "basic_stream::async_connect"));
  411. impl_->write.timer.async_wait(
  412. timeout_handler<decltype(this->get_executor())>{
  413. state(),
  414. impl_,
  415. state().tick,
  416. this->get_executor()});
  417. }
  418. BOOST_ASIO_HANDLER_LOCATION((
  419. __FILE__, __LINE__,
  420. "basic_stream::async_connect"));
  421. impl_->socket.async_connect(
  422. ep, std::move(*this));
  423. // *this is now moved-from
  424. }
  425. template<
  426. class Endpoints, class Condition,
  427. class Handler_>
  428. connect_op(
  429. Handler_&& h,
  430. basic_stream& s,
  431. Endpoints const& eps,
  432. Condition const& cond)
  433. : async_base<Handler, Executor>(
  434. std::forward<Handler_>(h), s.get_executor())
  435. , impl_(s.impl_)
  436. , pg0_(impl_->read.pending)
  437. , pg1_(impl_->write.pending)
  438. {
  439. this->set_allowed_cancellation(net::cancellation_type::all);
  440. if(state().timer.expiry() != stream_base::never())
  441. {
  442. BOOST_ASIO_HANDLER_LOCATION((
  443. __FILE__, __LINE__,
  444. "basic_stream::async_connect"));
  445. impl_->write.timer.async_wait(
  446. timeout_handler<decltype(this->get_executor())>{
  447. state(),
  448. impl_,
  449. state().tick,
  450. this->get_executor()});
  451. }
  452. BOOST_ASIO_HANDLER_LOCATION((
  453. __FILE__, __LINE__,
  454. "basic_stream::async_connect"));
  455. net::async_connect(impl_->socket,
  456. eps, cond, std::move(*this));
  457. // *this is now moved-from
  458. }
  459. template<
  460. class Iterator, class Condition,
  461. class Handler_>
  462. connect_op(
  463. Handler_&& h,
  464. basic_stream& s,
  465. Iterator begin, Iterator end,
  466. Condition const& cond)
  467. : async_base<Handler, Executor>(
  468. std::forward<Handler_>(h), s.get_executor())
  469. , impl_(s.impl_)
  470. , pg0_(impl_->read.pending)
  471. , pg1_(impl_->write.pending)
  472. {
  473. this->set_allowed_cancellation(net::cancellation_type::all);
  474. if(state().timer.expiry() != stream_base::never())
  475. {
  476. BOOST_ASIO_HANDLER_LOCATION((
  477. __FILE__, __LINE__,
  478. "basic_stream::async_connect"));
  479. impl_->write.timer.async_wait(
  480. timeout_handler<decltype(this->get_executor())>{
  481. state(),
  482. impl_,
  483. state().tick,
  484. this->get_executor()});
  485. }
  486. BOOST_ASIO_HANDLER_LOCATION((
  487. __FILE__, __LINE__,
  488. "basic_stream::async_connect"));
  489. net::async_connect(impl_->socket,
  490. begin, end, cond, std::move(*this));
  491. // *this is now moved-from
  492. }
  493. template<class... Args>
  494. void
  495. operator()(error_code ec, Args&&... args)
  496. {
  497. if(state().timer.expiry() != stream_base::never())
  498. {
  499. ++state().tick;
  500. // try cancelling timer
  501. auto const n =
  502. impl_->write.timer.cancel();
  503. if(n == 0)
  504. {
  505. // timeout handler invoked?
  506. if(state().timeout)
  507. {
  508. // yes, socket already closed
  509. BOOST_BEAST_ASSIGN_EC(ec, beast::error::timeout);
  510. state().timeout = false;
  511. }
  512. }
  513. else
  514. {
  515. BOOST_ASSERT(n == 1);
  516. BOOST_ASSERT(! state().timeout);
  517. }
  518. }
  519. pg0_.reset();
  520. pg1_.reset();
  521. this->complete_now(ec, std::forward<Args>(args)...);
  522. }
  523. };
  524. struct run_read_op
  525. {
  526. template<class ReadHandler, class Buffers>
  527. void
  528. operator()(
  529. ReadHandler&& h,
  530. basic_stream* s,
  531. Buffers const& b)
  532. {
  533. // If you get an error on the following line it means
  534. // that your handler does not meet the documented type
  535. // requirements for the handler.
  536. static_assert(
  537. detail::is_invocable<ReadHandler,
  538. void(error_code, std::size_t)>::value,
  539. "ReadHandler type requirements not met");
  540. transfer_op<
  541. true,
  542. Buffers,
  543. typename std::decay<ReadHandler>::type>(
  544. std::forward<ReadHandler>(h), *s, b);
  545. }
  546. };
  547. struct run_write_op
  548. {
  549. template<class WriteHandler, class Buffers>
  550. void
  551. operator()(
  552. WriteHandler&& h,
  553. basic_stream* s,
  554. Buffers const& b)
  555. {
  556. // If you get an error on the following line it means
  557. // that your handler does not meet the documented type
  558. // requirements for the handler.
  559. static_assert(
  560. detail::is_invocable<WriteHandler,
  561. void(error_code, std::size_t)>::value,
  562. "WriteHandler type requirements not met");
  563. transfer_op<
  564. false,
  565. Buffers,
  566. typename std::decay<WriteHandler>::type>(
  567. std::forward<WriteHandler>(h), *s, b);
  568. }
  569. };
  570. struct run_connect_op
  571. {
  572. template<class ConnectHandler>
  573. void
  574. operator()(
  575. ConnectHandler&& h,
  576. basic_stream* s,
  577. endpoint_type const& ep)
  578. {
  579. // If you get an error on the following line it means
  580. // that your handler does not meet the documented type
  581. // requirements for the handler.
  582. static_assert(
  583. detail::is_invocable<ConnectHandler,
  584. void(error_code)>::value,
  585. "ConnectHandler type requirements not met");
  586. connect_op<typename std::decay<ConnectHandler>::type>(
  587. std::forward<ConnectHandler>(h), *s, ep);
  588. }
  589. };
  590. struct run_connect_range_op
  591. {
  592. template<
  593. class RangeConnectHandler,
  594. class EndpointSequence,
  595. class Condition>
  596. void
  597. operator()(
  598. RangeConnectHandler&& h,
  599. basic_stream* s,
  600. EndpointSequence const& eps,
  601. Condition const& cond)
  602. {
  603. // If you get an error on the following line it means
  604. // that your handler does not meet the documented type
  605. // requirements for the handler.
  606. static_assert(
  607. detail::is_invocable<RangeConnectHandler,
  608. void(error_code, typename Protocol::endpoint)>::value,
  609. "RangeConnectHandler type requirements not met");
  610. connect_op<typename std::decay<RangeConnectHandler>::type>(
  611. std::forward<RangeConnectHandler>(h), *s, eps, cond);
  612. }
  613. };
  614. struct run_connect_iter_op
  615. {
  616. template<
  617. class IteratorConnectHandler,
  618. class Iterator,
  619. class Condition>
  620. void
  621. operator()(
  622. IteratorConnectHandler&& h,
  623. basic_stream* s,
  624. Iterator begin, Iterator end,
  625. Condition const& cond)
  626. {
  627. // If you get an error on the following line it means
  628. // that your handler does not meet the documented type
  629. // requirements for the handler.
  630. static_assert(
  631. detail::is_invocable<IteratorConnectHandler,
  632. void(error_code, Iterator)>::value,
  633. "IteratorConnectHandler type requirements not met");
  634. connect_op<typename std::decay<IteratorConnectHandler>::type>(
  635. std::forward<IteratorConnectHandler>(h), *s, begin, end, cond);
  636. }
  637. };
  638. };
  639. //------------------------------------------------------------------------------
  640. template<class Protocol, class Executor, class RatePolicy>
  641. basic_stream<Protocol, Executor, RatePolicy>::
  642. ~basic_stream()
  643. {
  644. // the shared object can outlive *this,
  645. // cancel any operations so the shared
  646. // object is destroyed as soon as possible.
  647. impl_->close();
  648. }
  649. template<class Protocol, class Executor, class RatePolicy>
  650. template<class Arg0, class... Args, class>
  651. basic_stream<Protocol, Executor, RatePolicy>::
  652. basic_stream(Arg0&& arg0, Args&&... args)
  653. : impl_(boost::make_shared<impl_type>(
  654. std::false_type{},
  655. std::forward<Arg0>(arg0),
  656. std::forward<Args>(args)...))
  657. {
  658. }
  659. template<class Protocol, class Executor, class RatePolicy>
  660. template<class RatePolicy_, class Arg0, class... Args, class>
  661. basic_stream<Protocol, Executor, RatePolicy>::
  662. basic_stream(
  663. RatePolicy_&& policy, Arg0&& arg0, Args&&... args)
  664. : impl_(boost::make_shared<impl_type>(
  665. std::true_type{},
  666. std::forward<RatePolicy_>(policy),
  667. std::forward<Arg0>(arg0),
  668. std::forward<Args>(args)...))
  669. {
  670. }
  671. template<class Protocol, class Executor, class RatePolicy>
  672. basic_stream<Protocol, Executor, RatePolicy>::
  673. basic_stream(basic_stream&& other)
  674. : impl_(boost::make_shared<impl_type>(
  675. std::move(*other.impl_)))
  676. {
  677. // Explainer: Asio's sockets provide the guarantee that a moved-from socket
  678. // will be in a state as-if newly created. i.e.:
  679. // * having the same (valid) executor
  680. // * the socket shall not be open
  681. // We provide the same guarantee by moving the impl rather than the pointer
  682. // controlling its lifetime.
  683. }
  684. template<class Protocol, class Executor, class RatePolicy>
  685. template<class Executor_>
  686. basic_stream<Protocol, Executor, RatePolicy>::
  687. basic_stream(basic_stream<Protocol, Executor_, RatePolicy> && other)
  688. : impl_(boost::make_shared<impl_type>(std::false_type{}, std::move(other.impl_->socket)))
  689. {
  690. }
  691. //------------------------------------------------------------------------------
  692. template<class Protocol, class Executor, class RatePolicy>
  693. auto
  694. basic_stream<Protocol, Executor, RatePolicy>::
  695. release_socket() ->
  696. socket_type
  697. {
  698. this->cancel();
  699. return std::move(impl_->socket);
  700. }
  701. template<class Protocol, class Executor, class RatePolicy>
  702. void
  703. basic_stream<Protocol, Executor, RatePolicy>::
  704. expires_after(net::steady_timer::duration expiry_time)
  705. {
  706. // If assert goes off, it means that there are
  707. // already read or write (or connect) operations
  708. // outstanding, so there is nothing to apply
  709. // the expiration time to!
  710. //
  711. BOOST_ASSERT(
  712. ! impl_->read.pending ||
  713. ! impl_->write.pending);
  714. if(! impl_->read.pending)
  715. BOOST_VERIFY(
  716. impl_->read.timer.expires_after(
  717. expiry_time) == 0);
  718. if(! impl_->write.pending)
  719. BOOST_VERIFY(
  720. impl_->write.timer.expires_after(
  721. expiry_time) == 0);
  722. }
  723. template<class Protocol, class Executor, class RatePolicy>
  724. void
  725. basic_stream<Protocol, Executor, RatePolicy>::
  726. expires_at(
  727. net::steady_timer::time_point expiry_time)
  728. {
  729. // If assert goes off, it means that there are
  730. // already read or write (or connect) operations
  731. // outstanding, so there is nothing to apply
  732. // the expiration time to!
  733. //
  734. BOOST_ASSERT(
  735. ! impl_->read.pending ||
  736. ! impl_->write.pending);
  737. if(! impl_->read.pending)
  738. BOOST_VERIFY(
  739. impl_->read.timer.expires_at(
  740. expiry_time) == 0);
  741. if(! impl_->write.pending)
  742. BOOST_VERIFY(
  743. impl_->write.timer.expires_at(
  744. expiry_time) == 0);
  745. }
  746. template<class Protocol, class Executor, class RatePolicy>
  747. void
  748. basic_stream<Protocol, Executor, RatePolicy>::
  749. expires_never()
  750. {
  751. impl_->reset();
  752. }
  753. template<class Protocol, class Executor, class RatePolicy>
  754. void
  755. basic_stream<Protocol, Executor, RatePolicy>::
  756. cancel()
  757. {
  758. error_code ec;
  759. impl_->socket.cancel(ec);
  760. impl_->timer.cancel();
  761. }
  762. template<class Protocol, class Executor, class RatePolicy>
  763. void
  764. basic_stream<Protocol, Executor, RatePolicy>::
  765. close()
  766. {
  767. impl_->close();
  768. }
  769. //------------------------------------------------------------------------------
  770. template<class Protocol, class Executor, class RatePolicy>
  771. template<BOOST_BEAST_ASYNC_TPARAM1 ConnectHandler>
  772. BOOST_BEAST_ASYNC_RESULT1(ConnectHandler)
  773. basic_stream<Protocol, Executor, RatePolicy>::
  774. async_connect(
  775. endpoint_type const& ep,
  776. ConnectHandler&& handler)
  777. {
  778. return net::async_initiate<
  779. ConnectHandler,
  780. void(error_code)>(
  781. typename ops::run_connect_op{},
  782. handler,
  783. this,
  784. ep);
  785. }
  786. template<class Protocol, class Executor, class RatePolicy>
  787. template<
  788. class EndpointSequence,
  789. BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, typename Protocol::endpoint)) RangeConnectHandler,
  790. class>
  791. BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(RangeConnectHandler,void(error_code, typename Protocol::endpoint))
  792. basic_stream<Protocol, Executor, RatePolicy>::
  793. async_connect(
  794. EndpointSequence const& endpoints,
  795. RangeConnectHandler&& handler)
  796. {
  797. return net::async_initiate<
  798. RangeConnectHandler,
  799. void(error_code, typename Protocol::endpoint)>(
  800. typename ops::run_connect_range_op{},
  801. handler,
  802. this,
  803. endpoints,
  804. detail::any_endpoint{});
  805. }
  806. template<class Protocol, class Executor, class RatePolicy>
  807. template<
  808. class EndpointSequence,
  809. class ConnectCondition,
  810. BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, typename Protocol::endpoint)) RangeConnectHandler,
  811. class>
  812. BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(RangeConnectHandler,void (error_code, typename Protocol::endpoint))
  813. basic_stream<Protocol, Executor, RatePolicy>::
  814. async_connect(
  815. EndpointSequence const& endpoints,
  816. ConnectCondition connect_condition,
  817. RangeConnectHandler&& handler)
  818. {
  819. return net::async_initiate<
  820. RangeConnectHandler,
  821. void(error_code, typename Protocol::endpoint)>(
  822. typename ops::run_connect_range_op{},
  823. handler,
  824. this,
  825. endpoints,
  826. connect_condition);
  827. }
  828. template<class Protocol, class Executor, class RatePolicy>
  829. template<
  830. class Iterator,
  831. BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, Iterator)) IteratorConnectHandler>
  832. BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(IteratorConnectHandler,void (error_code, Iterator))
  833. basic_stream<Protocol, Executor, RatePolicy>::
  834. async_connect(
  835. Iterator begin, Iterator end,
  836. IteratorConnectHandler&& handler)
  837. {
  838. return net::async_initiate<
  839. IteratorConnectHandler,
  840. void(error_code, Iterator)>(
  841. typename ops::run_connect_iter_op{},
  842. handler,
  843. this,
  844. begin, end,
  845. detail::any_endpoint{});
  846. }
  847. template<class Protocol, class Executor, class RatePolicy>
  848. template<
  849. class Iterator,
  850. class ConnectCondition,
  851. BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, Iterator)) IteratorConnectHandler>
  852. BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(IteratorConnectHandler,void (error_code, Iterator))
  853. basic_stream<Protocol, Executor, RatePolicy>::
  854. async_connect(
  855. Iterator begin, Iterator end,
  856. ConnectCondition connect_condition,
  857. IteratorConnectHandler&& handler)
  858. {
  859. return net::async_initiate<
  860. IteratorConnectHandler,
  861. void(error_code, Iterator)>(
  862. typename ops::run_connect_iter_op{},
  863. handler,
  864. this,
  865. begin, end,
  866. connect_condition);
  867. }
  868. //------------------------------------------------------------------------------
  869. template<class Protocol, class Executor, class RatePolicy>
  870. template<class MutableBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
  871. BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
  872. basic_stream<Protocol, Executor, RatePolicy>::
  873. async_read_some(
  874. MutableBufferSequence const& buffers,
  875. ReadHandler&& handler)
  876. {
  877. static_assert(net::is_mutable_buffer_sequence<
  878. MutableBufferSequence>::value,
  879. "MutableBufferSequence type requirements not met");
  880. return net::async_initiate<
  881. ReadHandler,
  882. void(error_code, std::size_t)>(
  883. typename ops::run_read_op{},
  884. handler,
  885. this,
  886. buffers);
  887. }
  888. template<class Protocol, class Executor, class RatePolicy>
  889. template<class ConstBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 WriteHandler>
  890. BOOST_BEAST_ASYNC_RESULT2(WriteHandler)
  891. basic_stream<Protocol, Executor, RatePolicy>::
  892. async_write_some(
  893. ConstBufferSequence const& buffers,
  894. WriteHandler&& handler)
  895. {
  896. static_assert(net::is_const_buffer_sequence<
  897. ConstBufferSequence>::value,
  898. "ConstBufferSequence type requirements not met");
  899. return net::async_initiate<
  900. WriteHandler,
  901. void(error_code, std::size_t)>(
  902. typename ops::run_write_op{},
  903. handler,
  904. this,
  905. buffers);
  906. }
  907. //------------------------------------------------------------------------------
  908. //
  909. // Customization points
  910. //
  911. #if ! BOOST_BEAST_DOXYGEN
  912. template<
  913. class Protocol, class Executor, class RatePolicy>
  914. void
  915. beast_close_socket(
  916. basic_stream<Protocol, Executor, RatePolicy>& stream)
  917. {
  918. error_code ec;
  919. stream.socket().close(ec);
  920. }
  921. template<
  922. class Protocol, class Executor, class RatePolicy>
  923. void
  924. teardown(
  925. role_type role,
  926. basic_stream<Protocol, Executor, RatePolicy>& stream,
  927. error_code& ec)
  928. {
  929. using beast::websocket::teardown;
  930. teardown(role, stream.socket(), ec);
  931. }
  932. template<
  933. class Protocol, class Executor, class RatePolicy,
  934. class TeardownHandler>
  935. void
  936. async_teardown(
  937. role_type role,
  938. basic_stream<Protocol, Executor, RatePolicy>& stream,
  939. TeardownHandler&& handler)
  940. {
  941. using beast::websocket::async_teardown;
  942. async_teardown(role, stream.socket(),
  943. std::forward<TeardownHandler>(handler));
  944. }
  945. #endif
  946. } // beast
  947. } // boost
  948. #endif