From: genaud Date: Mon, 29 Jun 2009 13:02:24 +0000 (+0000) Subject: MPI_Reduce using waitany(). Buggy. X-Git-Tag: SVN~1253 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/2ef76cc455c57d235189fdc567516e40f9c726d1?ds=sidebyside MPI_Reduce using waitany(). Buggy. git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6388 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- diff --git a/src/smpi/smpi_global.c b/src/smpi/smpi_global.c index b0c335d830..70cecc8f10 100644 --- a/src/smpi/smpi_global.c +++ b/src/smpi/smpi_global.c @@ -90,7 +90,6 @@ int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t datatype, smpi_mpi_request_t request = NULL; - printf("in create-req(): MPI_ANY_SOURCE=%d,src=%d,comm->size=%d\n",MPI_ANY_SOURCE,src,comm->size); // parameter checking prob belongs in smpi_mpi, but this is less repeat code if (NULL == buf) { retval = MPI_ERR_INTERN; @@ -101,7 +100,6 @@ int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t datatype, } else if (MPI_ANY_SOURCE != src && (0 > src || comm->size <= src)) { retval = MPI_ERR_RANK; } else if (0 > dst || comm->size <= dst) { - printf("err MPI_ERR_RANK => MPI_ANY_SOURCE=%d,src=%d,dst=%d,comm->size=%d\n",MPI_ANY_SOURCE,src,dst,comm->size); retval = MPI_ERR_RANK; } else if (MPI_ANY_TAG != tag && 0 > tag) { retval = MPI_ERR_TAG; @@ -192,6 +190,8 @@ void smpi_global_init() smpi_mpi_global->mpi_byte->size = (size_t) 1; smpi_mpi_global->mpi_int = xbt_new(s_smpi_mpi_datatype_t, 1); smpi_mpi_global->mpi_int->size = sizeof(int); + smpi_mpi_global->mpi_float = xbt_new(s_smpi_mpi_datatype_t, 1); + smpi_mpi_global->mpi_float->size = sizeof(float); smpi_mpi_global->mpi_double = xbt_new(s_smpi_mpi_datatype_t, 1); smpi_mpi_global->mpi_double->size = sizeof(double); diff --git a/src/smpi/smpi_mpi.c b/src/smpi/smpi_mpi.c index 301a642113..97b50bb8bd 100644 --- a/src/smpi/smpi_mpi.c +++ b/src/smpi/smpi_mpi.c @@ -181,6 +181,9 @@ int SMPI_MPI_Send(void *buf, int count, MPI_Datatype datatype, int dst, return retval; } +/** + * MPI_Wait and friends + **/ int SMPI_MPI_Wait(MPI_Request * request, MPI_Status * status) { return smpi_mpi_wait(*request, status); @@ -226,6 +229,18 @@ int SMPI_MPI_Bcast(void *buf, int count, MPI_Datatype datatype, int root, return retval; } +void print_buffer_int( void *buf, int len, const char *msg) ; +void print_buffer_int( void *buf, int len, const char *msg) { + int tmp, *v; + printf("**%s: ",msg); + for (tmp=0;tmpsize; - tabrequest = xbt_malloc((size)*sizeof(smpi_mpi_request_t)); + printf("-->rank %d. Entering ....\n",rank); + print_buffer_int( sendbuf, count, "sendbuf"); + + if (rank != root) { // if i am not ROOT, simply send my buffer to root + retval = smpi_create_request(sendbuf, count, datatype, rank, root, 0, comm, &request); + smpi_mpi_isend(request); + smpi_mpi_wait(request, MPI_STATUS_IGNORE); + xbt_mallocator_release(smpi_global->request_mallocator, request); - if (rank != root) { // if i am not root, simply send my buffer to root - retval = smpi_create_request(sendbuf, count, datatype, - rank, root, 0, comm, &(tabrequest[rank])); - smpi_mpi_isend(tabrequest[rank]); - smpi_mpi_wait(tabrequest[rank], MPI_STATUS_IGNORE); - //printf("DEBUG: rank %d sent my sendbuf to root (rank %d)\n",rank,root); } else { - // i am the root: wait for all buffers by creating requests + // i am the ROOT: wait for all buffers by creating one request by sender + tabrequest = xbt_malloc((size-1)*sizeof(smpi_mpi_request_t)); + + void *tmprecvbuf = xbt_malloc(count*datatype->size); // to store intermediate receptions + memcpy(recvbuf,sendbuf,count*datatype->size*sizeof(char)); // initiliaze recv buf with my own snd buf + // i can not use: 'request->forward = size-1;' (which would progagate size-1 receive reqs) // since we should op values as soon as one receiving request matches. - for (i=0; isize; i++) { - if ( rank != i ) { // except for me - // reminder: for smpi_create_request() the src is always the process sending. - retval = smpi_create_request(recvbuf, count, datatype, MPI_ANY_SOURCE, root, - 0, comm, &(tabrequest[i])); - if (NULL != tabrequest[i] && MPI_SUCCESS == retval) { - if (MPI_SUCCESS == retval) { - smpi_mpi_irecv(tabrequest[i]); - } + for (i=0; isize-1; i++) { + // reminder: for smpi_create_request() the src is always the process sending. + retval = smpi_create_request(tmprecvbuf, count, datatype, MPI_ANY_SOURCE, root, + 0, comm, &(tabrequest[i])); + if (NULL != tabrequest[i] && MPI_SUCCESS == retval) { + if (MPI_SUCCESS == retval) { + smpi_mpi_irecv(tabrequest[i]); } } } // now, wait for completion of all irecv's. - // FIXME: we should implement smpi_wait_all for a more asynchronous behavior - for (i=0; isize; i++) { - if ( rank != i ) { // except for me - smpi_mpi_wait(tabrequest[i], MPI_STATUS_IGNORE); - - // FIXME: the core part is here. To be written ... - - fprintf(stderr,"[smpi] %s:%d : MPI_Reduce *Not yet implemented*.\n",__FILE__,__LINE__); - } + for (i=0; isize-1; i++) { + int index = MPI_UNDEFINED; + smpi_mpi_waitany(comm->size-1, tabrequest, &index, MPI_STATUS_IGNORE); + + print_buffer_int( recvbuf, count, "rcvbuf"); + printf("MPI_Waitany() unblocked: root received (completes req[%d]): ",index); + print_buffer_int( tmprecvbuf, count, "tmprecvbuf"); + + // arg 2 is modified + op->func (tmprecvbuf,recvbuf,&count,&datatype); + print_buffer_int( recvbuf, count, "recvbuf after func"); + //fprintf(stderr,"[smpi] %s:%d : MPI_Reduce *Not yet implemented*.\n",__FILE__,__LINE__); + xbt_mallocator_release(smpi_global->request_mallocator, tabrequest[i]); } - + xbt_free(tabrequest); } - for (i=0; isize; i++) - xbt_mallocator_release(smpi_global->request_mallocator, tabrequest[i]); smpi_bench_begin(); @@ -293,8 +316,8 @@ int SMPI_MPI_Reduce( void *sendbuf, void *recvbuf, int count, MPI_Datatype datat int smpi_compare_rankkeys(const void *a, const void *b); int smpi_compare_rankkeys(const void *a, const void *b) { - int *x = (int *) a; - int *y = (int *) b; + int *x = (int *) a; + int *y = (int *) b; if (x[1] < y[1]) return -1;