From a9c3061cd030f1df440da03ce89f6498def03db1 Mon Sep 17 00:00:00 2001 From: genaud Date: Fri, 3 Jul 2009 16:49:58 +0000 Subject: [PATCH] - MPI_Barrier() as a collective - check code (allreduce still crashing) git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6446 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- src/smpi/smpi_base.c | 12 +++- src/smpi/smpi_coll.c | 145 ++++++++++++++++++++++++++++++++++---- src/smpi/smpi_global.c | 3 + src/smpi/smpi_mpi.c | 153 +++++++++++++++++++++-------------------- 4 files changed, 222 insertions(+), 91 deletions(-) diff --git a/src/smpi/smpi_base.c b/src/smpi/smpi_base.c index e5af68d34c..e44173929e 100644 --- a/src/smpi/smpi_base.c +++ b/src/smpi/smpi_base.c @@ -211,7 +211,8 @@ void smpi_process_finalize() xbt_free(pdata); } -int smpi_mpi_barrier(smpi_mpi_communicator_t comm) + +/*int smpi_mpi_barrier(smpi_mpi_communicator_t comm) { SIMIX_mutex_lock(comm->barrier_mutex); @@ -228,6 +229,7 @@ int smpi_mpi_barrier(smpi_mpi_communicator_t comm) return MPI_SUCCESS; } +*/ int smpi_mpi_isend(smpi_mpi_request_t request) { @@ -332,9 +334,13 @@ int smpi_mpi_waitany(int count, smpi_mpi_request_t * requests, int *index, } /* First check if one of them is already done */ for (cpt = 0; cpt < count; cpt++) { +#ifdef DEBUG_STEPH printf("...exam req[%d] of msg from [%d]\n",cpt,requests[cpt]->src); +#endif if (requests[cpt]->completed && !requests[cpt]->consumed) { /* got ya */ +#ifdef DEBUG_STEPH printf("...found match req[%d] of msg from [%d]\n",cpt,requests[cpt]->src); +#endif *index = cpt; goto found_request; } @@ -348,7 +354,9 @@ int smpi_mpi_waitany(int count, smpi_mpi_request_t * requests, int *index, print_req( requests[cpt] ); #endif if (!requests[cpt]->completed) { /* this one is not done, wait on it */ +#ifdef DEBUG_STEPH printf("... blocked waiting a msg %d->%d, tag=%d\n",requests[cpt]->src,requests[cpt]->dst,requests[cpt]->tag); +#endif while (!requests[cpt]->completed) SIMIX_cond_wait(requests[cpt]->cond, requests[cpt]->mutex); @@ -367,8 +375,8 @@ found_request: requests[*index]->consumed = 1; #ifdef DEBUG_STEPH print_req( requests[cpt] ); -#endif printf("...accessing *req[%d]->consumed\n",cpt); +#endif if (NULL != status) { status->MPI_SOURCE = requests[*index]->src; status->MPI_TAG = requests[*index]->tag; diff --git a/src/smpi/smpi_coll.c b/src/smpi/smpi_coll.c index 23dee9bf86..57a02e3f6b 100644 --- a/src/smpi/smpi_coll.c +++ b/src/smpi/smpi_coll.c @@ -14,9 +14,10 @@ #include #include "private.h" +#include "smpi_coll_private.h" -/* proctree taken and translated from P2P-MPI */ +/* proc_tree taken and translated from P2P-MPI */ struct proc_tree { int PROCTREE_A; @@ -36,7 +37,7 @@ void build_tree( int index, int extent, proc_tree_t *tree); proc_tree_t alloc_tree( int arity ); void free_tree( proc_tree_t tree); void print_tree(proc_tree_t tree); -int binomial_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm); + @@ -110,7 +111,9 @@ void build_tree( int index, int extent, proc_tree_t *tree) { } } - +/** + * prints the tree as a graphical representation + **/ void print_tree(proc_tree_t tree) { int i; char *spacer; @@ -131,8 +134,9 @@ void print_tree(proc_tree_t tree) { /** * bcast **/ -int binomial_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root, - MPI_Comm comm) +int tree_bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm, proc_tree_t tree); +int tree_bcast( void *buf, int count, MPI_Datatype datatype, int root, + MPI_Comm comm, proc_tree_t tree) { int system_tag = 999; // used negative int but smpi_create_request() declares this illegal (to be checked) int rank; @@ -140,12 +144,9 @@ int binomial_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root, int i; smpi_mpi_request_t request; smpi_mpi_request_t * requests; - void **tmpbufs; rank = smpi_mpi_comm_rank(comm); - proc_tree_t tree = alloc_tree( 2 ); // arity=2: a binomial tree - build_tree( rank, comm->size, &tree ); /* wait for data from my parent in the tree */ if (!tree->isRoot) { #ifdef DEBUG_STEPH @@ -164,9 +165,9 @@ int binomial_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root, printf("[%d] waiting on irecv from %d\n",rank , tree->parent); #endif smpi_mpi_wait(request, MPI_STATUS_IGNORE); + xbt_mallocator_release(smpi_global->request_mallocator, request); } - tmpbufs = xbt_malloc( tree->numChildren * sizeof(void *)); requests = xbt_malloc( tree->numChildren * sizeof(smpi_mpi_request_t)); #ifdef DEBUG_STEPH printf("[%d] creates %d requests\n",rank,tree->numChildren); @@ -178,9 +179,7 @@ int binomial_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root, #ifdef DEBUG_STEPH printf("[%d] send(%d->%d, tag=%d)\n",rank,rank, tree->child[i], system_tag+tree->child[i]); #endif - tmpbufs[i] = xbt_malloc( count * datatype->size); - memcpy( tmpbufs[i], buf, count * datatype->size * sizeof(char)); - retval = smpi_create_request(tmpbufs[i], count, datatype, + retval = smpi_create_request(buf, count, datatype, rank, tree->child[i], system_tag + tree->child[i], comm, &(requests[i])); @@ -194,8 +193,7 @@ int binomial_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root, smpi_mpi_isend(requests[i]); /* FIXME : we should not wait immediately here. See next FIXME. */ smpi_mpi_wait( requests[i], MPI_STATUS_IGNORE); - xbt_free(tmpbufs[i]); - xbt_free(requests[i]); + xbt_mallocator_release(smpi_global->request_mallocator, requests[i]); } } /* FIXME : normally, we sould wait only once all isend have been issued: @@ -208,13 +206,130 @@ int binomial_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root, printf("[%d] reqs completed\n)",rank); */ - xbt_free(tmpbufs); xbt_free(requests); return(retval); + /* checked ok with valgrind --leak-check=full*/ } + +/** + * anti-bcast + **/ +int tree_antibcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm, proc_tree_t tree); +int tree_antibcast( void *buf, int count, MPI_Datatype datatype, int root, + MPI_Comm comm, proc_tree_t tree) +{ + int system_tag = 999; // used negative int but smpi_create_request() declares this illegal (to be checked) + int rank; + int retval = MPI_SUCCESS; + int i; + smpi_mpi_request_t request; + smpi_mpi_request_t * requests; + + rank = smpi_mpi_comm_rank(comm); + + //------------------anti-bcast------------------- + + // everyone sends to its parent, except root. + if (!tree->isRoot) { + retval = smpi_create_request(buf, count, datatype, + rank,tree->parent, + system_tag + rank, + comm, &request); + if (MPI_SUCCESS != retval) { + printf("** internal error: smpi_create_request() rank=%d returned retval=%d, %s:%d\n", + rank,retval,__FILE__,__LINE__); + } + smpi_mpi_isend(request); + smpi_mpi_wait(request, MPI_STATUS_IGNORE); + xbt_mallocator_release(smpi_global->request_mallocator, request); + } + + //every one receives as many messages as it has children + requests = xbt_malloc( tree->numChildren * sizeof(smpi_mpi_request_t)); + for (i=0; i < tree->numChildren; i++) { + if (tree->child[i] != -1) { + retval = smpi_create_request(buf, count, datatype, + tree->child[i], rank, + system_tag + tree->child[i], + comm, &(requests[i])); + if (MPI_SUCCESS != retval) { + printf("** internal error: smpi_create_request() rank=%d returned retval=%d, %s:%d\n", + rank,retval,__FILE__,__LINE__); + } + smpi_mpi_irecv(requests[i]); + /* FIXME : we should not wait immediately here. See next FIXME. */ + smpi_mpi_wait( requests[i], MPI_STATUS_IGNORE); + xbt_mallocator_release(smpi_global->request_mallocator, requests[i]); + } + } + xbt_free(requests); + return(retval); + + /* checked ok with valgrind --leak-check=full*/ +} + +/** + * bcast with a binary, ternary, or whatever tree .. + **/ +int nary_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root, + MPI_Comm comm, int arity) +{ +int rank; +int retval; + + rank = smpi_mpi_comm_rank( comm ); + // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI)) + proc_tree_t tree = alloc_tree( arity ); + build_tree( rank, comm->size, &tree ); + + retval = tree_bcast( buf, count, datatype, root, comm, tree ); + + free_tree( tree ); + return( retval ); +} + + +/** + * Barrier + **/ + +int nary_tree_barrier( MPI_Comm comm , int arity) +{ + int rank; + int retval = MPI_SUCCESS; + char dummy='$'; + + rank = smpi_mpi_comm_rank(comm); + // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI)) + proc_tree_t tree = alloc_tree( arity ); + + build_tree( rank, comm->size, &tree ); + + retval = tree_antibcast( &dummy, 1, MPI_CHAR, 0, comm, tree); + if (MPI_SUCCESS != retval) { + printf("[%s:%d] ** Error: tree_antibcast() returned retval=%d\n",__FILE__,__LINE__,retval); + } + else { + retval = tree_bcast(&dummy, 1, MPI_CHAR, 0, comm, tree); + } + + free_tree( tree ); + return(retval); + + /* checked ok with valgrind --leak-check=full*/ +} + + + + + + + /** + * ----------------------------------------------------------------------------------------------------- * example usage + * ----------------------------------------------------------------------------------------------------- **/ /* * int main() { diff --git a/src/smpi/smpi_global.c b/src/smpi/smpi_global.c index cbe17a25c6..62f7e7d0d7 100644 --- a/src/smpi/smpi_global.c +++ b/src/smpi/smpi_global.c @@ -194,6 +194,8 @@ void smpi_global_init() // mpi datatypes smpi_mpi_global->mpi_byte = xbt_new(s_smpi_mpi_datatype_t, 1); smpi_mpi_global->mpi_byte->size = (size_t) 1; + smpi_mpi_global->mpi_char = xbt_new(s_smpi_mpi_datatype_t, 1); + smpi_mpi_global->mpi_char->size = (size_t) 1; smpi_mpi_global->mpi_int = xbt_new(s_smpi_mpi_datatype_t, 1); smpi_mpi_global->mpi_int->size = sizeof(int); smpi_mpi_global->mpi_float = xbt_new(s_smpi_mpi_datatype_t, 1); @@ -246,6 +248,7 @@ void smpi_global_destroy() xbt_free(smpi_mpi_global->mpi_comm_world); xbt_free(smpi_mpi_global->mpi_byte); + xbt_free(smpi_mpi_global->mpi_char); xbt_free(smpi_mpi_global->mpi_int); xbt_free(smpi_mpi_global->mpi_double); xbt_free(smpi_mpi_global->mpi_float); diff --git a/src/smpi/smpi_mpi.c b/src/smpi/smpi_mpi.c index 926728888a..0a075cc113 100644 --- a/src/smpi/smpi_mpi.c +++ b/src/smpi/smpi_mpi.c @@ -88,13 +88,20 @@ int SMPI_MPI_Type_size(MPI_Datatype datatype, size_t * size) int SMPI_MPI_Barrier(MPI_Comm comm) { int retval = MPI_SUCCESS; + int arity=4; smpi_bench_end(); if (NULL == comm) { retval = MPI_ERR_COMM; } else { - retval = smpi_mpi_barrier(comm); + + /* + * original implemantation: + * retval = smpi_mpi_barrier(comm); + * this one is unrealistic: it just cond_waits, means no time. + */ + retval = nary_tree_barrier( comm, arity ); } smpi_bench_begin(); @@ -243,13 +250,12 @@ int flat_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root, int SMPI_MPI_Bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm) { - int retval = MPI_SUCCESS; smpi_bench_end(); //retval = flat_tree_bcast(buf, count, datatype, root, comm); - retval = binomial_tree_bcast(buf, count, datatype, root, comm); + retval = nary_tree_bcast(buf, count, datatype, root, comm, 2 ); smpi_bench_begin(); @@ -279,88 +285,85 @@ static void print_buffer_int(void *buf, int len, const char *msg, int rank) * MPI_Reduce **/ int SMPI_MPI_Reduce(void *sendbuf, void *recvbuf, int count, - MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) + MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) { - int retval = MPI_SUCCESS; - int rank; - int size; - int i; - int tag = 0; - smpi_mpi_request_t *tabrequest; - smpi_mpi_request_t request; + int retval = MPI_SUCCESS; + int rank; + int size; + int i; + int tag = 0; + smpi_mpi_request_t *requests; + smpi_mpi_request_t request; - smpi_bench_end(); + smpi_bench_end(); - rank = smpi_mpi_comm_rank(comm); - size = comm->size; + rank = smpi_mpi_comm_rank(comm); + size = comm->size; - if (rank != root) { // if i am not ROOT, simply send my buffer to root + if (rank != root) { // if i am not ROOT, simply send my buffer to root #ifdef DEBUG_REDUCE - print_buffer_int(sendbuf, count, xbt_strdup("sndbuf"), rank); + print_buffer_int(sendbuf, count, xbt_strdup("sndbuf"), rank); #endif - retval = - smpi_create_request(sendbuf, count, datatype, rank, root, tag, comm, - &request); - smpi_mpi_isend(request); - smpi_mpi_wait(request, MPI_STATUS_IGNORE); - xbt_mallocator_release(smpi_global->request_mallocator, request); - - } else { - // i am the ROOT: wait for all buffers by creating one request by sender - int src; - tabrequest = xbt_malloc((size - 1) * sizeof(smpi_mpi_request_t)); - - void **tmpbufs = xbt_malloc((size - 1) * sizeof(void *)); - for (i = 0; i < size - 1; i++) { - // we need 1 buffer per request to store intermediate receptions - tmpbufs[i] = xbt_malloc(count * datatype->size); - } - memcpy(recvbuf, sendbuf, count * datatype->size * sizeof(char)); // initiliaze recv buf with my own snd buf - - // i can not use: 'request->forward = size-1;' (which would progagate size-1 receive reqs) - // since we should op values as soon as one receiving request matches. - for (i = 0; i < size - 1; i++) { - // reminder: for smpi_create_request() the src is always the process sending. - src = i < root ? i : i + 1; - retval = smpi_create_request(tmpbufs[i], count, datatype, - src, root, tag, comm, &(tabrequest[i])); - if (NULL != tabrequest[i] && MPI_SUCCESS == retval) { - if (MPI_SUCCESS == retval) { - smpi_mpi_irecv(tabrequest[i]); - } - } - } - // now, wait for completion of all irecv's. - for (i = 0; i < size - 1; i++) { - int index = MPI_UNDEFINED; - smpi_mpi_waitany(size - 1, tabrequest, &index, MPI_STATUS_IGNORE); + retval = + smpi_create_request(sendbuf, count, datatype, rank, root, tag, comm, + &request); + smpi_mpi_isend(request); + smpi_mpi_wait(request, MPI_STATUS_IGNORE); + xbt_mallocator_release(smpi_global->request_mallocator, request); + } else { + // i am the ROOT: wait for all buffers by creating one request by sender + int src; + requests = xbt_malloc((size-1) * sizeof(smpi_mpi_request_t)); + + void **tmpbufs = xbt_malloc((size-1) * sizeof(void *)); + for (i = 0; i < size-1; i++) { + // we need 1 buffer per request to store intermediate receptions + tmpbufs[i] = xbt_malloc(count * datatype->size); + } + // root: initiliaze recv buf with my own snd buf + memcpy(recvbuf, sendbuf, count * datatype->size * sizeof(char)); + + // i can not use: 'request->forward = size-1;' (which would progagate size-1 receive reqs) + // since we should op values as soon as one receiving request matches. + for (i = 0; i < size-1; i++) { + // reminder: for smpi_create_request() the src is always the process sending. + src = i < root ? i : i + 1; + retval = smpi_create_request(tmpbufs[i], count, datatype, + src, root, tag, comm, &(requests[i])); + if (NULL != requests[i] && MPI_SUCCESS == retval) { + if (MPI_SUCCESS == retval) { + smpi_mpi_irecv(requests[i]); + } + } + } + // now, wait for completion of all irecv's. + for (i = 0; i < size-1; i++) { + int index = MPI_UNDEFINED; + smpi_mpi_waitany( size-1, requests, &index, MPI_STATUS_IGNORE); #ifdef DEBUG_REDUCE - printf - ("MPI_Waitany() unblocked: root received (completes req[index=%d])\n", - index); - print_buffer_int(tmpbufs[index], count, - bprintf("tmpbufs[index=%d] (value received)", index), - rank); + printf ("MPI_Waitany() unblocked: root received (completes req[index=%d])\n",index); + print_buffer_int(tmpbufs[index], count, bprintf("tmpbufs[index=%d] (value received)", index), + rank); #endif - // arg 2 is modified - op->func(tmpbufs[index], recvbuf, &count, &datatype); + // arg 2 is modified + op->func(tmpbufs[index], recvbuf, &count, &datatype); #ifdef DEBUG_REDUCE - print_buffer_int(recvbuf, count, xbt_strdup("rcvbuf"), rank); - + print_buffer_int(recvbuf, count, xbt_strdup("rcvbuf"), rank); #endif - //xbt_mallocator_release(smpi_global->request_mallocator, tabrequest[i]); - xbt_free(tmpbufs[index]); - } - xbt_free(tabrequest); - xbt_free(tmpbufs); - } - - smpi_bench_begin(); - - return retval; + xbt_free(tmpbufs[index]); + /* FIXME: with the following line, it generates an + * [xbt_ex/CRITICAL] Conditional list not empty 162518800. + */ + // xbt_mallocator_release(smpi_global->request_mallocator, requests[index]); + } + xbt_free(requests); + xbt_free(tmpbufs); + } + smpi_bench_begin(); + return retval; } /** @@ -374,15 +377,15 @@ int SMPI_MPI_Allreduce( void *sendbuf, void *recvbuf, int count, MPI_Datatype da MPI_Op op, MPI_Comm comm ) { int retval = MPI_SUCCESS; - int root=0; // arbitrary choice + int root=1; // arbitrary choice smpi_bench_end(); retval = SMPI_MPI_Reduce( sendbuf, recvbuf, count, datatype, op, root, comm); if (MPI_SUCCESS != retval) return(retval); - retval = SMPI_MPI_Bcast( recvbuf, count, datatype, root, comm); + retval = SMPI_MPI_Bcast( sendbuf, count, datatype, root, comm); smpi_bench_begin(); return( retval ); } @@ -527,3 +530,5 @@ double SMPI_MPI_Wtime(void) { return (SIMIX_get_clock()); } + + -- 2.20.1