indii/ml/aux/MixturePdf.hpp

00001 #ifndef INDII_ML_AUX_MIXTUREPDF_HPP
00002 #define INDII_ML_AUX_MIXTUREPDF_HPP
00003 
00004 #include "Pdf.hpp"
00005 
00006 #include <vector>
00007 
00008 namespace indii {
00009   namespace ml {
00010     namespace aux {
00011     
00012 /**
00013  * Mixture probability density.
00014  *
00015  * @author Lawrence Murray <lawrence@indii.org>
00016  * @version $Rev: 571 $
00017  * @date $Date: 2008-09-24 15:06:22 +0100 (Wed, 24 Sep 2008) $
00018  *
00019  * @param T Component type, should be derivative of Pdf.
00020  *
00021  * @section MixturePdf_serialization Serialization
00022  *
00023  * This class supports serialization through the Boost.Serialization library.
00024  *
00025  * @section MixturePdf_parallel Parallelisation
00026  *
00027  * This class supports distributed storage. MixturePdf objects may be
00028  * instantiated on all nodes of a communicator, each with a different
00029  * set of mixture components, such that the union of all components
00030  * defines the full distribution.
00031  *
00032  * "Regular" methods such as getSize(), getTotalWeight() and
00033  * getExpectation() use only those components stored on the local
00034  * node. Special "distributed" methods such as
00035  * getDistributedSize(), getDistributedTotalWeight() and
00036  * getDistributedExpectation() use all components stored across the
00037  * nodes.
00038  */
00039 template <class T>
00040 class MixturePdf : public Pdf {
00041 public:
00042   /**
00043    * Default constructor.
00044    *
00045    * Initialises the mixture with zero dimensions. This should
00046    * generally only be used when the object is to be restored from a
00047    * serialization. Indeed, there is no other way to resize the
00048    * mixture to nonzero dimensionality except by subsequently
00049    * restoring from a serialization.
00050    */
00051   MixturePdf();
00052 
00053   /**
00054    * Constructor. One or more components should be added with
00055    * add() after construction.
00056    *
00057    * @param N Dimensionality of the distribution.
00058    */
00059   MixturePdf(const unsigned int N);
00060 
00061   /**
00062    * Destructor.
00063    */
00064   virtual ~MixturePdf();
00065 
00066   /**
00067    * Assignment operator. Both sides must have the same dimensionality,
00068    * but may have different number of components.
00069    */
00070   MixturePdf<T>& operator=(const MixturePdf<T>& o);
00071 
00072   virtual void setDimensions(const unsigned int N,
00073       const bool preserve = false);
00074 
00075   /**
00076    * @name Local storage methods
00077    *
00078    * Use these methods if:
00079    *
00080    * @li You are not working in a parallel environment,
00081    * @li You are working in a parallel environment but are not using
00082    * the distributed storage features of this class, or
00083    * @li You are working in a parallel environment and are using the
00084    * distributed storage features of this class, but only want to deal
00085    * with the mixture components stored on the local node.
00086    */
00087   //@{
00088 
00089   /**
00090    * Add a component to the distribution on the local node.
00091    *
00092    * @param x The component.
00093    * @param w Unnormalised weight of the component.
00094    *
00095    * The new component is added to the end of the list of components in
00096    * terms of indices used by get(), getWeight(), etc.
00097    */
00098   void add(const T& x, const double w = 1.0);
00099 
00100   /**
00101    * Get component on the local node.
00102    *
00103    * @param i Index of the component.
00104    *
00105    * @return The @p i th component.
00106    */
00107   const T& get(const unsigned int i) const;
00108 
00109   /**
00110    * Get component on the local node.
00111    *
00112    * @param i Index of the component.
00113    *
00114    * @return The @p i th component.
00115    *
00116    * @note Modifying any of the components in the mixture may have
00117    * unintended side effects, especially since Pdf classes rely heavily on
00118    * precalculations which may consequently become out of date. This method
00119    * is provided only for those situations where precalculations must be
00120    * made within the component objects themselves, such as within their
00121    * getExpectation() methods, such that a non-const context is required.
00122    * For all other situations, favour the const version of this method.
00123    */
00124   T& get(const unsigned int i);
00125 
00126   /**
00127    * Set component on the local node.
00128    *
00129    * @param i Index of the component.
00130    * @param x Value of the component.
00131    */
00132   void set(const unsigned int i, const T&);
00133 
00134   /**
00135    * Get components on the local node.
00136    *
00137    * @return \f$\{(\mathbf{x}^{(i)},w^{(i)})\}\f$; set of weighted
00138    * components defining the distribution, as a vector.
00139    */
00140   const std::vector<T>& getAll() const;
00141 
00142   /**
00143    * Get components on the local node.
00144    *
00145    * @return \f$\{(\mathbf{x}^{(i)},w^{(i)})\}\f$; set of weighted
00146    * components defining the distribution, as a vector.
00147    *
00148    * @note Modifying any of the components in the mixture may have
00149    * unintended side effects, especially since Pdf classes rely heavily on
00150    * precalculations which may consequently become out of date. This method
00151    * is provided only for those situations where precalculations must be
00152    * made within the component objects themselves, such as within their
00153    * getExpectation() methods, such that a non-const context is required.
00154    * For all other situations, favour the const version of this method.
00155    */
00156   std::vector<T>& getAll();
00157 
00158   /**
00159    * Get component weight on the local node.
00160    *
00161    * @param i Index of the component.
00162    *
00163    * @return Weight of the @p i th component.
00164    */
00165   double getWeight(const unsigned int i) const;
00166 
00167   /**
00168    * Set component weight on the local node.
00169    *
00170    * @param i Index of the component.
00171    * @param w Weight of the @p i th component.
00172    */
00173   void setWeight(const unsigned int i, const double w);
00174 
00175   /**
00176    * Get the weights of all components on the local node.
00177    *
00178    * @return Vector of the weights of all components.
00179    */
00180   const vector& getWeights() const;
00181   
00182   /**
00183    * Set the weights of all components on the local node.
00184    *
00185    * @param ws Vector of the weights of all components.
00186    */
00187   void setWeights(const vector& ws);
00188   
00189   /**
00190    * Clear all components from the distribution on the local node.
00191    */
00192   void clear();
00193 
00194   /**
00195    * Get the number of components in the distribution on the local
00196    * node.
00197    *
00198    * @return \f$K\f$; the number of components in the distribution.
00199    */
00200   unsigned int getSize() const;
00201 
00202   /**
00203    * Get the total weight of components on the local node.
00204    *
00205    * @return \f$W\f$; the total weight of components.
00206    */
00207   double getTotalWeight() const;
00208 
00209   /**
00210    * Normalise weights on the local node to sum to 1.
00211    *
00212    * @warning If in a distributed storage situation this is probably
00213    * not what you want. Consider distributedNormalise()
00214    * instead. Normalising the weights only on the local node will skew
00215    * the weighting of mixture components across all nodes.
00216    */
00217   void normalise();
00218 
00219   /**
00220    * Sample from the distribution on the local node.
00221    *
00222    * @return A sample from the distribution.
00223    */
00224   virtual vector sample();
00225 
00226   /**
00227    * Calculate the density on the local node at a given point.
00228    *
00229    * \f[p(\mathbf{x}) = \frac{1}{W}\sum_{i=1}^{K}w^{(i)}p^{(i)}(\mathbf{x})\f]
00230    *
00231    * @param x \f$\mathbf{x}\f$; the point at which to calculate the
00232    * density.
00233    *
00234    * @return \f$p(\mathbf{x})\f$; the density at \f$\mathbf{x}\f$.
00235    */
00236   virtual double densityAt(const vector& x);
00237 
00238   /**
00239    * Get the expected value of the distribution on the local node.
00240    *
00241    * @return \f$\mathbf{\mu}\f$; expected value of the distribution.
00242    */
00243   virtual const vector& getExpectation();
00244 
00245   //@}
00246 
00247   /**
00248    * @name Distributed storage methods
00249    *
00250    * Use these methods if:
00251    *
00252    * @li You are working in a parallel environment and are using the
00253    * distributed storage features of this class, and want to deal with
00254    * the mixture components stored across all nodes.
00255    *
00256    * These methods are used for distributed storage of mixtures. They
00257    * require synchronization and communication between all nodes in
00258    * the communicator, such that if any of these methods is called on
00259    * one node, the same method should eventually, and preferably as
00260    * soon as possible, be called on all other nodes in the same
00261    * communicator.
00262    */
00263   //@{
00264 
00265   /**
00266    * Get the number of components in the distribution across all
00267    * nodes.
00268    *
00269    * @return \f$\sum_i K_i\f$; the number of components in the distribution.
00270    */
00271   unsigned int getDistributedSize() const;
00272 
00273   /**
00274    * Get the total weight of components across all nodes.
00275    *
00276    * @return \f$\sum_i W_i\f$; the total weight of components across
00277    * all nodes.
00278    */
00279   double getDistributedTotalWeight() const;
00280 
00281   /**
00282    * Normalise weights across all nodes to sum to 1.
00283    */
00284   void distributedNormalise();
00285 
00286   /**
00287    * Sample from the full distribution.
00288    *
00289    * @return A sample from the full distribution.
00290    */
00291   virtual vector distributedSample();
00292 
00293   /**
00294    * Draw multiple samples from the full distribution. This is
00295    * significantly more efficient than multiple calls to
00296    * distributedSample(), as it involves less message passing.
00297    *
00298    * @param P \f$P\f$; number of samples to draw.
00299    *
00300    * @return Vector of samples drawn on the local node. The number of
00301    * samples drawn across all nodes will total \f$P\f$.
00302    */
00303   virtual std::vector<vector> distributedSample(const unsigned int P);
00304 
00305   /**
00306    * Calculate the density of the full distribution at a given point.
00307    *
00308    * @param x \f$\mathbf{x}\f$; the point at which to calculate the
00309    * density.
00310    *
00311    * @return \f$p(\mathbf{x})\f$; the density at \f$\mathbf{x}\f$.
00312    */
00313   virtual double distributedDensityAt(const vector& x);
00314 
00315   /**
00316    * Perform multiple density calculations from the full distribution. This
00317    * is significantly more efficient than multiple calls to
00318    * distributedDensityAt(const vector&), as it involves less message
00319    * passing.
00320    *
00321    * @param xs The points on this node at which to calculate densities.
00322    * 
00323    * @return The densities at the given points on this node.
00324    *
00325    * Note that while each node is passed only its set of points and returns
00326    * only the density calculations for its set of points, all nodes
00327    * participate in the calculation for all points.
00328    *
00329    * @see distributedDensityAt(const vector&);
00330    */
00331   virtual vector distributedDensityAt(std::vector<vector>& xs);
00332 
00333   /**
00334    * Get the expected value of the full distribution.
00335    *
00336    * @return \f$\mathbf{\mu}\f$; expected value of the full
00337    * distribution.
00338    */
00339   virtual vector getDistributedExpectation();
00340 
00341   /**
00342    * Redistribute components across nodes by number. This attempts to
00343    * redistribute the components of the full distribution across
00344    * nodes, so that each node stores as close to an equal number of
00345    * components as possible.
00346    */
00347   void redistributeBySize();
00348 
00349   /**
00350    * Redistribute components across nodes by weight. This attempts to
00351    * redistribute the components of the full distribution across
00352    * nodes, so that the total weight of components at each node is as
00353    * close to an equal number as possible.
00354    */
00355   void redistributeByWeight();
00356 
00357   //@}
00358 
00359 protected:
00360   /**
00361    * Called when changes are made to the distribution, such as when a
00362    * new component is added. This allows pre-calculations to be
00363    * refreshed.
00364    */
00365   virtual void dirty();
00366 
00367 private:
00368   /**
00369    * Node property.
00370    */
00371   template <class P>
00372   struct node_property {
00373     /**
00374      * Node rank.
00375      */
00376     unsigned int rank;
00377 
00378     /**
00379      * Property at node.
00380      */
00381     P prop;
00382 
00383     /**
00384      * Comparison operator for sorting.
00385      *
00386      * @return True if this node's property is greater than the
00387      * argument's property.
00388      */
00389     bool operator<(const node_property& o) const;
00390 
00391     /**
00392      * Serialize, or restore from serialization.
00393      */
00394     template<class Archive>
00395     void serialize(Archive& ar, const unsigned int version);
00396 
00397     /*
00398      * Boost.Serialization requirements.
00399      */
00400     friend class boost::serialization::access;
00401 
00402   };
00403 
00404   typedef struct node_property<int> node_count;
00405   typedef std::vector<node_count> node_count_vector;
00406   typedef typename node_count_vector::iterator node_count_iterator;
00407 
00408   typedef struct node_property<double> node_weight;
00409   typedef std::vector<node_weight> node_weight_vector;
00410   typedef typename node_weight_vector::iterator node_weight_iterator;
00411 
00412   /**
00413    * Components.
00414    */
00415   std::vector<T> xs;
00416   
00417   /**
00418    * Component weights.
00419    */
00420   indii::ml::aux::vector ws;
00421   
00422   /**
00423    * Cumulative component weights.
00424    */
00425   std::vector<double> Ws;
00426 
00427   /**
00428    * Last calculated expectation.
00429    */
00430   vector mu;
00431 
00432   /**
00433    * Last calculated unnormalised expectation.
00434    */
00435   vector Zmu;
00436 
00437   /**
00438    * Is the last calculated expectation up to date?
00439    */
00440   bool haveMu;
00441 
00442   /**
00443    * Calculate expectation from current components on the local node.
00444    */
00445   void calculateExpectation();
00446 
00447   /**
00448    * Serialize.
00449    */
00450   template<class Archive>
00451   void save(Archive& ar, const unsigned int version) const;
00452 
00453   /**
00454    * Restore from serialization.
00455    */
00456   template<class Archive>
00457   void load(Archive& ar, const unsigned int version);
00458 
00459   /*
00460    * Boost.Serialization requirements.
00461    */
00462   BOOST_SERIALIZATION_SPLIT_MEMBER()
00463   friend class boost::serialization::access;
00464 
00465 };
00466 
00467     }
00468   }
00469 }
00470 
00471 #include "Random.hpp"
00472 #include "parallel.hpp"
00473 
00474 #include "boost/serialization/base_object.hpp"
00475 #include "boost/serialization/vector.hpp"
00476 
00477 #include "boost/mpi.hpp"
00478 #include "boost/mpi/environment.hpp"
00479 #include "boost/mpi/communicator.hpp"
00480 
00481 #include <algorithm>
00482 
00483 template <class T>
00484 indii::ml::aux::MixturePdf<T>::MixturePdf() : xs(0), ws(0), Ws(0),
00485     mu(0), Zmu(0) {
00486   haveMu = false;
00487 }
00488 
00489 template <class T>
00490 indii::ml::aux::MixturePdf<T>::MixturePdf(const unsigned int N) : Pdf(N),
00491     xs(0), ws(0), Ws(0), mu(N), Zmu(N) {
00492   haveMu = false;
00493 }
00494 
00495 template <class T>
00496 indii::ml::aux::MixturePdf<T>::~MixturePdf() {
00497   //
00498 }
00499 
00500 template <class T>
00501 indii::ml::aux::MixturePdf<T>& indii::ml::aux::MixturePdf<T>::operator=(
00502     const MixturePdf<T>& o) {
00503   /* pre-condition */
00504   assert (N == o.N);
00505 
00506   xs = o.xs;
00507 
00508   ws.resize(o.ws.size(), false);
00509   ws = o.ws;
00510 
00511   Ws.resize(o.Ws.size(), false);
00512   Ws = o.Ws;
00513 
00514   haveMu = o.haveMu;
00515   if (haveMu) {
00516     mu = o.mu;
00517     Zmu = o.Zmu;
00518   }
00519   
00520   return *this;
00521 }
00522 
00523 template <class T>
00524 void indii::ml::aux::MixturePdf<T>::add(const T& x, const double w) {
00525   /* pre-condition */
00526   assert (x.getDimensions() == getDimensions());
00527 
00528   /* component */
00529   xs.push_back(x);
00530   
00531   /* weight */
00532   ws.resize(ws.size() + 1, true);
00533   ws(ws.size() - 1) = w;
00534 
00535   /* cumulative weight */
00536   Ws.push_back(getTotalWeight() + w);
00537     
00538   dirty();
00539   
00540   /* post-condition */
00541   assert (xs.size() == ws.size());
00542   assert (xs.size() == Ws.size());
00543 }
00544 
00545 template <class T>
00546 inline const T& indii::ml::aux::MixturePdf<T>::get(const unsigned int i)
00547     const {
00548   return xs[i];
00549 }
00550 
00551 template <class T>
00552 inline T& indii::ml::aux::MixturePdf<T>::get(const unsigned int i) {
00553   return xs[i];
00554 }
00555 
00556 template <class T>
00557 void indii::ml::aux::MixturePdf<T>::set(const unsigned int i, const T& x) {
00558   xs[i] = x;
00559   dirty();
00560 }
00561 
00562 template <class T>
00563 inline const std::vector<T>& indii::ml::aux::MixturePdf<T>::getAll() const {
00564   return xs;
00565 }
00566 
00567 template <class T>
00568 inline std::vector<T>& indii::ml::aux::MixturePdf<T>::getAll() {
00569   return xs;
00570 }
00571 
00572 template <class T>
00573 inline double indii::ml::aux::MixturePdf<T>::getWeight(
00574     const unsigned int i) const {
00575   return ws(i);
00576 }
00577 
00578 template <class T>
00579 void indii::ml::aux::MixturePdf<T>::setWeight(const unsigned int i,
00580     const double w) {
00581   /* pre-condition */
00582   assert (i < getSize());
00583     
00584   this->ws(i) = w;
00585 
00586   /* recalculate cumulative weights */
00587   unsigned int j = i;
00588   if (j == 0) {
00589     Ws[0] = ws(0);
00590     j++;
00591   }
00592   for (; j < getSize(); j++) {
00593     Ws[j] = Ws[j - 1] + ws(j);
00594   }
00595   
00596   dirty();
00597 }
00598 
00599 template <class T>
00600 inline const indii::ml::aux::vector&
00601     indii::ml::aux::MixturePdf<T>::getWeights() const {
00602   return ws;
00603 }
00604 
00605 template <class T>
00606 void indii::ml::aux::MixturePdf<T>::setWeights(const vector& ws) {
00607   /* pre-condition */
00608   assert (this->ws.size() == ws.size());
00609 
00610   this->ws = ws;
00611 
00612   /* recalculate cumulative weights */
00613   unsigned int i = 0;
00614   if (getSize() > 0) {
00615     Ws[0] = ws(0);
00616     i++;
00617   }
00618   for (; i < getSize(); i++) {
00619     Ws[i] = Ws[i - 1] + ws(i);
00620   }
00621   
00622   dirty();
00623 }
00624 
00625 template <class T>
00626 inline unsigned int indii::ml::aux::MixturePdf<T>::getSize() const {
00627   return xs.size();
00628 }
00629 
00630 template <class T>
00631 void indii::ml::aux::MixturePdf<T>::normalise() {
00632   if (!Ws.empty()) {
00633     const double W = getTotalWeight();
00634   
00635     if (W != 1.0) {
00636       double WI = 1.0 / W;
00637     
00638       /* update weights */
00639       ws *= WI;
00640     
00641       /* update cumulative weights */
00642       std::vector<double>::iterator iter, end;
00643       iter = Ws.begin();
00644       end = Ws.end();
00645       while (iter != end) {
00646         *iter *= WI;
00647         iter++;
00648       }
00649       
00650       dirty(); // unnormalised mean out of date
00651     }
00652   }
00653 }
00654 
00655 template <class T>
00656 void indii::ml::aux::MixturePdf<T>::clear() {
00657   xs.clear();
00658   ws.resize(0, false);
00659   Ws.clear();
00660 
00661   dirty();
00662 }
00663 
00664 template <class T>
00665 double indii::ml::aux::MixturePdf<T>::getTotalWeight() const {
00666   return (Ws.empty() ? 0.0 : Ws.back());
00667 }
00668 
00669 template <class T>
00670 void indii::ml::aux::MixturePdf<T>::setDimensions(const unsigned int N,
00671     const bool preserve) {
00672   this->N = N;
00673 
00674   mu.resize(N, preserve);
00675   Zmu.resize(N, preserve);
00676 
00677   if (preserve) {
00678     unsigned int i;
00679     for (i = 0; i < xs.size(); i++) {
00680       xs[i].setDimensions(N, preserve);
00681     }
00682   } else {
00683     clear();
00684   }
00685 
00686   dirty();
00687 }
00688 
00689 template <class T>
00690 const indii::ml::aux::vector&
00691     indii::ml::aux::MixturePdf<T>::getExpectation() {
00692   if (!haveMu) {
00693     calculateExpectation();
00694   }
00695   return mu;
00696 }
00697 
00698 template <class T>
00699 indii::ml::aux::vector indii::ml::aux::MixturePdf<T>::sample() {
00700   /* pre-condition */
00701   assert (!xs.empty());
00702 
00703   double u = Random::uniform(0.0, getTotalWeight());
00704   unsigned int i = std::distance(Ws.begin(), lower_bound(Ws.begin(),
00705       Ws.end(), u));
00706   
00707   return xs[i].sample();
00708 }
00709 
00710 template <class T>
00711 double indii::ml::aux::MixturePdf<T>::densityAt(const vector& x) {
00712   double p = 0.0;
00713   if (getTotalWeight() > 0.0) {
00714     unsigned int i;
00715     for (i = 0; i < xs.size(); i++) {
00716       p += ws(i) * xs[i].densityAt(x);
00717     }
00718     p /= getTotalWeight();
00719   }
00720 
00721   /* post-condition */
00722   assert (p >= 0.0);
00723 
00724   return p;
00725 }
00726 
00727 template <class T>
00728 unsigned int indii::ml::aux::MixturePdf<T>::getDistributedSize()
00729     const {
00730   boost::mpi::communicator world;
00731 
00732   return boost::mpi::all_reduce(world, getSize(), std::plus<unsigned int>());
00733 }
00734 
00735 template <class T>
00736 double indii::ml::aux::MixturePdf<T>::getDistributedTotalWeight() const {
00737   boost::mpi::communicator world;
00738   return boost::mpi::all_reduce(world, getTotalWeight(),
00739       std::plus<double>()); 
00740 }
00741 
00742 template <class T>
00743 void indii::ml::aux::MixturePdf<T>::distributedNormalise() {
00744   double W_d = getDistributedTotalWeight();
00745 
00746   if (W_d > 0.0 && W_d != 1.0) {
00747     double WI_d = 1.0 / W_d;
00748 
00749     /* update weights */
00750     ws *= WI_d;
00751   
00752     /* update cumulative weights */
00753     std::vector<double>::iterator iter, end;
00754     iter = Ws.begin();
00755     end = Ws.end();
00756     while (iter != end) {
00757       *iter *= WI_d;
00758       iter++;
00759     }
00760 
00761     dirty(); // unnormalised mean out of date
00762   }
00763 }
00764 
00765 template <class T>
00766 indii::ml::aux::vector indii::ml::aux::MixturePdf<T>::distributedSample() {
00767   boost::mpi::communicator world;
00768   unsigned int rank = world.rank();
00769   unsigned int size = world.size();
00770   unsigned int i;
00771 
00772   double W_s;
00773   double u;
00774   vector x(N);
00775 
00776   /* scan sum weights across nodes */
00777   W_s = boost::mpi::scan(world, getTotalWeight(), std::plus<double>());
00778 
00779   /* final node has total weight, so it generatea random number up
00780      to that weight and broadcasts */
00781   if (rank == size - 1) {
00782     u = Random::uniform(0.0, W_s);
00783   }
00784   boost::mpi::broadcast(world, u, size - 1);
00785 
00786   if (u >= W_s - getTotalWeight() && u < W_s) {
00787     /* this node draws sample and sends */
00788     std::vector<boost::mpi::request> reqs;
00789     reqs.reserve(size - 1);
00790     x = sample();
00791     for (i = 0; i < size; i++) {
00792       if (i != rank) {
00793         reqs.push_back(world.isend(i, 0, x));
00794       }
00795     }
00796 
00797     // boost::mpi::wait_all seems exceptionally slow here, workaround:
00798     std::vector<boost::mpi::request>::iterator iter, end;
00799     iter = reqs.begin();
00800     end = reqs.end();
00801     while (iter != end) {
00802       iter->wait();
00803       iter++;
00804     }
00805     // end workaround
00806 
00807   } else {
00808     /* this node receives sample */
00809     world.recv(boost::mpi::any_source, boost::mpi::any_tag, x);
00810   }
00811 
00812   return x;
00813 }
00814 
00815 template <class T>
00816 std::vector<indii::ml::aux::vector>
00817     indii::ml::aux::MixturePdf<T>::distributedSample(const unsigned int P) {
00818   boost::mpi::communicator world;
00819   int rank = world.rank();
00820   int size = world.size();
00821   unsigned int i;
00822 
00823   double W_s;
00824   double u;
00825   std::vector<double> us;
00826   std::vector<double>::iterator iter, end;
00827   std::vector<vector> xs;
00828 
00829   /* scan sum weights across nodes */
00830   W_s = boost::mpi::scan(world, getTotalWeight(), std::plus<double>());
00831 
00832   /* final node has total weight, so it can generate random numbers up
00833      to total weight and broadcast to other nodes */
00834   if (rank == size - 1) {
00835     for (i = 0; i < P; i++) {
00836       us.push_back(Random::uniform(0.0, W_s));
00837     }
00838   }
00839   boost::mpi::broadcast(world, us, size - 1);
00840 
00841   iter = us.begin();
00842   end = us.end();
00843   while (iter != end) {
00844     u = *iter;
00845     if (u >= W_s - getTotalWeight() && u < W_s) {
00846       /* this node draws sample */
00847       xs.push_back(sample());
00848     }
00849     iter++;
00850   }
00851 
00852   return xs;
00853 }
00854 
00855 template <class T>
00856 double indii::ml::aux::MixturePdf<T>::distributedDensityAt(const vector& x) {
00857   boost::mpi::communicator world;  
00858   double p = boost::mpi::all_reduce(world, densityAt(x), std::plus<double>());
00859   
00860   /* post-condition */
00861   assert (p >= 0.0);
00862 
00863   return p;
00864 }
00865 
00866 template <class T>
00867 indii::ml::aux::vector indii::ml::aux::MixturePdf<T>::distributedDensityAt(
00868     std::vector<vector>& xs) {
00869   boost::mpi::communicator world;  
00870   unsigned int size = world.size();
00871 
00872   unsigned int i, k;
00873   vector p(xs.size());
00874 
00875   p.clear();
00876   for (k = 0; k < size; k++) {
00877     for (i = 0; i < xs.size(); i++) {
00878       p(i) += getTotalWeight() * densityAt(xs[i]);
00879     }
00880     rotate(p);
00881     rotate(xs);
00882   }
00883   p /= getDistributedTotalWeight();
00884 
00885   return p;
00886 }
00887 
00888 template <class T>
00889 indii::ml::aux::vector
00890     indii::ml::aux::MixturePdf<T>::getDistributedExpectation() {
00891   boost::mpi::communicator world;
00892   const unsigned int size = world.size();
00893   
00894   if (size == 0) {
00895     return getExpectation();
00896   } else {
00897     vector mu_d(N);
00898 
00899     if (getTotalWeight() > 0.0) {
00900       if (!haveMu) {
00901         calculateExpectation();
00902       }
00903     } else {
00904       Zmu.clear();
00905     }
00906 
00907     noalias(mu_d) = boost::mpi::all_reduce(world, Zmu, std::plus<vector>());
00908     mu_d /= getDistributedTotalWeight();
00909 
00910     return mu_d;
00911   }
00912 }
00913 
00914 template <class T>
00915 void indii::ml::aux::MixturePdf<T>::redistributeBySize() {
00916   namespace aux = indii::ml::aux;
00917   namespace ublas = boost::numeric::ublas;
00918 
00919   boost::mpi::communicator world;
00920   unsigned int rank = world.rank();
00921   unsigned int size = world.size();
00922 
00923   std::vector<T> xsBuffer;
00924   aux::vector wsBuffer;
00925 
00926   boost::mpi::request reqSendXs, reqSendWs;
00927   boost::mpi::request reqRecvXs, reqRecvWs;
00928 
00929   unsigned int P_total;
00930   unsigned int P_target;  // target no. components per node
00931   unsigned int P_change;  // no. components to transfer
00932 
00933   node_count excess, from, to;
00934   node_count_vector excesses;  // no. excess components at each node
00935 
00936   unsigned int i;
00937 
00938   /* work out target number of components for this node */
00939   P_total = getDistributedSize();
00940   P_target = P_total / size;
00941   if (rank < P_total % size) {
00942     P_target++; // take a leftover
00943   }
00944 
00945   /* gather excess components at each node */
00946   excess.rank = rank;
00947   excess.prop = getSize() - P_target;
00948 
00949   boost::mpi::all_gather(world, excess, excesses);
00950   std::stable_sort(excesses.begin(), excesses.end());
00951 
00952   /* while distribution of components is not even */
00953   while (excesses.front().prop > 0) {
00954     from = excesses.front();  // node with largest excess
00955     to = excesses.back();  // node with smallest (largest -ve) excess
00956     excesses.erase(excesses.begin());
00957     excesses.pop_back();
00958 
00959     P_change = std::min(abs(to.prop), from.prop);
00960 
00961     if (rank == from.rank) {
00962       /* send components */
00963       xsBuffer.clear();
00964       xsBuffer.insert(xsBuffer.end(), xs.end() - P_change, xs.end());
00965       xs.resize(xs.size() - P_change);
00966       reqSendXs = world.isend(to.rank, 0, xsBuffer);
00967       
00968       /* send weights */
00969       wsBuffer.resize(P_change, false);
00970       noalias(wsBuffer) = project(ws, ublas::range(ws.size() - P_change,
00971           ws.size()));
00972       ws.resize(ws.size() - P_change, true);
00973       reqSendWs = world.isend(to.rank, 1, wsBuffer);
00974       
00975       /* update cumulative weights */
00976       Ws.resize(Ws.size() - P_change);
00977       
00978       /* wait */
00979       reqSendWs.wait();
00980       reqSendXs.wait();
00981     } else if (rank == to.rank) {
00982       /* receive components */
00983       reqRecvXs = world.irecv(from.rank, 0, xsBuffer);
00984       reqRecvWs = world.irecv(from.rank, 1, wsBuffer);
00985       reqRecvWs.wait();
00986       reqRecvXs.wait();
00987       
00988       assert (xsBuffer.size() == wsBuffer.size());
00989       
00990       for (i = 0; i < xsBuffer.size(); i++) {
00991         add(xsBuffer[i], wsBuffer(i));
00992       }
00993     }
00994 
00995     /* update excesses */
00996     from.prop -= P_change;
00997     to.prop += P_change;
00998   
00999     /* re-sort */
01000     excesses.push_back(from); // 'from' must have >= no. components of 'to'
01001     excesses.push_back(to);
01002     inplace_merge(excesses.begin(), excesses.end() - 2, excesses.end());
01003   }
01004 
01005   dirty();
01006   
01007   /* post-conditions */
01008   assert (xs.size() == ws.size());
01009   assert (xs.size() == Ws.size());
01010 }
01011 
01012 template <class T>
01013 void indii::ml::aux::MixturePdf<T>::redistributeByWeight() {
01014   boost::mpi::communicator world;
01015   unsigned int rank = world.rank();
01016   unsigned int size = world.size();
01017 
01018   std::vector<T> xsBuffer;
01019   aux::vector wsBuffer;
01020 
01021   boost::mpi::request reqSendXs, reqSendWs;
01022   boost::mpi::request reqRecvXs, reqRecvWs;
01023 
01024   double W_target = getDistributedTotalWeight() / size;
01025        // target weight per node
01026   double W_change;  // weight to transfer
01027   double W_actual;  // actual weight transferred
01028 
01029   node_weight excess, from, to;
01030   node_weight_vector excesses;  // excess weight at each node
01031 
01032   unsigned int i;
01033 
01034   /* gather excess weight at each node */
01035   excess.rank = rank;
01036   excess.prop = getTotalWeight() - W_target;
01037   boost::mpi::all_gather(world, excess, excesses);
01038   std::stable_sort(excesses.begin(), excesses.end());
01039 
01040   /* while distribution of components is not even */
01041   from = excesses.front();
01042   to = from;  // will never enter the loop if only one node
01043   while ((from.rank != excesses.front().rank ||
01044       to.rank != excesses.back().rank) &&
01045       excesses.front().prop != excesses.back().prop) {
01046     from = excesses.front();  // node with largest excess
01047     to = excesses.back();  // node with smallest (largest -ve) excess
01048     excesses.erase(excesses.begin());
01049     excesses.pop_back();
01050 
01051     W_change = std::min(fabs(to.prop), from.prop);
01052 
01053     if (rank == from.rank) {
01054       /* send components of up to W_change weight */
01055       W_actual = 0.0;
01056       xsBuffer.clear();
01057       wsBuffer.resize(0, false);
01058       
01059       while (W_actual + ws(ws.size() - 1) < W_change) {
01060         /* component */
01061         xsBuffer.push_back(xs.back());
01062         xs.pop_back();
01063         
01064         /* weight */
01065         wsBuffer.resize(wsBuffer.size() + 1, true);
01066         wsBuffer(wsBuffer.size() - 1) = ws(ws.size() - 1);
01067         ws.resize(ws.size() - 1, true);
01068         
01069         /* cumulative weights */
01070         Ws.pop_back();
01071         
01072         W_actual += wsBuffer(wsBuffer.size() - 1);
01073       }
01074       
01075       reqSendXs = world.isend(to.rank, 0, xsBuffer);
01076       reqSendWs = world.isend(to.rank, 1, wsBuffer);
01077       reqSendWs.wait();
01078       reqSendXs.wait();      
01079 
01080     } else if (rank == to.rank) {
01081       /* receive components */
01082       reqRecvXs = world.irecv(from.rank, 0, xsBuffer);
01083       reqRecvWs = world.irecv(from.rank, 1, wsBuffer);
01084       reqRecvWs.wait();
01085       reqRecvXs.wait();
01086       
01087       assert (xsBuffer.size() == wsBuffer.size());
01088       
01089       for (i = 0; i < xsBuffer.size(); i++) {
01090         add(xsBuffer[i], wsBuffer(i));
01091       }
01092     }
01093 
01094     /* broadcast actual weight transfer to all nodes */
01095     boost::mpi::broadcast(world, W_actual, from.rank);
01096 
01097     /* update excesses */
01098     from.prop -= W_actual;
01099     to.prop += W_actual;
01100 
01101     /* re-sort */
01102     excesses.push_back(from); // 'from' must have >= weight of 'to'
01103     excesses.push_back(to);
01104     inplace_merge(excesses.begin(), excesses.end() - 2, excesses.end());
01105   }
01106 
01107   dirty();
01108 
01109   /* post-conditions */
01110   assert (xs.size() == ws.size());
01111   assert (xs.size() == Ws.size());
01112 }
01113 
01114 template <class T>
01115 void indii::ml::aux::MixturePdf<T>::dirty() {
01116   haveMu = false;
01117 }
01118 
01119 template <class T>
01120 void indii::ml::aux::MixturePdf<T>::calculateExpectation() {
01121   /* pre-condition */
01122   assert (getTotalWeight() > 0.0);
01123 
01124   unsigned int i;
01125 
01126   Zmu.clear();
01127   for (i = 0; i < xs.size(); i++) {
01128     noalias(Zmu) += ws(i) * xs[i].getExpectation();
01129   }
01130   noalias(mu) = Zmu / getTotalWeight();
01131   haveMu = true;
01132 }
01133 
01134 template <class T>
01135 template <class Archive>
01136 void indii::ml::aux::MixturePdf<T>::save(Archive& ar,
01137     const unsigned int version) const {
01138   ar & boost::serialization::base_object<Pdf>(*this);
01139 
01140   ar & xs;
01141   ar & ws;
01142   ar & Ws;
01143 }
01144 
01145 template <class T>
01146 template <class Archive>
01147 void indii::ml::aux::MixturePdf<T>::load(Archive& ar,
01148     const unsigned int version) {
01149   ar & boost::serialization::base_object<Pdf>(*this);
01150 
01151   ar & xs;
01152   ar & ws;
01153   ar & Ws;
01154 
01155   mu.resize(N, false);
01156   Zmu.resize(N, false);
01157   haveMu = false;
01158 }
01159 
01160 template <class T>
01161 template <class P>
01162 bool indii::ml::aux::MixturePdf<T>::node_property<P>::operator<(
01163     const node_property& o) const {
01164   return prop > o.prop;
01165 }
01166 
01167 template <class T>
01168 template <class P>
01169 template <class Archive>
01170 void indii::ml::aux::MixturePdf<T>::node_property<P>::serialize(Archive& ar,
01171     const unsigned int version) {
01172   ar & rank;
01173   ar & prop;
01174 }
01175 
01176 #endif
01177 

Generated on Wed Dec 17 15:11:57 2008 for dysii Dynamical Systems Library by  doxygen 1.5.3