a4cec69069
[SVN r41370]
197 lines
6.4 KiB
C++
197 lines
6.4 KiB
C++
// Copyright (C) 2005-2006 Matthias Troyer
|
|
|
|
// Use, modification and distribution is subject to the Boost Software
|
|
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
|
|
// http://www.boost.org/LICENSE_1_0.txt)
|
|
|
|
// An example of a parallel Monte Carlo simulation using some nodes to produce
|
|
// data and others to aggregate the data
|
|
#include <iostream>
|
|
|
|
#include <boost/mpi.hpp>
|
|
#include <boost/random/parallel.hpp>
|
|
#include <boost/random.hpp>
|
|
#include <boost/foreach.hpp>
|
|
#include <iostream>
|
|
#include <cstdlib>
|
|
|
|
namespace mpi = boost::mpi;
|
|
|
|
enum {sample_tag, sample_skeleton_tag, sample_broadcast_tag, quit_tag};
|
|
|
|
|
|
void calculate_samples(int sample_length)
|
|
{
|
|
int num_samples = 100;
|
|
std::vector<double> sample(sample_length);
|
|
|
|
// setup communicator by splitting
|
|
|
|
mpi::communicator world;
|
|
mpi::communicator calculate_communicator = world.split(0);
|
|
|
|
unsigned int num_calculate_ranks = calculate_communicator.size();
|
|
|
|
// the master of the accumulaion ranks is the first of them, hence
|
|
// with a rank just one after the last calculation rank
|
|
int master_accumulate_rank = num_calculate_ranks;
|
|
|
|
// the master of the calculation ranks sends the skeleton of the sample
|
|
// to the master of the accumulation ranks
|
|
|
|
if (world.rank()==0)
|
|
world.send(master_accumulate_rank,sample_skeleton_tag,mpi::skeleton(sample));
|
|
|
|
// next we extract the content of the sample vector, to be used in sending
|
|
// the content later on
|
|
|
|
mpi::content sample_content = mpi::get_content(sample);
|
|
|
|
// now intialize the parallel random number generator
|
|
|
|
boost::lcg64 engine(
|
|
boost::random::stream_number = calculate_communicator.rank(),
|
|
boost::random::total_streams = calculate_communicator.size()
|
|
);
|
|
|
|
boost::variate_generator<boost::lcg64&,boost::uniform_real<> >
|
|
rng(engine,boost::uniform_real<>());
|
|
|
|
for (unsigned int i=0; i<num_samples/num_calculate_ranks+1;++i) {
|
|
|
|
// calculate sample by filling the vector with random numbers
|
|
// note that std::generate will not work since it takes the generator
|
|
// by value, and boost::ref cannot be used as a generator.
|
|
// boost::ref should be fixed so that it can be used as generator
|
|
|
|
BOOST_FOREACH(double& x, sample)
|
|
x = rng();
|
|
|
|
// send sample to accumulation ranks
|
|
// Ideally we want to do this as a broadcast with an inter-communicator
|
|
// between the calculation and accumulation ranks. MPI2 should support
|
|
// this, but here we present an MPI1 compatible solution.
|
|
|
|
// send content of sample to first (master) accumulation process
|
|
|
|
world.send(master_accumulate_rank,sample_tag,sample_content);
|
|
|
|
// gather some results from all calculation ranks
|
|
|
|
double local_result = sample[0];
|
|
std::vector<double> gathered_results(calculate_communicator.size());
|
|
mpi::all_gather(calculate_communicator,local_result,gathered_results);
|
|
}
|
|
|
|
// we are done: the master tells the accumulation ranks to quit
|
|
if (world.rank()==0)
|
|
world.send(master_accumulate_rank,quit_tag);
|
|
}
|
|
|
|
|
|
|
|
void accumulate_samples()
|
|
{
|
|
std::vector<double> sample;
|
|
|
|
// setup the communicator for all accumulation ranks by splitting
|
|
|
|
mpi::communicator world;
|
|
mpi::communicator accumulate_communicator = world.split(1);
|
|
|
|
bool is_master_accumulate_rank = accumulate_communicator.rank()==0;
|
|
|
|
// the master receives the sample skeleton
|
|
|
|
if (is_master_accumulate_rank)
|
|
world.recv(0,sample_skeleton_tag,mpi::skeleton(sample));
|
|
|
|
// and broadcasts it to all accumulation ranks
|
|
mpi::broadcast(accumulate_communicator,mpi::skeleton(sample),0);
|
|
|
|
// next we extract the content of the sample vector, to be used in receiving
|
|
// the content later on
|
|
|
|
mpi::content sample_content = mpi::get_content(sample);
|
|
|
|
// accumulate until quit is called
|
|
double sum=0.;
|
|
while (true) {
|
|
|
|
|
|
// the accumulation master checks whether we should quit
|
|
if (world.iprobe(0,quit_tag)) {
|
|
world.recv(0,quit_tag);
|
|
for (int i=1; i<accumulate_communicator.size();++i)
|
|
accumulate_communicator.send(i,quit_tag);
|
|
std::cout << sum << "\n";
|
|
break; // We're done
|
|
}
|
|
|
|
// the otehr accumulation ranks check whether we should quit
|
|
if (accumulate_communicator.iprobe(0,quit_tag)) {
|
|
accumulate_communicator.recv(0,quit_tag);
|
|
std::cout << sum << "\n";
|
|
break; // We're done
|
|
}
|
|
|
|
// check whether the master accumulation rank has received a sample
|
|
if (world.iprobe(mpi::any_source,sample_tag)) {
|
|
BOOST_ASSERT(is_master_accumulate_rank);
|
|
|
|
// receive the content
|
|
world.recv(mpi::any_source,sample_tag,sample_content);
|
|
|
|
// now we need to braodcast
|
|
// the problam is we do not have a non-blocking broadcast that we could
|
|
// abort if we receive a quit message from the master. We thus need to
|
|
// first tell all accumulation ranks to start a broadcast. If the sample
|
|
// is small, we could just send the sample in this message, but here we
|
|
// optimize the code for large samples, so that the overhead of these
|
|
// sends can be ignored, and we count on an optimized broadcast
|
|
// implementation with O(log N) complexity
|
|
|
|
for (int i=1; i<accumulate_communicator.size();++i)
|
|
accumulate_communicator.send(i,sample_broadcast_tag);
|
|
|
|
// now broadcast the contents of the sample to all accumulate ranks
|
|
mpi::broadcast(accumulate_communicator,sample_content,0);
|
|
|
|
// and handle the sample by summing the appropriate value
|
|
sum += sample[0];
|
|
}
|
|
|
|
// the other accumulation ranks wait for a mesage to start the broadcast
|
|
if (accumulate_communicator.iprobe(0,sample_broadcast_tag)) {
|
|
BOOST_ASSERT(!is_master_accumulate_rank);
|
|
|
|
accumulate_communicator.recv(0,sample_broadcast_tag);
|
|
|
|
// receive broadcast of the sample contents
|
|
mpi::broadcast(accumulate_communicator,sample_content,0);
|
|
|
|
// and handle the sample
|
|
|
|
// and handle the sample by summing the appropriate value
|
|
sum += sample[accumulate_communicator.rank()];
|
|
}
|
|
}
|
|
}
|
|
|
|
int main(int argc, char** argv)
|
|
{
|
|
mpi::environment env(argc, argv);
|
|
mpi::communicator world;
|
|
|
|
// half of the processes generate, the others accumulate
|
|
// the sample size is just the number of accumulation ranks
|
|
if (world.rank() < world.size()/2)
|
|
calculate_samples(world.size()-world.size()/2);
|
|
else
|
|
accumulate_samples();
|
|
|
|
return 0;
|
|
}
|
|
|
|
|