From: mquinson Date: Mon, 29 Jun 2009 09:11:39 +0000 (+0000) Subject: Implement MPI_Waitany and MPI_Waitall X-Git-Tag: SVN~1255 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/bd9bee9dcbceb25fdb5a293b91abbd5476db7ad2?hp=46b15303c8dfa35886444e5df7cf2af7d4ea7201 Implement MPI_Waitany and MPI_Waitall git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6386 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- diff --git a/ChangeLog b/ChangeLog index cfd482171e..131ec516a5 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,5 +1,9 @@ SimGrid (3.3.1-svn) unstable; urgency=low + SMPI: + * Implement MPI_Waitany and MPI_Waitall + (not available from user side yet) + -- Da SimGrid team SimGrid (3.3.1) stable; urgency=low diff --git a/src/smpi/private.h b/src/smpi/private.h index 487d12913f..115b23f17a 100644 --- a/src/smpi/private.h +++ b/src/smpi/private.h @@ -41,6 +41,7 @@ typedef struct smpi_mpi_request_t { smpi_mpi_datatype_t datatype; short int completed:1; + short int consumed:1; /* for waitany */ smx_mutex_t mutex; smx_cond_t cond; @@ -128,6 +129,8 @@ int smpi_mpi_barrier(smpi_mpi_communicator_t comm); int smpi_mpi_isend(smpi_mpi_request_t request); int smpi_mpi_irecv(smpi_mpi_request_t request); int smpi_mpi_wait(smpi_mpi_request_t request, smpi_mpi_status_t * status); +int smpi_mpi_wait_all(int count, smpi_mpi_request_t *requests, smpi_mpi_status_t **status); +int smpi_mpi_wait_any(int count, smpi_mpi_request_t *requests, int *index, smpi_mpi_status_t *status); void smpi_execute(double duration); void smpi_start_timer(void); diff --git a/src/smpi/smpi_base.c b/src/smpi/smpi_base.c index 6dbf4ff887..9eee82fdd2 100644 --- a/src/smpi/smpi_base.c +++ b/src/smpi/smpi_base.c @@ -34,7 +34,7 @@ void smpi_mpi_land_func(void *a, void *b, int *length, /** * sum two vectors element-wise * - * @param a the first vectors + * @param a the first vectors * @param b the second vectors * @return the second vector is modified and contains the element-wise sums **/ @@ -49,25 +49,22 @@ void smpi_mpi_sum_func(void *a, void *b, int *length, MPI_Datatype * datatype) for (i = 0; i < *length; i++) { y[i] = x[i] + y[i]; } - } else { - if (*datatype == smpi_mpi_global->mpi_int) { + } else if (*datatype == smpi_mpi_global->mpi_int) { int *x = a, *y = b; for (i = 0; i < *length; i++) { y[i] = x[i] + y[i]; } - } else { - if (*datatype == smpi_mpi_global->mpi_float) { + } else if (*datatype == smpi_mpi_global->mpi_float) { float *x = a, *y = b; for (i = 0; i < *length; i++) { y[i] = x[i] + y[i]; } - } else { - if (*datatype == smpi_mpi_global->mpi_double) { + } else if (*datatype == smpi_mpi_global->mpi_double) { double *x = a, *y = b; for (i = 0; i < *length; i++) { y[i] = x[i] + y[i]; } - }}}} + } } /** * compute the min of two vectors element-wise @@ -279,3 +276,60 @@ int smpi_mpi_wait(smpi_mpi_request_t request, smpi_mpi_status_t * status) return retval; } + +int smpi_mpi_wait_all(int count, smpi_mpi_request_t *requests, smpi_mpi_status_t **status) { + int cpt; + int index; + int retval; + smpi_mpi_status_t stat; + + for (cpt=0; cptcompleted && !requests[cpt]->consumed) { /* got ya */ + *index=cpt; + goto found_request; + } + } + /* If none found, block */ + /* FIXME: should use a SIMIX_cond_waitany, when implemented. For now, block on the first one */ + while (1) { + for (cpt=0;cptcompleted) { /* this one is not done, wait on it */ + while (!requests[cpt]->completed) + SIMIX_cond_wait(requests[cpt]->cond, requests[cpt]->mutex); + + *index=cpt; + goto found_request; + } + } + if (cpt == count) /* they are all done. Damn user */ + return MPI_ERR_REQUEST; + } + + found_request: + requests[*index]->consumed = 1; + + if (NULL != status) { + status->MPI_SOURCE = requests[*index]->src; + status->MPI_TAG = requests[*index]->tag; + status->MPI_ERROR = MPI_SUCCESS; + } + return MPI_SUCCESS; + +} diff --git a/src/smpi/smpi_global.c b/src/smpi/smpi_global.c index 4971ece6d8..b0c335d830 100644 --- a/src/smpi/smpi_global.c +++ b/src/smpi/smpi_global.c @@ -47,6 +47,7 @@ void smpi_request_reset(void *pointer) request->buf = NULL; request->completed = 0; + request->consumed = 0; request->data = NULL; request->forward = 0; diff --git a/src/smpi/smpi_mpi.c b/src/smpi/smpi_mpi.c index d19913a062..6713ba5e7f 100644 --- a/src/smpi/smpi_mpi.c +++ b/src/smpi/smpi_mpi.c @@ -224,28 +224,24 @@ int SMPI_MPI_Bcast(void *buf, int count, MPI_Datatype datatype, int root, * MPI_Reduce **/ -int SMPI_MPI_Reduce( void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, +int SMPI_MPI_Reduce( void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm ) { int retval = MPI_SUCCESS; int rank; int size; int i; - smpi_mpi_request_t *tabrequest; + smpi_mpi_request_t *tabrequest; smpi_bench_end(); rank = smpi_mpi_comm_rank(comm); - size = comm->size; + size = comm->size; - tabrequest = malloc((size)*sizeof(smpi_mpi_request_t)); - if (NULL==tabrequest) { - fprintf(stderr,"[smpi] %s:%d : cannot alloc memory for %d requests. Exiting.\n",__FILE__,__LINE__,size); - exit(1); - } + tabrequest = xbt_malloc((size)*sizeof(smpi_mpi_request_t)); if (rank != root) { // if i am not root, simply send my buffer to root - retval = smpi_create_request(sendbuf, count, datatype, + retval = smpi_create_request(sendbuf, count, datatype, rank, root, 0, comm, &(tabrequest[rank])); smpi_mpi_isend(tabrequest[rank]); smpi_mpi_wait(tabrequest[rank], MPI_STATUS_IGNORE); @@ -257,7 +253,7 @@ int SMPI_MPI_Reduce( void *sendbuf, void *recvbuf, int count, MPI_Datatype datat for (i=0; isize; i++) { if ( rank != i ) { // except for me // reminder: for smpi_create_request() the src is always the process sending. - retval = smpi_create_request(recvbuf, count, datatype, MPI_ANY_SOURCE, root, + retval = smpi_create_request(recvbuf, count, datatype, MPI_ANY_SOURCE, root, 0, comm, &(tabrequest[i])); if (NULL != tabrequest[i] && MPI_SUCCESS == retval) { if (MPI_SUCCESS == retval) { @@ -267,19 +263,19 @@ int SMPI_MPI_Reduce( void *sendbuf, void *recvbuf, int count, MPI_Datatype datat } } // now, wait for completion of all irecv's. - // FIXME: we should implement smpi_waill_all for a more asynchronous behavior + // FIXME: we should implement smpi_wait_all for a more asynchronous behavior for (i=0; isize; i++) { if ( rank != i ) { // except for me smpi_mpi_wait(tabrequest[i], MPI_STATUS_IGNORE); // FIXME: the core part is here. To be written ... - fprintf(stderr,"[smpi] %s:%d : MPI_Reduce *Not yet implemented*.\n",__FILE__,__LINE__); + fprintf(stderr,"[smpi] %s:%d : MPI_Reduce *Not yet implemented*.\n",__FILE__,__LINE__); } - } + } - for (i=0; isize; i++) + for (i=0; isize; i++) xbt_mallocator_release(smpi_global->request_mallocator, tabrequest[i]); smpi_bench_begin(); @@ -419,7 +415,7 @@ int SMPI_MPI_Comm_split(MPI_Comm comm, int color, int key, return retval; } -double SMPI_MPI_Wtime( void ) -{ +double SMPI_MPI_Wtime( void ) +{ return ( SIMIX_get_clock() ); }