a4cec69069
[SVN r41370]
114 lines
3.4 KiB
C++
114 lines
3.4 KiB
C++
// Copyright (C) 2006 Douglas Gregor <doug.gregor@gmail.com>
|
|
|
|
// Use, modification and distribution is subject to the Boost Software
|
|
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
|
|
// http://www.boost.org/LICENSE_1_0.txt)
|
|
|
|
// An example using Boost.MPI's split() operation on communicators to
|
|
// create separate data-generating processes and data-collecting
|
|
// processes using boost::optional for broadcasting.
|
|
#include <boost/mpi.hpp>
|
|
#include <iostream>
|
|
#include <cstdlib>
|
|
#include <boost/serialization/vector.hpp>
|
|
#include <boost/serialization/optional.hpp>
|
|
namespace mpi = boost::mpi;
|
|
|
|
enum message_tags { msg_data_packet, msg_finished };
|
|
|
|
void generate_data(mpi::communicator local, mpi::communicator world)
|
|
{
|
|
using std::srand;
|
|
using std::rand;
|
|
|
|
// The rank of the collector within the world communicator
|
|
int master_collector = local.size();
|
|
|
|
srand(time(0) + world.rank());
|
|
|
|
// Send out several blocks of random data to the collectors.
|
|
int num_data_blocks = rand() % 3 + 1;
|
|
for (int block = 0; block < num_data_blocks; ++block) {
|
|
// Generate some random dataa
|
|
int num_samples = rand() % 1000;
|
|
std::vector<int> data;
|
|
for (int i = 0; i < num_samples; ++i) {
|
|
data.push_back(rand());
|
|
}
|
|
|
|
// Send our data to the master collector process.
|
|
std::cout << "Generator #" << local.rank() << " sends some data..."
|
|
<< std::endl;
|
|
world.send(master_collector, msg_data_packet, data);
|
|
}
|
|
|
|
// Wait for all of the generators to complete
|
|
(local.barrier)();
|
|
|
|
// The first generator will send the message to the master collector
|
|
// indicating that we're done.
|
|
if (local.rank() == 0)
|
|
world.send(master_collector, msg_finished);
|
|
}
|
|
|
|
void collect_data(mpi::communicator local, mpi::communicator world)
|
|
{
|
|
// The rank of the collector within the world communicator
|
|
int master_collector = world.size() - local.size();
|
|
|
|
if (world.rank() == master_collector) {
|
|
while (true) {
|
|
// Wait for a message
|
|
mpi::status msg = world.probe();
|
|
if (msg.tag() == msg_data_packet) {
|
|
// Receive the packet of data into a boost::optional
|
|
boost::optional<std::vector<int> > data;
|
|
data = std::vector<int>();
|
|
world.recv(msg.source(), msg.source(), *data);
|
|
|
|
// Broadcast the actual data.
|
|
broadcast(local, data, 0);
|
|
} else if (msg.tag() == msg_finished) {
|
|
// Receive the message
|
|
world.recv(msg.source(), msg.tag());
|
|
|
|
// Broadcast to each collector to tell them we've finished.
|
|
boost::optional<std::vector<int> > data;
|
|
broadcast(local, data, 0);
|
|
break;
|
|
}
|
|
}
|
|
} else {
|
|
boost::optional<std::vector<int> > data;
|
|
do {
|
|
// Wait for a broadcast from the master collector
|
|
broadcast(local, data, 0);
|
|
if (data) {
|
|
std::cout << "Collector #" << local.rank()
|
|
<< " is processing data." << std::endl;
|
|
}
|
|
} while (data);
|
|
}
|
|
}
|
|
|
|
int main(int argc, char* argv[])
|
|
{
|
|
mpi::environment env(argc, argv);
|
|
mpi::communicator world;
|
|
|
|
if (world.size() < 4) {
|
|
if (world.rank() == 0) {
|
|
std::cerr << "Error: this example requires at least 4 processes."
|
|
<< std::endl;
|
|
}
|
|
env.abort(-1);
|
|
}
|
|
|
|
bool is_generator = world.rank() < 2 * world.size() / 3;
|
|
mpi::communicator local = world.split(is_generator? 0 : 1);
|
|
if (is_generator) generate_data(local, world);
|
|
else collect_data(local, world);
|
|
|
|
return 0;
|
|
}
|