async_frontend.hpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579
  1. /*
  2. * Copyright Andrey Semashev 2007 - 2015.
  3. * Distributed under the Boost Software License, Version 1.0.
  4. * (See accompanying file LICENSE_1_0.txt or copy at
  5. * http://www.boost.org/LICENSE_1_0.txt)
  6. */
  7. /*!
  8. * \file async_frontend.hpp
  9. * \author Andrey Semashev
  10. * \date 14.07.2009
  11. *
  12. * The header contains implementation of asynchronous sink frontend.
  13. */
  14. #ifndef BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_
  15. #define BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_
  16. #include <exception> // std::terminate
  17. #include <boost/log/detail/config.hpp>
  18. #ifdef BOOST_HAS_PRAGMA_ONCE
  19. #pragma once
  20. #endif
  21. #if defined(BOOST_LOG_NO_THREADS)
  22. #error Boost.Log: Asynchronous sink frontend is only supported in multithreaded environment
  23. #endif
  24. #include <boost/memory_order.hpp>
  25. #include <boost/atomic/atomic.hpp>
  26. #include <boost/smart_ptr/shared_ptr.hpp>
  27. #include <boost/smart_ptr/make_shared_object.hpp>
  28. #include <boost/preprocessor/control/if.hpp>
  29. #include <boost/preprocessor/comparison/equal.hpp>
  30. #include <boost/thread/locks.hpp>
  31. #include <boost/thread/recursive_mutex.hpp>
  32. #include <boost/thread/thread.hpp>
  33. #include <boost/thread/condition_variable.hpp>
  34. #include <boost/log/exceptions.hpp>
  35. #include <boost/log/detail/locking_ptr.hpp>
  36. #include <boost/log/detail/parameter_tools.hpp>
  37. #include <boost/log/core/record_view.hpp>
  38. #include <boost/log/sinks/basic_sink_frontend.hpp>
  39. #include <boost/log/sinks/frontend_requirements.hpp>
  40. #include <boost/log/sinks/unbounded_fifo_queue.hpp>
  41. #include <boost/log/keywords/start_thread.hpp>
  42. #include <boost/log/detail/header.hpp>
  43. namespace boost {
  44. BOOST_LOG_OPEN_NAMESPACE
  45. namespace sinks {
  46. #ifndef BOOST_LOG_DOXYGEN_PASS
  47. #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1(z, n, data)\
  48. template< typename T0 >\
  49. explicit asynchronous_sink(T0 const& arg0, typename boost::log::aux::enable_if_named_parameters< T0, boost::log::aux::sfinae_dummy >::type = boost::log::aux::sfinae_dummy()) :\
  50. base_type(true),\
  51. queue_base_type(arg0),\
  52. m_pBackend(boost::make_shared< sink_backend_type >(arg0)),\
  53. m_ActiveOperation(idle),\
  54. m_StopRequested(false),\
  55. m_FlushRequested(false)\
  56. {\
  57. if (arg0[keywords::start_thread | true])\
  58. start_feeding_thread();\
  59. }\
  60. template< typename T0 >\
  61. explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, T0 const& arg0) :\
  62. base_type(true),\
  63. queue_base_type(arg0),\
  64. m_pBackend(backend),\
  65. m_ActiveOperation(idle),\
  66. m_StopRequested(false),\
  67. m_FlushRequested(false)\
  68. {\
  69. if (arg0[keywords::start_thread | true])\
  70. start_feeding_thread();\
  71. }
  72. #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N(z, n, data)\
  73. template< BOOST_PP_ENUM_PARAMS_Z(z, n, typename T) >\
  74. explicit asynchronous_sink(BOOST_PP_ENUM_BINARY_PARAMS_Z(z, n, T, const& arg)) :\
  75. base_type(true),\
  76. queue_base_type((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
  77. m_pBackend(boost::make_shared< sink_backend_type >(BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
  78. m_ActiveOperation(idle),\
  79. m_StopRequested(false),\
  80. m_FlushRequested(false)\
  81. {\
  82. if ((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))[keywords::start_thread | true])\
  83. start_feeding_thread();\
  84. }\
  85. template< BOOST_PP_ENUM_PARAMS_Z(z, n, typename T) >\
  86. explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, BOOST_PP_ENUM_BINARY_PARAMS_Z(z, n, T, const& arg)) :\
  87. base_type(true),\
  88. queue_base_type((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
  89. m_pBackend(backend),\
  90. m_ActiveOperation(idle),\
  91. m_StopRequested(false),\
  92. m_FlushRequested(false)\
  93. {\
  94. if ((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))[keywords::start_thread | true])\
  95. start_feeding_thread();\
  96. }
  97. #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL(z, n, data)\
  98. BOOST_PP_IF(BOOST_PP_EQUAL(n, 1), BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1, BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N)(z, n, data)
  99. #endif // BOOST_LOG_DOXYGEN_PASS
  100. /*!
  101. * \brief Asynchronous logging sink frontend
  102. *
  103. * The frontend starts a separate thread on construction. All logging records are passed
  104. * to the backend in this dedicated thread.
  105. *
  106. * The user can prevent spawning the internal thread by specifying \c start_thread parameter
  107. * with the value of \c false on construction. In this case log records will be buffered
  108. * in the internal queue until the user calls \c run, \c feed_records or \c flush in his own
  109. * thread. Log record queueing strategy is specified in the \c QueueingStrategyT template
  110. * parameter.
  111. */
  112. template< typename SinkBackendT, typename QueueingStrategyT = unbounded_fifo_queue >
  113. class asynchronous_sink :
  114. public aux::make_sink_frontend_base< SinkBackendT >::type,
  115. public QueueingStrategyT
  116. {
  117. typedef typename aux::make_sink_frontend_base< SinkBackendT >::type base_type;
  118. typedef QueueingStrategyT queue_base_type;
  119. private:
  120. //! Backend synchronization mutex type
  121. typedef boost::recursive_mutex backend_mutex_type;
  122. //! Frontend synchronization mutex type
  123. typedef typename base_type::mutex_type frontend_mutex_type;
  124. //! Operation bit mask
  125. enum operation
  126. {
  127. idle = 0u,
  128. feeding_records = 1u,
  129. flushing = 3u
  130. };
  131. //! Function object to run the log record feeding thread
  132. class run_func
  133. {
  134. public:
  135. typedef void result_type;
  136. private:
  137. asynchronous_sink* m_self;
  138. public:
  139. explicit run_func(asynchronous_sink* self) BOOST_NOEXCEPT : m_self(self)
  140. {
  141. }
  142. result_type operator()() const
  143. {
  144. m_self->run();
  145. }
  146. };
  147. //! A scope guard that implements active operation management
  148. class scoped_feeding_operation
  149. {
  150. private:
  151. asynchronous_sink& m_self;
  152. public:
  153. //! Initializing constructor
  154. explicit scoped_feeding_operation(asynchronous_sink& self) : m_self(self)
  155. {
  156. }
  157. //! Destructor
  158. ~scoped_feeding_operation()
  159. {
  160. m_self.complete_feeding_operation();
  161. }
  162. BOOST_DELETED_FUNCTION(scoped_feeding_operation(scoped_feeding_operation const&))
  163. BOOST_DELETED_FUNCTION(scoped_feeding_operation& operator= (scoped_feeding_operation const&))
  164. };
  165. //! A scope guard that resets a flag on destructor
  166. class scoped_flag
  167. {
  168. private:
  169. frontend_mutex_type& m_Mutex;
  170. condition_variable_any& m_Cond;
  171. boost::atomic< bool >& m_Flag;
  172. public:
  173. explicit scoped_flag(frontend_mutex_type& mut, condition_variable_any& cond, boost::atomic< bool >& f) :
  174. m_Mutex(mut), m_Cond(cond), m_Flag(f)
  175. {
  176. }
  177. ~scoped_flag()
  178. {
  179. try
  180. {
  181. lock_guard< frontend_mutex_type > lock(m_Mutex);
  182. m_Flag.store(false, boost::memory_order_relaxed);
  183. m_Cond.notify_all();
  184. }
  185. catch (...)
  186. {
  187. }
  188. }
  189. BOOST_DELETED_FUNCTION(scoped_flag(scoped_flag const&))
  190. BOOST_DELETED_FUNCTION(scoped_flag& operator= (scoped_flag const&))
  191. };
  192. public:
  193. //! Sink implementation type
  194. typedef SinkBackendT sink_backend_type;
  195. //! \cond
  196. static_assert(has_requirement< typename sink_backend_type::frontend_requirements, synchronized_feeding >::value, "Asynchronous sink frontend is incompatible with the specified backend: thread synchronization requirements are not met");
  197. //! \endcond
  198. #ifndef BOOST_LOG_DOXYGEN_PASS
  199. //! A pointer type that locks the backend until it's destroyed
  200. typedef boost::log::aux::locking_ptr< sink_backend_type, backend_mutex_type > locked_backend_ptr;
  201. #else // BOOST_LOG_DOXYGEN_PASS
  202. //! A pointer type that locks the backend until it's destroyed
  203. typedef implementation_defined locked_backend_ptr;
  204. #endif // BOOST_LOG_DOXYGEN_PASS
  205. private:
  206. //! Synchronization mutex
  207. backend_mutex_type m_BackendMutex;
  208. //! Pointer to the backend
  209. const shared_ptr< sink_backend_type > m_pBackend;
  210. //! Dedicated record feeding thread
  211. thread m_DedicatedFeedingThread;
  212. //! Condition variable to implement blocking operations
  213. condition_variable_any m_BlockCond;
  214. //! Currently active operation
  215. operation m_ActiveOperation;
  216. //! The flag indicates that the feeding loop has to be stopped
  217. boost::atomic< bool > m_StopRequested;
  218. //! The flag indicates that queue flush has been requested
  219. boost::atomic< bool > m_FlushRequested;
  220. public:
  221. /*!
  222. * Default constructor. Constructs the sink backend instance.
  223. * Requires the backend to be default-constructible.
  224. *
  225. * \param start_thread If \c true, the frontend creates a thread to feed
  226. * log records to the backend. Otherwise no thread is
  227. * started and it is assumed that the user will call
  228. * \c run, \c feed_records or \c flush himself.
  229. */
  230. explicit asynchronous_sink(bool start_thread = true) :
  231. base_type(true),
  232. m_pBackend(boost::make_shared< sink_backend_type >()),
  233. m_ActiveOperation(idle),
  234. m_StopRequested(false),
  235. m_FlushRequested(false)
  236. {
  237. if (start_thread)
  238. start_feeding_thread();
  239. }
  240. /*!
  241. * Constructor attaches user-constructed backend instance
  242. *
  243. * \param backend Pointer to the backend instance.
  244. * \param start_thread If \c true, the frontend creates a thread to feed
  245. * log records to the backend. Otherwise no thread is
  246. * started and it is assumed that the user will call
  247. * \c run, \c feed_records or \c flush himself.
  248. *
  249. * \pre \a backend is not \c NULL.
  250. */
  251. explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, bool start_thread = true) :
  252. base_type(true),
  253. m_pBackend(backend),
  254. m_ActiveOperation(idle),
  255. m_StopRequested(false),
  256. m_FlushRequested(false)
  257. {
  258. if (start_thread)
  259. start_feeding_thread();
  260. }
  261. /*!
  262. * Constructor that passes arbitrary named parameters to the interprocess sink backend constructor.
  263. * Refer to the backend documentation for the list of supported parameters.
  264. *
  265. * The frontend uses the following named parameters:
  266. *
  267. * \li start_thread - If \c true, the frontend creates a thread to feed
  268. * log records to the backend. Otherwise no thread is
  269. * started and it is assumed that the user will call
  270. * \c run, \c feed_records or \c flush himself.
  271. */
  272. #ifndef BOOST_LOG_DOXYGEN_PASS
  273. BOOST_LOG_PARAMETRIZED_CONSTRUCTORS_GEN(BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL, ~)
  274. #else
  275. template< typename... Args >
  276. explicit asynchronous_sink(Args&&... args);
  277. #endif
  278. /*!
  279. * Destructor. Implicitly stops the dedicated feeding thread, if one is running.
  280. */
  281. ~asynchronous_sink() BOOST_NOEXCEPT BOOST_OVERRIDE
  282. {
  283. try
  284. {
  285. boost::this_thread::disable_interruption no_interrupts;
  286. stop();
  287. }
  288. catch (...)
  289. {
  290. std::terminate();
  291. }
  292. }
  293. /*!
  294. * Locking accessor to the attached backend
  295. */
  296. locked_backend_ptr locked_backend()
  297. {
  298. return locked_backend_ptr(m_pBackend, m_BackendMutex);
  299. }
  300. /*!
  301. * Enqueues the log record to the backend
  302. */
  303. void consume(record_view const& rec) BOOST_OVERRIDE
  304. {
  305. if (BOOST_UNLIKELY(m_FlushRequested.load(boost::memory_order_acquire)))
  306. {
  307. unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
  308. // Wait until flush is done
  309. while (m_FlushRequested.load(boost::memory_order_acquire))
  310. m_BlockCond.wait(lock);
  311. }
  312. queue_base_type::enqueue(rec);
  313. }
  314. /*!
  315. * The method attempts to pass logging record to the backend
  316. */
  317. bool try_consume(record_view const& rec) BOOST_OVERRIDE
  318. {
  319. if (!m_FlushRequested.load(boost::memory_order_acquire))
  320. {
  321. return queue_base_type::try_enqueue(rec);
  322. }
  323. else
  324. return false;
  325. }
  326. /*!
  327. * The method starts record feeding loop and effectively blocks until either of this happens:
  328. *
  329. * \li the thread is interrupted due to either standard thread interruption or a call to \c stop
  330. * \li an exception is thrown while processing a log record in the backend, and the exception is
  331. * not terminated by the exception handler, if one is installed
  332. *
  333. * \pre The sink frontend must be constructed without spawning a dedicated thread
  334. */
  335. void run()
  336. {
  337. // First check that no other thread is running
  338. {
  339. unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
  340. if (start_feeding_operation(lock, feeding_records))
  341. return;
  342. }
  343. scoped_feeding_operation guard(*this);
  344. // Now start the feeding loop
  345. while (true)
  346. {
  347. do_feed_records();
  348. if (!m_StopRequested.load(boost::memory_order_acquire))
  349. {
  350. // Block until new record is available
  351. record_view rec;
  352. if (queue_base_type::dequeue_ready(rec))
  353. base_type::feed_record(rec, m_BackendMutex, *m_pBackend);
  354. }
  355. else
  356. break;
  357. }
  358. }
  359. /*!
  360. * The method softly interrupts record feeding loop. This method must be called when \c run,
  361. * \c feed_records or \c flush method execution has to be interrupted. Unlike regular thread
  362. * interruption, calling \c stop will not interrupt the record processing in the middle.
  363. * Instead, the sink frontend will attempt to finish its business with the record in progress
  364. * and return afterwards. This method can be called either if the sink was created with
  365. * an internal dedicated thread, or if the feeding loop was initiated by user.
  366. *
  367. * If no record feeding operation is in progress, calling \c stop marks the sink frontend
  368. * so that the next feeding operation stops immediately.
  369. *
  370. * \note Returning from this method does not guarantee that there are no records left buffered
  371. * in the sink frontend. It is possible that log records keep coming during and after this
  372. * method is called. At some point of execution of this method log records stop being processed,
  373. * and all records that come after this point are put into the queue. These records will be
  374. * processed upon further calls to \c run or \c feed_records.
  375. *
  376. * \note If the record feeding loop is being run in a user's thread (i.e. \c start_thread was specified
  377. * as \c false on frontend construction), this method does not guarantee that upon return the thread
  378. * has returned from the record feeding loop or that it won't enter it in the future. The method
  379. * only ensures that the record feeding thread will eventually return from the feeding loop. It is
  380. * user's responsibility to synchronize with the user's record feeding thread.
  381. */
  382. void stop()
  383. {
  384. boost::thread feeding_thread;
  385. {
  386. lock_guard< frontend_mutex_type > lock(base_type::frontend_mutex());
  387. m_StopRequested.store(true, boost::memory_order_release);
  388. queue_base_type::interrupt_dequeue();
  389. m_DedicatedFeedingThread.swap(feeding_thread);
  390. }
  391. if (feeding_thread.joinable())
  392. feeding_thread.join();
  393. }
  394. /*!
  395. * The method feeds log records that may have been buffered to the backend and returns
  396. *
  397. * \pre The sink frontend must be constructed without spawning a dedicated thread
  398. */
  399. void feed_records()
  400. {
  401. // First check that no other thread is running
  402. {
  403. unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
  404. if (start_feeding_operation(lock, feeding_records))
  405. return;
  406. }
  407. scoped_feeding_operation guard(*this);
  408. // Now start the feeding loop
  409. do_feed_records();
  410. }
  411. /*!
  412. * The method feeds all log records that may have been buffered to the backend and returns.
  413. * Unlike \c feed_records, in case of ordering queueing the method also feeds records
  414. * that were enqueued during the ordering window, attempting to drain the queue completely.
  415. */
  416. void flush() BOOST_OVERRIDE
  417. {
  418. {
  419. unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
  420. if (static_cast< unsigned int >(m_ActiveOperation & feeding_records) != 0u)
  421. {
  422. // There is already a thread feeding records, let it do the job
  423. m_FlushRequested.store(true, boost::memory_order_release);
  424. queue_base_type::interrupt_dequeue();
  425. while (!m_StopRequested.load(boost::memory_order_acquire) && m_FlushRequested.load(boost::memory_order_acquire))
  426. m_BlockCond.wait(lock);
  427. // The condition may have been signalled when the feeding operation was finishing.
  428. // In that case records may not have been flushed, and we do the flush ourselves.
  429. if (m_ActiveOperation != idle)
  430. return;
  431. }
  432. m_ActiveOperation = flushing;
  433. m_FlushRequested.store(true, boost::memory_order_relaxed);
  434. }
  435. scoped_feeding_operation guard(*this);
  436. do_feed_records();
  437. }
  438. private:
  439. #ifndef BOOST_LOG_DOXYGEN_PASS
  440. //! The method spawns record feeding thread
  441. void start_feeding_thread()
  442. {
  443. boost::thread(run_func(this)).swap(m_DedicatedFeedingThread);
  444. }
  445. //! Starts record feeding operation. The method blocks or throws if another feeding operation is in progress.
  446. bool start_feeding_operation(unique_lock< frontend_mutex_type >& lock, operation op)
  447. {
  448. while (m_ActiveOperation != idle)
  449. {
  450. if (BOOST_UNLIKELY(op == feeding_records && m_ActiveOperation == feeding_records))
  451. BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread");
  452. if (BOOST_UNLIKELY(m_StopRequested.load(boost::memory_order_relaxed)))
  453. {
  454. m_StopRequested.store(false, boost::memory_order_relaxed);
  455. return true;
  456. }
  457. m_BlockCond.wait(lock);
  458. }
  459. m_ActiveOperation = op;
  460. return false;
  461. }
  462. //! Completes record feeding operation
  463. void complete_feeding_operation() BOOST_NOEXCEPT
  464. {
  465. try
  466. {
  467. lock_guard< frontend_mutex_type > lock(base_type::frontend_mutex());
  468. m_ActiveOperation = idle;
  469. m_StopRequested.store(false, boost::memory_order_relaxed);
  470. m_BlockCond.notify_all();
  471. }
  472. catch (...)
  473. {
  474. }
  475. }
  476. //! The record feeding loop
  477. void do_feed_records()
  478. {
  479. while (!m_StopRequested.load(boost::memory_order_acquire))
  480. {
  481. record_view rec;
  482. bool dequeued = false;
  483. if (BOOST_LIKELY(!m_FlushRequested.load(boost::memory_order_acquire)))
  484. dequeued = queue_base_type::try_dequeue_ready(rec);
  485. else
  486. dequeued = queue_base_type::try_dequeue(rec);
  487. if (dequeued)
  488. base_type::feed_record(rec, m_BackendMutex, *m_pBackend);
  489. else
  490. break;
  491. }
  492. if (BOOST_UNLIKELY(m_FlushRequested.load(boost::memory_order_acquire)))
  493. {
  494. scoped_flag guard(base_type::frontend_mutex(), m_BlockCond, m_FlushRequested);
  495. base_type::flush_backend(m_BackendMutex, *m_pBackend);
  496. }
  497. }
  498. #endif // BOOST_LOG_DOXYGEN_PASS
  499. };
  500. #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1
  501. #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N
  502. #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL
  503. } // namespace sinks
  504. BOOST_LOG_CLOSE_NAMESPACE // namespace log
  505. } // namespace boost
  506. #include <boost/log/detail/footer.hpp>
  507. #endif // BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_