threadsafe_queue.hpp 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. /*
  2. * Copyright Andrey Semashev 2007 - 2021.
  3. * Distributed under the Boost Software License, Version 1.0.
  4. * (See accompanying file LICENSE_1_0.txt or copy at
  5. * http://www.boost.org/LICENSE_1_0.txt)
  6. */
  7. /*!
  8. * \file threadsafe_queue.hpp
  9. * \author Andrey Semashev
  10. * \date 05.11.2010
  11. *
  12. * \brief This header is the Boost.Log library implementation, see the library documentation
  13. * at http://www.boost.org/doc/libs/release/libs/log/doc/html/index.html.
  14. */
  15. #ifndef BOOST_LOG_DETAIL_THREADSAFE_QUEUE_HPP_INCLUDED_
  16. #define BOOST_LOG_DETAIL_THREADSAFE_QUEUE_HPP_INCLUDED_
  17. #include <boost/log/detail/config.hpp>
  18. #ifdef BOOST_HAS_PRAGMA_ONCE
  19. #pragma once
  20. #endif
  21. #ifndef BOOST_LOG_NO_THREADS
  22. #include <new>
  23. #include <memory>
  24. #include <cstddef>
  25. #include <boost/atomic/atomic.hpp>
  26. #include <boost/move/core.hpp>
  27. #include <boost/move/utility_core.hpp>
  28. #include <boost/type_traits/alignment_of.hpp>
  29. #include <boost/type_traits/aligned_storage.hpp>
  30. #include <boost/log/utility/use_std_allocator.hpp>
  31. #include <boost/log/detail/allocator_traits.hpp>
  32. #include <boost/log/detail/header.hpp>
  33. namespace boost {
  34. BOOST_LOG_OPEN_NAMESPACE
  35. namespace aux {
  36. //! Base class for the thread-safe queue implementation
  37. class threadsafe_queue_impl
  38. {
  39. public:
  40. struct node_base
  41. {
  42. boost::atomic< node_base* > next;
  43. };
  44. protected:
  45. threadsafe_queue_impl();
  46. ~threadsafe_queue_impl();
  47. public:
  48. static BOOST_LOG_API threadsafe_queue_impl* create(node_base* first_node);
  49. static BOOST_LOG_API void destroy(threadsafe_queue_impl* impl) BOOST_NOEXCEPT;
  50. static BOOST_LOG_API node_base* reset_last_node(threadsafe_queue_impl* impl) BOOST_NOEXCEPT;
  51. static BOOST_LOG_API bool unsafe_empty(const threadsafe_queue_impl* impl) BOOST_NOEXCEPT;
  52. static BOOST_LOG_API void push(threadsafe_queue_impl* impl, node_base* p);
  53. static BOOST_LOG_API bool try_pop(threadsafe_queue_impl* impl, node_base*& node_to_free, node_base*& node_with_value);
  54. // Copying and assignment is prohibited
  55. BOOST_DELETED_FUNCTION(threadsafe_queue_impl(threadsafe_queue_impl const&))
  56. BOOST_DELETED_FUNCTION(threadsafe_queue_impl& operator= (threadsafe_queue_impl const&))
  57. };
  58. //! Thread-safe queue node type
  59. template< typename T >
  60. struct threadsafe_queue_node :
  61. public threadsafe_queue_impl::node_base
  62. {
  63. typedef typename aligned_storage< sizeof(T), alignment_of< T >::value >::type storage_type;
  64. storage_type storage;
  65. BOOST_DEFAULTED_FUNCTION(threadsafe_queue_node(), {})
  66. explicit threadsafe_queue_node(T const& val) { new (storage.address()) T(val); }
  67. T& value() BOOST_NOEXCEPT { return *static_cast< T* >(storage.address()); }
  68. void destroy() BOOST_NOEXCEPT { static_cast< T* >(storage.address())->~T(); }
  69. // Copying and assignment is prohibited
  70. BOOST_DELETED_FUNCTION(threadsafe_queue_node(threadsafe_queue_node const&))
  71. BOOST_DELETED_FUNCTION(threadsafe_queue_node& operator= (threadsafe_queue_node const&))
  72. };
  73. /*!
  74. * \brief An unbounded thread-safe queue
  75. *
  76. * The implementation is based on algorithms published in the "Simple, Fast,
  77. * and Practical Non-Blocking and Blocking Concurrent Queue Algorithms" article
  78. * in PODC96 by Maged M. Michael and Michael L. Scott. Pseudocode is available here:
  79. * http://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
  80. *
  81. * The implementation provides thread-safe \c push and \c try_pop operations, as well as
  82. * a thread-unsafe \c empty operation. The queue imposes the following requirements
  83. * on the element type:
  84. *
  85. * \li Default constructible, the default constructor must not throw.
  86. * \li Copy constructible.
  87. * \li Movable (i.e. there should be an efficient move assignment for this type).
  88. *
  89. * The last requirement is not mandatory but is crucial for decent performance.
  90. */
  91. template< typename T, typename AllocatorT = use_std_allocator >
  92. class threadsafe_queue :
  93. private boost::log::aux::rebind_alloc< AllocatorT, threadsafe_queue_node< T > >::type
  94. {
  95. private:
  96. typedef threadsafe_queue_node< T > node;
  97. public:
  98. typedef typename boost::log::aux::rebind_alloc< AllocatorT, node >::type allocator_type;
  99. typedef T value_type;
  100. typedef T& reference;
  101. typedef T const& const_reference;
  102. typedef T* pointer;
  103. typedef T const* const_pointer;
  104. typedef std::ptrdiff_t difference_type;
  105. typedef std::size_t size_type;
  106. private:
  107. typedef boost::log::aux::allocator_traits< allocator_type > alloc_traits;
  108. //! A simple scope guard to automate memory reclaiming
  109. struct auto_deallocate;
  110. friend struct auto_deallocate;
  111. struct auto_deallocate
  112. {
  113. auto_deallocate(allocator_type* alloc, node* dealloc, node* destr) BOOST_NOEXCEPT :
  114. m_pAllocator(alloc),
  115. m_pDeallocate(dealloc),
  116. m_pDestroy(destr)
  117. {
  118. }
  119. ~auto_deallocate() BOOST_NOEXCEPT
  120. {
  121. alloc_traits::destroy(*m_pAllocator, m_pDeallocate);
  122. alloc_traits::deallocate(*m_pAllocator, m_pDeallocate, 1);
  123. m_pDestroy->destroy();
  124. }
  125. private:
  126. allocator_type* m_pAllocator;
  127. node* m_pDeallocate;
  128. node* m_pDestroy;
  129. };
  130. public:
  131. /*!
  132. * Default constructor, creates an empty queue. Unlike most containers,
  133. * the constructor requires memory allocation.
  134. *
  135. * \throw std::bad_alloc if there is not sufficient memory
  136. */
  137. threadsafe_queue(allocator_type const& alloc = allocator_type()) :
  138. allocator_type(alloc)
  139. {
  140. node* p = alloc_traits::allocate(get_allocator(), 1);
  141. if (BOOST_LIKELY(!!p))
  142. {
  143. try
  144. {
  145. alloc_traits::construct(get_allocator(), p);
  146. try
  147. {
  148. m_pImpl = threadsafe_queue_impl::create(p);
  149. }
  150. catch (...)
  151. {
  152. alloc_traits::destroy(get_allocator(), p);
  153. throw;
  154. }
  155. }
  156. catch (...)
  157. {
  158. alloc_traits::deallocate(get_allocator(), p, 1);
  159. throw;
  160. }
  161. }
  162. else
  163. throw std::bad_alloc();
  164. }
  165. /*!
  166. * Destructor
  167. */
  168. ~threadsafe_queue() BOOST_NOEXCEPT
  169. {
  170. // Clear the queue
  171. if (!unsafe_empty())
  172. {
  173. value_type value;
  174. while (try_pop(value)) {}
  175. }
  176. // Remove the last dummy node
  177. node* p = static_cast< node* >(threadsafe_queue_impl::reset_last_node(m_pImpl));
  178. alloc_traits::destroy(get_allocator(), p);
  179. alloc_traits::deallocate(get_allocator(), p, 1);
  180. threadsafe_queue_impl::destroy(m_pImpl);
  181. }
  182. /*!
  183. * Checks if the queue is empty. Not thread-safe, the returned result may not be actual.
  184. */
  185. bool unsafe_empty() const BOOST_NOEXCEPT { return threadsafe_queue_impl::unsafe_empty(m_pImpl); }
  186. /*!
  187. * Puts a new element to the end of the queue. Thread-safe, can be called
  188. * concurrently by several threads, and concurrently with the \c pop operation.
  189. */
  190. void push(const_reference value)
  191. {
  192. node* p = alloc_traits::allocate(get_allocator(), 1);
  193. if (BOOST_LIKELY(!!p))
  194. {
  195. try
  196. {
  197. alloc_traits::construct(get_allocator(), p, value);
  198. }
  199. catch (...)
  200. {
  201. alloc_traits::deallocate(get_allocator(), p, 1);
  202. throw;
  203. }
  204. threadsafe_queue_impl::push(m_pImpl, p);
  205. }
  206. else
  207. throw std::bad_alloc();
  208. }
  209. /*!
  210. * Attempts to pop an element from the beginning of the queue. Thread-safe, can
  211. * be called concurrently with the \c push operation. Should not be called by
  212. * several threads concurrently.
  213. */
  214. bool try_pop(reference value)
  215. {
  216. threadsafe_queue_impl::node_base *dealloc, *destr;
  217. if (threadsafe_queue_impl::try_pop(m_pImpl, dealloc, destr))
  218. {
  219. node* p = static_cast< node* >(destr);
  220. auto_deallocate guard(static_cast< allocator_type* >(this), static_cast< node* >(dealloc), p);
  221. value = boost::move(p->value());
  222. return true;
  223. }
  224. else
  225. return false;
  226. }
  227. // Copying and assignment is prohibited
  228. BOOST_DELETED_FUNCTION(threadsafe_queue(threadsafe_queue const&))
  229. BOOST_DELETED_FUNCTION(threadsafe_queue& operator= (threadsafe_queue const&))
  230. private:
  231. //! Returns the allocator instance
  232. allocator_type& get_allocator() BOOST_NOEXCEPT { return *static_cast< allocator_type* >(this); }
  233. private:
  234. //! Pointer to the implementation
  235. threadsafe_queue_impl* m_pImpl;
  236. };
  237. } // namespace aux
  238. BOOST_LOG_CLOSE_NAMESPACE // namespace log
  239. } // namespace boost
  240. #include <boost/log/detail/footer.hpp>
  241. #endif // BOOST_LOG_NO_THREADS
  242. #endif // BOOST_LOG_DETAIL_THREADSAFE_QUEUE_HPP_INCLUDED_