464 lines
20 KiB
Plaintext
464 lines
20 KiB
Plaintext
[/
|
|
(C) Copyright 2008-11 Anthony Williams.
|
|
(C) Copyright 2012-2015 Vicente J. Botet Escriba.
|
|
Distributed under the Boost Software License, Version 1.0.
|
|
(See accompanying file LICENSE_1_0.txt or copy at
|
|
http://www.boost.org/LICENSE_1_0.txt).
|
|
]
|
|
|
|
[section:futures Futures]
|
|
|
|
[template future_state_link[link_text] [link thread.synchronization.futures.reference.future_state [link_text]]]
|
|
[def __uninitialized__ [future_state_link `boost::future_state::uninitialized`]]
|
|
[def __ready__ [future_state_link `boost::future_state::ready`]]
|
|
[def __waiting__ [future_state_link `boost::future_state::waiting`]]
|
|
|
|
[def __future_uninitialized__ `boost::future_uninitialized`]
|
|
[def __broken_promise__ `boost::broken_promise`]
|
|
[def __future_already_retrieved__ `boost::future_already_retrieved`]
|
|
[def __task_moved__ `boost::task_moved`]
|
|
[def __task_already_started__ `boost::task_already_started`]
|
|
[def __promise_already_satisfied__ `boost::promise_already_satisfied`]
|
|
|
|
[def __thread_interrupted__ `boost::thread_interrupted`]
|
|
|
|
|
|
[template unique_future_link[link_text] [link thread.synchronization.futures.reference.unique_future [link_text]]]
|
|
[def __unique_future__ [unique_future_link `future`]]
|
|
[def __unique_future `future`]
|
|
|
|
[template unique_future_get_link[link_text] [link thread.synchronization.futures.reference.unique_future.get [link_text]]]
|
|
[def __unique_future_get__ [unique_future_get_link `boost::future<R>::get()`]]
|
|
|
|
[template unique_future_wait_link[link_text] [link thread.synchronization.futures.reference.unique_future.wait [link_text]]]
|
|
[def __unique_future_wait__ [unique_future_wait_link `boost::future<R>::wait()`]]
|
|
|
|
[template unique_future_is_ready_link[link_text] [link thread.synchronization.futures.reference.unique_future.is_ready [link_text]]]
|
|
[def __unique_future_is_ready__ [unique_future_is_ready_link `boost::future<R>::is_ready()`]]
|
|
|
|
[template unique_future_has_value_link[link_text] [link thread.synchronization.futures.reference.unique_future.has_value [link_text]]]
|
|
[def __unique_future_has_value__ [unique_future_has_value_link `boost::future<R>::has_value()`]]
|
|
|
|
[template unique_future_has_exception_link[link_text] [link thread.synchronization.futures.reference.unique_future.has_exception [link_text]]]
|
|
[def __unique_future_has_exception__ [unique_future_has_exception_link `boost::future<R>::has_exception()`]]
|
|
|
|
[template unique_future_get_state_link[link_text] [link thread.synchronization.futures.reference.unique_future.get_state [link_text]]]
|
|
[def __unique_future_get_state__ [unique_future_get_state_link `boost::future<R>::get_state()`]]
|
|
|
|
[template shared_future_link[link_text] [link thread.synchronization.futures.reference.shared_future [link_text]]]
|
|
[def __shared_future__ [shared_future_link `boost::shared_future`]]
|
|
|
|
[template shared_future_get_link[link_text] [link thread.synchronization.futures.reference.shared_future.get [link_text]]]
|
|
[def __shared_future_get__ [shared_future_get_link `boost::shared_future<R>::get()`]]
|
|
|
|
[template shared_future_wait_link[link_text] [link thread.synchronization.futures.reference.shared_future.wait [link_text]]]
|
|
[def __shared_future_wait__ [shared_future_wait_link `boost::shared_future<R>::wait()`]]
|
|
|
|
[template shared_future_is_ready_link[link_text] [link thread.synchronization.futures.reference.shared_future.is_ready [link_text]]]
|
|
[def __shared_future_is_ready__ [shared_future_is_ready_link `boost::shared_future<R>::is_ready()`]]
|
|
|
|
[template shared_future_has_value_link[link_text] [link thread.synchronization.futures.reference.shared_future.has_value [link_text]]]
|
|
[def __shared_future_has_value__ [shared_future_has_value_link `boost::shared_future<R>::has_value()`]]
|
|
|
|
[template shared_future_has_exception_link[link_text] [link thread.synchronization.futures.reference.shared_future.has_exception [link_text]]]
|
|
[def __shared_future_has_exception__ [shared_future_has_exception_link `boost::shared_future<R>::has_exception()`]]
|
|
|
|
[template shared_future_get_state_link[link_text] [link thread.synchronization.futures.reference.shared_future.get_state [link_text]]]
|
|
[def __shared_future_get_state__ [shared_future_get_state_link `boost::shared_future<R>::get_state()`]]
|
|
|
|
[template promise_link[link_text] [link thread.synchronization.futures.reference.promise [link_text]]]
|
|
[def __promise__ [promise_link `boost::promise`]]
|
|
|
|
[template packaged_task_link[link_text] [link thread.synchronization.futures.reference.packaged_task [link_text]]]
|
|
[def __packaged_task__ [packaged_task_link `boost::packaged_task`]]
|
|
[def __packaged_task [packaged_task_link `boost::packaged_task`]]
|
|
|
|
[template wait_for_any_link[link_text] [link thread.synchronization.futures.reference.wait_for_any [link_text]]]
|
|
[def __wait_for_any__ [wait_for_any_link `boost::wait_for_any()`]]
|
|
|
|
[template wait_for_all_link[link_text] [link thread.synchronization.futures.reference.wait_for_all [link_text]]]
|
|
[def __wait_for_all__ [wait_for_all_link `boost::wait_for_all()`]]
|
|
|
|
|
|
[section:overview Overview]
|
|
|
|
The futures library provides a means of handling synchronous future values, whether those values are generated by another thread, or
|
|
on a single thread in response to external stimuli, or on-demand.
|
|
|
|
This is done through the provision of four class templates: __unique_future__ and __shared_future__ which are used to retrieve the
|
|
asynchronous results, and __promise__ and __packaged_task__ which are used to generate the asynchronous results.
|
|
|
|
An instance of __unique_future__ holds the one and only reference to a result. Ownership can be transferred between instances using
|
|
the move constructor or move-assignment operator, but at most one instance holds a reference to a given asynchronous result. When
|
|
the result is ready, it is returned from __unique_future_get__ by rvalue-reference to allow the result to be moved or copied as
|
|
appropriate for the type.
|
|
|
|
On the other hand, many instances of __shared_future__ may reference the same result. Instances can be freely copied and assigned,
|
|
and __shared_future_get__ returns a `const` reference so that multiple calls to __shared_future_get__ are safe. You can move an
|
|
instance of __unique_future__ into an instance of __shared_future__, thus transferring ownership of the associated asynchronous
|
|
result, but not vice-versa.
|
|
|
|
`boost::async` is a simple way of running asynchronous tasks. A call to `boost::async` returns a __unique_future__ that will contain the result of the task.
|
|
|
|
|
|
You can wait for futures either individually or with one of the __wait_for_any__ and __wait_for_all__ functions.
|
|
|
|
[endsect]
|
|
|
|
[section:creating Creating asynchronous values]
|
|
|
|
You can set the value in a future with either a __promise__ or a __packaged_task__. A __packaged_task__ is a callable object that
|
|
wraps a function or callable object. When the packaged task is invoked, it invokes the contained function in turn, and populates a
|
|
future with the return value. This is an answer to the perennial question: "how do I return a value from a thread?": package the
|
|
function you wish to run as a __packaged_task__ and pass the packaged task to the thread constructor. The future retrieved from the
|
|
packaged task can then be used to obtain the return value. If the function throws an exception, that is stored in the future in
|
|
place of the return value.
|
|
|
|
int calculate_the_answer_to_life_the_universe_and_everything()
|
|
{
|
|
return 42;
|
|
}
|
|
|
|
boost::packaged_task<int> pt(calculate_the_answer_to_life_the_universe_and_everything);
|
|
boost::__unique_future__<int> fi=pt.get_future();
|
|
|
|
boost::thread task(boost::move(pt)); // launch task on a thread
|
|
|
|
fi.wait(); // wait for it to finish
|
|
|
|
assert(fi.is_ready());
|
|
assert(fi.has_value());
|
|
assert(!fi.has_exception());
|
|
assert(fi.get_state()==boost::future_state::ready);
|
|
assert(fi.get()==42);
|
|
|
|
|
|
A __promise__ is a bit more low level: it just provides explicit functions to store a value or an exception in the associated
|
|
future. A promise can therefore be used where the value may come from more than one possible source, or where a single operation may
|
|
produce multiple values.
|
|
|
|
boost::promise<int> pi;
|
|
boost::__unique_future__<int> fi;
|
|
fi=pi.get_future();
|
|
|
|
pi.set_value(42);
|
|
|
|
assert(fi.is_ready());
|
|
assert(fi.has_value());
|
|
assert(!fi.has_exception());
|
|
assert(fi.get_state()==boost::future_state::ready);
|
|
assert(fi.get()==42);
|
|
|
|
[endsect]
|
|
|
|
[section:lazy_futures Wait Callbacks and Lazy Futures]
|
|
|
|
Both __promise__ and __packaged_task__ support ['wait callbacks] that are invoked when a thread blocks in a call to `wait()` or
|
|
`timed_wait()` on a future that is waiting for the result from the __promise__ or __packaged_task__, in the thread that is doing the
|
|
waiting. These can be set using the `set_wait_callback()` member function on the __promise__ or __packaged_task__ in question.
|
|
|
|
This allows ['lazy futures] where the result is not actually computed until it is needed by some thread. In the example below, the
|
|
call to `f.get()` invokes the callback `invoke_lazy_task`, which runs the task to set the value. If you remove the call to
|
|
`f.get()`, the task is not ever run.
|
|
|
|
int calculate_the_answer_to_life_the_universe_and_everything()
|
|
{
|
|
return 42;
|
|
}
|
|
|
|
void invoke_lazy_task(boost::packaged_task<int>& task)
|
|
{
|
|
try
|
|
{
|
|
task();
|
|
}
|
|
catch(boost::task_already_started&)
|
|
{}
|
|
}
|
|
|
|
int main()
|
|
{
|
|
boost::packaged_task<int> task(calculate_the_answer_to_life_the_universe_and_everything);
|
|
task.set_wait_callback(invoke_lazy_task);
|
|
boost::__unique_future__<int> f(task.get_future());
|
|
|
|
assert(f.get()==42);
|
|
}
|
|
|
|
|
|
[endsect]
|
|
|
|
[section:at_thread_exit Handling Detached Threads and Thread Specific Variables]
|
|
|
|
Detached threads pose a problem for objects with thread storage duration.
|
|
If we use a mechanism other than `thread::__join` to wait for a __thread to complete its work - such as waiting for a future to be ready -
|
|
then the destructors of thread specific variables will still be running after the waiting thread has resumed.
|
|
This section explain how the standard mechanism can be used to make such synchronization safe by ensuring that the
|
|
objects with thread storage duration are destroyed prior to the future being made ready. e.g.
|
|
|
|
int find_the_answer(); // uses thread specific objects
|
|
void thread_func(boost::promise<int>&& p)
|
|
{
|
|
p.set_value_at_thread_exit(find_the_answer());
|
|
}
|
|
|
|
int main()
|
|
{
|
|
boost::promise<int> p;
|
|
boost::thread t(thread_func,boost::move(p));
|
|
t.detach(); // we're going to wait on the future
|
|
std::cout<<p.get_future().get()<<std::endl;
|
|
}
|
|
|
|
When the call to `get()` returns, we know that not only is the future value ready, but the thread specific variables
|
|
on the other thread have also been destroyed.
|
|
|
|
Such mechanisms are provided for `boost::condition_variable`, `boost::promise` and `boost::packaged_task`. e.g.
|
|
|
|
void task_executor(boost::packaged_task<void(int)> task,int param)
|
|
{
|
|
task.make_ready_at_thread_exit(param); // execute stored task
|
|
} // destroy thread specific and wake threads waiting on futures from task
|
|
|
|
Other threads can wait on a future obtained from the task without having to worry about races due to the execution of
|
|
destructors of the thread specific objects from the task's thread.
|
|
|
|
boost::condition_variable cv;
|
|
boost::mutex m;
|
|
complex_type the_data;
|
|
bool data_ready;
|
|
|
|
void thread_func()
|
|
{
|
|
boost::unique_lock<std::mutex> lk(m);
|
|
the_data=find_the_answer();
|
|
data_ready=true;
|
|
boost::notify_all_at_thread_exit(cv,boost::move(lk));
|
|
} // destroy thread specific objects, notify cv, unlock mutex
|
|
|
|
void waiting_thread()
|
|
{
|
|
boost::unique_lock<std::mutex> lk(m);
|
|
while(!data_ready)
|
|
{
|
|
cv.wait(lk);
|
|
}
|
|
process(the_data);
|
|
}
|
|
|
|
The waiting thread is guaranteed that the thread specific objects used by `thread_func()` have been destroyed by the time
|
|
`process(the_data)` is called. If the lock on `m` is released and re-acquired after setting `data_ready` and before calling
|
|
`boost::notify_all_at_thread_exit()` then this does NOT hold, since the thread may return from the wait due to a
|
|
spurious wake-up.
|
|
|
|
[endsect]
|
|
|
|
[section:async Executing asynchronously]
|
|
|
|
`boost::async` is a simple way of running asynchronous tasks to make use of the available hardware concurrency.
|
|
A call to `boost::async` returns a `boost::future` that will contain the result of the task. Depending on
|
|
the launch policy, the task is either run asynchronously on its own thread or synchronously on whichever thread
|
|
calls the `wait()` or `get()` member functions on that `future`.
|
|
|
|
A launch policy of either boost::launch::async, which asks the runtime to create an asynchronous thread,
|
|
or boost::launch::deferred, which indicates you simply want to defer the function call until a later time (lazy evaluation).
|
|
This argument is optional - if you omit it your function will use the default policy.
|
|
|
|
For example, consider computing the sum of a very large array. The first task is to not compute asynchronously when
|
|
the overhead would be significant. The second task is to split the work into two pieces, one executed by the host
|
|
thread and one executed asynchronously.
|
|
|
|
|
|
int parallel_sum(int* data, int size)
|
|
{
|
|
int sum = 0;
|
|
if ( size < 1000 )
|
|
for ( int i = 0; i < size; ++i )
|
|
sum += data[i];
|
|
else {
|
|
auto handle = boost::async(parallel_sum, data+size/2, size-size/2);
|
|
sum += parallel_sum(data, size/2);
|
|
sum += handle.get();
|
|
}
|
|
return sum;
|
|
}
|
|
|
|
|
|
|
|
[endsect]
|
|
|
|
[section:shared Shared Futures]
|
|
|
|
`shared_future` is designed to be shared between threads,
|
|
that is to allow multiple concurrent get operations.
|
|
|
|
[heading Multiple get]
|
|
|
|
The second `get()` call in the following example is undefined.
|
|
|
|
void bad_second_use( type arg ) {
|
|
|
|
auto ftr = async( [=]{ return work( arg ); } );
|
|
if ( cond1 )
|
|
{
|
|
use1( ftr.get() );
|
|
} else
|
|
{
|
|
use2( ftr.get() );
|
|
}
|
|
use3( ftr.get() ); // second use is undefined
|
|
}
|
|
|
|
Using a `shared_future` solves the issue
|
|
|
|
void good_second_use( type arg ) {
|
|
|
|
shared_future<type> ftr = async( [=]{ return work( arg ); } );
|
|
if ( cond1 )
|
|
{
|
|
use1( ftr.get() );
|
|
} else
|
|
{
|
|
use2( ftr.get() );
|
|
}
|
|
use3( ftr.get() ); // second use is defined
|
|
}
|
|
|
|
[heading share()]
|
|
|
|
Naming the return type when declaring the `shared_future` is needed; auto is not available within template argument lists.
|
|
Here `share()` could be used to simplify the code
|
|
|
|
void better_second_use( type arg ) {
|
|
|
|
auto ftr = async( [=]{ return work( arg ); } ).share();
|
|
if ( cond1 )
|
|
{
|
|
use1( ftr.get() );
|
|
} else
|
|
{
|
|
use2( ftr.get() );
|
|
}
|
|
use3( ftr.get() ); // second use is defined
|
|
}
|
|
|
|
[heading Writing on get()]
|
|
|
|
The user can either read or write the future variable.
|
|
|
|
void write_to_get( type arg ) {
|
|
|
|
auto ftr = async( [=]{ return work( arg ); } ).share();
|
|
if ( cond1 )
|
|
{
|
|
use1( ftr.get() );
|
|
} else
|
|
{
|
|
if ( cond2 )
|
|
use2( ftr.get() );
|
|
else
|
|
ftr.get() = something(); // assign to non-const reference.
|
|
}
|
|
use3( ftr.get() ); // second use is defined
|
|
}
|
|
|
|
This works because the `shared_future<>::get()` function returns a non-const reference to the appropriate storage.
|
|
Of course the access to this storage must be ensured by the user. The library doesn't ensure the access to the internal storage is thread safe.
|
|
|
|
There has been some work by the C++ standard committee on an `atomic_future` that behaves as an `atomic` variable, that is thread_safe,
|
|
and a `shared_future` that can be shared between several threads, but there were not enough consensus and time to get it ready for C++11.
|
|
|
|
[endsect]
|
|
|
|
[section:make_ready_future Making immediate futures easier]
|
|
|
|
Some functions may know the value at the point of construction. In these cases the value is immediately available,
|
|
but needs to be returned as a future or shared_future. By using make_ready_future a future
|
|
can be created which holds a pre-computed result in its shared state.
|
|
|
|
Without these features it is non-trivial to create a future directly from a value.
|
|
First a promise must be created, then the promise is set, and lastly the future is retrieved from the promise.
|
|
This can now be done with one operation.
|
|
|
|
[heading make_ready_future]
|
|
|
|
This function creates a future for a given value. If no value is given then a future<void> is returned.
|
|
This function is primarily useful in cases where sometimes, the return value is immediately available, but sometimes
|
|
it is not. The example below illustrates, that in an error path the value is known immediately, however in other paths
|
|
the function must return an eventual value represented as a future.
|
|
|
|
|
|
boost::future<int> compute(int x)
|
|
{
|
|
if (x == 0) return boost::make_ready_future(0);
|
|
if (x < 0) return boost::make_ready_future<int>(std::logic_error("Error"));
|
|
boost::future<int> f1 = boost::async([]() { return x+1; });
|
|
return f1;
|
|
}
|
|
|
|
There are two variations of this function. The first takes a value of any type, and returns a future of that type.
|
|
The input value is passed to the shared state of the returned future. The second version takes no input and returns a future<void>.
|
|
|
|
[endsect]
|
|
|
|
[section:then Associating future continuations]
|
|
|
|
In asynchronous programming, it is very common for one asynchronous operation, on completion, to invoke a second
|
|
operation and pass data to it. The current C++ standard does not allow one to register a continuation to a future.
|
|
With `.then`, instead of waiting for the result, a continuation is "attached" to the asynchronous operation, which is
|
|
invoked when the result is ready. Continuations registered using the `.then` function will help to avoid blocking waits
|
|
or wasting threads on polling, greatly improving the responsiveness and scalability of an application.
|
|
|
|
`future.then()` provides the ability to sequentially compose two futures by declaring one to be the continuation of another.
|
|
With `.then()` the antecedent future is ready (has a value or exception stored in the shared state) before the continuation
|
|
starts as instructed by the lambda function.
|
|
|
|
In the example below the `future<string>` `f2` is registered to be a continuation of `future<int>` `f1` using the `.then()` member
|
|
function. This operation takes a lambda function which describes how `f2` should proceed after `f1` is ready.
|
|
|
|
|
|
#include <boost/thread/future.hpp>
|
|
using namespace boost;
|
|
int main()
|
|
{
|
|
future<int> f1 = async([]() { return 123; });
|
|
future<string> f2 = f1.then([](future<int> f) { return f.get().to_string(); // here .get() won't block });
|
|
}
|
|
|
|
One key feature of this function is the ability to chain multiple asynchronous operations. In asynchronous programming,
|
|
it's common to define a sequence of operations, in which each continuation executes only when the previous one completes.
|
|
In some cases, the antecedent future produces a value that the continuation accepts as input. By using `future.then()`,
|
|
creating a chain of continuations becomes straightforward and intuitive:
|
|
|
|
myFuture.then(...).then(...).then(...).
|
|
|
|
Some points to note are:
|
|
|
|
* Each continuation will not begin until the preceding has completed.
|
|
* If an exception is thrown, the following continuation can handle it in a try-catch block
|
|
|
|
|
|
Input Parameters:
|
|
|
|
* Lambda function: One option which can be considered is to take two functions, one for
|
|
success and one for error handling. However this option has not been retained for the moment.
|
|
The lambda function takes a future as its input which carries the exception
|
|
through. This makes propagating exceptions straightforward. This approach also simplifies the chaining of continuations.
|
|
* Executor: Providing an overload to `.then`, to take an executor reference places great flexibility over the execution
|
|
of the future in the programmer's hand. As described above, often taking a launch policy is not sufficient for powerful
|
|
asynchronous operations. The lifetime of the executor must outlive the continuation.
|
|
* Launch policy: if the additional flexibility that the executor provides is not required.
|
|
|
|
Return values: The decision to return a future was based primarily on the ability to chain multiple continuations using
|
|
`.then()`. This benefit of composability gives the programmer incredible control and flexibility over their code. Returning
|
|
a `future` object rather than a `shared_future` is also a much cheaper operation thereby improving performance. A
|
|
`shared_future` object is not necessary to take advantage of the chaining feature. It is also easy to go from a `future`
|
|
to a `shared_future` when needed using future::share().
|
|
|
|
|
|
[endsect]
|
|
|
|
|
|
[include future_ref.qbk]
|
|
|
|
[endsect] |