b53e167a68
- buffered_channel: MPMC with lock-free guarantees - unbuffered_channel: rendezvous point
510 lines
19 KiB
Plaintext
510 lines
19 KiB
Plaintext
[/
|
|
Copyright Oliver Kowalke, Nat Goodspeed 2015.
|
|
Distributed under the Boost Software License, Version 1.0.
|
|
(See accompanying file LICENSE_1_0.txt or copy at
|
|
http://www.boost.org/LICENSE_1_0.txt
|
|
]
|
|
|
|
[/ import path is relative to this .qbk file]
|
|
[import ../examples/wait_stuff.cpp]
|
|
|
|
[#when_any]
|
|
[section:when_any when_any / when_all functionality]
|
|
|
|
[heading Overview]
|
|
|
|
A bit of wisdom from the early days of computing still holds true today:
|
|
prefer to model program state using the instruction pointer rather than with
|
|
Boolean flags. In other words, if the program must ["do something] and then
|
|
do something almost the same, but with minor changes... perhaps parts of that
|
|
something should be broken out as smaller separate functions, rather than
|
|
introducing flags to alter the internal behavior of a monolithic function.
|
|
|
|
To that we would add: prefer to describe control flow using C++ native
|
|
constructs such as function calls, `if`, `while`, `for`, `do` et al.
|
|
rather than as chains of callbacks.
|
|
|
|
One of the great strengths of __boost_fiber__ is the flexibility it confers on
|
|
the coder to restructure an application from chains of callbacks to
|
|
straightforward C++ statement sequence, even when code in that fiber is
|
|
in fact interleaved with code running in other fibers.
|
|
|
|
There has been much recent discussion about the benefits of when_any and
|
|
when_all functionality. When dealing with asynchronous and possibly unreliable
|
|
services, these are valuable idioms. But of course when_any and when_all are
|
|
closely tied to the use of chains of callbacks.
|
|
|
|
This section presents recipes for achieving the same ends, in the context of a
|
|
fiber that wants to ["do something] when one or more other independent
|
|
activities have completed. Accordingly, these are `wait_something()`
|
|
functions rather than `when_something()` functions. The expectation is that
|
|
the calling fiber asks to launch those independent activities, then waits for
|
|
them, then sequentially proceeds with whatever processing depends on those
|
|
results.
|
|
|
|
The function names shown (e.g. [link wait_first_simple `wait_first_simple()`])
|
|
are for illustrative purposes only, because all these functions have been
|
|
bundled into a single source file. Presumably, if (say) [link
|
|
wait_first_success `wait_first_success()`] best suits your application needs,
|
|
you could introduce that variant with the name `wait_any()`.
|
|
|
|
[note The functions presented in this section accept variadic argument lists
|
|
of task functions. Corresponding `wait_something()` functions accepting a
|
|
container of task functions are left as an exercise for the interested reader.
|
|
Those should actually be simpler. Most of the complexity would arise from
|
|
overloading the same name for both purposes.]
|
|
|
|
[/ @path link is relative to (eventual) doc/html/index.html, hence ../..]
|
|
All the source code for this section is found in
|
|
[@../../examples/wait_stuff.cpp wait_stuff.cpp].
|
|
|
|
[heading Example Task Function]
|
|
|
|
[#wait_sleeper]
|
|
We found it convenient to model an asynchronous task using this function:
|
|
|
|
[wait_sleeper]
|
|
|
|
with type-specific `sleeper()` ["front ends] for `std::string`, `double` and
|
|
`int`.
|
|
|
|
`Verbose` simply prints a message to `std::cout` on construction and
|
|
destruction.
|
|
|
|
Basically:
|
|
|
|
# `sleeper()` prints a start message;
|
|
# sleeps for the specified number of milliseconds;
|
|
# if `thrw` is passed as `true`, throws a string description of the passed
|
|
`item`;
|
|
# else returns the passed `item`.
|
|
# On the way out, `sleeper()` produces a stop message.
|
|
|
|
This function will feature in the example calls to the various functions
|
|
presented below.
|
|
|
|
[section when_any]
|
|
[#wait_first_simple_section]
|
|
[section when_any, simple completion]
|
|
|
|
The simplest case is when you only need to know that the first of a set of
|
|
asynchronous tasks has completed [mdash] but you don't need to obtain a return
|
|
value, and you're confident that they will not throw exceptions.
|
|
|
|
[#wait_done]
|
|
For this we introduce a `Done` class to wrap a `bool` variable with a
|
|
[class_link condition_variable] and a [class_link mutex]:
|
|
|
|
[wait_done]
|
|
|
|
The pattern we follow throughout this section is to pass a
|
|
[@http://www.cplusplus.com/reference/memory/shared_ptr/ `std::shared_ptr<>`]
|
|
to the relevant synchronization object to the various tasks' fiber functions.
|
|
This eliminates nagging questions about the lifespan of the synchronization
|
|
object relative to the last of the fibers.
|
|
|
|
[#wait_first_simple]
|
|
`wait_first_simple()` uses that tactic for [link wait_done `Done`]:
|
|
|
|
[wait_first_simple]
|
|
|
|
[#wait_first_simple_impl]
|
|
`wait_first_simple_impl()` is an ordinary recursion over the argument pack,
|
|
capturing `Done::ptr` for each new fiber:
|
|
|
|
[wait_first_simple_impl]
|
|
|
|
The body of the fiber's lambda is extremely simple, as promised: call the
|
|
function, notify [link wait_done `Done`] when it returns. The first fiber to
|
|
do so allows `wait_first_simple()` to return [mdash] which is why it's useful
|
|
to have `std::shared_ptr<Done>` manage the lifespan of our `Done` object
|
|
rather than declaring it as a stack variable in `wait_first_simple()`.
|
|
|
|
This is how you might call it:
|
|
|
|
[wait_first_simple_ex]
|
|
|
|
In this example, control resumes after `wait_first_simple()` when [link
|
|
wait_sleeper `sleeper("wfs_short", 50)`] completes [mdash] even though the
|
|
other two `sleeper()` fibers are still running.
|
|
|
|
[endsect]
|
|
[section when_any, return value]
|
|
|
|
It seems more useful to add the ability to capture the return value from the
|
|
first of the task functions to complete. Again, we assume that none will throw
|
|
an exception.
|
|
|
|
One tactic would be to adapt our [link wait_done `Done`] class to store the
|
|
first of the return values, rather than a simple `bool`. However, we choose
|
|
instead to use a [template_link buffered_channel]. We'll only need to enqueue
|
|
the first value, so we'll [member_link buffered_channel..close] it once we've
|
|
retrieved that value. Subsequent `push()` calls will return `closed`.
|
|
|
|
[#wait_first_value]
|
|
[wait_first_value]
|
|
|
|
[#wait_first_value_impl]
|
|
The meat of the `wait_first_value_impl()` function is as you might expect:
|
|
|
|
[wait_first_value_impl]
|
|
|
|
It calls the passed function, pushes its return value and ignores the `push()`
|
|
result. You might call it like this:
|
|
|
|
[wait_first_value_ex]
|
|
|
|
[endsect]
|
|
[section when_any, produce first outcome, whether result or exception]
|
|
|
|
We may not be running in an environment in which we can guarantee no exception
|
|
will be thrown by any of our task functions. In that case, the above
|
|
implementations of `wait_first_something()` would be naïve: as mentioned in
|
|
[link exceptions the section on Fiber Management], an uncaught exception in one
|
|
of our task fibers would cause `std::terminate()` to be called.
|
|
|
|
Let's at least ensure that such an exception would propagate to the fiber
|
|
awaiting the first result. We can use [template_link future] to transport
|
|
either a return value or an exception. Therefore, we will change [link
|
|
wait_first_value `wait_first_value()`]'s [template_link buffered_channel] to
|
|
hold `future< T >` items instead of simply `T`.
|
|
|
|
Once we have a `future<>` in hand, all we need do is call [member_link
|
|
future..get], which will either return the value or rethrow the exception.
|
|
|
|
[#wait_first_outcome]
|
|
[wait_first_outcome]
|
|
|
|
So far so good [mdash] but there's a timing issue. How should we obtain the
|
|
`future<>` to [member_link buffered_channel..push] on the queue?
|
|
|
|
We could call [ns_function_link fibers..async]. That would certainly produce a
|
|
`future<>` for the task function. The trouble is that it would return too
|
|
quickly! We only want `future<>` items for ['completed] tasks on our
|
|
`queue<>`. In fact, we only want the `future<>` for the one that
|
|
completes first. If each fiber launched by `wait_first_outcome()` were to
|
|
`push()` the result of calling `async()`, the queue would only ever report
|
|
the result of the leftmost task item [mdash] ['not] the one that completes most
|
|
quickly.
|
|
|
|
Calling [member_link future..get] on the future returned by `async()` wouldn't
|
|
be right. You can only call `get()` once per `future<>` instance! And if there
|
|
were an exception, it would be rethrown inside the helper fiber at the
|
|
producer end of the queue, rather than propagated to the consumer end.
|
|
|
|
We could call [member_link future..wait]. That would block the helper fiber
|
|
until the `future<>` became ready, at which point we could `push()` it to be
|
|
retrieved by `wait_first_outcome()`.
|
|
|
|
That would work [mdash] but there's a simpler tactic that avoids creating an extra
|
|
fiber. We can wrap the task function in a [template_link packaged_task]. While
|
|
one naturally thinks of passing a `packaged_task<>` to a new fiber [mdash] that is,
|
|
in fact, what `async()` does [mdash] in this case, we're already running in the
|
|
helper fiber at the producer end of the queue! We can simply ['call] the
|
|
`packaged_task<>`. On return from that call, the task function has completed,
|
|
meaning that the `future<>` obtained from the `packaged_task<>` is certain to
|
|
be ready. At that point we can simply `push()` it to the queue.
|
|
|
|
[#wait_first_outcome_impl]
|
|
[wait_first_outcome_impl]
|
|
|
|
Calling it might look like this:
|
|
|
|
[wait_first_outcome_ex]
|
|
|
|
[endsect]
|
|
[section when_any, produce first success]
|
|
|
|
One scenario for ["when_any] functionality is when we're redundantly contacting
|
|
some number of possibly-unreliable web services. Not only might they be slow
|
|
[mdash] any one of them might produce a failure rather than the desired
|
|
result.
|
|
|
|
In such a case, [link wait_first_outcome `wait_first_outcome()`] isn't the
|
|
right approach. If one of the services produces an error quickly, while
|
|
another follows up with a real answer, we don't want to prefer the error just
|
|
because it arrived first!
|
|
|
|
Given the `queue< future< T > >` we already constructed for
|
|
`wait_first_outcome()`, though, we can readily recast the interface function
|
|
to deliver the first ['successful] result.
|
|
|
|
That does beg the question: what if ['all] the task functions throw an
|
|
exception? In that case we'd probably better know about it.
|
|
|
|
[#exception_list]
|
|
The
|
|
[@http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2015/n4407.html#parallel.exceptions.synopsis
|
|
C++ Parallelism Draft Technical Specification] proposes a
|
|
`std::exception_list` exception capable of delivering a collection of
|
|
`std::exception_ptr`s. Until that becomes universally available, let's fake up
|
|
an `exception_list` of our own:
|
|
|
|
[exception_list]
|
|
|
|
Now we can build `wait_first_success()`, using [link wait_first_outcome_impl
|
|
`wait_first_outcome_impl()`].
|
|
|
|
Instead of retrieving only the first `future<>` from the queue, we must now
|
|
loop over `future<>` items. Of course we must limit that iteration! If we
|
|
launch only `count` producer fibers, the `(count+1)`[superscript st]
|
|
[member_link buffered_channel..pop] call would block forever.
|
|
|
|
Given a ready `future<>`, we can distinguish failure by calling [member_link
|
|
future..get_exception_ptr]. If the `future<>` in fact contains a result rather
|
|
than an exception, `get_exception_ptr()` returns `nullptr`. In that case, we
|
|
can confidently call [member_link future..get] to return that result to our
|
|
caller.
|
|
|
|
If the `std::exception_ptr` is ['not] `nullptr`, though, we collect it into
|
|
our pending `exception_list` and loop back for the next `future<>` from the
|
|
queue.
|
|
|
|
If we fall out of the loop [mdash] if every single task fiber threw an
|
|
exception [mdash] we throw the `exception_list` exception into which we've
|
|
been collecting those `std::exception_ptr`s.
|
|
|
|
[#wait_first_success]
|
|
[wait_first_success]
|
|
|
|
A call might look like this:
|
|
|
|
[wait_first_success_ex]
|
|
|
|
[endsect]
|
|
[section when_any, heterogeneous types]
|
|
|
|
We would be remiss to ignore the case in which the various task functions have
|
|
distinct return types. That means that the value returned by the first of them
|
|
might have any one of those types. We can express that with
|
|
[@http://www.boost.org/doc/libs/release/doc/html/variant.html Boost.Variant].
|
|
|
|
To keep the example simple, we'll revert to pretending that none of them can
|
|
throw an exception. That makes `wait_first_value_het()` strongly resemble
|
|
[link wait_first_value `wait_first_value()`]. We can actually reuse [link
|
|
wait_first_value_impl `wait_first_value_impl()`], merely passing
|
|
`boost::variant<T0, T1, ...>` as the queue's value type rather than the
|
|
common `T`!
|
|
|
|
Naturally this could be extended to use [link wait_first_success
|
|
`wait_first_success()`] semantics instead.
|
|
|
|
[wait_first_value_het]
|
|
|
|
It might be called like this:
|
|
|
|
[wait_first_value_het_ex]
|
|
|
|
[endsect]
|
|
[section when_any, a dubious alternative]
|
|
|
|
Certain topics in C++ can arouse strong passions, and exceptions are no
|
|
exception. We cannot resist mentioning [mdash] for purely informational
|
|
purposes [mdash] that when you need only the ['first] result from some number
|
|
of concurrently-running fibers, it would be possible to pass a
|
|
[^shared_ptr<[template_link promise]>] to the participating fibers, then cause
|
|
the initiating fiber to call [member_link future..get] on its [template_link
|
|
future]. The first fiber to call [member_link promise..set_value] on that
|
|
shared `promise` will succeed; subsequent `set_value()` calls on the same
|
|
`promise` instance will throw `future_error`.
|
|
|
|
Use this information at your own discretion. Beware the dark side.
|
|
|
|
[endsect]
|
|
[endsect][/ when_any]
|
|
|
|
[section when_all functionality]
|
|
[section when_all, simple completion]
|
|
|
|
For the case in which we must wait for ['all] task functions to complete
|
|
[mdash] but we don't need results (or expect exceptions) from any of them
|
|
[mdash] we can write `wait_all_simple()` that looks remarkably like [link
|
|
wait_first_simple `wait_first_simple()`]. The difference is that instead of
|
|
our [link wait_done `Done`] class, we instantiate a [class_link barrier] and
|
|
call its [member_link barrier..wait].
|
|
|
|
We initialize the `barrier` with `(count+1)` because we are launching `count`
|
|
fibers, plus the `wait()` call within `wait_all_simple()` itself.
|
|
|
|
[wait_all_simple]
|
|
|
|
As stated above, the only difference between `wait_all_simple_impl()` and
|
|
[link wait_first_simple_impl `wait_first_simple_impl()`] is that the former
|
|
calls `barrier::wait()` rather than `Done::notify()`:
|
|
|
|
[wait_all_simple_impl]
|
|
|
|
You might call it like this:
|
|
|
|
[wait_all_simple_ex]
|
|
|
|
Control will not return from the `wait_all_simple()` call until the last of
|
|
its task functions has completed.
|
|
|
|
[endsect]
|
|
[section when_all, return values]
|
|
|
|
As soon as we want to collect return values from all the task functions, we
|
|
can see right away how to reuse [link wait_first_value `wait_first_value()`]'s
|
|
queue<T> for the purpose. All we have to do is avoid closing it after the
|
|
first value!
|
|
|
|
But in fact, collecting multiple values raises an interesting question: do we
|
|
['really] want to wait until the slowest of them has arrived? Wouldn't we
|
|
rather process each result as soon as it becomes available?
|
|
|
|
Fortunately we can present both APIs. Let's define `wait_all_values_source()`
|
|
to return `shared_ptr<buffered_channel<T>>`.
|
|
|
|
[#wait_all_values]
|
|
Given `wait_all_values_source()`, it's straightforward to implement
|
|
`wait_all_values()`:
|
|
|
|
[wait_all_values]
|
|
|
|
It might be called like this:
|
|
|
|
[wait_all_values_ex]
|
|
|
|
As you can see from the loop in `wait_all_values()`, instead of requiring its
|
|
caller to count values, we define `wait_all_values_source()` to [member_link
|
|
buffered_channel..close] the queue when done. But how do we do that? Each
|
|
producer fiber is independent. It has no idea whether it is the last one to
|
|
[member_link buffered_channel..push] a value.
|
|
|
|
[#wait_nqueue]
|
|
We can address that problem with a counting façade for the
|
|
`queue<>`. In fact, our façade need only support the producer end of
|
|
the queue.
|
|
|
|
[wait_nqueue]
|
|
|
|
[#wait_all_values_source]
|
|
Armed with `nqueue<>`, we can implement `wait_all_values_source()`. It
|
|
starts just like [link wait_first_value `wait_first_value()`]. The difference
|
|
is that we wrap the `queue<T>` with an `nqueue<T>` to pass to
|
|
the producer fibers.
|
|
|
|
Then, of course, instead of popping the first value, closing the queue and
|
|
returning it, we simply return the `shared_ptr<queue<T>>`.
|
|
|
|
[wait_all_values_source]
|
|
|
|
For example:
|
|
|
|
[wait_all_values_source_ex]
|
|
|
|
[#wait_all_values_impl]
|
|
`wait_all_values_impl()` really is just like [link wait_first_value_impl
|
|
`wait_first_value_impl()`] except for the use of `nqueue<T>` rather than
|
|
`queue<T>`:
|
|
|
|
[wait_all_values_impl]
|
|
|
|
[endsect]
|
|
[section when_all until first exception]
|
|
|
|
Naturally, just as with [link wait_first_outcome `wait_first_outcome()`], we
|
|
can elaborate [link wait_all_values `wait_all_values()`] and [link
|
|
wait_all_values_source `wait_all_values_source()`] by passing `future< T >`
|
|
instead of plain `T`.
|
|
|
|
[#wait_all_until_error]
|
|
`wait_all_until_error()` pops that `future< T >` and calls its [member_link
|
|
future..get]:
|
|
|
|
[wait_all_until_error]
|
|
|
|
For example:
|
|
|
|
[wait_all_until_error_ex]
|
|
|
|
[#wait_all_until_error_source]
|
|
Naturally this complicates the API for `wait_all_until_error_source()`. The
|
|
caller must both retrieve a `future< T >` and call its `get()` method. It would,
|
|
of course, be possible to return a façade over the consumer end of the
|
|
queue that would implicitly perform the `get()` and return a simple `T` (or
|
|
throw).
|
|
|
|
The implementation is just as you would expect. Notice, however, that we can
|
|
reuse [link wait_first_outcome_impl `wait_first_outcome_impl()`], passing the
|
|
`nqueue<T>` rather than `queue<T>`.
|
|
|
|
[wait_all_until_error_source]
|
|
|
|
For example:
|
|
|
|
[wait_all_until_error_source_ex]
|
|
|
|
[endsect]
|
|
[section wait_all, collecting all exceptions]
|
|
|
|
[#wait_all_collect_errors]
|
|
Given [link wait_all_until_error_source `wait_all_until_error_source()`], it
|
|
might be more reasonable to make a `wait_all_...()` that collects ['all]
|
|
errors instead of presenting only the first:
|
|
|
|
[wait_all_collect_errors]
|
|
|
|
The implementation is a simple variation on [link wait_first_success
|
|
`wait_first_success()`], using the same [link exception_list `exception_list`]
|
|
exception class.
|
|
|
|
[endsect]
|
|
[section when_all, heterogeneous types]
|
|
|
|
But what about the case when we must wait for all results of different types?
|
|
|
|
We can present an API that is frankly quite cool. Consider a sample struct:
|
|
|
|
[wait_Data]
|
|
|
|
Let's fill its members from task functions all running concurrently:
|
|
|
|
[wait_all_members_data_ex]
|
|
|
|
Note that for this case, we abandon the notion of capturing the earliest
|
|
result first, and so on: we must fill exactly the passed struct in
|
|
left-to-right order.
|
|
|
|
That permits a beautifully simple implementation:
|
|
|
|
[wait_all_members]
|
|
|
|
[wait_all_members_get]
|
|
|
|
It is tempting to try to implement `wait_all_members()` as a one-liner like
|
|
this:
|
|
|
|
return Result{ boost::fibers::async(functions).get()... };
|
|
|
|
The trouble with this tactic is that it would serialize all the task
|
|
functions. The runtime makes a single pass through `functions`, calling
|
|
[ns_function_link fibers..async] for each and then immediately calling
|
|
[member_link future..get] on its returned `future<>`. That blocks the implicit
|
|
loop. The above is almost equivalent to writing:
|
|
|
|
return Result{ functions()... };
|
|
|
|
in which, of course, there is no concurrency at all.
|
|
|
|
Passing the argument pack through a function-call boundary
|
|
(`wait_all_members_get()`) forces the runtime to make ['two] passes: one in
|
|
`wait_all_members()` to collect the `future<>`s from all the `async()` calls,
|
|
the second in `wait_all_members_get()` to fetch each of the results.
|
|
|
|
As noted in comments, within the `wait_all_members_get()` parameter pack
|
|
expansion pass, the blocking behavior of `get()` becomes irrelevant. Along the
|
|
way, we will hit the `get()` for the slowest task function; after that every
|
|
subsequent `get()` will complete in trivial time.
|
|
|
|
By the way, we could also use this same API to fill a vector or other
|
|
collection:
|
|
|
|
[wait_all_members_vector_ex]
|
|
|
|
[endsect]
|
|
[endsect][/ when_all]
|
|
|
|
[endsect][/ outermost]
|