fix queues

This commit is contained in:
Oliver Kowalke 2013-11-02 14:15:05 +01:00
parent 99cb841259
commit 6bb976b4d3
7 changed files with 1080 additions and 341 deletions

View File

@ -15,15 +15,14 @@ synchonize fibers via message passing.
void send( queue_t & queue)
{
for ( int i = 0; i < 5; ++i)
queue.put( i);
queue.deactivate();
queue.push( i);
queue.close();
}
void recv( queue_t & queue)
{
boost::optional< int > value;
while ( queue.take( value) )
{ std::cout << "received " << * value << std::endl; }
while ( ! queue.is_closed() )
{ std::cout << "received " << queue.value_pop() << std::endl; }
}
queue_t queue;
@ -33,6 +32,44 @@ synchonize fibers via message passing.
f1.join();
f2.join();
[heading Enumeration `queue_op_status`]
Queue-operations return the state of the queue.
enum class queue_op_status
{
success,
empty,
full,
closed,
timeout
};
[heading `success`]
[variablelist
[[Effects:] [Operation was successfull.]]
]
[heading `empty`]
[variablelist
[[Effects:] [Queue is empty, operation failed.]]
]
[heading `full`]
[variablelist
[[Effects:] [Queue is full, operation failed.]]
]
[heading `closed`]
[variablelist
[[Effects:] [Queue is closed, operation failed.]]
]
[heading `timeout`]
[variablelist
[[Effects:] [The operation did not become ready before timout has passed.]]
]
[heading Template `unbounded_queue<>`]
#include <boost/fiber/unbounded_queue.hpp>
@ -41,58 +78,65 @@ synchonize fibers via message passing.
class unbounded_queue
{
public:
unbounded_queue( unbounded_queue const& other) = delete;
unbounded_queue & operator=( unbounded_queue const& other) = delete;
typedef T value_type;
bool active() const;
unbounded_queue( unbounded_queue const& other) = delete;
unbounded_queue & operator=( unbounded_queue const& other) = delete;
void deactivate();
bool is_closed() const;
void close();
bool empty();
bool is_empty();
void put( T const& t);
void push( value_type const& va);
void push( value_types &&) va);
bool take( boost::optional< T > & va);
bool try_take( boost::optional< T > & va);
value_type value_pop();
template< typename Rep, typename Period >
queue_op_status wait_pop( value_type & va,
chrono::duration< Rep, Period > const& timeout_duration)
queue_op_status wait_pop( value_type & va,
clock_type::time_point const& timeout_time)
queue_op_status try_pop( value_type & va);
};
[heading Member function `active()`]
[heading Member function `is_closed()`]
bool active() const;
bool is_closed() const;
[variablelist
[[Effects:] [Return `true` if queue is still usable.]]
[[Effects:] [Return `true` if queue was closed.]]
[[Throws:] [Nothing.]]
[[Note:] [The queue is usable by default.]]
[[Note:] [The queue is not closed by default.]]
]
[heading Member function `deactivate()`]
[heading Member function `closed()`]
void deactivate();
void closed();
[variablelist
[[Effects:] [Deactivates the queue. No values can be put after calling
`this->deactivate`. Fibers blocked in `this->take()` will return.]]
`this->close`. Fibers blocked in `this->value_pop()` will return.]]
[[Throws:] [Nothing.]]
[[Note:] [`deactivate()` is like closing a pipe. It informs waiting receivers
[[Note:] [`close()` is like closing a pipe. It informs waiting receivers
that no more values will arrive.]]
]
[heading Member function `empty()`]
[heading Member function `is_empty()`]
bool empty();
bool is_empty();
[variablelist
[[Effects:] [Returns `true` if the queue currently contains no data.]]
[[Throws:] [Nothing.]]
[[Note:] [This condition is transient. An `empty()` queue can become
[[Note:] [This condition is transient. An `is_empty()` queue can become
non-empty.]]
]
[heading Member function `put()`]
[heading Member function `push()`]
void put( T const& t);
void push( value_type const& va);
void push( value_type && va);
[variablelist
[[Effects:] [Enqueues the value in the queue and wakes up a fiber waiting
@ -135,6 +179,8 @@ the queue at this moment, the function returns `false`. Otherwise it returns
class bounded_queue
{
public:
typedef T value_type;
bounded_queue( bounded_queue const& other) = delete;
bounded_queue & operator=( bounded_queue const& other) = delete;
@ -142,17 +188,34 @@ the queue at this moment, the function returns `false`. Otherwise it returns
bounded_queue( std::size_t hwm, std::size_t lwm);
bool active() const;
bool is_closed() const;
void close();
void deactivate();
bool is_empty();
bool is_full();
bool empty();
void push( T const& t);
void push( T && t);
template< typename Rep, typename Period >
queue_op_status wait_push( value_type const& va,
chrono::duration< Rep, Period > const& timeout_duration);
template< typename Rep, typename Period >
queue_op_status wait_push( value_type && va,
chrono::duration< Rep, Period > const& timeout_duration);
queue_op_status wait_push( value_type const& va,
clock_type::time_point const& timeout_time);
queue_op_status wait_push( value_type && va,
clock_type::time_point const& timeout_time);
queue_op_status try_push( value_type const& va);
queue_op_status try_push( value_type && va);
void put( T const& t);
bool take( boost::optional< T > & va);
bool try_take( boost::optional< T > & va);
value_type value_pop();
template< typename Rep, typename Period >
queue_op_status wait_pop( value_type & va,
chrono::duration< Rep, Period > const& timeout_duration);
queue_op_status wait_pop( value_type & va,
clock_type::time_point const& timeout_time);
queue_op_status try_pop( value_type & va);
};
[heading Constructor]

View File

@ -17,24 +17,24 @@ void ping( fifo_t & recv_buf, fifo_t & send_buf)
{
boost::fibers::fiber::id id( boost::this_fiber::get_id() );
boost::optional< std::string > value;
send_buf.push("ping");
send_buf.put("ping");
BOOST_ASSERT( recv_buf.take( value) );
std::cout << "fiber " << id << ": ping received: " << * value << std::endl;
value.reset();
std::string value = recv_buf.value_pop();
std::cout << "fiber " << id << ": ping received: " << value << std::endl;
value.clear();
send_buf.put("ping");
BOOST_ASSERT( recv_buf.take( value) );
std::cout << "fiber " << id << ": ping received: " << * value << std::endl;
value.reset();
send_buf.push("ping");
send_buf.put("ping");
BOOST_ASSERT( recv_buf.take( value) );
std::cout << "fiber " << id << ": ping received: " << * value << std::endl;
value.reset();
value = recv_buf.value_pop();
std::cout << "fiber " << id << ": ping received: " << value << std::endl;
value.clear();
send_buf.deactivate();
send_buf.push("ping");
value = recv_buf.value_pop();
std::cout << "fiber " << id << ": ping received: " << value << std::endl;
send_buf.close();
}
inline
@ -42,24 +42,24 @@ void pong( fifo_t & recv_buf, fifo_t & send_buf)
{
boost::fibers::fiber::id id( boost::this_fiber::get_id() );
boost::optional< std::string > value;
std::string value = recv_buf.value_pop();
std::cout << "fiber " << id << ": pong received: " << value << std::endl;
value.clear();
BOOST_ASSERT( recv_buf.take( value) );
std::cout << "fiber " << id << ": pong received: " << * value << std::endl;
value.reset();
send_buf.put("pong");
send_buf.push("pong");
BOOST_ASSERT( recv_buf.take( value) );
std::cout << "fiber " << id << ": pong received: " << * value << std::endl;
value.reset();
send_buf.put("pong");
value = recv_buf.value_pop();
std::cout << "fiber " << id << ": pong received: " << value << std::endl;
value.clear();
BOOST_ASSERT( recv_buf.take( value) );
std::cout << "fiber " << id << ": pong received: " << * value << std::endl;
value.reset();
send_buf.put("pong");
send_buf.push("pong");
send_buf.deactivate();
value = recv_buf.value_pop();
std::cout << "fiber " << id << ": pong received: " << value << std::endl;
send_buf.push("pong");
send_buf.close();
}
int main()

View File

@ -6,23 +6,25 @@
//
// idea of node-base locking from 'C++ Concurrency in Action', Anthony Williams
#ifndef BOOST_FIBERS_BOUNDED_CHANNEL_H
#define BOOST_FIBERS_BOUNDED_CHANNEL_H
#ifndef BOOST_FIBERS_BOUNDED_QUEUE_H
#define BOOST_FIBERS_BOUNDED_QUEUE_H
#include <cstddef>
#include <stdexcept>
#include <utility>
#include <boost/chrono/system_clocks.hpp>
#include <boost/config.hpp>
#include <boost/exception/all.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/optional.hpp>
#include <boost/move/move.hpp>
#include <boost/system/error_code.hpp>
#include <boost/throw_exception.hpp>
#include <boost/utility.hpp>
#include <boost/fiber/exceptions.hpp>
#include <boost/fiber/condition.hpp>
#include <boost/fiber/mutex.hpp>
#include <boost/fiber/queue_op_status.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
@ -48,6 +50,22 @@ struct bounded_queue_node
{}
};
template< typename T >
struct bounded_queue_node< T * >
{
typedef intrusive_ptr< bounded_queue_node > ptr;
std::size_t use_count;
T va;
ptr next;
bounded_queue_node() :
use_count( 0),
va( 0),
next()
{}
};
template< typename T >
void intrusive_ptr_add_ref( bounded_queue_node< T > * p)
{ ++p->use_count; }
@ -62,20 +80,15 @@ template< typename T >
class bounded_queue : private noncopyable
{
public:
typedef optional< T > value_type;
typedef T value_type;
private:
typedef detail::bounded_queue_node< value_type > node_type;
template< typename X >
friend void intrusive_ptr_add_ref( bounded_queue< X > * p);
template< typename X >
friend void intrusive_ptr_release( bounded_queue< X > * p);
enum state_t
{
ACTIVE = 0,
DEACTIVE
OPEN = 0,
CLOSED
};
state_t state_;
@ -86,27 +99,31 @@ private:
mutable mutex tail_mtx_;
condition not_empty_cond_;
condition not_full_cond_;
unsigned int hwm_;
unsigned int lwm_;
std::size_t hwm_;
std::size_t lwm_;
bool active_() const
{ return ACTIVE == state_; }
bool is_closed_() const
{ return CLOSED == state_; }
void deactivate_()
{ state_ = DEACTIVE; }
void close_()
{
state_ = CLOSED;
not_empty_cond_.notify_all();
not_full_cond_.notify_all();
}
std::size_t size_() const
{ return count_; }
bool empty_() const
bool is_empty_() const
{ return head_ == get_tail_(); }
bool full_() const
bool is_full_() const
{ return size_() >= hwm_; }
typename node_type::ptr get_tail_() const
{
mutex::scoped_lock lk( tail_mtx_);
mutex::scoped_lock lk( tail_mtx_);
typename node_type::ptr tmp = tail_;
return tmp;
}
@ -123,7 +140,7 @@ public:
bounded_queue(
std::size_t hwm,
std::size_t lwm) :
state_( ACTIVE),
state_( OPEN),
count_( 0),
head_( new node_type() ),
head_mtx_(),
@ -135,14 +152,14 @@ public:
lwm_( lwm)
{
if ( hwm_ < lwm_)
boost::throw_exception(
BOOST_THROW_EXCEPTION(
invalid_argument(
system::errc::invalid_argument,
"boost fiber: high-watermark is less than low-watermark for bounded_queue") );
}
bounded_queue( std::size_t wm) :
state_( ACTIVE),
state_( OPEN),
count_( 0),
head_( new node_type() ),
head_mtx_(),
@ -160,155 +177,270 @@ public:
std::size_t lower_bound() const
{ return lwm_; }
bool active() const
{ return active_(); }
bool is_closed() const
{ return is_closed_(); }
void deactivate()
void close()
{
mutex::scoped_lock head_lk( head_mtx_);
mutex::scoped_lock tail_lk( tail_mtx_);
deactivate_();
not_empty_cond_.notify_all();
not_full_cond_.notify_all();
close_();
}
bool empty() const
{ return empty_(); }
bool is_empty() const
{ return is_empty_(); }
void put( T const& t)
bool is_full() const
{ return is_full_(); }
void push( value_type const& va)
{
typename node_type::ptr new_node( new node_type() );
mutex::scoped_lock lk( tail_mtx_);
if ( is_closed_() )
BOOST_THROW_EXCEPTION( std::runtime_error("queue is closed") );
while ( is_full_() ) not_full_cond_.wait( lk);
tail_->va = va;
tail_->next = new_node;
tail_ = new_node;
++count_;
not_empty_cond_.notify_one();
}
void push( BOOST_RV_REF( value_type) va)
{
typename node_type::ptr new_node( new node_type() );
mutex::scoped_lock lk( tail_mtx_);
if ( is_closed_() )
BOOST_THROW_EXCEPTION( std::runtime_error("queue is closed") );
while ( is_full_() ) not_full_cond_.wait( lk);
tail_->va = boost::move( va);
tail_->next = new_node;
tail_ = new_node;
++count_;
not_empty_cond_.notify_one();
}
template< typename Rep, typename Period >
queue_op_status wait_push( value_type const& va,
chrono::duration< Rep, Period > const& timeout_duration)
{ return wait_push( va, clock_type::now() + timeout_duration); }
template< typename Rep, typename Period >
queue_op_status wait_push( BOOST_RV_REF( value_type) va,
chrono::duration< Rep, Period > const& timeout_duration)
{ return wait_push( boost::move( va), clock_type::now() + timeout_duration); }
queue_op_status wait_push( value_type const& va,
clock_type::time_point const& timeout_time)
{
typename node_type::ptr new_node( new node_type() );
mutex::scoped_lock lk( tail_mtx_);
if ( is_closed_() )
BOOST_THROW_EXCEPTION( std::runtime_error("queue is closed") );
while ( is_full_() )
{
mutex::scoped_lock lk( tail_mtx_);
if ( cv_status::timeout == not_full_cond_.wait_until(
lk, timeout_time) )
return queue_op_status::timeout;
}
if ( full_() )
{
while ( active_() && full_() )
not_full_cond_.wait( lk);
}
if ( ! active_() )
boost::throw_exception( fiber_resource_error() );
tail_->va = t;
try
{
tail_->va = va;
tail_->next = new_node;
tail_ = new_node;
++count_;
not_empty_cond_.notify_one();
return queue_op_status::success;
}
catch (...)
{
close_();
throw;
}
not_empty_cond_.notify_one();
}
#if 0
template< typename TimeDuration >
bool put( T const& t, TimeDuration const& dt)
{ return put( t, chrono::system_clock::now() + dt); }
bool put( T const& t, chrono::system_clock::time_point const& abs_time)
queue_op_status wait_push( BOOST_RV_REF( value_type) va,
clock_type::time_point const& timeout_time)
{
typename node_type::ptr new_node( new node_type() );
mutex::scoped_lock lk( tail_mtx_);
if ( is_closed_() )
BOOST_THROW_EXCEPTION( std::runtime_error("queue is closed") );
while ( is_full_() )
{
mutex::scoped_lock lk( tail_mtx_);
if ( cv_status::timeout == not_full_cond_.wait_until(
lk, timeout_time) )
return queue_op_status::timeout;
}
if ( full_() )
{
while ( active_() && full_() )
{
if ( ! not_full_cond_.timed_wait( lk, abs_time) )
return false;
}
}
if ( ! active_() )
boost::throw_exception( fiber_resource_error() );
tail_->va = t;
try
{
tail_->va = boost::move( va);
tail_->next = new_node;
tail_ = new_node;
++count_;
not_empty_cond_.notify_one();
return queue_op_status::success;
}
not_empty_cond_.notify_one();
return true;
}
#endif
bool take( value_type & va)
{
mutex::scoped_lock lk( head_mtx_);
bool empty = empty_();
if ( ! active_() && empty)
return false;
if ( empty)
catch (...)
{
try
{
while ( active_() && empty_() )
not_empty_cond_.wait( lk);
}
catch ( fiber_interrupted const&)
{ return false; }
close_();
throw;
}
if ( ! active_() && empty_() )
return false;
swap( va, head_->va);
pop_head_();
if ( size_() <= lwm_)
{
if ( lwm_ == hwm_)
not_full_cond_.notify_one();
else
// more than one producer could be waiting
// for submiting an action object
not_full_cond_.notify_all();
}
return va;
}
#if 0
template< typename TimeDuration >
bool take( value_type & va, TimeDuration const& dt)
{ return take( va, chrono::system_clock::now() + dt); }
bool take( value_type & va, chrono::system_clock::time_point const& abs_time)
queue_op_status try_push( value_type const& va)
{
typename node_type::ptr new_node( new node_type() );
mutex::scoped_lock lk( head_mtx_);
if ( is_closed_() ) return queue_op_status::closed;
if ( is_full_() ) return queue_op_status::full;
try
{
tail_->va = va;
tail_->next = new_node;
tail_ = new_node;
++count_;
not_empty_cond_.notify_one();
return queue_op_status::success;
}
catch (...)
{
close_();
throw;
}
}
queue_op_status try_push( BOOST_RV_REF( value_type) va)
{
typename node_type::ptr new_node( new node_type() );
mutex::scoped_lock lk( head_mtx_);
if ( is_closed_() ) return queue_op_status::closed;
if ( is_full_() ) return queue_op_status::full;
try
{
tail_->va = boost::move( va);
tail_->next = new_node;
tail_ = new_node;
++count_;
not_empty_cond_.notify_one();
return queue_op_status::success;
}
catch (...)
{
close_();
throw;
}
}
value_type value_pop()
{
mutex::scoped_lock lk( head_mtx_);
bool empty = empty_();
if ( ! active_() && empty)
return false;
if ( empty)
while ( is_closed_() && is_empty_() ) not_empty_cond_.wait( lk);
if ( is_closed_() )
BOOST_THROW_EXCEPTION( std::runtime_error("queue is closed") );
BOOST_ASSERT( ! is_empty_() );
try
{
try
value_type va = boost::move( head_->va);
pop_head_();
if ( size_() <= lwm_)
{
while ( active_() && empty_() )
{
if ( ! not_empty_cond_.timed_wait( lk, abs_time) )
return false;
}
if ( lwm_ == hwm_)
not_full_cond_.notify_one();
else
// more than one producer could be waiting
// for submiting an action object
not_full_cond_.notify_all();
}
catch ( fiber_interrupted const&)
{ return false; }
return va;
}
if ( ! active_() && empty_() )
return false;
swap( va, head_->va);
catch (...)
{
close_();
throw;
}
}
template< typename Rep, typename Period >
queue_op_status wait_pop( value_type & va,
chrono::duration< Rep, Period > const& timeout_duration)
{ return wait_pop( va, clock_type::now() + timeout_duration); }
queue_op_status wait_pop( value_type & va,
clock_type::time_point const& timeout_time)
{
mutex::scoped_lock lk( head_mtx_);
while ( ! is_closed_() && is_empty_() )
{
if ( cv_status::timeout == not_empty_cond_.wait_until(
lk, timeout_time) )
return queue_op_status::timeout;
}
if ( is_closed_() ) return queue_op_status::closed;
BOOST_ASSERT( ! is_empty_() );
try
{
va = boost::move( head_->va);
pop_head_();
if ( size_() <= lwm_)
{
if ( lwm_ == hwm_)
not_full_cond_.notify_one();
else
// more than one producer could be waiting
// for submiting an action object
not_full_cond_.notify_all();
}
return queue_op_status::success;
}
catch (...)
{
close_();
throw;
}
}
queue_op_status try_pop( value_type & va)
{
mutex::scoped_lock lk( head_mtx_);
if ( is_closed_() ) return queue_op_status::closed;
if ( is_empty_() ) return queue_op_status::empty;
va = boost::move( head_->va);
pop_head_();
if ( size_() <= lwm_)
{
if ( lwm_ == hwm_)
not_full_cond_.notify_one();
else
// more than one producer could be waiting
// for submiting an action object
not_full_cond_.notify_all();
}
return va;
}
#endif
bool try_take( value_type & va)
{
mutex::scoped_lock lk( head_mtx_);
if ( empty_() )
return false;
swap( va, head_->va);
pop_head_();
bool valid = va;
if ( valid && size_() <= lwm_)
{
if ( lwm_ == hwm_)
not_full_cond_.notify_one();
@ -317,7 +449,302 @@ public:
// in order to submit an task
not_full_cond_.notify_all();
}
return valid;
return queue_op_status::success;
}
};
template< typename T >
class bounded_queue< T * > : private noncopyable
{
public:
typedef T * value_type;
private:
typedef detail::bounded_queue_node< value_type > node_type;
enum state_t
{
OPEN = 0,
CLOSED
};
state_t state_;
std::size_t count_;
typename node_type::ptr head_;
mutable mutex head_mtx_;
typename node_type::ptr tail_;
mutable mutex tail_mtx_;
condition not_empty_cond_;
condition not_full_cond_;
std::size_t hwm_;
std::size_t lwm_;
bool is_closed_() const
{ return CLOSED == state_; }
void close_()
{
state_ = CLOSED;
not_empty_cond_.notify_all();
not_full_cond_.notify_all();
}
std::size_t size_() const
{ return count_; }
bool is_empty_() const
{ return head_ == get_tail_(); }
bool is_full_() const
{ return size_() >= hwm_; }
typename node_type::ptr get_tail_() const
{
mutex::scoped_lock lk( tail_mtx_);
typename node_type::ptr tmp = tail_;
return tmp;
}
typename node_type::ptr pop_head_()
{
typename node_type::ptr old_head = head_;
head_ = old_head->next;
--count_;
return old_head;
}
public:
bounded_queue(
std::size_t hwm,
std::size_t lwm) :
state_( OPEN),
count_( 0),
head_( new node_type() ),
head_mtx_(),
tail_( head_),
tail_mtx_(),
not_empty_cond_(),
not_full_cond_(),
hwm_( hwm),
lwm_( lwm)
{
if ( hwm_ < lwm_)
BOOST_THROW_EXCEPTION(
invalid_argument(
system::errc::invalid_argument,
"boost fiber: high-watermark is less than low-watermark for bounded_queue") );
}
bounded_queue( std::size_t wm) :
state_( OPEN),
count_( 0),
head_( new node_type() ),
head_mtx_(),
tail_( head_),
tail_mtx_(),
not_empty_cond_(),
not_full_cond_(),
hwm_( wm),
lwm_( wm)
{}
std::size_t upper_bound() const
{ return hwm_; }
std::size_t lower_bound() const
{ return lwm_; }
bool is_closed() const
{ return is_closed_(); }
void close()
{
mutex::scoped_lock head_lk( head_mtx_);
mutex::scoped_lock tail_lk( tail_mtx_);
close_();
}
bool is_empty() const
{ return is_empty_(); }
bool is_full() const
{ return is_full_(); }
void push( value_type va)
{
typename node_type::ptr new_node( new node_type() );
mutex::scoped_lock lk( tail_mtx_);
if ( is_closed_() )
BOOST_THROW_EXCEPTION( std::runtime_error("queue is closed") );
while ( is_full_() ) not_full_cond_.wait( lk);
tail_->va = va;
tail_->next = new_node;
tail_ = new_node;
++count_;
not_empty_cond_.notify_one();
}
template< typename Rep, typename Period >
queue_op_status wait_push( value_type va,
chrono::duration< Rep, Period > const& timeout_duration)
{ return wait_push( va, clock_type::now() + timeout_duration); }
queue_op_status wait_push( value_type va,
clock_type::time_point const& timeout_time)
{
typename node_type::ptr new_node( new node_type() );
mutex::scoped_lock lk( tail_mtx_);
if ( is_closed_() )
BOOST_THROW_EXCEPTION( std::runtime_error("queue is closed") );
while ( is_full_() )
{
if ( cv_status::timeout == not_full_cond_.wait_until(
lk, timeout_time) )
return queue_op_status::timeout;
}
try
{
tail_->va = va;
tail_->next = new_node;
tail_ = new_node;
++count_;
not_empty_cond_.notify_one();
return queue_op_status::success;
}
catch (...)
{
close_();
throw;
}
}
queue_op_status try_push( value_type va)
{
typename node_type::ptr new_node( new node_type() );
mutex::scoped_lock lk( head_mtx_);
if ( is_closed_() ) return queue_op_status::closed;
if ( is_full_() ) return queue_op_status::full;
try
{
tail_->va = va;
tail_->next = new_node;
tail_ = new_node;
++count_;
not_empty_cond_.notify_one();
return queue_op_status::success;
}
catch (...)
{
close_();
throw;
}
}
value_type value_pop()
{
mutex::scoped_lock lk( head_mtx_);
while ( is_closed_() && is_empty_() ) not_empty_cond_.wait( lk);
if ( is_closed_() )
BOOST_THROW_EXCEPTION( std::runtime_error("queue is closed") );
BOOST_ASSERT( ! is_empty_() );
try
{
value_type va = head_->va;
pop_head_();
if ( size_() <= lwm_)
{
if ( lwm_ == hwm_)
not_full_cond_.notify_one();
else
// more than one producer could be waiting
// for submiting an action object
not_full_cond_.notify_all();
}
return va;
}
catch (...)
{
close_();
throw;
}
}
template< typename Rep, typename Period >
queue_op_status wait_pop( value_type va,
chrono::duration< Rep, Period > const& timeout_duration)
{ return wait_pop( va, clock_type::now() + timeout_duration); }
queue_op_status wait_pop( value_type va,
clock_type::time_point const& timeout_time)
{
mutex::scoped_lock lk( head_mtx_);
while ( ! is_closed_() && is_empty_() )
{
if ( cv_status::timeout == not_empty_cond_.wait_until(
lk, timeout_time) )
return queue_op_status::timeout;
}
if ( is_closed_() ) return queue_op_status::closed;
BOOST_ASSERT( ! is_empty_() );
try
{
std::swap( va, head_->va);
pop_head_();
if ( size_() <= lwm_)
{
if ( lwm_ == hwm_)
not_full_cond_.notify_one();
else
// more than one producer could be waiting
// for submiting an action object
not_full_cond_.notify_all();
}
return queue_op_status::success;
}
catch (...)
{
close_();
throw;
}
}
queue_op_status try_pop( value_type va)
{
mutex::scoped_lock lk( head_mtx_);
if ( is_closed_() ) return queue_op_status::closed;
if ( is_empty_() ) return queue_op_status::empty;
std::swap( va, head_->va);
pop_head_();
if ( size_() <= lwm_)
{
if ( lwm_ == hwm_)
not_full_cond_.notify_one();
else
// more than one producer could be waiting
// in order to submit an task
not_full_cond_.notify_all();
}
return queue_op_status::success;
}
};
@ -327,4 +754,4 @@ public:
# include BOOST_ABI_SUFFIX
#endif
#endif // BOOST_FIBERS_BOUNDED_CHANNEL_H
#endif // BOOST_FIBERS_BOUNDED_QUEUE_H

View File

@ -144,7 +144,7 @@ public:
// remove fiber from waiting-list
waiting_.erase(
std::find( waiting_.begin(), waiting_.end(), n) );
status = cv_status::timeout;
}

View File

@ -10,6 +10,8 @@
#include <boost/config.hpp>
#include <boost/detail/scoped_enum_emulation.hpp>
#include <boost/fiber/detail/config.hpp>
namespace boost {
namespace fibers {

View File

@ -0,0 +1,37 @@
// Copyright Oliver Kowalke 2013.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#ifndef BOOST_FIBERS_QUEUE_OP_STATUS_H
#define BOOST_FIBERS_QUEUE_OP_STATUS_H
#include <boost/config.hpp>
#include <boost/detail/scoped_enum_emulation.hpp>
#include <boost/fiber/detail/config.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
#endif
namespace boost {
namespace fibers {
BOOST_SCOPED_ENUM_DECLARE_BEGIN(queue_op_status)
{
success = 0,
empty,
full,
closed,
timeout
}
BOOST_SCOPED_ENUM_DECLARE_END(queue_op_status)
}}
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_SUFFIX
#endif
#endif // BOOST_FIBERS_QUEUE_OP_STATUS_H

View File

@ -6,21 +6,24 @@
//
// idea of node-base locking from 'C++ Concurrency in Action', Anthony Williams
#ifndef BOOST_FIBERS_UNBOUNDED_CHANNEL_H
#define BOOST_FIBERS_UNBOUNDED_CHANNEL_H
#ifndef BOOST_FIBERS_UNBOUNDED_QUEUE_H
#define BOOST_FIBERS_UNBOUNDED_QUEUE_H
#include <cstddef>
#include <stdexcept>
#include <utility>
#include <boost/chrono/system_clocks.hpp>
#include <boost/config.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/optional.hpp>
#include <boost/move/move.hpp>
#include <boost/throw_exception.hpp>
#include <boost/utility.hpp>
#include <boost/fiber/exceptions.hpp>
#include <boost/fiber/condition.hpp>
#include <boost/fiber/mutex.hpp>
#include <boost/fiber/queue_op_status.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
@ -33,17 +36,33 @@ namespace detail {
template< typename T >
struct unbounded_queue_node
{
typedef intrusive_ptr< unbounded_queue_node > ptr;
typedef intrusive_ptr< unbounded_queue_node > ptr;
std::size_t use_count;
T va;
ptr next;
std::size_t use_count;
T va;
ptr next;
unbounded_queue_node() :
use_count( 0),
va(),
next()
{}
unbounded_queue_node() :
use_count( 0),
va(),
next()
{}
};
template< typename T >
struct unbounded_queue_node< T * >
{
typedef intrusive_ptr< unbounded_queue_node > ptr;
std::size_t use_count;
T * va;
ptr next;
unbounded_queue_node() :
use_count( 0),
va( 0),
next()
{}
};
template< typename T >
@ -60,151 +79,342 @@ template< typename T >
class unbounded_queue : private noncopyable
{
public:
typedef optional< T > value_type;
typedef T value_type;
private:
typedef detail::unbounded_queue_node< value_type > node_type;
typedef detail::unbounded_queue_node< value_type > node_type;
enum state
{
ACTIVE = 0,
DEACTIVE
};
enum state
{
OPEN = 0,
CLOSED
};
state state_;
typename node_type::ptr head_;
mutable mutex head_mtx_;
typename node_type::ptr tail_;
mutable mutex tail_mtx_;
condition not_empty_cond_;
state state_;
typename node_type::ptr head_;
mutable mutex head_mtx_;
typename node_type::ptr tail_;
mutable mutex tail_mtx_;
condition not_empty_cond_;
bool active_() const
{ return ACTIVE == state_; }
bool is_closed_() const
{ return OPEN != state_; }
void deactivate_()
{ state_ = DEACTIVE; }
void close_()
{
state_ = CLOSED;
not_empty_cond_.notify_all();
}
bool empty_() const
{ return head_ == get_tail_(); }
bool is_empty_() const
{ return head_ == get_tail_(); }
typename node_type::ptr get_tail_() const
{
mutex::scoped_lock lk( tail_mtx_);
typename node_type::ptr tmp = tail_;
return tmp;
}
typename node_type::ptr get_tail_() const
{
mutex::scoped_lock lk( tail_mtx_);
typename node_type::ptr tmp = tail_;
return tmp;
}
typename node_type::ptr pop_head_()
{
typename node_type::ptr old_head = head_;
head_ = old_head->next;
return old_head;
}
typename node_type::ptr pop_head_()
{
typename node_type::ptr old_head = head_;
head_ = old_head->next;
return old_head;
}
public:
unbounded_queue() :
state_( ACTIVE),
head_( new node_type() ),
head_mtx_(),
tail_( head_),
tail_mtx_(),
not_empty_cond_()
{}
unbounded_queue() :
state_( OPEN),
head_( new node_type() ),
head_mtx_(),
tail_( head_),
tail_mtx_(),
not_empty_cond_()
{}
bool active() const
{ return active_(); }
void close()
{
mutex::scoped_lock lk( head_mtx_);
close_();
}
void deactivate()
{
mutex::scoped_lock lk( head_mtx_);
deactivate_();
not_empty_cond_.notify_all();
}
bool is_closed() const
{ return is_closed_(); }
bool empty() const
{
mutex::scoped_lock lk( head_mtx_);
return empty_();
}
bool is_empty() const
{
mutex::scoped_lock lk( head_mtx_);
return is_empty_();
}
void put( T const& t)
{
typename node_type::ptr new_node( new node_type() );
{
mutex::scoped_lock lk( tail_mtx_);
void push( value_type const& va)
{
typename node_type::ptr new_node( new node_type() );
mutex::scoped_lock lk( tail_mtx_);
if ( ! active_() )
throw std::runtime_error("queue is not active");
if ( is_closed_() )
BOOST_THROW_EXCEPTION( std::runtime_error("queue is closed") );
tail_->va = t;
tail_->next = new_node;
tail_ = new_node;
}
not_empty_cond_.notify_one();
}
tail_->va = va;
tail_->next = new_node;
tail_ = new_node;
bool take( value_type & va)
{
mutex::scoped_lock lk( head_mtx_);
bool empty = empty_();
if ( ! active_() && empty)
return false;
if ( empty)
{
try
{
while ( active_() && empty_() )
not_empty_cond_.wait( lk);
}
catch ( fiber_interrupted const&)
{ return false; }
}
if ( ! active_() && empty_() )
return false;
swap( va, head_->va);
pop_head_();
return va;
}
#if 0
template< typename TimeDuration >
bool take( value_type & va, TimeDuration const& dt)
{ return take( va, chrono::system_clock::now() + dt); }
lk.unlock();
bool take( value_type & va, chrono::system_clock::time_point const& abs_time)
{
mutex::scoped_lock lk( head_mtx_);
bool empty = empty_();
if ( ! active_() && empty)
return false;
if ( empty)
{
try
{
while ( active_() && empty_() )
{
if ( ! not_empty_cond_.timed_wait( lk, abs_time) )
return false;
}
}
catch ( fiber_interrupted const&)
{ return false; }
}
if ( ! active_() && empty_() )
return false;
swap( va, head_->va);
pop_head_();
return va;
}
#endif
bool try_take( value_type & va)
{
mutex::scoped_lock lk( head_mtx_);
if ( empty_() )
return false;
swap( va, head_->va);
pop_head_();
return va;
}
not_empty_cond_.notify_one();
}
void push( BOOST_RV_REF( value_type) va)
{
typename node_type::ptr new_node( new node_type() );
mutex::scoped_lock lk( tail_mtx_);
if ( is_closed_() )
BOOST_THROW_EXCEPTION( std::runtime_error("queue is closed") );
tail_->va = boost::move( va);
tail_->next = new_node;
tail_ = new_node;
lk.unlock();
not_empty_cond_.notify_one();
}
value_type value_pop()
{
mutex::scoped_lock lk( head_mtx_);
while ( ! is_closed_() && is_empty_() ) not_empty_cond_.wait( lk);
if ( is_closed_() )
BOOST_THROW_EXCEPTION( std::runtime_error("queue is closed") );
BOOST_ASSERT( ! is_empty_() );
try
{
value_type va = boost::move( head_->va);
pop_head_();
return va;
}
catch (...)
{
close_();
throw;
}
}
template< typename Rep, typename Period >
queue_op_status wait_pop( value_type & va,
chrono::duration< Rep, Period > const& timeout_duration)
{ return wait_pop( va, clock_type::now() + timeout_duration); }
queue_op_status wait_pop( value_type & va,
clock_type::time_point const& timeout_time)
{
mutex::scoped_lock lk( head_mtx_);
while ( ! is_closed_() && is_empty_() )
{
if ( cv_status::timeout == not_empty_cond_.wait_until(
lk, timeout_time) )
return queue_op_status::timeout;
}
if ( is_closed_() ) return queue_op_status::closed;
BOOST_ASSERT( ! is_empty_() );
try
{
va = boost::move( head_->va);
pop_head_();
return queue_op_status::success;
}
catch (...)
{
close_();
throw;
}
}
queue_op_status try_pop( value_type & va)
{
mutex::scoped_lock lk( head_mtx_);
if ( is_closed_() ) return queue_op_status::closed;
if ( is_empty_() ) return queue_op_status::empty;
va = boost::move( head_->va);
pop_head_();
return queue_op_status::success;
}
};
template< typename T >
class unbounded_queue< T * > : private noncopyable
{
public:
typedef T * value_type;
private:
typedef detail::unbounded_queue_node< value_type > node_type;
enum state
{
OPEN = 0,
CLOSED
};
state state_;
typename node_type::ptr head_;
mutable mutex head_mtx_;
typename node_type::ptr tail_;
mutable mutex tail_mtx_;
condition not_empty_cond_;
bool is_closed_() const
{ return OPEN != state_; }
void close_()
{
state_ = CLOSED;
not_empty_cond_.notify_all();
}
bool is_empty_() const
{ return head_ == get_tail_(); }
typename node_type::ptr get_tail_() const
{
mutex::scoped_lock lk( tail_mtx_);
typename node_type::ptr tmp = tail_;
return tmp;
}
typename node_type::ptr pop_head_()
{
typename node_type::ptr old_head = head_;
head_ = old_head->next;
return old_head;
}
public:
unbounded_queue() :
state_( OPEN),
head_( new node_type() ),
head_mtx_(),
tail_( head_),
tail_mtx_(),
not_empty_cond_()
{}
void close()
{
mutex::scoped_lock lk( head_mtx_);
close_();
}
bool is_closed() const
{ return is_closed_(); }
bool is_empty() const
{
mutex::scoped_lock lk( head_mtx_);
return is_empty_();
}
void push( value_type va)
{
typename node_type::ptr new_node( new node_type() );
mutex::scoped_lock lk( tail_mtx_);
if ( is_closed_() )
BOOST_THROW_EXCEPTION( std::runtime_error("queue is closed") );
tail_->va = va;
tail_->next = new_node;
tail_ = new_node;
lk.unlock();
not_empty_cond_.notify_one();
}
value_type value_pop()
{
mutex::scoped_lock lk( head_mtx_);
while ( ! is_closed_() && is_empty_() ) not_empty_cond_.wait( lk);
if ( is_closed_() )
BOOST_THROW_EXCEPTION( std::runtime_error("queue is closed") );
BOOST_ASSERT( ! is_empty_() );
try
{
value_type va = head_->va;
pop_head_();
return va;
}
catch (...)
{
close_();
throw;
}
}
template< typename Rep, typename Period >
queue_op_status wait_pop( value_type va,
chrono::duration< Rep, Period > const& timeout_duration)
{ return wait_pop( va, clock_type::now() + timeout_duration); }
queue_op_status wait_pop( value_type va, clock_type::time_point const& timeout_time)
{
mutex::scoped_lock lk( head_mtx_);
while ( ! is_closed_() && is_empty_() )
{
if ( cv_status::timeout == not_empty_cond_.wait_until(
lk, timeout_time) )
return queue_op_status::timeout;
}
if ( is_closed_() ) return queue_op_status::closed;
BOOST_ASSERT( ! is_empty_() );
try
{
std::swap( va, head_->va);
pop_head_();
return queue_op_status::success;
}
catch (...)
{
close_();
throw;
}
}
queue_op_status try_pop( value_type va)
{
mutex::scoped_lock lk( head_mtx_);
if ( is_closed_() ) return queue_op_status::closed;
if ( is_empty_() ) return queue_op_status::empty;
try
{
std::swap( va, head_->va);
pop_head_();
return queue_op_status::success;
}
catch (...)
{
close_();
throw;
}
}
};
}}
@ -213,4 +423,4 @@ public:
# include BOOST_ABI_SUFFIX
#endif
#endif // BOOST_FIBERS_UNBOUNDED_CHANNEL_H
#endif // BOOST_FIBERS_UNBOUNDED_QUEUE_H