00001 #include "DistributedPartitioner.hpp"
00002
00003 #include "PartitionFunctor.hpp"
00004
00005 #include <limits>
00006 #include <vector>
00007 #include <algorithm>
00008
00009 using namespace indii::ml::aux;
00010
00011 DistributedPartitioner::DistributedPartitioner(const unsigned int nth) :
00012 nth(nth) {
00013
00014 }
00015
00016 DistributedPartitioner::~DistributedPartitioner() {
00017
00018 }
00019
00020 bool DistributedPartitioner::init(DiracMixturePdf* p,
00021 const std::vector<unsigned int>& is) {
00022 boost::mpi::communicator world;
00023 const unsigned int rank = world.rank();
00024 const unsigned int size = world.size();
00025
00026 const unsigned int N = p->getDimensions();
00027 unsigned int i, j;
00028 vector lower(N);
00029 vector upper(N);
00030 vector length(N);
00031
00032
00033 if (is.size() > 0) {
00034 noalias(lower) = p->get(is[0]);
00035 noalias(upper) = lower;
00036
00037 for (i = 1; i < is.size(); i++) {
00038 for (j = 0; j < N; j++) {
00039 vector& x = p->get(is[i]);
00040 if (x(j) < lower(j)) {
00041 lower(j) = x(j);
00042 } else if (x(j) > upper(j)) {
00043 upper(j) = x(j);
00044 }
00045 }
00046 }
00047 } else {
00048 noalias(lower) = scalar_vector(N,
00049 std::numeric_limits<double>::quiet_NaN());
00050 noalias(upper) = lower;
00051 }
00052
00053
00054 for (i = 0; i < N; i++) {
00055 lower(i) = all_reduce(world, lower(i), boost::mpi::minimum<double>());
00056 upper(i) = all_reduce(world, upper(i), boost::mpi::maximum<double>());
00057 }
00058
00059
00060 noalias(length) = upper - lower;
00061 this->index = index_norm_inf(length);
00062
00063
00064 std::vector<unsigned int> js(is);
00065 std::vector<unsigned int>::iterator split, guess, from, to;
00066 unsigned int guesser = 0;
00067 double value, lastValue;
00068 unsigned int leftP, nth = this->nth;
00069
00070 from = js.begin();
00071 to = js.end();
00072 guess = from;
00073
00074 do {
00075
00076 do {
00077 if (rank == guesser) {
00078 if (guess != to) {
00079
00080 value = p->get(*guess)(this->index);
00081 } else {
00082
00083 value = std::numeric_limits<double>::quiet_NaN();
00084 }
00085 }
00086 boost::mpi::broadcast(world, value, guesser);
00087
00088 if (isnan(value)) {
00089 guesser++;
00090 }
00091 } while (isnan(value) && guesser < size);
00092
00093
00094 if (guesser < size) {
00095 lastValue = value;
00096 PartitionFunctor functor(*p, index, value);
00097
00098
00099 split = std::stable_partition(from, to, functor);
00100
00101
00102 leftP = boost::mpi::all_reduce(world, std::distance(from, split),
00103 std::plus<unsigned int>());
00104
00105
00106 if (leftP == 0) {
00107
00108 if (rank == guesser) {
00109 guess++;
00110 }
00111 } else if (leftP > nth) {
00112
00113 to = split;
00114 guess = from;
00115 } else if (leftP < nth) {
00116
00117 from = split;
00118 guess = from;
00119 nth -= leftP;
00120 }
00121 }
00122 } while (guesser < size && leftP != nth);
00123
00124 this->value = lastValue;
00125
00126 return length(this->index) > 0.0;
00127 }
00128