8db97c837c
- in context of #170
285 lines
9.7 KiB
Plaintext
285 lines
9.7 KiB
Plaintext
[/
|
|
Copyright Oliver Kowalke 2013.
|
|
Distributed under the Boost Software License, Version 1.0.
|
|
(See accompanying file LICENSE_1_0.txt or copy at
|
|
http://www.boost.org/LICENSE_1_0.txt
|
|
]
|
|
|
|
[section:buffered_channel Buffered Channel]
|
|
|
|
__boost_fiber__ provides a bounded, buffered channel (MPMC queue) suitable to
|
|
synchonize fibers (running on same or different threads) via asynchronouss
|
|
message passing.
|
|
|
|
typedef boost::fibers::buffered_channel< int > channel_t;
|
|
|
|
void send( channel_t & chan) {
|
|
for ( int i = 0; i < 5; ++i) {
|
|
chan.push( i);
|
|
}
|
|
chan.close();
|
|
}
|
|
|
|
void recv( channel_t & chan) {
|
|
int i;
|
|
while ( boost::fibers::channel_op_status::success == chan.pop(i) ) {
|
|
std::cout << "received " << i << std::endl;
|
|
}
|
|
}
|
|
|
|
channel_t chan{ 2 };
|
|
boost::fibers::fiber f1( std::bind( send, std::ref( chan) ) );
|
|
boost::fibers::fiber f2( std::bind( recv, std::ref( chan) ) );
|
|
|
|
f1.join();
|
|
f2.join();
|
|
|
|
Class `buffered_channel` supports range-for syntax:
|
|
|
|
typedef boost::fibers::buffered_channel< int > channel_t;
|
|
|
|
void foo( channel_t & chan) {
|
|
chan.push( 1);
|
|
chan.push( 1);
|
|
chan.push( 2);
|
|
chan.push( 3);
|
|
chan.push( 5);
|
|
chan.push( 8);
|
|
chan.push( 12);
|
|
chan.close();
|
|
}
|
|
|
|
void bar( channel_t & chan) {
|
|
for ( unsigned int value : chan) {
|
|
std::cout << value << " ";
|
|
}
|
|
std::cout << std::endl;
|
|
}
|
|
|
|
|
|
[template_heading buffered_channel]
|
|
|
|
#include <boost/fiber/buffered_channel.hpp>
|
|
|
|
namespace boost {
|
|
namespace fibers {
|
|
|
|
template< typename T >
|
|
class buffered_channel {
|
|
public:
|
|
typedef T value_type;
|
|
|
|
class iterator;
|
|
|
|
explicit buffered_channel( std::size_t capacity);
|
|
|
|
buffered_channel( buffered_channel const& other) = delete;
|
|
buffered_channel & operator=( buffered_channel const& other) = delete;
|
|
|
|
void close() noexcept;
|
|
|
|
channel_op_status push( value_type const& va);
|
|
channel_op_status push( value_type && va);
|
|
template< typename Rep, typename Period >
|
|
channel_op_status push_wait_for(
|
|
value_type const& va,
|
|
std::chrono::duration< Rep, Period > const& timeout_duration);
|
|
channel_op_status push_wait_for( value_type && va,
|
|
std::chrono::duration< Rep, Period > const& timeout_duration);
|
|
template< typename Clock, typename Duration >
|
|
channel_op_status push_wait_until(
|
|
value_type const& va,
|
|
std::chrono::time_point< Clock, Duration > const& timeout_time);
|
|
template< typename Clock, typename Duration >
|
|
channel_op_status push_wait_until(
|
|
value_type && va,
|
|
std::chrono::time_point< Clock, Duration > const& timeout_time);
|
|
channel_op_status try_push( value_type const& va);
|
|
channel_op_status try_push( value_type && va);
|
|
|
|
channel_op_status pop( value_type & va);
|
|
value_type value_pop();
|
|
template< typename Rep, typename Period >
|
|
channel_op_status pop_wait_for(
|
|
value_type & va,
|
|
std::chrono::duration< Rep, Period > const& timeout_duration);
|
|
template< typename Clock, typename Duration >
|
|
channel_op_status pop_wait_until(
|
|
value_type & va,
|
|
std::chrono::time_point< Clock, Duration > const& timeout_time);
|
|
channel_op_status try_pop( value_type & va);
|
|
};
|
|
|
|
template< typename T >
|
|
buffered_channel< T >::iterator begin( buffered_channel< T > & chan);
|
|
|
|
template< typename T >
|
|
buffered_channel< T >::iterator end( buffered_channel< T > & chan);
|
|
|
|
}}
|
|
|
|
[heading Constructor]
|
|
|
|
explicit buffered_channel( std::size_t capacity);
|
|
|
|
[variablelist
|
|
[[Preconditions:] [`2<=capacity && 0==(capacity & (capacity-1))`]]
|
|
[[Effects:] [The constructor constructs an object of class `buffered_channel`
|
|
with an internal buffer of size `capacity`.]]
|
|
[[Throws:] [`fiber_error`]]
|
|
[[Error Conditions:] [
|
|
[*invalid_argument]: if `0==capacity || 0!=(capacity & (capacity-1))`.]]
|
|
[[Notes:] [A `push()`, `push_wait_for()` or `push_wait_until()` will not block
|
|
until the number of values in the channel becomes equal to `capacity`.
|
|
The channel can hold only `capacity - 1` elements, otherwise it is
|
|
considered to be full.]]
|
|
]
|
|
|
|
[member_heading buffered_channel..close]
|
|
|
|
void close() noexcept;
|
|
|
|
[variablelist
|
|
[[Effects:] [Deactivates the channel. No values can be put after calling
|
|
`this->close()`. Fibers blocked in `this->pop()`, `this->pop_wait_for()`
|
|
or `this->pop_wait_until()` will return `closed`. Fibers blocked in
|
|
`this->value_pop()` will receive an exception.]]
|
|
[[Throws:] [Nothing.]]
|
|
[[Note:] [`close()` is like closing a pipe. It informs waiting consumers
|
|
that no more values will arrive.]]
|
|
]
|
|
|
|
[template buffered_channel_push_effects[enqueues] If channel is closed, returns
|
|
`closed`. [enqueues] the value in the channel, wakes up a fiber
|
|
blocked on `this->pop()`, `this->value_pop()`, `this->pop_wait_for()` or
|
|
`this->pop_wait_until()` and returns `success`. If the channel is full,
|
|
the fiber is blocked.]
|
|
|
|
[member_heading buffered_channel..push]
|
|
|
|
channel_op_status push( value_type const& va);
|
|
channel_op_status push( value_type && va);
|
|
|
|
[variablelist
|
|
[[Effects:] [[buffered_channel_push_effects Otherwise enqueues]]]
|
|
[[Throws:] [Exceptions thrown by copy- or move-operations.]]
|
|
]
|
|
|
|
[template buffered_channel_try_push_effects[enqueues] If channel is closed, returns
|
|
`closed`. [enqueues] the value in the channel, wakes up a fiber
|
|
blocked on `this->pop()`, `this->value_pop()`, `this->pop_wait_for()` or
|
|
`this->pop_wait_until()` and returns `success`. If the channel is full,
|
|
it doesn't block and returns `full`.]
|
|
|
|
[member_heading buffered_channel..try_push]
|
|
|
|
channel_op_status try_push( value_type const& va);
|
|
channel_op_status try_push( value_type && va);
|
|
|
|
[variablelist
|
|
[[Effects:] [[buffered_channel_try_push_effects Otherwise enqueues]]]
|
|
[[Throws:] [Exceptions thrown by copy- or move-operations.]]
|
|
]
|
|
|
|
[template buffered_channel_pop[cls unblocking]
|
|
[member_heading [cls]..pop]
|
|
|
|
channel_op_status pop( value_type & va);
|
|
|
|
[variablelist
|
|
[[Effects:] [Dequeues a value from the channel. If the channel is empty, the
|
|
fiber gets suspended until at least one new item is `push()`ed (return value
|
|
`success` and `va` contains dequeued value) or the channel gets `close()`d
|
|
(return value `closed`)[unblocking]]]
|
|
[[Throws:] [Exceptions thrown by copy- or move-operations.]]
|
|
]
|
|
]
|
|
[buffered_channel_pop buffered_channel .]
|
|
|
|
[template buffered_channel_value_pop[cls unblocking]
|
|
[member_heading [cls]..value_pop]
|
|
|
|
value_type value_pop();
|
|
|
|
[variablelist
|
|
[[Effects:] [Dequeues a value from the channel. If the channel is empty, the
|
|
fiber gets suspended until at least one new item is `push()`ed or the channel
|
|
gets `close()`d (which throws an exception)[unblocking]]]
|
|
[[Throws:] [`fiber_error` if `*this` is closed or by copy- or move-operations.]]
|
|
[[Error conditions:] [`std::errc::operation_not_permitted`]]
|
|
]
|
|
]
|
|
[buffered_channel_value_pop buffered_channel .]
|
|
|
|
[template buffered_channel_try_pop[cls unblocking]
|
|
[member_heading [cls]..try_pop]
|
|
|
|
channel_op_status try_pop( value_type & va);
|
|
|
|
[variablelist
|
|
[[Effects:] [If channel is empty, returns `empty`. If channel is closed,
|
|
returns `closed`. Otherwise it returns `success` and `va` contains the
|
|
dequeued value[unblocking]]]
|
|
[[Throws:] [Exceptions thrown by copy- or move-operations.]]
|
|
]
|
|
]
|
|
[buffered_channel_try_pop buffered_channel .]
|
|
|
|
[template buffered_channel_pop_wait_until_effects[endtime unblocking] If channel
|
|
is not empty, immediately dequeues a value from the channel. Otherwise
|
|
the fiber gets suspended until at least one new item is `push()`ed (return
|
|
value `success` and `va` contains dequeued value), or the channel gets
|
|
`close()`d (return value `closed`), or the system time reaches [endtime]
|
|
(return value `timeout`)[unblocking]]
|
|
|
|
[template buffered_channel_pop_wait_for[cls unblocking]
|
|
[member_heading [cls]..pop_wait_for]
|
|
|
|
template< typename Rep, typename Period >
|
|
channel_op_status pop_wait_for(
|
|
value_type & va,
|
|
std::chrono::duration< Rep, Period > const& timeout_duration)
|
|
|
|
[variablelist
|
|
[[Effects:] [Accepts `std::chrono::duration` and internally computes a timeout
|
|
time as (system time + `timeout_duration`).
|
|
[buffered_channel_pop_wait_until_effects the computed timeout time..[unblocking]]]]
|
|
[[Throws:] [timeout-related exceptions or by copy- or move-operations.]]
|
|
]
|
|
]
|
|
[buffered_channel_pop_wait_for buffered_channel .]
|
|
|
|
[template buffered_channel_pop_wait_until[cls unblocking]
|
|
[member_heading [cls]..pop_wait_until]
|
|
|
|
template< typename Clock, typename Duration >
|
|
channel_op_status pop_wait_until(
|
|
value_type & va,
|
|
std::chrono::time_point< Clock, Duration > const& timeout_time)
|
|
|
|
[variablelist
|
|
[[Effects:] [Accepts a `std::chrono::time_point< Clock, Duration >`.
|
|
[buffered_channel_pop_wait_until_effects the passed `time_point`..[unblocking]]]]
|
|
[[Throws:] [timeout-related exceptions or by copy- or move-operations.]]
|
|
]
|
|
]
|
|
[buffered_channel_pop_wait_until buffered_channel .]
|
|
|
|
[heading Non-member function `begin( buffered_channel< T > &)`]
|
|
template< typename T >
|
|
buffered_channel< T >::iterator begin( buffered_channel< T > &);
|
|
|
|
[variablelist
|
|
[[Returns:] [Returns a range-iterator (input-iterator).]]
|
|
]
|
|
|
|
[heading Non-member function `end( buffered_channel< T > &)`]
|
|
template< typename T >
|
|
buffered_channel< R >::iterator end( buffered_channel< T > &);
|
|
|
|
[variablelist
|
|
[[Returns:] [Returns an end range-iterator (input-iterator).]]
|
|
]
|
|
|
|
[endsect]
|