123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369 |
- // Copyright (C) 2014 Ian Forbed
- // Copyright (C) 2014-2017 Vicente J. Botet Escriba
- //
- // Distributed under the Boost Software License, Version 1.0. (See accompanying
- // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
- //
- #ifndef BOOST_THREAD_SYNC_PRIORITY_QUEUE
- #define BOOST_THREAD_SYNC_PRIORITY_QUEUE
- #include <boost/thread/detail/config.hpp>
- #include <boost/thread/concurrent_queues/detail/sync_queue_base.hpp>
- #include <boost/thread/concurrent_queues/queue_op_status.hpp>
- #include <boost/thread/condition_variable.hpp>
- #include <boost/thread/csbl/vector.hpp>
- #include <boost/thread/detail/move.hpp>
- #include <boost/thread/mutex.hpp>
- #include <boost/atomic.hpp>
- #include <boost/chrono/duration.hpp>
- #include <boost/chrono/time_point.hpp>
- #include <exception>
- #include <queue>
- #include <utility>
- #include <boost/config/abi_prefix.hpp>
- namespace boost
- {
- namespace detail {
- template <
- class Type,
- class Container = csbl::vector<Type>,
- class Compare = std::less<Type>
- >
- class priority_queue
- {
- private:
- Container _elements;
- Compare _compare;
- public:
- typedef Type value_type;
- typedef typename Container::size_type size_type;
- explicit priority_queue(const Compare& compare = Compare())
- : _elements(), _compare(compare)
- { }
- size_type size() const
- {
- return _elements.size();
- }
- bool empty() const
- {
- return _elements.empty();
- }
- void push(Type const& element)
- {
- _elements.push_back(element);
- std::push_heap(_elements.begin(), _elements.end(), _compare);
- }
- void push(BOOST_RV_REF(Type) element)
- {
- _elements.push_back(boost::move(element));
- std::push_heap(_elements.begin(), _elements.end(), _compare);
- }
- void pop()
- {
- std::pop_heap(_elements.begin(), _elements.end(), _compare);
- _elements.pop_back();
- }
- Type pull()
- {
- Type result = boost::move(_elements.front());
- pop();
- return boost::move(result);
- }
- Type const& top() const
- {
- return _elements.front();
- }
- };
- }
- namespace concurrent
- {
- template <class ValueType,
- class Container = csbl::vector<ValueType>,
- class Compare = std::less<typename Container::value_type> >
- class sync_priority_queue
- : public detail::sync_queue_base<ValueType, boost::detail::priority_queue<ValueType,Container,Compare> >
- {
- typedef detail::sync_queue_base<ValueType, boost::detail::priority_queue<ValueType,Container,Compare> > super;
- public:
- typedef ValueType value_type;
- //typedef typename super::value_type value_type; // fixme
- typedef typename super::underlying_queue_type underlying_queue_type;
- typedef typename super::size_type size_type;
- typedef typename super::op_status op_status;
- typedef chrono::steady_clock clock;
- protected:
- public:
- sync_priority_queue() {}
- ~sync_priority_queue()
- {
- if(!super::closed())
- {
- super::close();
- }
- }
- void push(const ValueType& elem);
- void push(BOOST_THREAD_RV_REF(ValueType) elem);
- queue_op_status try_push(const ValueType& elem);
- queue_op_status try_push(BOOST_THREAD_RV_REF(ValueType) elem);
- ValueType pull();
- void pull(ValueType&);
- template <class WClock, class Duration>
- queue_op_status pull_until(const chrono::time_point<WClock,Duration>&, ValueType&);
- template <class Rep, class Period>
- queue_op_status pull_for(const chrono::duration<Rep,Period>&, ValueType&);
- queue_op_status try_pull(ValueType& elem);
- queue_op_status wait_pull(ValueType& elem);
- queue_op_status nonblocking_pull(ValueType&);
- private:
- void push(unique_lock<mutex>&, const ValueType& elem);
- void push(lock_guard<mutex>&, const ValueType& elem);
- void push(unique_lock<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);
- void push(lock_guard<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);
- queue_op_status try_push(unique_lock<mutex>&, const ValueType& elem);
- queue_op_status try_push(unique_lock<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);
- ValueType pull(unique_lock<mutex>&);
- ValueType pull(lock_guard<mutex>&);
- void pull(unique_lock<mutex>&, ValueType&);
- void pull(lock_guard<mutex>&, ValueType&);
- queue_op_status try_pull(lock_guard<mutex>& lk, ValueType& elem);
- queue_op_status try_pull(unique_lock<mutex>& lk, ValueType& elem);
- queue_op_status wait_pull(unique_lock<mutex>& lk, ValueType& elem);
- queue_op_status nonblocking_pull(unique_lock<mutex>& lk, ValueType&);
- sync_priority_queue(const sync_priority_queue&);
- sync_priority_queue& operator= (const sync_priority_queue&);
- sync_priority_queue(BOOST_THREAD_RV_REF(sync_priority_queue));
- sync_priority_queue& operator= (BOOST_THREAD_RV_REF(sync_priority_queue));
- }; //end class
- //////////////////////
- template <class T, class Container,class Cmp>
- void sync_priority_queue<T,Container,Cmp>::push(unique_lock<mutex>& lk, const T& elem)
- {
- super::throw_if_closed(lk);
- super::data_.push(elem);
- super::notify_elem_added(lk);
- }
- template <class T, class Container,class Cmp>
- void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, const T& elem)
- {
- super::throw_if_closed(lk);
- super::data_.push(elem);
- super::notify_elem_added(lk);
- }
- template <class T, class Container,class Cmp>
- void sync_priority_queue<T,Container,Cmp>::push(const T& elem)
- {
- lock_guard<mutex> lk(super::mtx_);
- push(lk, elem);
- }
- //////////////////////
- template <class T, class Container,class Cmp>
- void sync_priority_queue<T,Container,Cmp>::push(unique_lock<mutex>& lk, BOOST_THREAD_RV_REF(T) elem)
- {
- super::throw_if_closed(lk);
- super::data_.push(boost::move(elem));
- super::notify_elem_added(lk);
- }
- template <class T, class Container,class Cmp>
- void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, BOOST_THREAD_RV_REF(T) elem)
- {
- super::throw_if_closed(lk);
- super::data_.push(boost::move(elem));
- super::notify_elem_added(lk);
- }
- template <class T, class Container,class Cmp>
- void sync_priority_queue<T,Container,Cmp>::push(BOOST_THREAD_RV_REF(T) elem)
- {
- lock_guard<mutex> lk(super::mtx_);
- push(lk, boost::move(elem));
- }
- //////////////////////
- template <class T, class Container,class Cmp>
- queue_op_status sync_priority_queue<T,Container,Cmp>::try_push(const T& elem)
- {
- lock_guard<mutex> lk(super::mtx_);
- if (super::closed(lk)) return queue_op_status::closed;
- push(lk, elem);
- return queue_op_status::success;
- }
- //////////////////////
- template <class T, class Container,class Cmp>
- queue_op_status sync_priority_queue<T,Container,Cmp>::try_push(BOOST_THREAD_RV_REF(T) elem)
- {
- lock_guard<mutex> lk(super::mtx_);
- if (super::closed(lk)) return queue_op_status::closed;
- push(lk, boost::move(elem));
- return queue_op_status::success;
- }
- //////////////////////
- template <class T,class Container, class Cmp>
- T sync_priority_queue<T,Container,Cmp>::pull(unique_lock<mutex>&)
- {
- return super::data_.pull();
- }
- template <class T,class Container, class Cmp>
- T sync_priority_queue<T,Container,Cmp>::pull(lock_guard<mutex>&)
- {
- return super::data_.pull();
- }
- template <class T,class Container, class Cmp>
- T sync_priority_queue<T,Container,Cmp>::pull()
- {
- unique_lock<mutex> lk(super::mtx_);
- const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
- if (has_been_closed) super::throw_if_closed(lk);
- return pull(lk);
- }
- //////////////////////
- template <class T,class Container, class Cmp>
- void sync_priority_queue<T,Container,Cmp>::pull(unique_lock<mutex>&, T& elem)
- {
- elem = super::data_.pull();
- }
- template <class T,class Container, class Cmp>
- void sync_priority_queue<T,Container,Cmp>::pull(lock_guard<mutex>&, T& elem)
- {
- elem = super::data_.pull();
- }
- template <class T,class Container, class Cmp>
- void sync_priority_queue<T,Container,Cmp>::pull(T& elem)
- {
- unique_lock<mutex> lk(super::mtx_);
- const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
- if (has_been_closed) super::throw_if_closed(lk);
- pull(lk, elem);
- }
- //////////////////////
- template <class T, class Cont,class Cmp>
- template <class WClock, class Duration>
- queue_op_status
- sync_priority_queue<T,Cont,Cmp>::pull_until(const chrono::time_point<WClock,Duration>& tp, T& elem)
- {
- unique_lock<mutex> lk(super::mtx_);
- const queue_op_status rc = super::wait_until_not_empty_or_closed_until(lk, tp);
- if (rc == queue_op_status::success) pull(lk, elem);
- return rc;
- }
- //////////////////////
- template <class T, class Cont,class Cmp>
- template <class Rep, class Period>
- queue_op_status
- sync_priority_queue<T,Cont,Cmp>::pull_for(const chrono::duration<Rep,Period>& dura, T& elem)
- {
- return pull_until(chrono::steady_clock::now() + dura, elem);
- }
- //////////////////////
- template <class T, class Container,class Cmp>
- queue_op_status
- sync_priority_queue<T,Container,Cmp>::try_pull(unique_lock<mutex>& lk, T& elem)
- {
- if (super::empty(lk))
- {
- if (super::closed(lk)) return queue_op_status::closed;
- return queue_op_status::empty;
- }
- pull(lk, elem);
- return queue_op_status::success;
- }
- template <class T, class Container,class Cmp>
- queue_op_status
- sync_priority_queue<T,Container,Cmp>::try_pull(lock_guard<mutex>& lk, T& elem)
- {
- if (super::empty(lk))
- {
- if (super::closed(lk)) return queue_op_status::closed;
- return queue_op_status::empty;
- }
- pull(lk, elem);
- return queue_op_status::success;
- }
- template <class T, class Container,class Cmp>
- queue_op_status
- sync_priority_queue<T,Container,Cmp>::try_pull(T& elem)
- {
- lock_guard<mutex> lk(super::mtx_);
- return try_pull(lk, elem);
- }
- //////////////////////
- template <class T,class Container, class Cmp>
- queue_op_status sync_priority_queue<T,Container,Cmp>::wait_pull(unique_lock<mutex>& lk, T& elem)
- {
- const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
- if (has_been_closed) return queue_op_status::closed;
- pull(lk, elem);
- return queue_op_status::success;
- }
- template <class T,class Container, class Cmp>
- queue_op_status sync_priority_queue<T,Container,Cmp>::wait_pull(T& elem)
- {
- unique_lock<mutex> lk(super::mtx_);
- return wait_pull(lk, elem);
- }
- //////////////////////
- template <class T,class Container, class Cmp>
- queue_op_status sync_priority_queue<T,Container,Cmp>::nonblocking_pull(T& elem)
- {
- unique_lock<mutex> lk(super::mtx_, try_to_lock);
- if (!lk.owns_lock()) return queue_op_status::busy;
- return try_pull(lk, elem);
- }
- } //end concurrent namespace
- using concurrent::sync_priority_queue;
- } //end boost namespace
- #include <boost/config/abi_suffix.hpp>
- #endif
|