mpi/example/generate_collect_optional.cpp
2007-11-25 18:38:02 +00:00

114 lines
3.4 KiB
C++

// Copyright (C) 2006 Douglas Gregor <doug.gregor@gmail.com>
// Use, modification and distribution is subject to the Boost Software
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
// An example using Boost.MPI's split() operation on communicators to
// create separate data-generating processes and data-collecting
// processes using boost::optional for broadcasting.
#include <boost/mpi.hpp>
#include <iostream>
#include <cstdlib>
#include <boost/serialization/vector.hpp>
#include <boost/serialization/optional.hpp>
namespace mpi = boost::mpi;
enum message_tags { msg_data_packet, msg_finished };
void generate_data(mpi::communicator local, mpi::communicator world)
{
using std::srand;
using std::rand;
// The rank of the collector within the world communicator
int master_collector = local.size();
srand(time(0) + world.rank());
// Send out several blocks of random data to the collectors.
int num_data_blocks = rand() % 3 + 1;
for (int block = 0; block < num_data_blocks; ++block) {
// Generate some random dataa
int num_samples = rand() % 1000;
std::vector<int> data;
for (int i = 0; i < num_samples; ++i) {
data.push_back(rand());
}
// Send our data to the master collector process.
std::cout << "Generator #" << local.rank() << " sends some data..."
<< std::endl;
world.send(master_collector, msg_data_packet, data);
}
// Wait for all of the generators to complete
(local.barrier)();
// The first generator will send the message to the master collector
// indicating that we're done.
if (local.rank() == 0)
world.send(master_collector, msg_finished);
}
void collect_data(mpi::communicator local, mpi::communicator world)
{
// The rank of the collector within the world communicator
int master_collector = world.size() - local.size();
if (world.rank() == master_collector) {
while (true) {
// Wait for a message
mpi::status msg = world.probe();
if (msg.tag() == msg_data_packet) {
// Receive the packet of data into a boost::optional
boost::optional<std::vector<int> > data;
data = std::vector<int>();
world.recv(msg.source(), msg.source(), *data);
// Broadcast the actual data.
broadcast(local, data, 0);
} else if (msg.tag() == msg_finished) {
// Receive the message
world.recv(msg.source(), msg.tag());
// Broadcast to each collector to tell them we've finished.
boost::optional<std::vector<int> > data;
broadcast(local, data, 0);
break;
}
}
} else {
boost::optional<std::vector<int> > data;
do {
// Wait for a broadcast from the master collector
broadcast(local, data, 0);
if (data) {
std::cout << "Collector #" << local.rank()
<< " is processing data." << std::endl;
}
} while (data);
}
}
int main(int argc, char* argv[])
{
mpi::environment env(argc, argv);
mpi::communicator world;
if (world.size() < 4) {
if (world.rank() == 0) {
std::cerr << "Error: this example requires at least 4 processes."
<< std::endl;
}
env.abort(-1);
}
bool is_generator = world.rank() < 2 * world.size() / 3;
mpi::communicator local = world.split(is_generator? 0 : 1);
if (is_generator) generate_data(local, world);
else collect_data(local, world);
return 0;
}