123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449 |
- // Copyright Oliver Kowalke 2016.
- // Distributed under the Boost Software License, Version 1.0.
- // (See accompanying file LICENSE_1_0.txt or copy at
- // http://www.boost.org/LICENSE_1_0.txt)
- //
- #ifndef BOOST_FIBERS_BUFFERED_CHANNEL_H
- #define BOOST_FIBERS_BUFFERED_CHANNEL_H
- #include <atomic>
- #include <chrono>
- #include <cstddef>
- #include <cstdint>
- #include <memory>
- #include <type_traits>
- #include <boost/config.hpp>
- #include <boost/fiber/channel_op_status.hpp>
- #include <boost/fiber/context.hpp>
- #include <boost/fiber/waker.hpp>
- #include <boost/fiber/detail/config.hpp>
- #include <boost/fiber/detail/convert.hpp>
- #include <boost/fiber/detail/spinlock.hpp>
- #include <boost/fiber/exceptions.hpp>
- #ifdef BOOST_HAS_ABI_HEADERS
- # include BOOST_ABI_PREFIX
- #endif
- namespace boost {
- namespace fibers {
- template< typename T >
- class buffered_channel {
- public:
- using value_type = typename std::remove_reference<T>::type;
- private:
- using slot_type = value_type;
- mutable detail::spinlock splk_{};
- wait_queue waiting_producers_{};
- wait_queue waiting_consumers_{};
- slot_type * slots_;
- std::size_t pidx_{ 0 };
- std::size_t cidx_{ 0 };
- std::size_t capacity_;
- bool closed_{ false };
- bool is_full_() const noexcept {
- return cidx_ == ((pidx_ + 1) % capacity_);
- }
- bool is_empty_() const noexcept {
- return cidx_ == pidx_;
- }
- bool is_closed_() const noexcept {
- return closed_;
- }
- public:
- explicit buffered_channel( std::size_t capacity) :
- capacity_{ capacity } {
- if ( BOOST_UNLIKELY( 2 > capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) ) {
- throw fiber_error{ std::make_error_code( std::errc::invalid_argument),
- "boost fiber: buffer capacity is invalid" };
- }
- slots_ = new slot_type[capacity_];
- }
- ~buffered_channel() {
- close();
- delete [] slots_;
- }
- buffered_channel( buffered_channel const&) = delete;
- buffered_channel & operator=( buffered_channel const&) = delete;
- bool is_closed() const noexcept {
- detail::spinlock_lock lk{splk_, std::defer_lock};
- for(;;) {
- if(lk.try_lock())
- break;
- context::active()->yield();
- }
- return is_closed_();
- }
- void close() noexcept {
- detail::spinlock_lock lk{splk_, std::defer_lock};
- for(;;) {
- if(lk.try_lock())
- break;
- context::active()->yield();
- }
- if ( ! closed_) {
- closed_ = true;
- waiting_producers_.notify_all();
- waiting_consumers_.notify_all();
- }
- }
- channel_op_status try_push( value_type const& value) {
- detail::spinlock_lock lk{splk_, std::defer_lock};
- for(;;) {
- if(lk.try_lock())
- break;
- context::active()->yield();
- }
- if ( BOOST_UNLIKELY( is_closed_() ) ) {
- return channel_op_status::closed;
- }
- if ( is_full_() ) {
- return channel_op_status::full;
- }
- slots_[pidx_] = value;
- pidx_ = (pidx_ + 1) % capacity_;
- waiting_consumers_.notify_one();
- return channel_op_status::success;
- }
- channel_op_status try_push( value_type && value) {
-
- detail::spinlock_lock lk{splk_, std::defer_lock};
- for(;;) {
- if(lk.try_lock())
- break;
- context::active()->yield();
- }
- if ( BOOST_UNLIKELY( is_closed_() ) ) {
- return channel_op_status::closed;
- }
- if ( is_full_() ) {
- return channel_op_status::full;
- }
- slots_[pidx_] = std::move( value);
- pidx_ = (pidx_ + 1) % capacity_;
- waiting_consumers_.notify_one();
- return channel_op_status::success;
- }
- channel_op_status push( value_type const& value) {
- context * active_ctx = context::active();
- for (;;) {
- detail::spinlock_lock lk{splk_, std::try_to_lock};
- if (!lk) {
- active_ctx->yield();
- continue;
- }
- if ( BOOST_UNLIKELY( is_closed_() ) ) {
- return channel_op_status::closed;
- }
- if ( is_full_() ) {
- waiting_producers_.suspend_and_wait( lk, active_ctx);
- } else {
- slots_[pidx_] = value;
- pidx_ = (pidx_ + 1) % capacity_;
- waiting_consumers_.notify_one();
- return channel_op_status::success;
- }
- }
- }
- channel_op_status push( value_type && value) {
- context * active_ctx = context::active();
- for (;;) {
- detail::spinlock_lock lk{splk_, std::try_to_lock};
- if (!lk) {
- active_ctx->yield();
- continue;
- }
- if ( BOOST_UNLIKELY( is_closed_() ) ) {
- return channel_op_status::closed;
- }
- if ( is_full_() ) {
- waiting_producers_.suspend_and_wait( lk, active_ctx);
- } else {
- slots_[pidx_] = std::move( value);
- pidx_ = (pidx_ + 1) % capacity_;
- waiting_consumers_.notify_one();
- return channel_op_status::success;
- }
- }
- }
- template< typename Rep, typename Period >
- channel_op_status push_wait_for( value_type const& value,
- std::chrono::duration< Rep, Period > const& timeout_duration) {
- return push_wait_until( value,
- std::chrono::steady_clock::now() + timeout_duration);
- }
- template< typename Rep, typename Period >
- channel_op_status push_wait_for( value_type && value,
- std::chrono::duration< Rep, Period > const& timeout_duration) {
- return push_wait_until( std::forward< value_type >( value),
- std::chrono::steady_clock::now() + timeout_duration);
- }
- template< typename Clock, typename Duration >
- channel_op_status push_wait_until( value_type const& value,
- std::chrono::time_point< Clock, Duration > const& timeout_time_) {
- context * active_ctx = context::active();
- std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
- for (;;) {
- detail::spinlock_lock lk{splk_, std::try_to_lock};
- if (!lk) {
- active_ctx->yield();
- continue;
- }
- if ( BOOST_UNLIKELY( is_closed_() ) ) {
- return channel_op_status::closed;
- }
- if ( is_full_() ) {
- if ( ! waiting_producers_.suspend_and_wait_until( lk, active_ctx, timeout_time)) {
- return channel_op_status::timeout;
- }
- } else {
- slots_[pidx_] = value;
- pidx_ = (pidx_ + 1) % capacity_;
- waiting_consumers_.notify_one();
- return channel_op_status::success;
- }
- }
- }
- template< typename Clock, typename Duration >
- channel_op_status push_wait_until( value_type && value,
- std::chrono::time_point< Clock, Duration > const& timeout_time_) {
- context * active_ctx = context::active();
- std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
- for (;;) {
- detail::spinlock_lock lk{splk_, std::try_to_lock};
- if (!lk) {
- active_ctx->yield();
- continue;
- }
- if ( BOOST_UNLIKELY( is_closed_() ) ) {
- return channel_op_status::closed;
- }
- if ( is_full_() ) {
- if ( ! waiting_producers_.suspend_and_wait_until( lk, active_ctx, timeout_time)) {
- return channel_op_status::timeout;
- }
- } else {
- slots_[pidx_] = std::move( value);
- pidx_ = (pidx_ + 1) % capacity_;
- // notify one waiting consumer
- waiting_consumers_.notify_one();
- return channel_op_status::success;
- }
- }
- }
- channel_op_status try_pop( value_type & value) {
- detail::spinlock_lock lk{splk_, std::defer_lock};
- for(;;) {
- if(lk.try_lock())
- break;
- context::active()->yield();
- }
- if ( is_empty_() ) {
- return is_closed_()
- ? channel_op_status::closed
- : channel_op_status::empty;
- }
- value = std::move( slots_[cidx_]);
- cidx_ = (cidx_ + 1) % capacity_;
- waiting_producers_.notify_one();
- return channel_op_status::success;
- }
- channel_op_status pop( value_type & value) {
- context * active_ctx = context::active();
- for (;;) {
- detail::spinlock_lock lk{splk_, std::try_to_lock};
- if (!lk) {
- active_ctx->yield();
- continue;
- }
- if ( is_empty_() ) {
- if ( BOOST_UNLIKELY( is_closed_() ) ) {
- return channel_op_status::closed;
- }
- waiting_consumers_.suspend_and_wait( lk, active_ctx);
- } else {
- value = std::move( slots_[cidx_]);
- cidx_ = (cidx_ + 1) % capacity_;
- waiting_producers_.notify_one();
- return channel_op_status::success;
- }
- }
- }
- value_type value_pop() {
- context * active_ctx = context::active();
- for (;;) {
- detail::spinlock_lock lk{splk_, std::try_to_lock};
- if (!lk) {
- active_ctx->yield();
- continue;
- }
- if ( is_empty_() ) {
- if ( BOOST_UNLIKELY( is_closed_() ) ) {
- throw fiber_error{
- std::make_error_code( std::errc::operation_not_permitted),
- "boost fiber: channel is closed" };
- }
- waiting_consumers_.suspend_and_wait( lk, active_ctx);
- } else {
- value_type value = std::move( slots_[cidx_]);
- cidx_ = (cidx_ + 1) % capacity_;
- waiting_producers_.notify_one();
- return value;
- }
- }
- }
- template< typename Rep, typename Period >
- channel_op_status pop_wait_for( value_type & value,
- std::chrono::duration< Rep, Period > const& timeout_duration) {
- return pop_wait_until( value,
- std::chrono::steady_clock::now() + timeout_duration);
- }
- template< typename Clock, typename Duration >
- channel_op_status pop_wait_until( value_type & value,
- std::chrono::time_point< Clock, Duration > const& timeout_time_) {
- context * active_ctx = context::active();
- std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
- for (;;) {
- detail::spinlock_lock lk{splk_, std::try_to_lock};
- if (!lk) {
- active_ctx->yield();
- continue;
- }
- if ( is_empty_() ) {
- if ( BOOST_UNLIKELY( is_closed_() ) ) {
- return channel_op_status::closed;
- }
- if ( ! waiting_consumers_.suspend_and_wait_until( lk, active_ctx, timeout_time)) {
- return channel_op_status::timeout;
- }
- } else {
- value = std::move( slots_[cidx_]);
- cidx_ = (cidx_ + 1) % capacity_;
- waiting_producers_.notify_one();
- return channel_op_status::success;
- }
- }
- }
- class iterator {
- private:
- typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
- buffered_channel * chan_{ nullptr };
- storage_type storage_;
- void increment_( bool initial = false) {
- BOOST_ASSERT( nullptr != chan_);
- try {
- if ( ! initial) {
- reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
- }
- ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
- } catch ( fiber_error const&) {
- chan_ = nullptr;
- }
- }
- public:
- using iterator_category = std::input_iterator_tag;
- using difference_type = std::ptrdiff_t;
- using pointer = value_type *;
- using reference = value_type &;
- using pointer_t = pointer;
- using reference_t = reference;
- iterator() = default;
- explicit iterator( buffered_channel< T > * chan) noexcept :
- chan_{ chan } {
- increment_( true);
- }
- iterator( iterator const& other) noexcept :
- chan_{ other.chan_ } {
- }
- iterator & operator=( iterator const& other) noexcept {
- if ( BOOST_LIKELY( this != & other) ) {
- chan_ = other.chan_;
- }
- return * this;
- }
- bool operator==( iterator const& other) const noexcept {
- return other.chan_ == chan_;
- }
- bool operator!=( iterator const& other) const noexcept {
- return other.chan_ != chan_;
- }
- iterator & operator++() {
- reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
- increment_();
- return * this;
- }
- const iterator operator++( int) = delete;
- reference_t operator*() noexcept {
- return * reinterpret_cast< value_type * >( std::addressof( storage_) );
- }
- pointer_t operator->() noexcept {
- return reinterpret_cast< value_type * >( std::addressof( storage_) );
- }
- };
- friend class iterator;
- };
- template< typename T >
- typename buffered_channel< T >::iterator
- begin( buffered_channel< T > & chan) {
- return typename buffered_channel< T >::iterator( & chan);
- }
- template< typename T >
- typename buffered_channel< T >::iterator
- end( buffered_channel< T > &) {
- return typename buffered_channel< T >::iterator();
- }
- }}
- #ifdef BOOST_HAS_ABI_HEADERS
- # include BOOST_ABI_SUFFIX
- #endif
- #endif // BOOST_FIBERS_BUFFERED_CHANNEL_H
|