asio/example/cpp11/executors/actor.cpp
2017-12-02 10:27:15 +11:00

287 lines
7.5 KiB
C++

#include <boost/asio/defer.hpp>
#include <boost/asio/executor.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/system_executor.hpp>
#include <condition_variable>
#include <deque>
#include <memory>
#include <mutex>
#include <typeinfo>
#include <vector>
using boost::asio::defer;
using boost::asio::executor;
using boost::asio::post;
using boost::asio::strand;
using boost::asio::system_executor;
//------------------------------------------------------------------------------
// A tiny actor framework
// ~~~~~~~~~~~~~~~~~~~~~~
class actor;
// Used to identify the sender and recipient of messages.
typedef actor* actor_address;
// Base class for all registered message handlers.
class message_handler_base
{
public:
virtual ~message_handler_base() {}
// Used to determine which message handlers receive an incoming message.
virtual const std::type_info& message_id() const = 0;
};
// Base class for a handler for a specific message type.
template <class Message>
class message_handler : public message_handler_base
{
public:
// Handle an incoming message.
virtual void handle_message(Message msg, actor_address from) = 0;
};
// Concrete message handler for a specific message type.
template <class Actor, class Message>
class mf_message_handler : public message_handler<Message>
{
public:
// Construct a message handler to invoke the specified member function.
mf_message_handler(void (Actor::* mf)(Message, actor_address), Actor* a)
: function_(mf), actor_(a)
{
}
// Used to determine which message handlers receive an incoming message.
virtual const std::type_info& message_id() const
{
return typeid(Message);
}
// Handle an incoming message.
virtual void handle_message(Message msg, actor_address from)
{
(actor_->*function_)(std::move(msg), from);
}
// Determine whether the message handler represents the specified function.
bool is_function(void (Actor::* mf)(Message, actor_address)) const
{
return mf == function_;
}
private:
void (Actor::* function_)(Message, actor_address);
Actor* actor_;
};
// Base class for all actors.
class actor
{
public:
virtual ~actor()
{
}
// Obtain the actor's address for use as a message sender or recipient.
actor_address address()
{
return this;
}
// Send a message from one actor to another.
template <class Message>
friend void send(Message msg, actor_address from, actor_address to)
{
// Execute the message handler in the context of the target's executor.
post(to->executor_,
[=]
{
to->call_handler(std::move(msg), from);
});
}
protected:
// Construct the actor to use the specified executor for all message handlers.
actor(executor e)
: executor_(std::move(e))
{
}
// Register a handler for a specific message type. Duplicates are permitted.
template <class Actor, class Message>
void register_handler(void (Actor::* mf)(Message, actor_address))
{
handlers_.push_back(
std::make_shared<mf_message_handler<Actor, Message>>(
mf, static_cast<Actor*>(this)));
}
// Deregister a handler. Removes only the first matching handler.
template <class Actor, class Message>
void deregister_handler(void (Actor::* mf)(Message, actor_address))
{
const std::type_info& id = typeid(message_handler<Message>);
for (auto iter = handlers_.begin(); iter != handlers_.end(); ++iter)
{
if ((*iter)->message_id() == id)
{
auto mh = static_cast<mf_message_handler<Actor, Message>*>(iter->get());
if (mh->is_function(mf))
{
handlers_.erase(iter);
return;
}
}
}
}
// Send a message from within a message handler.
template <class Message>
void tail_send(Message msg, actor_address to)
{
// Execute the message handler in the context of the target's executor.
actor* from = this;
defer(to->executor_,
[=]
{
to->call_handler(std::move(msg), from);
});
}
private:
// Find the matching message handlers, if any, and call them.
template <class Message>
void call_handler(Message msg, actor_address from)
{
const std::type_info& message_id = typeid(Message);
for (auto& h: handlers_)
{
if (h->message_id() == message_id)
{
auto mh = static_cast<message_handler<Message>*>(h.get());
mh->handle_message(msg, from);
}
}
}
// All messages associated with a single actor object should be processed
// non-concurrently. We use a strand to ensure non-concurrent execution even
// if the underlying executor may use multiple threads.
strand<executor> executor_;
std::vector<std::shared_ptr<message_handler_base>> handlers_;
};
// A concrete actor that allows synchronous message retrieval.
template <class Message>
class receiver : public actor
{
public:
receiver()
: actor(system_executor())
{
register_handler(&receiver::message_handler);
}
// Block until a message has been received.
Message wait()
{
std::unique_lock<std::mutex> lock(mutex_);
condition_.wait(lock, [this]{ return !message_queue_.empty(); });
Message msg(std::move(message_queue_.front()));
message_queue_.pop_front();
return msg;
}
private:
// Handle a new message by adding it to the queue and waking a waiter.
void message_handler(Message msg, actor_address /* from */)
{
std::lock_guard<std::mutex> lock(mutex_);
message_queue_.push_back(std::move(msg));
condition_.notify_one();
}
std::mutex mutex_;
std::condition_variable condition_;
std::deque<Message> message_queue_;
};
//------------------------------------------------------------------------------
#include <boost/asio/thread_pool.hpp>
#include <iostream>
using boost::asio::thread_pool;
class member : public actor
{
public:
explicit member(executor e)
: actor(std::move(e))
{
register_handler(&member::init_handler);
}
private:
void init_handler(actor_address next, actor_address from)
{
next_ = next;
caller_ = from;
register_handler(&member::token_handler);
deregister_handler(&member::init_handler);
}
void token_handler(int token, actor_address /*from*/)
{
int msg(token);
actor_address to(caller_);
if (token > 0)
{
msg = token - 1;
to = next_;
}
tail_send(msg, to);
}
actor_address next_;
actor_address caller_;
};
int main()
{
const std::size_t num_threads = 16;
const int num_hops = 50000000;
const std::size_t num_actors = 503;
const int token_value = (num_hops + num_actors - 1) / num_actors;
const std::size_t actors_per_thread = num_actors / num_threads;
struct single_thread_pool : thread_pool { single_thread_pool() : thread_pool(1) {} };
single_thread_pool pools[num_threads];
std::vector<std::shared_ptr<member>> members(num_actors);
receiver<int> rcvr;
// Create the member actors.
for (std::size_t i = 0; i < num_actors; ++i)
members[i] = std::make_shared<member>(pools[(i / actors_per_thread) % num_threads].get_executor());
// Initialise the actors by passing each one the address of the next actor in the ring.
for (std::size_t i = num_actors, next_i = 0; i > 0; next_i = --i)
send(members[next_i]->address(), rcvr.address(), members[i - 1]->address());
// Send exactly one token to each actor, all with the same initial value, rounding up if required.
for (std::size_t i = 0; i < num_actors; ++i)
send(token_value, rcvr.address(), members[i]->address());
// Wait for all signal messages, indicating the tokens have all reached zero.
for (std::size_t i = 0; i < num_actors; ++i)
rcvr.wait();
}