Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Implement MPI_Waitany and MPI_Waitall
authormquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Mon, 29 Jun 2009 09:11:39 +0000 (09:11 +0000)
committermquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Mon, 29 Jun 2009 09:11:39 +0000 (09:11 +0000)
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6386 48e7efb5-ca39-0410-a469-dd3cf9ba447f

ChangeLog
src/smpi/private.h
src/smpi/smpi_base.c
src/smpi/smpi_global.c
src/smpi/smpi_mpi.c

index cfd4821..131ec51 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,9 @@
 SimGrid (3.3.1-svn) unstable; urgency=low
 
+ SMPI:
+  * Implement MPI_Waitany and MPI_Waitall 
+    (not available from user side yet)
+
  -- Da SimGrid team <simgrid-devel@lists.gforge.inria.fr>
 
 SimGrid (3.3.1) stable; urgency=low
index 487d129..115b23f 100644 (file)
@@ -41,6 +41,7 @@ typedef struct smpi_mpi_request_t {
   smpi_mpi_datatype_t datatype;
 
   short int completed:1;
+  short int consumed:1; /* for waitany */
 
   smx_mutex_t mutex;
   smx_cond_t cond;
@@ -128,6 +129,8 @@ int smpi_mpi_barrier(smpi_mpi_communicator_t comm);
 int smpi_mpi_isend(smpi_mpi_request_t request);
 int smpi_mpi_irecv(smpi_mpi_request_t request);
 int smpi_mpi_wait(smpi_mpi_request_t request, smpi_mpi_status_t * status);
+int smpi_mpi_wait_all(int count, smpi_mpi_request_t *requests, smpi_mpi_status_t **status);
+int smpi_mpi_wait_any(int count, smpi_mpi_request_t *requests, int *index, smpi_mpi_status_t *status);
 
 void smpi_execute(double duration);
 void smpi_start_timer(void);
index 6dbf4ff..9eee82f 100644 (file)
@@ -34,7 +34,7 @@ void smpi_mpi_land_func(void *a, void *b, int *length,
 /**
  * sum two vectors element-wise
  *
- * @param a the first vectors 
+ * @param a the first vectors
  * @param b the second vectors
  * @return the second vector is modified and contains the element-wise sums
  **/
@@ -49,25 +49,22 @@ void smpi_mpi_sum_func(void *a, void *b, int *length, MPI_Datatype * datatype)
                                for (i = 0; i < *length; i++) {
                                          y[i] = x[i] + y[i];
                                }
-         } else {
-         if (*datatype == smpi_mpi_global->mpi_int) {
+         } else if (*datatype == smpi_mpi_global->mpi_int) {
                                int *x = a, *y = b;
                                for (i = 0; i < *length; i++) {
                                          y[i] = x[i] + y[i];
                                }
-         } else {
-         if (*datatype == smpi_mpi_global->mpi_float) {
+         } else if (*datatype == smpi_mpi_global->mpi_float) {
                                float *x = a, *y = b;
                                for (i = 0; i < *length; i++) {
                                          y[i] = x[i] + y[i];
                                }
-         } else {
-         if (*datatype == smpi_mpi_global->mpi_double) {
+         } else if (*datatype == smpi_mpi_global->mpi_double) {
                                double *x = a, *y = b;
                                for (i = 0; i < *length; i++) {
                                          y[i] = x[i] + y[i];
                                }
-         }}}}
+         }
 }
 /**
  * compute the min of two vectors element-wise
@@ -279,3 +276,60 @@ int smpi_mpi_wait(smpi_mpi_request_t request, smpi_mpi_status_t * status)
 
   return retval;
 }
+
+int smpi_mpi_wait_all(int count, smpi_mpi_request_t *requests, smpi_mpi_status_t **status) {
+       int cpt;
+       int index;
+       int retval;
+       smpi_mpi_status_t stat;
+
+       for (cpt=0; cpt<count;cpt++) {
+               retval = smpi_mpi_wait_any(count,requests, &index,&stat);
+               if (retval != MPI_SUCCESS)
+                       return retval;
+               memcpy(status[index],&stat,sizeof(stat));
+       }
+       return MPI_SUCCESS;
+}
+
+int smpi_mpi_wait_any(int count, smpi_mpi_request_t *requests, int *index, smpi_mpi_status_t *status) {
+         int cpt;
+
+         *index = MPI_UNDEFINED;
+         if (NULL == requests) {
+           return MPI_ERR_INTERN;
+         }
+         /* First check if one of them is already done */
+         for (cpt=0;cpt<count;cpt++) {
+                 if (requests[cpt]->completed && !requests[cpt]->consumed) { /* got ya */
+                         *index=cpt;
+                         goto found_request;
+                 }
+         }
+         /* If none found, block */
+         /* FIXME: should use a SIMIX_cond_waitany, when implemented. For now, block on the first one */
+         while (1) {
+                 for (cpt=0;cpt<count;cpt++) {
+                         if (!requests[cpt]->completed) { /* this one is not done, wait on it */
+                                 while (!requests[cpt]->completed)
+                                     SIMIX_cond_wait(requests[cpt]->cond, requests[cpt]->mutex);
+
+                                 *index=cpt;
+                                 goto found_request;
+                         }
+                 }
+                 if (cpt == count) /* they are all done. Damn user */
+                         return MPI_ERR_REQUEST;
+         }
+
+         found_request:
+         requests[*index]->consumed = 1;
+
+          if (NULL != status) {
+             status->MPI_SOURCE = requests[*index]->src;
+             status->MPI_TAG = requests[*index]->tag;
+             status->MPI_ERROR = MPI_SUCCESS;
+           }
+         return MPI_SUCCESS;
+
+}
index 4971ece..b0c335d 100644 (file)
@@ -47,6 +47,7 @@ void smpi_request_reset(void *pointer)
 
   request->buf = NULL;
   request->completed = 0;
