Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
+ MPI_Sendrecv()
authorgenaud <genaud@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Mon, 13 Jul 2009 21:20:08 +0000 (21:20 +0000)
committergenaud <genaud@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Mon, 13 Jul 2009 21:20:08 +0000 (21:20 +0000)
+ part of optimized MPI_Alltoall()  (pairwise algo)

git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6501 48e7efb5-ca39-0410-a469-dd3cf9ba447f

src/smpi/smpi_base.c
src/smpi/smpi_coll.c
src/smpi/smpi_coll_private.h
src/smpi/smpi_global.c
src/smpi/smpi_mpi.c

index e441739..6c7952e 100644 (file)
@@ -44,7 +44,7 @@ void smpi_mpi_sum_func(void *a, void *b, int *length,
 void smpi_mpi_sum_func(void *a, void *b, int *length, MPI_Datatype * datatype)
 {
   int i;
 void smpi_mpi_sum_func(void *a, void *b, int *length, MPI_Datatype * datatype)
 {
   int i;
-  if (*datatype == smpi_mpi_global->mpi_byte) {
+  if ((*datatype == smpi_mpi_global->mpi_byte) || (*datatype == smpi_mpi_global->mpi_char)) {
     char *x = a, *y = b;
     for (i = 0; i < *length; i++) {
       y[i] = x[i] + y[i];
     char *x = a, *y = b;
     for (i = 0; i < *length; i++) {
       y[i] = x[i] + y[i];
@@ -66,7 +66,41 @@ void smpi_mpi_sum_func(void *a, void *b, int *length, MPI_Datatype * datatype)
     }
   }
 }
     }
   }
 }
+/**
+ *i multiply two vectors element-wise
+ *
+ * @param a the first vectors
+ * @param b the second vectors
+ * @return the second vector is modified and contains the element-wise products
+ **/
+void smpi_mpi_prod_func(void *a, void *b, int *length,
+                       MPI_Datatype * datatype);
 
 
+void smpi_mpi_prod_func(void *a, void *b, int *length, MPI_Datatype * datatype)
+{
+  int i;
+  if ((*datatype == smpi_mpi_global->mpi_byte) || (*datatype == smpi_mpi_global->mpi_char)) {
+    char *x = a, *y = b;
+    for (i = 0; i < *length; i++) {
+      y[i] = x[i] * y[i];
+    }
+  } else if (*datatype == smpi_mpi_global->mpi_int) {
+    int *x = a, *y = b;
+    for (i = 0; i < *length; i++) {
+      y[i] = x[i] * y[i];
+    }
+  } else if (*datatype == smpi_mpi_global->mpi_float) {
+    float *x = a, *y = b;
+    for (i = 0; i < *length; i++) {
+      y[i] = x[i] * y[i];
+    }
+  } else if (*datatype == smpi_mpi_global->mpi_double) {
+    double *x = a, *y = b;
+    for (i = 0; i < *length; i++) {
+      y[i] = x[i] * y[i];
+    }
+  }
+}
 /**
  * compute the min of two vectors element-wise
  **/
 /**
  * compute the min of two vectors element-wise
  **/
@@ -76,7 +110,7 @@ void smpi_mpi_min_func(void *a, void *b, int *length,
 void smpi_mpi_min_func(void *a, void *b, int *length, MPI_Datatype * datatype)
 {
   int i;
 void smpi_mpi_min_func(void *a, void *b, int *length, MPI_Datatype * datatype)
 {
   int i;
-  if (*datatype == smpi_mpi_global->mpi_byte) {
+  if ((*datatype == smpi_mpi_global->mpi_byte) || (*datatype == smpi_mpi_global->mpi_char)) {
     char *x = a, *y = b;
     for (i = 0; i < *length; i++) {
       y[i] = x[i] < y[i] ? x[i] : y[i];
     char *x = a, *y = b;
     for (i = 0; i < *length; i++) {
       y[i] = x[i] < y[i] ? x[i] : y[i];
@@ -115,7 +149,7 @@ void smpi_mpi_max_func(void *a, void *b, int *length,
 void smpi_mpi_max_func(void *a, void *b, int *length, MPI_Datatype * datatype)
 {
   int i;
 void smpi_mpi_max_func(void *a, void *b, int *length, MPI_Datatype * datatype)
 {
   int i;
-  if (*datatype == smpi_mpi_global->mpi_byte) {
+  if ((*datatype == smpi_mpi_global->mpi_byte) || (*datatype == smpi_mpi_global->mpi_char)) {
     char *x = a, *y = b;
     for (i = 0; i < *length; i++) {
       y[i] = x[i] > y[i] ? x[i] : y[i];
     char *x = a, *y = b;
     for (i = 0; i < *length; i++) {
       y[i] = x[i] > y[i] ? x[i] : y[i];
index 57a02e3..9c8dd5c 100644 (file)
@@ -322,8 +322,183 @@ int nary_tree_barrier( MPI_Comm comm , int arity)
 
 
 
 
 
 
+int smpi_coll_tuned_alltoall_pairwise (void *sendbuf, int sendcount, MPI_Datatype datatype,
+                   void* recvbuf, int recvcount, MPI_Datatype recvdatatype, MPI_Comm comm)
+{
+        int retval = MPI_SUCCESS;
+         int rank;
+         int size = comm->size;
+         int step;
+         int sendto, recvfrom;
+         int tag_alltoall=999;
+         void * tmpsend, *tmprecv;
+
+         rank = smpi_mpi_comm_rank(comm);
+         /* Perform pairwise exchange - starting from 1 so the local copy is last */
+         for (step = 1; step < size+1; step++) {
+
+                   /* who do we talk to in this step? */
+                   sendto  = (rank+step)%size;
+                   recvfrom = (rank+size-step)%size;
+
+                   /* where from are we sending and where from are we receiving actual data ? */
+                   tmpsend = (char*)sendbuf+sendto*datatype->size*sendcount;
+                   tmprecv = (char*)recvbuf+recvfrom*recvdatatype->size*recvcount;
+
+                   /* send and receive */
+                   /* in OpenMPI, they use :
+                        err = ompi_coll_tuned_sendrecv( tmpsend, scount, sdtype, sendto, MCA_COLL_BASE_TAG_ALLTOALL,
+                        tmprecv, rcount, rdtype, recvfrom, MCA_COLL_BASE_TAG_ALLTOALL,
+                        comm, MPI_STATUS_IGNORE, rank);
+                    */
+                   retval = MPI_Sendrecv(tmpsend, sendcount, datatype, sendto, tag_alltoall,
+                                               tmprecv, recvcount, recvdatatype, recvfrom, tag_alltoall,
+                                                 comm, MPI_STATUS_IGNORE);
+         }
+         return(retval);
 
 
+}
 
 
+int smpi_coll_tuned_alltoall_bruck(void *sendbuf, int sendcount, MPI_Datatype sdtype,
+                   void* recvbuf, int recvcount, MPI_Datatype rdtype, MPI_Comm comm)
+{
+         /*
+         int i, k, line = -1;
+         int rank, size;
+         int sendto, recvfrom, distance, *displs=NULL, *blen=NULL;
+         int maxpacksize, packsize, position;
+         char * tmpbuf=NULL, *packbuf=NULL;
+         ptrdiff_t lb, sext, rext;
+         int err = 0;
+         int weallocated = 0;
+         MPI_Datatype iddt;
+
+         size = ompi_comm_size(comm);
+         rank = ompi_comm_rank(comm);
+
+         OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:alltoall_intra_bruck rank %d", rank));
+
+         err = ompi_ddt_get_extent (sdtype, &lb, &sext);
+         if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
+
+         err = ompi_ddt_get_extent (rdtype, &lb, &rext);
+         if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
+
+
+         displs = (int *) malloc(size*sizeof(int));
+         if (displs == NULL) { line = __LINE__; err = -1; goto err_hndl; }
+         blen = (int *) malloc(size*sizeof(int));
+         if (blen == NULL) { line = __LINE__; err = -1; goto err_hndl; }
+         weallocated = 1;
+*/
+         /* Prepare for packing data */
+         /*err = MPI_Pack_size( scount*size, sdtype, comm, &maxpacksize );
+         if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl;  }
+*/
+         /* pack buffer allocation */
+/*       packbuf = (char*) malloc((unsigned) maxpacksize);
+         if (packbuf == NULL) { line = __LINE__; err = -1; goto err_hndl; }
+*/
+         /* tmp buffer allocation for message data */
+/*       tmpbuf = (char *) malloc(scount*size*sext);
+         if (tmpbuf == NULL) { line = __LINE__; err = -1; goto err_hndl; }
+*/
+
+         /* Step 1 - local rotation - shift up by rank */
+/*       err = ompi_ddt_copy_content_same_ddt (sdtype, (int32_t) ((size-rank)*scount),
+                               tmpbuf, ((char*)sbuf)+rank*scount*sext);
+         if (err<0) {
+                   line = __LINE__; err = -1; goto err_hndl;
+         }
+
+         if (rank != 0) {
+                   err = ompi_ddt_copy_content_same_ddt (sdtype, (int32_t) (rank*scount),
+                                         tmpbuf+(size-rank)*scount*sext, (char*)sbuf);
+                   if (err<0) {
+                               line = __LINE__; err = -1; goto err_hndl;
+                   }
+         }
+*/
+         /* perform communication step */
+/*       for (distance = 1; distance < size; distance<<=1) {
+*/
+                   /* send data to "sendto" */
+/*                 sendto = (rank+distance)%size;
+                   recvfrom = (rank-distance+size)%size;
+                   packsize = 0;
+                   k = 0;
+*/
+                   /* create indexed datatype */
+//                 for (i = 1; i < size; i++) {
+//                             if ((i&distance) == distance) {
+//                                       displs[k] = i*scount; blen[k] = scount;
+//                                       k++;
+//                             }
+//                 }
+                   /* Set indexes and displacements */
+//                 err = MPI_Type_indexed(k, blen, displs, sdtype, &iddt);
+//                 if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl;  }
+//                 /* Commit the new datatype */
+///                err = MPI_Type_commit(&iddt);
+//                 if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl;  }
+
+                   /* have the new distribution ddt, pack and exchange data */
+//                 err = MPI_Pack(tmpbuf, 1, iddt, packbuf, maxpacksize, &packsize, comm);
+//                 if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl;  }
+
+                   /* Sendreceive */
+//                 err = ompi_coll_tuned_sendrecv ( packbuf, packsize, MPI_PACKED, sendto,
+//                                       MCA_COLL_BASE_TAG_ALLTOALL,
+//                                       rbuf, packsize, MPI_PACKED, recvfrom,
+//                                       MCA_COLL_BASE_TAG_ALLTOALL,
+//                                       comm, MPI_STATUS_IGNORE, rank);
+//                 if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
+
+                   /* Unpack data from rbuf to tmpbuf */
+//                 position = 0;
+//         err = MPI_Unpack(rbuf, packsize, &position,
+//                                       tmpbuf, 1, iddt, comm);
+//                 if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
+
+                   /* free ddt */
+//                 err = MPI_Type_free(&iddt);
+//                 if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl;  }
+//       } /* end of for (distance = 1... */
+
+         /* Step 3 - local rotation - */
+//       for (i = 0; i < size; i++) {
+
+//                 err = ompi_ddt_copy_content_same_ddt (rdtype, (int32_t) rcount,
+//                                       ((char*)rbuf)+(((rank-i+size)%size)*rcount*rext),
+//                                       tmpbuf+i*rcount*rext);
+//
+//       if (err<0) {
+//                             line = __LINE__; err = -1; goto err_hndl;
+//                 }
+//       }
+
+         /* Step 4 - clean up */
+/*       if (tmpbuf != NULL) free(tmpbuf);
+         if (packbuf != NULL) free(packbuf);
+         if (weallocated) {
+                   if (displs != NULL) free(displs);
+                   if (blen != NULL) free(blen);
+         }
+         return OMPI_SUCCESS;
+
+err_hndl:
+         OPAL_OUTPUT((ompi_coll_tuned_stream,"%s:%4d\tError occurred %d, rank %2d", __FILE__,line,err,rank));
+         if (tmpbuf != NULL) free(tmpbuf);
+         if (packbuf != NULL) free(packbuf);
+         if (weallocated) {
+                   if (displs != NULL) free(displs);
+                   if (blen != NULL) free(blen);
+         }
+         return err;
+         */
+         int NOTYET=1;
+         return NOTYET;
+}
 
 
 /**
 
 
 /**
index 1df83cc..7647b0e 100644 (file)
 int nary_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm,int arity);
 int nary_tree_barrier( MPI_Comm comm, int arity );
 
 int nary_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm,int arity);
 int nary_tree_barrier( MPI_Comm comm, int arity );
 
+int smpi_coll_tuned_alltoall_bruck(void *sbuf, int scount, MPI_Datatype sdtype, 
+                                      void* rbuf, int rcount, MPI_Datatype rdtype,
+                                          MPI_Comm comm);
+
+int smpi_coll_tuned_alltoall_pairwise (void *sendbuf, int sendcount, MPI_Datatype datatype,
+                                          void* recvbuf, int recvcount, MPI_Datatype recvdatatype,
+                                                  MPI_Comm comm);
+
index c8550fe..c5c0e43 100644 (file)
@@ -127,6 +127,8 @@ void smpi_mpi_land_func(void *a, void *b, int *length,
                         MPI_Datatype * datatype);
 void smpi_mpi_sum_func(void *a, void *b, int *length,
                        MPI_Datatype * datatype);
                         MPI_Datatype * datatype);
 void smpi_mpi_sum_func(void *a, void *b, int *length,
                        MPI_Datatype * datatype);
+void smpi_mpi_prod_func(void *a, void *b, int *length,
+                       MPI_Datatype * datatype);
 void smpi_mpi_min_func(void *a, void *b, int *length,
                        MPI_Datatype * datatype);
 void smpi_mpi_max_func(void *a, void *b, int *length,
 void smpi_mpi_min_func(void *a, void *b, int *length,
                        MPI_Datatype * datatype);
 void smpi_mpi_max_func(void *a, void *b, int *length,
@@ -208,6 +210,8 @@ void smpi_global_init()
   smpi_mpi_global->mpi_land->func = smpi_mpi_land_func;
   smpi_mpi_global->mpi_sum = xbt_new(s_smpi_mpi_op_t, 1);
   smpi_mpi_global->mpi_sum->func = smpi_mpi_sum_func;
   smpi_mpi_global->mpi_land->func = smpi_mpi_land_func;
   smpi_mpi_global->mpi_sum = xbt_new(s_smpi_mpi_op_t, 1);
   smpi_mpi_global->mpi_sum->func = smpi_mpi_sum_func;
+  smpi_mpi_global->mpi_prod = xbt_new(s_smpi_mpi_op_t, 1);
+  smpi_mpi_global->mpi_prod->func = smpi_mpi_prod_func;
   smpi_mpi_global->mpi_min = xbt_new(s_smpi_mpi_op_t, 1);
   smpi_mpi_global->mpi_min->func = smpi_mpi_min_func;
   smpi_mpi_global->mpi_max = xbt_new(s_smpi_mpi_op_t, 1);
   smpi_mpi_global->mpi_min = xbt_new(s_smpi_mpi_op_t, 1);
   smpi_mpi_global->mpi_min->func = smpi_mpi_min_func;
   smpi_mpi_global->mpi_max = xbt_new(s_smpi_mpi_op_t, 1);
@@ -255,6 +259,7 @@ void smpi_global_destroy()
 
   xbt_free(smpi_mpi_global->mpi_land);
   xbt_free(smpi_mpi_global->mpi_sum);
 
   xbt_free(smpi_mpi_global->mpi_land);
   xbt_free(smpi_mpi_global->mpi_sum);
+  xbt_free(smpi_mpi_global->mpi_prod);
   xbt_free(smpi_mpi_global->mpi_max);
   xbt_free(smpi_mpi_global->mpi_min);
 
   xbt_free(smpi_mpi_global->mpi_max);
   xbt_free(smpi_mpi_global->mpi_min);
 
index 6b767a9..3d60c4d 100644 (file)
@@ -191,6 +191,40 @@ int SMPI_MPI_Send(void *buf, int count, MPI_Datatype datatype, int dst,
   return retval;
 }
 
   return retval;
 }
 
+/**
+ * MPI_Sendrecv
+ **/
+int SMPI_MPI_Sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype, int dest, int sendtag, 
+                   void *recvbuf, int recvcount, MPI_Datatype recvtype, int source, int recvtag,
+                   MPI_Comm comm, MPI_Status *status)
+{
+int rank;
+int retval = MPI_SUCCESS;
+smpi_mpi_request_t srequest;
+smpi_mpi_request_t rrequest;
+
+         rank = smpi_mpi_comm_rank(comm);
+
+         /* send */
+         retval = smpi_create_request(sendbuf, sendcount, sendtype, 
+                               rank,dest,sendtag, 
+                               comm, &srequest);
+         smpi_mpi_isend(srequest);
+
+
+         /* recv */
+         retval = smpi_create_request(recvbuf, recvcount, recvtype, 
+                               source,rank,recvtag, 
+                               comm, &rrequest);
+         smpi_mpi_irecv(rrequest);
+
+         smpi_mpi_wait(srequest, MPI_STATUS_IGNORE);
+         smpi_mpi_wait(rrequest, MPI_STATUS_IGNORE);
+
+         return(retval);
+}
+
+
 /**
  * MPI_Wait and friends
  **/
 /**
  * MPI_Wait and friends
  **/
@@ -477,6 +511,45 @@ int SMPI_MPI_Scatter(void *sendbuf, int sendcount, MPI_Datatype datatype,
 }
 
 
 }
 
 
+/**
+ * MPI_Alltoall user entry point
+ * 
+ * Uses the logic of OpenMPI (upto 1.2.7 or greater) for the optimizations
+ * ompi/mca/coll/tuned/coll_tuned_module.c
+ **/
+int SMPI_MPI_Alltoall(void *sendbuf, int sendcount, MPI_Datatype datatype, 
+                        void *recvbuf, int recvcount, MPI_Datatype recvtype,
+                          MPI_Comm comm)
+{
+  int retval = MPI_SUCCESS;
+  int block_dsize;
+  int rank;
+
+  smpi_bench_end();
+
+  rank = smpi_mpi_comm_rank(comm);
+  block_dsize = datatype->size * sendcount;
+
+  if ((block_dsize < 200) && (comm->size > 12)) {
+           retval = smpi_coll_tuned_alltoall_bruck(sendbuf, sendcount, datatype,
+                                 recvbuf, recvcount, recvtype, comm);
+
+  } else if (block_dsize < 3000) {
+/* use this one !!         retval = smpi_coll_tuned_alltoall_basic_linear(sendbuf, sendcount, datatype,
+                                 recvbuf, recvcount, recvtype, comm);
+                                 */
+  retval = smpi_coll_tuned_alltoall_pairwise(sendbuf, sendcount, datatype,
+                                 recvbuf, recvcount, recvtype, comm);
+  } else {
+
+  retval = smpi_coll_tuned_alltoall_pairwise(sendbuf, sendcount, datatype,
+                                 recvbuf, recvcount, recvtype, comm);
+  }
+
+  smpi_bench_begin();
+
+  return retval;
+}