X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/a9c3061cd030f1df440da03ce89f6498def03db1..3c072dea92bbf0e4df46b1d8d56cd62e68883b33:/src/smpi/smpi_coll.c diff --git a/src/smpi/smpi_coll.c b/src/smpi/smpi_coll.c index 57a02e3f6b..42217104de 100644 --- a/src/smpi/smpi_coll.c +++ b/src/smpi/smpi_coll.c @@ -12,10 +12,13 @@ #include #include #include +#include #include "private.h" #include "smpi_coll_private.h" +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_coll, smpi, + "Logging specific to SMPI (coll)"); /* proc_tree taken and translated from P2P-MPI */ @@ -149,9 +152,7 @@ int tree_bcast( void *buf, int count, MPI_Datatype datatype, int root, /* wait for data from my parent in the tree */ if (!tree->isRoot) { -#ifdef DEBUG_STEPH - printf("[%d] recv(%d from %d, tag=%d)\n",rank,rank, tree->parent, system_tag+rank); -#endif + DEBUG3("<%d> tree_bcast(): i am not root: recv from %d, tag=%d)",rank,tree->parent,system_tag+rank); retval = smpi_create_request(buf, count, datatype, tree->parent, rank, system_tag + rank, @@ -161,31 +162,22 @@ int tree_bcast( void *buf, int count, MPI_Datatype datatype, int root, rank,retval,__FILE__,__LINE__); } smpi_mpi_irecv(request); -#ifdef DEBUG_STEPH - printf("[%d] waiting on irecv from %d\n",rank , tree->parent); -#endif + DEBUG2("<%d> tree_bcast(): waiting on irecv from %d",rank, tree->parent); smpi_mpi_wait(request, MPI_STATUS_IGNORE); xbt_mallocator_release(smpi_global->request_mallocator, request); } requests = xbt_malloc( tree->numChildren * sizeof(smpi_mpi_request_t)); -#ifdef DEBUG_STEPH - printf("[%d] creates %d requests\n",rank,tree->numChildren); -#endif + DEBUG2("<%d> creates %d requests (1 per child)\n",rank,tree->numChildren); /* iniates sends to ranks lower in the tree */ for (i=0; i < tree->numChildren; i++) { if (tree->child[i] != -1) { -#ifdef DEBUG_STEPH - printf("[%d] send(%d->%d, tag=%d)\n",rank,rank, tree->child[i], system_tag+tree->child[i]); -#endif + DEBUG3("<%d> send to <%d>,tag=%d",rank,tree->child[i], system_tag+tree->child[i]); retval = smpi_create_request(buf, count, datatype, rank, tree->child[i], system_tag + tree->child[i], comm, &(requests[i])); -#ifdef DEBUG_STEPH - printf("[%d] after create req[%d]=%p req->(src=%d,dst=%d)\n",rank , i, requests[i],requests[i]->src,requests[i]->dst ); -#endif if (MPI_SUCCESS != retval) { printf("** internal error: smpi_create_request() rank=%d returned retval=%d, %s:%d\n", rank,retval,__FILE__,__LINE__); @@ -237,7 +229,7 @@ int tree_antibcast( void *buf, int count, MPI_Datatype datatype, int root, system_tag + rank, comm, &request); if (MPI_SUCCESS != retval) { - printf("** internal error: smpi_create_request() rank=%d returned retval=%d, %s:%d\n", + ERROR4("** internal error: smpi_create_request() rank=%d returned retval=%d, %s:%d\n", rank,retval,__FILE__,__LINE__); } smpi_mpi_isend(request); @@ -279,6 +271,7 @@ int rank; int retval; rank = smpi_mpi_comm_rank( comm ); + DEBUG2("<%d> entered nary_tree_bcast(), arity=%d",rank,arity); // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI)) proc_tree_t tree = alloc_tree( arity ); build_tree( rank, comm->size, &tree ); @@ -293,7 +286,6 @@ int retval; /** * Barrier **/ - int nary_tree_barrier( MPI_Comm comm , int arity) { int rank; @@ -321,7 +313,480 @@ int nary_tree_barrier( MPI_Comm comm , int arity) } +/** + * Alltoall pairwise + * + * this algorithm performs size steps (1<=s<=size) and + * at each step s, a process p sends iand receive to.from a unique distinct remote process + * size=5 : s=1: 4->0->1, 0->1->2, 1->2->3, ... + * s=2: 3->0->2, 4->1->3, 0->2->4, 1->3->0 , 2->4->1 + * .... + * Openmpi calls this routine when the message size sent to each rank is greater than 3000 bytes + **/ + +int smpi_coll_tuned_alltoall_pairwise (void *sendbuf, int sendcount, MPI_Datatype datatype, + void* recvbuf, int recvcount, MPI_Datatype recvdatatype, MPI_Comm comm) +{ + int retval = MPI_SUCCESS; + int rank; + int size = comm->size; + int step; + int sendto, recvfrom; + int tag_alltoall=999; + void * tmpsend, *tmprecv; + + rank = smpi_mpi_comm_rank(comm); + INFO1("<%d> algorithm alltoall_pairwise() called.\n",rank); + + + /* Perform pairwise exchange - starting from 1 so the local copy is last */ + for (step = 1; step < size+1; step++) { + + /* who do we talk to in this step? */ + sendto = (rank+step)%size; + recvfrom = (rank+size-step)%size; + + /* where from are we sending and where from are we receiving actual data ? */ + tmpsend = (char*)sendbuf+sendto*datatype->size*sendcount; + tmprecv = (char*)recvbuf+recvfrom*recvdatatype->size*recvcount; + + /* send and receive */ + /* in OpenMPI, they use : + err = ompi_coll_tuned_sendrecv( tmpsend, scount, sdtype, sendto, MCA_COLL_BASE_TAG_ALLTOALL, + tmprecv, rcount, rdtype, recvfrom, MCA_COLL_BASE_TAG_ALLTOALL, + comm, MPI_STATUS_IGNORE, rank); + */ + retval = MPI_Sendrecv(tmpsend, sendcount, datatype, sendto, tag_alltoall, + tmprecv, recvcount, recvdatatype, recvfrom, tag_alltoall, + comm, MPI_STATUS_IGNORE); + } + return(retval); +} + +/** + * helper: copy a datatype into another (in the simple case dt1=dt2) +**/ +int copy_dt( void *sbuf, int scount, const MPI_Datatype sdtype, void *rbuf, int rcount, const MPI_Datatype rdtype); +int copy_dt( void *sbuf, int scount, const MPI_Datatype sdtype, + void *rbuf, int rcount, const MPI_Datatype rdtype) +{ + /* First check if we really have something to do */ + if (0 == rcount) { + return ((0 == scount) ? MPI_SUCCESS : MPI_ERR_TRUNCATE); + } + /* If same datatypes used, just copy. */ + if (sdtype == rdtype) { + int count = ( scount < rcount ? scount : rcount ); + memcpy( rbuf, sbuf, sdtype->size*count); + return ((scount > rcount) ? MPI_ERR_TRUNCATE : MPI_SUCCESS); + } + /* FIXME: cases + * - If receive packed. + * - If send packed + * to be treated once we have the MPI_Pack things ... + **/ + return( MPI_SUCCESS ); +} + +/** + * Alltoall basic_linear + **/ +int smpi_coll_tuned_alltoall_basic_linear(void *sbuf, int scount, MPI_Datatype sdtype, + void* rbuf, int rcount, MPI_Datatype rdtype, MPI_Comm comm) +{ + int i; + int system_alltoall_tag = 888; + int rank; + int size = comm->size; + int err; + char *psnd; + char *prcv; + int nreq = 0; + MPI_Aint lb; + MPI_Aint sndinc; + MPI_Aint rcvinc; + MPI_Request *reqs; + + /* Initialize. */ + rank = smpi_mpi_comm_rank(comm); + DEBUG1("<%d> algorithm alltoall_basic_linear() called.",rank); + + err = smpi_mpi_type_get_extent(sdtype, &lb, &sndinc); + err = smpi_mpi_type_get_extent(rdtype, &lb, &rcvinc); + sndinc *= scount; + rcvinc *= rcount; + /* simple optimization */ + psnd = ((char *) sbuf) + (rank * sndinc); + prcv = ((char *) rbuf) + (rank * rcvinc); + + err = copy_dt( psnd, scount, sdtype, prcv, rcount, rdtype ); + if (MPI_SUCCESS != err) { + return err; + } + + /* If only one process, we're done. */ + if (1 == size) { + return MPI_SUCCESS; + } + + /* Initiate all send/recv to/from others. */ + reqs = xbt_malloc(2*(size-1) * sizeof(smpi_mpi_request_t)); + + prcv = (char *) rbuf; + psnd = (char *) sbuf; + + /* Post all receives first -- a simple optimization */ + for (i = (rank + 1) % size; i != rank; i = (i + 1) % size) { + err = smpi_create_request( prcv + (i * rcvinc), rcount, rdtype, + i, rank, + system_alltoall_tag, + comm, &(reqs[nreq])); + if (MPI_SUCCESS != err) { + DEBUG2("<%d> failed to create request for rank %d",rank,i); + for (i=0;i< nreq;i++) + xbt_mallocator_release(smpi_global->request_mallocator, reqs[i]); + return err; + } + nreq++; + } + /* Now post all sends in reverse order + * - We would like to minimize the search time through message queue + * when messages actually arrive in the order in which they were posted. + * */ + for (i = (rank + size - 1) % size; i != rank; i = (i + size - 1) % size ) { + err = smpi_create_request (psnd + (i * sndinc), scount, sdtype, + rank, i, + system_alltoall_tag, + comm, &(reqs[nreq])); + if (MPI_SUCCESS != err) { + DEBUG2("<%d> failed to create request for rank %d\n",rank,i); + for (i=0;i< nreq;i++) + xbt_mallocator_release(smpi_global->request_mallocator, reqs[i]); + return err; + } + nreq++; + } + + /* Start your engines. This will never return an error. */ + for ( i=0; i< nreq/2; i++ ) { + DEBUG3("<%d> issued irecv request reqs[%d]=%p",rank,i,reqs[i]); + smpi_mpi_irecv( reqs[i] ); + } + for ( i= nreq/2; i issued isend request reqs[%d]=%p",rank,i,reqs[i]); + smpi_mpi_isend( reqs[i] ); + } + + + /* Wait for them all. If there's an error, note that we don't + * care what the error was -- just that there *was* an error. The + * PML will finish all requests, even if one or more of them fail. + * i.e., by the end of this call, all the requests are free-able. + * So free them anyway -- even if there was an error, and return + * the error after we free everything. */ + + DEBUG2("<%d> wait for %d requests",rank,nreq); + // waitall is buggy: use a loop instead for the moment + // err = smpi_mpi_waitall(nreq, reqs, MPI_STATUS_IGNORE); + for (i=0;irequest_mallocator, reqs[i]); + } + xbt_free( reqs ); + return err; +} + + +/** + * Alltoall Bruck + * + * Openmpi calls this routine when the message size sent to each rank < 2000 bytes and size < 12 + **/ + + +int smpi_coll_tuned_alltoall_bruck(void *sendbuf, int sendcount, MPI_Datatype sdtype, + void* recvbuf, int recvcount, MPI_Datatype rdtype, MPI_Comm comm) +{ +/* int size = comm->size; + int i, k, line = -1; + int sendto, recvfrom, distance, *displs=NULL, *blen=NULL; + int maxpacksize, packsize, position; + char * tmpbuf=NULL, *packbuf=NULL; + ptrdiff_t lb, sext, rext; + int err = 0; + int weallocated = 0; + MPI_Datatype iddt; + + rank = smpi_mpi_comm_rank(comm); +*/ + INFO0("coll:tuned:alltoall_intra_bruck ** NOT IMPLEMENTED YET**"); +/* + displs = xbt_malloc(size*sizeof(int)); + blen = xbt_malloc(size*sizeof(int)); + + weallocated = 1; +*/ + /* Prepare for packing data */ +/* + err = MPI_Pack_size( scount*size, sdtype, comm, &maxpacksize ); + if (err != MPI_SUCCESS) { } +*/ + /* pack buffer allocation */ +/* packbuf = (char*) malloc((unsigned) maxpacksize); + if (packbuf == NULL) { } +*/ + /* tmp buffer allocation for message data */ +/* tmpbuf = xbt_malloc(scount*size*sext); + if (tmpbuf == NULL) { } +*/ + + /* Step 1 - local rotation - shift up by rank */ +/* err = ompi_ddt_copy_content_same_ddt (sdtype, (int32_t) ((size-rank)*scount), + tmpbuf, ((char*)sbuf)+rank*scount*sext); + if (err<0) { + line = __LINE__; err = -1; goto err_hndl; + } + + if (rank != 0) { + err = ompi_ddt_copy_content_same_ddt (sdtype, (int32_t) (rank*scount), + tmpbuf+(size-rank)*scount*sext, (char*)sbuf); + if (err<0) { + line = __LINE__; err = -1; goto err_hndl; + } + } +*/ + /* perform communication step */ +/* for (distance = 1; distance < size; distance<<=1) { +*/ + /* send data to "sendto" */ +/* sendto = (rank+distance)%size; + recvfrom = (rank-distance+size)%size; + packsize = 0; + k = 0; +*/ + /* create indexed datatype */ +// for (i = 1; i < size; i++) { +// if ((i&distance) == distance) { +// displs[k] = i*scount; blen[k] = scount; +// k++; +// } +// } + /* Set indexes and displacements */ +// err = MPI_Type_indexed(k, blen, displs, sdtype, &iddt); +// if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; } +// /* Commit the new datatype */ +/// err = MPI_Type_commit(&iddt); +// if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; } + + /* have the new distribution ddt, pack and exchange data */ +// err = MPI_Pack(tmpbuf, 1, iddt, packbuf, maxpacksize, &packsize, comm); +// if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; } + + /* Sendreceive */ +// err = ompi_coll_tuned_sendrecv ( packbuf, packsize, MPI_PACKED, sendto, +// MCA_COLL_BASE_TAG_ALLTOALL, +// rbuf, packsize, MPI_PACKED, recvfrom, +// MCA_COLL_BASE_TAG_ALLTOALL, +// comm, MPI_STATUS_IGNORE, rank); +// if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; } + + /* Unpack data from rbuf to tmpbuf */ +// position = 0; +// err = MPI_Unpack(rbuf, packsize, &position, +// tmpbuf, 1, iddt, comm); +// if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; } + + /* free ddt */ +// err = MPI_Type_free(&iddt); +// if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; } +// } /* end of for (distance = 1... */ + + /* Step 3 - local rotation - */ +// for (i = 0; i < size; i++) { + +// err = ompi_ddt_copy_content_same_ddt (rdtype, (int32_t) rcount, +// ((char*)rbuf)+(((rank-i+size)%size)*rcount*rext), +// tmpbuf+i*rcount*rext); +// +// if (err<0) { +// line = __LINE__; err = -1; goto err_hndl; +// } +// } + + /* Step 4 - clean up */ +/* if (tmpbuf != NULL) free(tmpbuf); + if (packbuf != NULL) free(packbuf); + if (weallocated) { + if (displs != NULL) free(displs); + if (blen != NULL) free(blen); + } + return OMPI_SUCCESS; + +err_hndl: + OPAL_OUTPUT((ompi_coll_tuned_stream,"%s:%4d\tError occurred %d, rank %2d", __FILE__,line,err,rank)); + if (tmpbuf != NULL) free(tmpbuf); + if (packbuf != NULL) free(packbuf); + if (weallocated) { + if (displs != NULL) free(displs); + if (blen != NULL) free(blen); + } + return err; + */ + return -1; /* FIXME: to be changed*/ +} + +static void print_buffer_int(void *buf, int len, char *msg, int rank); +static void print_buffer_int(void *buf, int len, char *msg, int rank) +{ + int tmp, *v; + fprintf(stderr,"**<%d> %s (#%d): ", rank, msg,len); + for (tmp = 0; tmp < len; tmp++) { + v = buf; + fprintf(stderr,"[%d (%p)]", v[tmp],v+tmp); + } + fprintf(stderr,"\n"); + free(msg); +} + + + +/** + * alltoallv basic + **/ + +int smpi_coll_basic_alltoallv(void *sbuf, int *scounts, int *sdisps, MPI_Datatype sdtype, + void *rbuf, int *rcounts, int *rdisps, MPI_Datatype rdtype, + MPI_Comm comm) { + + int i; + int system_alltoallv_tag = 889; + int rank; + int size = comm->size; + int err; + char *psnd; + char *prcv; + //int nreq = 0; + int rreq = 0; + int sreq = 0; + MPI_Aint lb; + MPI_Aint sndextent; + MPI_Aint rcvextent; + MPI_Request *reqs; + + /* Initialize. */ + rank = smpi_mpi_comm_rank(comm); + DEBUG1("<%d> algorithm basic_alltoallv() called.",rank); + + err = smpi_mpi_type_get_extent(sdtype, &lb, &sndextent); + err = smpi_mpi_type_get_extent(rdtype, &lb, &rcvextent); + + psnd = (char *)sbuf; + //print_buffer_int(psnd,size*size,xbt_strdup("sbuff"),rank); + + /* copy the local sbuf to rbuf when it's me */ + psnd = ((char *) sbuf) + (sdisps[rank] * sndextent); + prcv = ((char *) rbuf) + (rdisps[rank] * rcvextent); + + if (0 != scounts[rank]) { + err = copy_dt( psnd, scounts[rank], sdtype, prcv, rcounts[rank], rdtype ); + if (MPI_SUCCESS != err) { + return err; + } + } + + /* If only one process, we're done. */ + if (1 == size) { + return MPI_SUCCESS; + } + + /* Initiate all send/recv to/from others. */ + reqs = xbt_malloc(2*(size-1) * sizeof(smpi_mpi_request_t)); + + + /* Create all receives that will be posted first */ + for (i = 0; i < size; ++i) { + if (i == rank || 0 == rcounts[i]) { + DEBUG3("<%d> skip req creation i=%d,rcounts[i]=%d",rank,i, rcounts[i]); + continue; + } + prcv = ((char *) rbuf) + (rdisps[i] * rcvextent); + + err = smpi_create_request( prcv, rcounts[i], rdtype, + i, rank, + system_alltoallv_tag, + comm, &(reqs[rreq])); + if (MPI_SUCCESS != err) { + DEBUG2("<%d> failed to create request for rank %d",rank,i); + for (i=0;i< rreq;i++) + xbt_mallocator_release(smpi_global->request_mallocator, reqs[i]); + return err; + } + rreq++; + } + DEBUG2("<%d> %d irecv reqs created",rank,rreq); + /* Now create all sends */ + for (i = 0; i < size; ++i) { + if (i == rank || 0 == scounts[i]) { + DEBUG3("<%d> skip req creation i=%d,scounts[i]=%d",rank,i, scounts[i]); + continue; + } + psnd = ((char *) sbuf) + (sdisps[i] * sndextent); + + //fprintf(stderr,"<%d> send %d elems to <%d>\n",rank,scounts[i],i); + //print_buffer_int(psnd,scounts[i],xbt_strdup("sbuff part"),rank); + err = smpi_create_request (psnd, scounts[i], sdtype, + rank, i, + system_alltoallv_tag, + comm, &(reqs[rreq+sreq])); + if (MPI_SUCCESS != err) { + DEBUG2("<%d> failed to create request for rank %d\n",rank,i); + for (i=0;i< rreq+sreq;i++) + xbt_mallocator_release(smpi_global->request_mallocator, reqs[i]); + return err; + } + sreq++; + } + DEBUG2("<%d> %d isend reqs created",rank,sreq); + + /* Start your engines. This will never return an error. */ + for ( i=0; i< rreq; i++ ) { + DEBUG3("<%d> issued irecv request reqs[%d]=%p",rank,i,reqs[i]); + smpi_mpi_irecv( reqs[i] ); + } + DEBUG3("<%d> for (i=%d;i<%d)",rank,rreq,sreq); + for ( i=rreq; i< rreq+sreq; i++ ) { + DEBUG3("<%d> issued isend request reqs[%d]=%p",rank,i,reqs[i]); + smpi_mpi_isend( reqs[i] ); + } + + /* Wait for them all. If there's an error, note that we don't + * care what the error was -- just that there *was* an error. The + * PML will finish all requests, even if one or more of them fail. + * i.e., by the end of this call, all the requests are free-able. + * So free them anyway -- even if there was an error, and return + * the error after we free everything. */ + + DEBUG2("<%d> wait for %d requests",rank,rreq+sreq); + // waitall is buggy: use a loop instead for the moment + // err = smpi_mpi_waitall(nreq, reqs, MPI_STATUS_IGNORE); + for (i=0;i< rreq+sreq;i++) { + err = smpi_mpi_wait( reqs[i], MPI_STATUS_IGNORE); + } + + /* Free the reqs */ + /* nreq might be < 2*(size-1) since some request creations are skipped */ + for (i=0;i< rreq+sreq;i++) { + xbt_mallocator_release(smpi_global->request_mallocator, reqs[i]); + } + xbt_free( reqs ); + return err; +}