+  request->consumed = 0;
   request->data = NULL;
   request->forward = 0;
 
index d19913a..6713ba5 100644 (file)
@@ -224,28 +224,24 @@ int SMPI_MPI_Bcast(void *buf, int count, MPI_Datatype datatype, int root,
  * MPI_Reduce
  **/
 
-int SMPI_MPI_Reduce( void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, 
+int SMPI_MPI_Reduce( void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype,
                         MPI_Op op, int root, MPI_Comm comm )
 {
   int retval = MPI_SUCCESS;
   int rank;
   int size;
   int i;
-  smpi_mpi_request_t *tabrequest; 
+  smpi_mpi_request_t *tabrequest;
 
   smpi_bench_end();
 
   rank = smpi_mpi_comm_rank(comm);
-  size = comm->size; 
+  size = comm->size;
 
-  tabrequest = malloc((size)*sizeof(smpi_mpi_request_t));
-  if (NULL==tabrequest) {
-       fprintf(stderr,"[smpi] %s:%d : cannot alloc memory for %d requests. Exiting.\n",__FILE__,__LINE__,size);
-       exit(1);
-  }
+  tabrequest = xbt_malloc((size)*sizeof(smpi_mpi_request_t));
 
   if (rank != root) { // if i am not root, simply send my buffer to root
-           retval = smpi_create_request(sendbuf, count, datatype, 
+           retval = smpi_create_request(sendbuf, count, datatype,
                                  rank, root, 0, comm, &(tabrequest[rank]));
            smpi_mpi_isend(tabrequest[rank]);
            smpi_mpi_wait(tabrequest[rank], MPI_STATUS_IGNORE);
@@ -257,7 +253,7 @@ int SMPI_MPI_Reduce( void *sendbuf, void *recvbuf, int count, MPI_Datatype datat
            for (i=0; i<comm->size; i++) {
                        if ( rank != i ) { // except for me
                                  // reminder: for smpi_create_request() the src is always the process sending.
-                                 retval = smpi_create_request(recvbuf, count, datatype, MPI_ANY_SOURCE, root, 
+                                 retval = smpi_create_request(recvbuf, count, datatype, MPI_ANY_SOURCE, root,
                                                        0, comm, &(tabrequest[i]));
                                  if (NULL != tabrequest[i] && MPI_SUCCESS == retval) {
                                            if (MPI_SUCCESS == retval) {
@@ -267,19 +263,19 @@ int SMPI_MPI_Reduce( void *sendbuf, void *recvbuf, int count, MPI_Datatype datat
                        }
            }
            // now, wait for completion of all irecv's.
-           // FIXME: we should implement smpi_waill_all for a more asynchronous behavior
+           // FIXME: we should implement smpi_wait_all for a more asynchronous behavior
            for (i=0; i<comm->size; i++) {
                        if ( rank != i ) { // except for me
                                  smpi_mpi_wait(tabrequest[i], MPI_STATUS_IGNORE);
 
                                  // FIXME: the core part is here. To be written ...
-                                 fprintf(stderr,"[smpi] %s:%d : MPI_Reduce *Not yet implemented*.\n",__FILE__,__LINE__);
 
+                                 fprintf(stderr,"[smpi] %s:%d : MPI_Reduce *Not yet implemented*.\n",__FILE__,__LINE__);
                        }
-
            }
+
   }
-  for (i=0; i<comm->size; i++) 
+  for (i=0; i<comm->size; i++)
            xbt_mallocator_release(smpi_global->request_mallocator, tabrequest[i]);
 
   smpi_bench_begin();
@@ -419,7 +415,7 @@ int SMPI_MPI_Comm_split(MPI_Comm comm, int color, int key,
   return retval;
 }
 
-double SMPI_MPI_Wtime( void ) 
-{ 
+double SMPI_MPI_Wtime( void )
+{
          return ( SIMIX_get_clock() );
 }