fiber/doc/when_any.qbk
Oliver Kowalke b53e167a68 support for channels refactored
- buffered_channel: MPMC with lock-free guarantees
- unbuffered_channel: rendezvous point
2016-10-30 19:33:49 +01:00

510 lines
19 KiB
Plaintext

[/
Copyright Oliver Kowalke, Nat Goodspeed 2015.
Distributed under the Boost Software License, Version 1.0.
(See accompanying file LICENSE_1_0.txt or copy at
http://www.boost.org/LICENSE_1_0.txt
]
[/ import path is relative to this .qbk file]
[import ../examples/wait_stuff.cpp]
[#when_any]
[section:when_any when_any / when_all functionality]
[heading Overview]
A bit of wisdom from the early days of computing still holds true today:
prefer to model program state using the instruction pointer rather than with
Boolean flags. In other words, if the program must ["do something] and then
do something almost the same, but with minor changes... perhaps parts of that
something should be broken out as smaller separate functions, rather than
introducing flags to alter the internal behavior of a monolithic function.
To that we would add: prefer to describe control flow using C++ native
constructs such as function calls, `if`, `while`, `for`, `do` et al.
rather than as chains of callbacks.
One of the great strengths of __boost_fiber__ is the flexibility it confers on
the coder to restructure an application from chains of callbacks to
straightforward C++ statement sequence, even when code in that fiber is
in fact interleaved with code running in other fibers.
There has been much recent discussion about the benefits of when_any and
when_all functionality. When dealing with asynchronous and possibly unreliable
services, these are valuable idioms. But of course when_any and when_all are
closely tied to the use of chains of callbacks.
This section presents recipes for achieving the same ends, in the context of a
fiber that wants to ["do something] when one or more other independent
activities have completed. Accordingly, these are `wait_something()`
functions rather than `when_something()` functions. The expectation is that
the calling fiber asks to launch those independent activities, then waits for
them, then sequentially proceeds with whatever processing depends on those
results.
The function names shown (e.g. [link wait_first_simple `wait_first_simple()`])
are for illustrative purposes only, because all these functions have been
bundled into a single source file. Presumably, if (say) [link
wait_first_success `wait_first_success()`] best suits your application needs,
you could introduce that variant with the name `wait_any()`.
[note The functions presented in this section accept variadic argument lists
of task functions. Corresponding `wait_something()` functions accepting a
container of task functions are left as an exercise for the interested reader.
Those should actually be simpler. Most of the complexity would arise from
overloading the same name for both purposes.]
[/ @path link is relative to (eventual) doc/html/index.html, hence ../..]
All the source code for this section is found in
[@../../examples/wait_stuff.cpp wait_stuff.cpp].
[heading Example Task Function]
[#wait_sleeper]
We found it convenient to model an asynchronous task using this function:
[wait_sleeper]
with type-specific `sleeper()` ["front ends] for `std::string`, `double` and
`int`.
`Verbose` simply prints a message to `std::cout` on construction and
destruction.
Basically:
# `sleeper()` prints a start message;
# sleeps for the specified number of milliseconds;
# if `thrw` is passed as `true`, throws a string description of the passed
`item`;
# else returns the passed `item`.
# On the way out, `sleeper()` produces a stop message.
This function will feature in the example calls to the various functions
presented below.
[section when_any]
[#wait_first_simple_section]
[section when_any, simple completion]
The simplest case is when you only need to know that the first of a set of
asynchronous tasks has completed [mdash] but you don't need to obtain a return
value, and you're confident that they will not throw exceptions.
[#wait_done]
For this we introduce a `Done` class to wrap a `bool` variable with a
[class_link condition_variable] and a [class_link mutex]:
[wait_done]
The pattern we follow throughout this section is to pass a
[@http://www.cplusplus.com/reference/memory/shared_ptr/ `std::shared_ptr<>`]
to the relevant synchronization object to the various tasks' fiber functions.
This eliminates nagging questions about the lifespan of the synchronization
object relative to the last of the fibers.
[#wait_first_simple]
`wait_first_simple()` uses that tactic for [link wait_done `Done`]:
[wait_first_simple]
[#wait_first_simple_impl]
`wait_first_simple_impl()` is an ordinary recursion over the argument pack,
capturing `Done::ptr` for each new fiber:
[wait_first_simple_impl]
The body of the fiber's lambda is extremely simple, as promised: call the
function, notify [link wait_done `Done`] when it returns. The first fiber to
do so allows `wait_first_simple()` to return [mdash] which is why it's useful
to have `std::shared_ptr<Done>` manage the lifespan of our `Done` object
rather than declaring it as a stack variable in `wait_first_simple()`.
This is how you might call it:
[wait_first_simple_ex]
In this example, control resumes after `wait_first_simple()` when [link
wait_sleeper `sleeper("wfs_short", 50)`] completes [mdash] even though the
other two `sleeper()` fibers are still running.
[endsect]
[section when_any, return value]
It seems more useful to add the ability to capture the return value from the
first of the task functions to complete. Again, we assume that none will throw
an exception.
One tactic would be to adapt our [link wait_done `Done`] class to store the
first of the return values, rather than a simple `bool`. However, we choose
instead to use a [template_link buffered_channel]. We'll only need to enqueue
the first value, so we'll [member_link buffered_channel..close] it once we've
retrieved that value. Subsequent `push()` calls will return `closed`.
[#wait_first_value]
[wait_first_value]
[#wait_first_value_impl]
The meat of the `wait_first_value_impl()` function is as you might expect:
[wait_first_value_impl]
It calls the passed function, pushes its return value and ignores the `push()`
result. You might call it like this:
[wait_first_value_ex]
[endsect]
[section when_any, produce first outcome, whether result or exception]
We may not be running in an environment in which we can guarantee no exception
will be thrown by any of our task functions. In that case, the above
implementations of `wait_first_something()` would be naïve: as mentioned in
[link exceptions the section on Fiber Management], an uncaught exception in one
of our task fibers would cause `std::terminate()` to be called.
Let's at least ensure that such an exception would propagate to the fiber
awaiting the first result. We can use [template_link future] to transport
either a return value or an exception. Therefore, we will change [link
wait_first_value `wait_first_value()`]'s [template_link buffered_channel] to
hold `future< T >` items instead of simply `T`.
Once we have a `future<>` in hand, all we need do is call [member_link
future..get], which will either return the value or rethrow the exception.
[#wait_first_outcome]
[wait_first_outcome]
So far so good [mdash] but there's a timing issue. How should we obtain the
`future<>` to [member_link buffered_channel..push] on the queue?
We could call [ns_function_link fibers..async]. That would certainly produce a
`future<>` for the task function. The trouble is that it would return too
quickly! We only want `future<>` items for ['completed] tasks on our
`queue<>`. In fact, we only want the `future<>` for the one that
completes first. If each fiber launched by `wait_first_outcome()` were to
`push()` the result of calling `async()`, the queue would only ever report
the result of the leftmost task item [mdash] ['not] the one that completes most
quickly.
Calling [member_link future..get] on the future returned by `async()` wouldn't
be right. You can only call `get()` once per `future<>` instance! And if there
were an exception, it would be rethrown inside the helper fiber at the
producer end of the queue, rather than propagated to the consumer end.
We could call [member_link future..wait]. That would block the helper fiber
until the `future<>` became ready, at which point we could `push()` it to be
retrieved by `wait_first_outcome()`.
That would work [mdash] but there's a simpler tactic that avoids creating an extra
fiber. We can wrap the task function in a [template_link packaged_task]. While
one naturally thinks of passing a `packaged_task<>` to a new fiber [mdash] that is,
in fact, what `async()` does [mdash] in this case, we're already running in the
helper fiber at the producer end of the queue! We can simply ['call] the
`packaged_task<>`. On return from that call, the task function has completed,
meaning that the `future<>` obtained from the `packaged_task<>` is certain to
be ready. At that point we can simply `push()` it to the queue.
[#wait_first_outcome_impl]
[wait_first_outcome_impl]
Calling it might look like this:
[wait_first_outcome_ex]
[endsect]
[section when_any, produce first success]
One scenario for ["when_any] functionality is when we're redundantly contacting
some number of possibly-unreliable web services. Not only might they be slow
[mdash] any one of them might produce a failure rather than the desired
result.
In such a case, [link wait_first_outcome `wait_first_outcome()`] isn't the
right approach. If one of the services produces an error quickly, while
another follows up with a real answer, we don't want to prefer the error just
because it arrived first!
Given the `queue< future< T > >` we already constructed for
`wait_first_outcome()`, though, we can readily recast the interface function
to deliver the first ['successful] result.
That does beg the question: what if ['all] the task functions throw an
exception? In that case we'd probably better know about it.
[#exception_list]
The
[@http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2015/n4407.html#parallel.exceptions.synopsis
C++ Parallelism Draft Technical Specification] proposes a
`std::exception_list` exception capable of delivering a collection of
`std::exception_ptr`s. Until that becomes universally available, let's fake up
an `exception_list` of our own:
[exception_list]
Now we can build `wait_first_success()`, using [link wait_first_outcome_impl
`wait_first_outcome_impl()`].
Instead of retrieving only the first `future<>` from the queue, we must now
loop over `future<>` items. Of course we must limit that iteration! If we
launch only `count` producer fibers, the `(count+1)`[superscript st]
[member_link buffered_channel..pop] call would block forever.
Given a ready `future<>`, we can distinguish failure by calling [member_link
future..get_exception_ptr]. If the `future<>` in fact contains a result rather
than an exception, `get_exception_ptr()` returns `nullptr`. In that case, we
can confidently call [member_link future..get] to return that result to our
caller.
If the `std::exception_ptr` is ['not] `nullptr`, though, we collect it into
our pending `exception_list` and loop back for the next `future<>` from the
queue.
If we fall out of the loop [mdash] if every single task fiber threw an
exception [mdash] we throw the `exception_list` exception into which we've
been collecting those `std::exception_ptr`s.
[#wait_first_success]
[wait_first_success]
A call might look like this:
[wait_first_success_ex]
[endsect]
[section when_any, heterogeneous types]
We would be remiss to ignore the case in which the various task functions have
distinct return types. That means that the value returned by the first of them
might have any one of those types. We can express that with
[@http://www.boost.org/doc/libs/release/doc/html/variant.html Boost.Variant].
To keep the example simple, we'll revert to pretending that none of them can
throw an exception. That makes `wait_first_value_het()` strongly resemble
[link wait_first_value `wait_first_value()`]. We can actually reuse [link
wait_first_value_impl `wait_first_value_impl()`], merely passing
`boost::variant<T0, T1, ...>` as the queue's value type rather than the
common `T`!
Naturally this could be extended to use [link wait_first_success
`wait_first_success()`] semantics instead.
[wait_first_value_het]
It might be called like this:
[wait_first_value_het_ex]
[endsect]
[section when_any, a dubious alternative]
Certain topics in C++ can arouse strong passions, and exceptions are no
exception. We cannot resist mentioning [mdash] for purely informational
purposes [mdash] that when you need only the ['first] result from some number
of concurrently-running fibers, it would be possible to pass a
[^shared_ptr<[template_link promise]>] to the participating fibers, then cause
the initiating fiber to call [member_link future..get] on its [template_link
future]. The first fiber to call [member_link promise..set_value] on that
shared `promise` will succeed; subsequent `set_value()` calls on the same
`promise` instance will throw `future_error`.
Use this information at your own discretion. Beware the dark side.
[endsect]
[endsect][/ when_any]
[section when_all functionality]
[section when_all, simple completion]
For the case in which we must wait for ['all] task functions to complete
[mdash] but we don't need results (or expect exceptions) from any of them
[mdash] we can write `wait_all_simple()` that looks remarkably like [link
wait_first_simple `wait_first_simple()`]. The difference is that instead of
our [link wait_done `Done`] class, we instantiate a [class_link barrier] and
call its [member_link barrier..wait].
We initialize the `barrier` with `(count+1)` because we are launching `count`
fibers, plus the `wait()` call within `wait_all_simple()` itself.
[wait_all_simple]
As stated above, the only difference between `wait_all_simple_impl()` and
[link wait_first_simple_impl `wait_first_simple_impl()`] is that the former
calls `barrier::wait()` rather than `Done::notify()`:
[wait_all_simple_impl]
You might call it like this:
[wait_all_simple_ex]
Control will not return from the `wait_all_simple()` call until the last of
its task functions has completed.
[endsect]
[section when_all, return values]
As soon as we want to collect return values from all the task functions, we
can see right away how to reuse [link wait_first_value `wait_first_value()`]'s
queue<T> for the purpose. All we have to do is avoid closing it after the
first value!
But in fact, collecting multiple values raises an interesting question: do we
['really] want to wait until the slowest of them has arrived? Wouldn't we
rather process each result as soon as it becomes available?
Fortunately we can present both APIs. Let's define `wait_all_values_source()`
to return `shared_ptr<buffered_channel<T>>`.
[#wait_all_values]
Given `wait_all_values_source()`, it's straightforward to implement
`wait_all_values()`:
[wait_all_values]
It might be called like this:
[wait_all_values_ex]
As you can see from the loop in `wait_all_values()`, instead of requiring its
caller to count values, we define `wait_all_values_source()` to [member_link
buffered_channel..close] the queue when done. But how do we do that? Each
producer fiber is independent. It has no idea whether it is the last one to
[member_link buffered_channel..push] a value.
[#wait_nqueue]
We can address that problem with a counting façade for the
`queue<>`. In fact, our façade need only support the producer end of
the queue.
[wait_nqueue]
[#wait_all_values_source]
Armed with `nqueue<>`, we can implement `wait_all_values_source()`. It
starts just like [link wait_first_value `wait_first_value()`]. The difference
is that we wrap the `queue<T>` with an `nqueue<T>` to pass to
the producer fibers.
Then, of course, instead of popping the first value, closing the queue and
returning it, we simply return the `shared_ptr<queue<T>>`.
[wait_all_values_source]
For example:
[wait_all_values_source_ex]
[#wait_all_values_impl]
`wait_all_values_impl()` really is just like [link wait_first_value_impl
`wait_first_value_impl()`] except for the use of `nqueue<T>` rather than
`queue<T>`:
[wait_all_values_impl]
[endsect]
[section when_all until first exception]
Naturally, just as with [link wait_first_outcome `wait_first_outcome()`], we
can elaborate [link wait_all_values `wait_all_values()`] and [link
wait_all_values_source `wait_all_values_source()`] by passing `future< T >`
instead of plain `T`.
[#wait_all_until_error]
`wait_all_until_error()` pops that `future< T >` and calls its [member_link
future..get]:
[wait_all_until_error]
For example:
[wait_all_until_error_ex]
[#wait_all_until_error_source]
Naturally this complicates the API for `wait_all_until_error_source()`. The
caller must both retrieve a `future< T >` and call its `get()` method. It would,
of course, be possible to return a façade over the consumer end of the
queue that would implicitly perform the `get()` and return a simple `T` (or
throw).
The implementation is just as you would expect. Notice, however, that we can
reuse [link wait_first_outcome_impl `wait_first_outcome_impl()`], passing the
`nqueue<T>` rather than `queue<T>`.
[wait_all_until_error_source]
For example:
[wait_all_until_error_source_ex]
[endsect]
[section wait_all, collecting all exceptions]
[#wait_all_collect_errors]
Given [link wait_all_until_error_source `wait_all_until_error_source()`], it
might be more reasonable to make a `wait_all_...()` that collects ['all]
errors instead of presenting only the first:
[wait_all_collect_errors]
The implementation is a simple variation on [link wait_first_success
`wait_first_success()`], using the same [link exception_list `exception_list`]
exception class.
[endsect]
[section when_all, heterogeneous types]
But what about the case when we must wait for all results of different types?
We can present an API that is frankly quite cool. Consider a sample struct:
[wait_Data]
Let's fill its members from task functions all running concurrently:
[wait_all_members_data_ex]
Note that for this case, we abandon the notion of capturing the earliest
result first, and so on: we must fill exactly the passed struct in
left-to-right order.
That permits a beautifully simple implementation:
[wait_all_members]
[wait_all_members_get]
It is tempting to try to implement `wait_all_members()` as a one-liner like
this:
return Result{ boost::fibers::async(functions).get()... };
The trouble with this tactic is that it would serialize all the task
functions. The runtime makes a single pass through `functions`, calling
[ns_function_link fibers..async] for each and then immediately calling
[member_link future..get] on its returned `future<>`. That blocks the implicit
loop. The above is almost equivalent to writing:
return Result{ functions()... };
in which, of course, there is no concurrency at all.
Passing the argument pack through a function-call boundary
(`wait_all_members_get()`) forces the runtime to make ['two] passes: one in
`wait_all_members()` to collect the `future<>`s from all the `async()` calls,
the second in `wait_all_members_get()` to fetch each of the results.
As noted in comments, within the `wait_all_members_get()` parameter pack
expansion pass, the blocking behavior of `get()` becomes irrelevant. Along the
way, we will hit the `get()` for the slowest task function; after that every
subsequent `get()` will complete in trivial time.
By the way, we could also use this same API to fill a vector or other
collection:
[wait_all_members_vector_ex]
[endsect]
[endsect][/ when_all]
[endsect][/ outermost]