mpi_gs.cpp
Go to the documentation of this file.
00001 
00002 #undef USE_ROS
00003 
00004 #ifdef USE_ROS
00005 #include "ros/ros.h"
00006 #endif
00007 
00008 #include <math.h>
00009 #include <stdio.h>
00010 #include <stdlib.h>
00011 #include <mpi.h>
00012 #include <Eigen3/Array>
00013 #include <Eigen3/SVD>
00014 #include <Eigen3/LU>
00015 #include <iostream>
00016 #include <sstream>
00017 
00018 void get_max_avg(unsigned int m, double* delta, double &max, double &avg)
00019 {
00020     avg = 0;
00021     for (unsigned int j=0; j<m ; j++)
00022     {
00023       if (delta[j] > max) max = delta[j];
00024       avg += delta[j];
00025     }
00026     avg = avg/(double)m;
00027 }
00028 
00029 void mat_print(FILE* file,unsigned int m,unsigned int n, double *a)
00030 {
00031   if (!file) return;
00032   // print matrix
00033   for (unsigned int j=0;j<n;j++)
00034   {
00035     for (unsigned int i=0;i<m;i++) fprintf(file,"%12.5f\t",a[i+j*m]);
00036     fprintf(file,"\n");
00037   }
00038 }
00039 
00040 void mat_print(FILE* file,Eigen3::MatrixXf a)
00041 {
00042   if (!file) return;
00043   unsigned int m = a.cols();
00044   unsigned int n = a.rows();
00045   // print matrix
00046   for (unsigned int j=0;j<n;j++)
00047   {
00048     for (unsigned int i=0;i<m;i++) fprintf(file,"%12.5f\t",a(i,j));
00049     fprintf(file,"\n");
00050   }
00051 }
00052 
00053 void mat_print(FILE* file,Eigen3::VectorXf v)
00054 {
00055   if (!file) return;
00056   unsigned int m = v.size();
00057   // print vector
00058   for (unsigned int i=0;i<m;i++) fprintf(file,"%12.5f\t",v(i));
00059   fprintf(file,"\n");
00060 }
00061 
00062 void gs(unsigned int m, double* a, double* x, double* b,unsigned int* order,double* rq ,double* delta,FILE* file)
00063 {
00064   // performs one iteration of gs
00065   for (unsigned int i = 0; i < m; i++)
00066   {
00067     unsigned int index = order[i];
00068     delta[index] = b[index]/a[index+index*m];
00069     for (unsigned int j = 0; j < m; j++)
00070     {
00071       delta[index] -= a[j+index*m]*x[j]/a[index+index*m];
00072     }
00073     x[index] += delta[index];
00074     if (delta[index] == 0)
00075       rq[index] = 0; //-1.0*(double)(i+1); //0.5/(double)(i+1);
00076     else
00077       rq[index] = fabs(delta[index])/(double)(i+1); // -1.0*(double)i; //
00078 
00079     rq[index] = (double)i;
00080   }
00081 /*
00082   if (file)
00083   {
00084     fprintf(file,"x: ");
00085     mat_print(file,m,1,x);
00086     fprintf(file,"d: ");
00087     mat_print(file,m,1,delta);
00088     fprintf(file,"r: ");
00089     mat_print(file,m,1,rq);
00090   }
00091 */
00092 }
00093 
00094 int main(int argc, char *argv[]) {
00095   int numprocs, rank, namelen;
00096   char processor_name[MPI_MAX_PROCESSOR_NAME];
00097 
00098   MPI_Init(&argc, &argv);
00099   //MPI_Comm_rank(MPI_COMM_WORLD, &rank);
00100   //MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
00101   rank = MPI::COMM_WORLD.Get_rank();
00102   numprocs = MPI::COMM_WORLD.Get_size();
00103 
00104   MPI_Get_processor_name(processor_name, &namelen);
00105 
00106   std::ostringstream log_file_stream;
00107   log_file_stream << "/tmp/mpi_gs_" << rank << ".x";
00108   FILE* log_file;
00109   log_file = fopen(log_file_stream.str().c_str(),"w");
00110   if (log_file) fprintf(log_file,"Process %d on %s out of %d\n", rank, processor_name, numprocs);
00111   printf("Process %d on %s out of %d\n", rank, processor_name, numprocs);
00112 
00113   // init random seed
00114   srand(time(NULL));
00115 
00116   // construct a mxm square matrix, note, this should be consistent across all mpi nodes
00117   const double matrix_row_sum_coeff = 2.0;
00118   const unsigned int m = 1000;
00119   const int num_iterations = 50;
00120   double* mat_a = new double[m*m];
00121   double* mat_x = new double[m];
00122   double* mat_b = new double[m];
00123   double* mat_rq = new double[m];
00124   double* mat_delta = new double[m];
00125   unsigned int* mat_order = new unsigned int[m]; // rearrange order of matrix
00126 
00127   for (unsigned int j=0;j<m;j++)
00128   {
00129     mat_rq[j] = 0; // recursive quality
00130     mat_order[j] = j; // access order
00131     mat_b[j] = (double)rand()/(double)RAND_MAX; // rhs
00132     for (unsigned int i=0;i<m;i++)
00133     {
00134       mat_x[i] = 0; // solution, preset to 0
00135       mat_a[i+j*m] = matrix_row_sum_coeff*(double)rand()/(double)RAND_MAX/(double)m; // strides in i first
00136     }
00137     if (j<m) mat_a[j+j*m] = 1.0; // ensure diagonal of A is non-zero, set to 1
00138   }
00139 
00140 #if 1
00141   // shift n-times based on rank
00142   for (int s=0;s<rank;s++) // shift rank-times
00143   {
00144     unsigned int tmp = mat_order[0];
00145     for (unsigned int j=1;j<m;j++) mat_order[j-1] = mat_order[j];
00146     mat_order[m-1] = tmp;
00147   }
00148 
00149   // shuffle
00150   for (unsigned int j=1;j<m;j++)
00151   {
00152     unsigned int tmpj = rand() % j; // between 0 and j-1
00153     //if (rank==20) printf("swap %d and %d\n",j,tmpj);
00154     unsigned int tmp = mat_order[j];
00155     mat_order[j] = mat_order[tmpj];
00156     mat_order[tmpj] = tmp;
00157   }
00158 #endif
00159 
00160   // write A matrix
00161   if (rank == 0)
00162   {
00163     std::ostringstream mat_a_file_stream;
00164     mat_a_file_stream << "/tmp/mpi_gs_A.x";
00165     FILE* mat_a_file;
00166     mat_a_file = fopen(mat_a_file_stream.str().c_str(),"w");
00167     if (mat_a_file) fprintf(mat_a_file,"Matrix:\n");
00168     if (mat_a_file) mat_print(mat_a_file,m,m,mat_a);
00169   }
00170 
00171   double init_time;
00172 
00173   // receiving buffers
00174   MPI::Request x_send_request[numprocs];
00175   MPI::Request x_recv_request[numprocs];
00176   MPI::Request rq_send_request[numprocs];
00177   MPI::Request rq_recv_request[numprocs];
00178   MPI::Status x_send_status[numprocs];
00179   MPI::Status x_recv_status[numprocs];
00180   MPI::Status rq_send_status[numprocs];
00181   MPI::Status rq_recv_status[numprocs];
00182   double* buf_x = new double[numprocs*m];  // this fails on single machine with 21+ nodes
00183   double* buf_rq = new double[numprocs*m];  // this fails on single machine with 21+ nodes
00184 
00185   if (rank == 0) init_time = MPI::Wtime();
00186   for (int iteration=0;iteration<num_iterations;iteration++)
00187   {
00188     double gs_time;
00189     if (rank == 0) gs_time = MPI::Wtime();
00190     gs(m,mat_a,mat_x,mat_b,mat_order,mat_rq,mat_delta,log_file);
00191     if (rank == 0) if (log_file) fprintf(log_file,"rq(gs) %d time: %f",iteration,MPI::Wtime()-gs_time);
00192 
00193     // round of exchange based on rq
00194     // synchronize solutions across nodes with latest updates
00195     double comm_time;
00196     if (rank == 0) comm_time = MPI::Wtime();
00197     for (int i=0; i < numprocs; i++)
00198     {
00199       if (i != rank)
00200       {
00201         // send solution and rq to all other nodes
00202         x_send_request[i] = MPI::COMM_WORLD.Isend(mat_x, m, MPI_DOUBLE , i, 1000+1000*(iteration+1)+i);
00203         rq_send_request[i] = MPI::COMM_WORLD.Isend(mat_rq, m, MPI_DOUBLE , i, 2000+1000*(iteration+1)+i);
00204         // receive solution and rq from all other nodes
00205         x_recv_request[i] = MPI::COMM_WORLD.Irecv(&buf_x[i*m], m, MPI_DOUBLE , i, 1000+1000*(iteration+1)+rank);
00206         rq_recv_request[i] = MPI::COMM_WORLD.Irecv(&buf_rq[i*m], m, MPI_DOUBLE , i, 2000+1000*(iteration+1)+rank);
00207       }
00208     }
00209     for (int i=0; i < numprocs; i++)
00210     {
00211       if (i != rank)
00212       {
00213         x_send_request[i].Wait(x_send_status[i]);  // don't need to wait for send request as they are implicitly enforced by receivers
00214         rq_send_request[i].Wait(rq_send_status[i]);  // don't need to wait for send request as they are implicitly enforced by receivers
00215 
00216         x_recv_request[i].Wait(x_recv_status[i]);
00217         rq_recv_request[i].Wait(rq_recv_status[i]);
00218       }
00219     }
00220     if (rank == 0) if (log_file) fprintf(log_file," rq(comm) %d time: %f",iteration,MPI::Wtime()-comm_time);
00221 
00222     for (int i=0; i < numprocs; i++)
00223     {
00224       if (i != rank)
00225       {
00226         // update solution based on rq
00227         for (unsigned int j=0; j<m ; j++)
00228         {
00229           //if (buf_rq[i*m+j] < mat_rq[j])
00230           if (buf_rq[i*m+j] == m-1)
00231           {
00232             mat_x[j] = buf_x[i*m+j];
00233             mat_rq[j] = buf_rq[i*m+j];
00234             //mat_x[j] = 0.5*(mat_x[j] + buf_x[i*m+j]);
00235             //mat_rq[j] = 0.5*(mat_rq[j] + buf_rq[i*m+j]);
00236           }
00237         }
00238       }
00239     }
00240     double tmp_avg = 0;
00241     double tmp_max = 0;
00242     get_max_avg(m, mat_delta,tmp_max,tmp_avg);
00243     if (log_file) fprintf(log_file," rq convergence: %d max: %f avg:%f \n",iteration,tmp_max,tmp_avg);
00244   }
00245   if (rank == 0) if (log_file) fprintf(log_file,"rq time: %f\n",MPI::Wtime()-init_time);
00246   if (log_file) mat_print(log_file,m,1,mat_x);
00247 
00248   delete buf_x;
00249   delete buf_rq;
00250 
00251   // re compute without rq
00252   for (unsigned int j=0; j<m ; j++) mat_x[j] = 0.0; // reset solution and try again
00253   if (rank == 0) init_time = MPI::Wtime();
00254   for (int iteration=0;iteration<num_iterations;iteration++)
00255   {
00256     gs(m,mat_a,mat_x,mat_b,mat_order,mat_rq,mat_delta,log_file);
00257 
00258     double tmp_avg = 0;
00259     double tmp_max = 0;
00260     get_max_avg(m, mat_delta,tmp_max,tmp_avg);
00261     if (log_file) fprintf(log_file,"gs convergence: %d max: %f avg:%f \n",iteration,tmp_max,tmp_avg);
00262 
00263   }
00264   if (rank == 0) if (log_file) fprintf(log_file,"gs time: %f\n",MPI::Wtime()-init_time);
00265   if (log_file) mat_print(log_file,m,1,mat_x);
00266 
00267   // re compute with eigen3
00268   Eigen3::MatrixXf e_a; e_a = Eigen3::MatrixXf::Zero(m,m);
00269   Eigen3::VectorXf e_x; e_x = Eigen3::VectorXf::Zero(m);
00270   Eigen3::VectorXf e_b; e_b = Eigen3::VectorXf::Zero(m);
00271   for (unsigned int j=0;j<m;j++)
00272   {
00273     for (unsigned int i=0;i<m;i++)
00274     {
00275       //e_x(i) = mat_x[i];
00276       e_a(j,i) = mat_a[i+j*m];
00277     }
00278     e_b(j) = mat_b[j];
00279   }
00280 
00281   if (rank == 0)
00282   {
00283     init_time = MPI::Wtime();
00284     e_x = e_a.fullPivLu().solve(e_b);
00285     if (log_file) fprintf(log_file,"lu time: %d %f\n",rank,MPI::Wtime()-init_time);
00286     if (log_file) mat_print(log_file,e_x);
00287   }
00288 
00289   if (rank == 0)
00290   {
00291     init_time = MPI::Wtime();
00292     Eigen3::JacobiSVD<Eigen3::MatrixXf> svd(e_a, Eigen3::ComputeThinU | Eigen3::ComputeThinV);
00293     std::cout << "Its singular values are:" << std::endl << svd.singularValues() << std::endl;
00294     e_x = svd.solve(e_b);
00295     if (log_file) fprintf(log_file,"svd time: %d %f\n",rank,MPI::Wtime()-init_time);
00296     if (log_file) mat_print(log_file,e_x);
00297   }
00298   
00299 
00300   if (log_file) fclose(log_file);
00301 
00302   delete mat_a;
00303   delete mat_x;
00304   delete mat_b;
00305   delete mat_rq;
00306   delete mat_delta;
00307   delete mat_order;
00308 
00309 #ifdef USE_ROS
00310   setenv("ROS_ROOT","/wg/stor5/wgsim/hsu/projects/cturtle/ros",1);
00311   setenv("ROS_MASTER_URI","http://wgsg0:11311",1);
00312   // test string buffer
00313   char uri_buf[256];
00314 
00315   std::string ros_master_uri;
00316   if (rank == 0)
00317   {
00318     ros::init(argc,argv,"mpi_node",ros::init_options::NoSigintHandler);
00319     ros_master_uri = ros::master::getURI();
00320     ros_master_uri = "http://akb:11311";
00321     //ROS_INFO("rank %d master uri %s",rank,ros_master_uri.c_str());
00322     for (int i=1; i < numprocs; i++)
00323       MPI::COMM_WORLD.Send(ros_master_uri.c_str(), ros_master_uri.size(), MPI_CHAR , i, 100);
00324     // ros::master::V_TopicInfo topics;
00325     // bool succ = ros::master::getTopics(topics);
00326     // printf("rank %d has %d topics %d",rank,(int)topics.size(),(int)succ);
00327     // printf("final: rank %d master uri %s",rank,ros::master::getURI().c_str());
00328   }
00329   else
00330   {
00331     MPI::COMM_WORLD.Recv(uri_buf, 256, MPI_CHAR , 0, 100);
00332     ros_master_uri = std::string(uri_buf);
00333   }
00334 #endif
00335 
00336 
00337 
00338 
00339   MPI_Finalize();
00340 }
00341 


mpi_test
Author(s): John Hsu
autogenerated on Mon Jan 6 2014 11:27:22