#include <stdio.h>
#include <stdlib.h>
#include <string.h>
+#include <assert.h>
#include "private.h"
#include "smpi_coll_private.h"
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_coll, smpi,
+ "Logging specific to SMPI (coll)");
/* proc_tree taken and translated from P2P-MPI */
/* wait for data from my parent in the tree */
if (!tree->isRoot) {
-#ifdef DEBUG_STEPH
- printf("[%d] recv(%d from %d, tag=%d)\n",rank,rank, tree->parent, system_tag+rank);
-#endif
+ DEBUG3("<%d> tree_bcast(): i am not root: recv from %d, tag=%d)",rank,tree->parent,system_tag+rank);
retval = smpi_create_request(buf, count, datatype,
tree->parent, rank,
system_tag + rank,
rank,retval,__FILE__,__LINE__);
}
smpi_mpi_irecv(request);
-#ifdef DEBUG_STEPH
- printf("[%d] waiting on irecv from %d\n",rank , tree->parent);
-#endif
+ DEBUG2("<%d> tree_bcast(): waiting on irecv from %d",rank, tree->parent);
smpi_mpi_wait(request, MPI_STATUS_IGNORE);
xbt_mallocator_release(smpi_global->request_mallocator, request);
}
requests = xbt_malloc( tree->numChildren * sizeof(smpi_mpi_request_t));
-#ifdef DEBUG_STEPH
- printf("[%d] creates %d requests\n",rank,tree->numChildren);
-#endif
+ DEBUG2("<%d> creates %d requests (1 per child)\n",rank,tree->numChildren);
/* iniates sends to ranks lower in the tree */
for (i=0; i < tree->numChildren; i++) {
if (tree->child[i] != -1) {
-#ifdef DEBUG_STEPH
- printf("[%d] send(%d->%d, tag=%d)\n",rank,rank, tree->child[i], system_tag+tree->child[i]);
-#endif
+ DEBUG3("<%d> send to <%d>,tag=%d",rank,tree->child[i], system_tag+tree->child[i]);
retval = smpi_create_request(buf, count, datatype,
rank, tree->child[i],
system_tag + tree->child[i],
comm, &(requests[i]));
-#ifdef DEBUG_STEPH
- printf("[%d] after create req[%d]=%p req->(src=%d,dst=%d)\n",rank , i, requests[i],requests[i]->src,requests[i]->dst );
-#endif
if (MPI_SUCCESS != retval) {
printf("** internal error: smpi_create_request() rank=%d returned retval=%d, %s:%d\n",
rank,retval,__FILE__,__LINE__);
system_tag + rank,
comm, &request);
if (MPI_SUCCESS != retval) {
- printf("** internal error: smpi_create_request() rank=%d returned retval=%d, %s:%d\n",
+ ERROR4("** internal error: smpi_create_request() rank=%d returned retval=%d, %s:%d\n",
rank,retval,__FILE__,__LINE__);
}
smpi_mpi_isend(request);
int retval;
rank = smpi_mpi_comm_rank( comm );
+ DEBUG2("<%d> entered nary_tree_bcast(), arity=%d",rank,arity);
// arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
proc_tree_t tree = alloc_tree( arity );
build_tree( rank, comm->size, &tree );
/**
* Barrier
**/
-
int nary_tree_barrier( MPI_Comm comm , int arity)
{
int rank;
}
+/**
+ * Alltoall pairwise
+ *
+ * this algorithm performs size steps (1<=s<=size) and
+ * at each step s, a process p sends iand receive to.from a unique distinct remote process
+ * size=5 : s=1: 4->0->1, 0->1->2, 1->2->3, ...
+ * s=2: 3->0->2, 4->1->3, 0->2->4, 1->3->0 , 2->4->1
+ * ....
+ * Openmpi calls this routine when the message size sent to each rank is greater than 3000 bytes
+ **/
+
+int smpi_coll_tuned_alltoall_pairwise (void *sendbuf, int sendcount, MPI_Datatype datatype,
+ void* recvbuf, int recvcount, MPI_Datatype recvdatatype, MPI_Comm comm)
+{
+ int retval = MPI_SUCCESS;
+ int rank;
+ int size = comm->size;
+ int step;
+ int sendto, recvfrom;
+ int tag_alltoall=999;
+ void * tmpsend, *tmprecv;
+
+ rank = smpi_mpi_comm_rank(comm);
+ INFO1("<%d> algorithm alltoall_pairwise() called.\n",rank);
+
+
+ /* Perform pairwise exchange - starting from 1 so the local copy is last */
+ for (step = 1; step < size+1; step++) {
+
+ /* who do we talk to in this step? */
+ sendto = (rank+step)%size;
+ recvfrom = (rank+size-step)%size;
+
+ /* where from are we sending and where from are we receiving actual data ? */
+ tmpsend = (char*)sendbuf+sendto*datatype->size*sendcount;
+ tmprecv = (char*)recvbuf+recvfrom*recvdatatype->size*recvcount;
+
+ /* send and receive */
+ /* in OpenMPI, they use :
+ err = ompi_coll_tuned_sendrecv( tmpsend, scount, sdtype, sendto, MCA_COLL_BASE_TAG_ALLTOALL,
+ tmprecv, rcount, rdtype, recvfrom, MCA_COLL_BASE_TAG_ALLTOALL,
+ comm, MPI_STATUS_IGNORE, rank);
+ */
+ retval = MPI_Sendrecv(tmpsend, sendcount, datatype, sendto, tag_alltoall,
+ tmprecv, recvcount, recvdatatype, recvfrom, tag_alltoall,
+ comm, MPI_STATUS_IGNORE);
+ }
+ return(retval);
+}
+
+/**
+ * helper: copy a datatype into another (in the simple case dt1=dt2)
+**/
+int copy_dt( void *sbuf, int scount, const MPI_Datatype sdtype, void *rbuf, int rcount, const MPI_Datatype rdtype);
+int copy_dt( void *sbuf, int scount, const MPI_Datatype sdtype,
+ void *rbuf, int rcount, const MPI_Datatype rdtype)
+{
+ /* First check if we really have something to do */
+ if (0 == rcount) {
+ return ((0 == scount) ? MPI_SUCCESS : MPI_ERR_TRUNCATE);
+ }
+ /* If same datatypes used, just copy. */
+ if (sdtype == rdtype) {
+ int count = ( scount < rcount ? scount : rcount );
+ memcpy( rbuf, sbuf, sdtype->size*count);
+ return ((scount > rcount) ? MPI_ERR_TRUNCATE : MPI_SUCCESS);
+ }
+ /* FIXME: cases
+ * - If receive packed.
+ * - If send packed
+ * to be treated once we have the MPI_Pack things ...
+ **/
+ return( MPI_SUCCESS );
+}
+
+/**
+ * Alltoall basic_linear
+ **/
+int smpi_coll_tuned_alltoall_basic_linear(void *sbuf, int scount, MPI_Datatype sdtype,
+ void* rbuf, int rcount, MPI_Datatype rdtype, MPI_Comm comm)
+{
+ int i;
+ int system_alltoall_tag = 888;
+ int rank;
+ int size = comm->size;
+ int err;
+ char *psnd;
+ char *prcv;
+ int nreq = 0;
+ MPI_Aint lb;
+ MPI_Aint sndinc;
+ MPI_Aint rcvinc;
+ MPI_Request *reqs;
+
+ /* Initialize. */
+ rank = smpi_mpi_comm_rank(comm);
+ DEBUG1("<%d> algorithm alltoall_basic_linear() called.",rank);
+
+ err = smpi_mpi_type_get_extent(sdtype, &lb, &sndinc);
+ err = smpi_mpi_type_get_extent(rdtype, &lb, &rcvinc);
+ sndinc *= scount;
+ rcvinc *= rcount;
+ /* simple optimization */
+ psnd = ((char *) sbuf) + (rank * sndinc);
+ prcv = ((char *) rbuf) + (rank * rcvinc);
+
+ err = copy_dt( psnd, scount, sdtype, prcv, rcount, rdtype );
+ if (MPI_SUCCESS != err) {
+ return err;
+ }
+
+ /* If only one process, we're done. */
+ if (1 == size) {
+ return MPI_SUCCESS;
+ }
+
+ /* Initiate all send/recv to/from others. */
+ reqs = xbt_malloc(2*(size-1) * sizeof(smpi_mpi_request_t));
+
+ prcv = (char *) rbuf;
+ psnd = (char *) sbuf;
+
+ /* Post all receives first -- a simple optimization */
+ for (i = (rank + 1) % size; i != rank; i = (i + 1) % size) {
+ err = smpi_create_request( prcv + (i * rcvinc), rcount, rdtype,
+ i, rank,
+ system_alltoall_tag,
+ comm, &(reqs[nreq]));
+ if (MPI_SUCCESS != err) {
+ DEBUG2("<%d> failed to create request for rank %d",rank,i);
+ for (i=0;i< nreq;i++)
+ xbt_mallocator_release(smpi_global->request_mallocator, reqs[i]);
+ return err;
+ }
+ nreq++;
+ }
+ /* Now post all sends in reverse order
+ * - We would like to minimize the search time through message queue
+ * when messages actually arrive in the order in which they were posted.
+ * */
+ for (i = (rank + size - 1) % size; i != rank; i = (i + size - 1) % size ) {
+ err = smpi_create_request (psnd + (i * sndinc), scount, sdtype,
+ rank, i,
+ system_alltoall_tag,
+ comm, &(reqs[nreq]));
+ if (MPI_SUCCESS != err) {
+ DEBUG2("<%d> failed to create request for rank %d\n",rank,i);
+ for (i=0;i< nreq;i++)
+ xbt_mallocator_release(smpi_global->request_mallocator, reqs[i]);
+ return err;
+ }
+ nreq++;
+ }
+
+ /* Start your engines. This will never return an error. */
+ for ( i=0; i< nreq/2; i++ ) {
+ DEBUG3("<%d> issued irecv request reqs[%d]=%p",rank,i,reqs[i]);
+ smpi_mpi_irecv( reqs[i] );
+ }
+ for ( i= nreq/2; i<nreq; i++ ) {
+ DEBUG3("<%d> issued isend request reqs[%d]=%p",rank,i,reqs[i]);
+ smpi_mpi_isend( reqs[i] );
+ }
+
+
+ /* Wait for them all. If there's an error, note that we don't
+ * care what the error was -- just that there *was* an error. The
+ * PML will finish all requests, even if one or more of them fail.
+ * i.e., by the end of this call, all the requests are free-able.
+ * So free them anyway -- even if there was an error, and return
+ * the error after we free everything. */
+
+ DEBUG2("<%d> wait for %d requests",rank,nreq);
+ // waitall is buggy: use a loop instead for the moment
+ // err = smpi_mpi_waitall(nreq, reqs, MPI_STATUS_IGNORE);
+ for (i=0;i<nreq;i++) {
+ err = smpi_mpi_wait( reqs[i], MPI_STATUS_IGNORE);
+ }
+
+ /* Free the reqs */
+ assert( nreq == 2*(size-1) );
+ for (i=0;i< nreq;i++) {
+ xbt_mallocator_release(smpi_global->request_mallocator, reqs[i]);
+ }
+ xbt_free( reqs );
+ return err;
+}
+
+
+/**
+ * Alltoall Bruck
+ *
+ * Openmpi calls this routine when the message size sent to each rank < 2000 bytes and size < 12
+ **/
+
+
+int smpi_coll_tuned_alltoall_bruck(void *sendbuf, int sendcount, MPI_Datatype sdtype,
+ void* recvbuf, int recvcount, MPI_Datatype rdtype, MPI_Comm comm)
+{
+/* int size = comm->size;
+ int i, k, line = -1;
+ int sendto, recvfrom, distance, *displs=NULL, *blen=NULL;
+ int maxpacksize, packsize, position;
+ char * tmpbuf=NULL, *packbuf=NULL;
+ ptrdiff_t lb, sext, rext;
+ int err = 0;
+ int weallocated = 0;
+ MPI_Datatype iddt;
+
+ rank = smpi_mpi_comm_rank(comm);
+*/
+ INFO0("coll:tuned:alltoall_intra_bruck ** NOT IMPLEMENTED YET**");
+/*
+ displs = xbt_malloc(size*sizeof(int));
+ blen = xbt_malloc(size*sizeof(int));
+
+ weallocated = 1;
+*/
+ /* Prepare for packing data */
+/*
+ err = MPI_Pack_size( scount*size, sdtype, comm, &maxpacksize );
+ if (err != MPI_SUCCESS) { }
+*/
+ /* pack buffer allocation */
+/* packbuf = (char*) malloc((unsigned) maxpacksize);
+ if (packbuf == NULL) { }
+*/
+ /* tmp buffer allocation for message data */
+/* tmpbuf = xbt_malloc(scount*size*sext);
+ if (tmpbuf == NULL) { }
+*/
+
+ /* Step 1 - local rotation - shift up by rank */
+/* err = ompi_ddt_copy_content_same_ddt (sdtype, (int32_t) ((size-rank)*scount),
+ tmpbuf, ((char*)sbuf)+rank*scount*sext);
+ if (err<0) {
+ line = __LINE__; err = -1; goto err_hndl;
+ }
+
+ if (rank != 0) {
+ err = ompi_ddt_copy_content_same_ddt (sdtype, (int32_t) (rank*scount),
+ tmpbuf+(size-rank)*scount*sext, (char*)sbuf);
+ if (err<0) {
+ line = __LINE__; err = -1; goto err_hndl;
+ }
+ }
+*/
+ /* perform communication step */
+/* for (distance = 1; distance < size; distance<<=1) {
+*/
+ /* send data to "sendto" */
+/* sendto = (rank+distance)%size;
+ recvfrom = (rank-distance+size)%size;
+ packsize = 0;
+ k = 0;
+*/
+ /* create indexed datatype */
+// for (i = 1; i < size; i++) {
+// if ((i&distance) == distance) {
+// displs[k] = i*scount; blen[k] = scount;
+// k++;
+// }
+// }
+ /* Set indexes and displacements */
+// err = MPI_Type_indexed(k, blen, displs, sdtype, &iddt);
+// if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
+// /* Commit the new datatype */
+/// err = MPI_Type_commit(&iddt);
+// if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
+
+ /* have the new distribution ddt, pack and exchange data */
+// err = MPI_Pack(tmpbuf, 1, iddt, packbuf, maxpacksize, &packsize, comm);
+// if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
+
+ /* Sendreceive */
+// err = ompi_coll_tuned_sendrecv ( packbuf, packsize, MPI_PACKED, sendto,
+// MCA_COLL_BASE_TAG_ALLTOALL,
+// rbuf, packsize, MPI_PACKED, recvfrom,
+// MCA_COLL_BASE_TAG_ALLTOALL,
+// comm, MPI_STATUS_IGNORE, rank);
+// if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
+
+ /* Unpack data from rbuf to tmpbuf */
+// position = 0;
+// err = MPI_Unpack(rbuf, packsize, &position,
+// tmpbuf, 1, iddt, comm);
+// if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
+
+ /* free ddt */
+// err = MPI_Type_free(&iddt);
+// if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
+// } /* end of for (distance = 1... */
+
+ /* Step 3 - local rotation - */
+// for (i = 0; i < size; i++) {
+
+// err = ompi_ddt_copy_content_same_ddt (rdtype, (int32_t) rcount,
+// ((char*)rbuf)+(((rank-i+size)%size)*rcount*rext),
+// tmpbuf+i*rcount*rext);
+//
+// if (err<0) {
+// line = __LINE__; err = -1; goto err_hndl;
+// }
+// }
+
+ /* Step 4 - clean up */
+/* if (tmpbuf != NULL) free(tmpbuf);
+ if (packbuf != NULL) free(packbuf);
+ if (weallocated) {
+ if (displs != NULL) free(displs);
+ if (blen != NULL) free(blen);
+ }
+ return OMPI_SUCCESS;
+
+err_hndl:
+ OPAL_OUTPUT((ompi_coll_tuned_stream,"%s:%4d\tError occurred %d, rank %2d", __FILE__,line,err,rank));
+ if (tmpbuf != NULL) free(tmpbuf);
+ if (packbuf != NULL) free(packbuf);
+ if (weallocated) {
+ if (displs != NULL) free(displs);
+ if (blen != NULL) free(blen);
+ }
+ return err;
+ */
+ return -1; /* FIXME: to be changed*/
+}
+
+static void print_buffer_int(void *buf, int len, char *msg, int rank);
+static void print_buffer_int(void *buf, int len, char *msg, int rank)
+{
+ int tmp, *v;
+ fprintf(stderr,"**<%d> %s (#%d): ", rank, msg,len);
+ for (tmp = 0; tmp < len; tmp++) {
+ v = buf;
+ fprintf(stderr,"[%d (%p)]", v[tmp],v+tmp);
+ }
+ fprintf(stderr,"\n");
+ free(msg);
+}
+
+
+
+/**
+ * alltoallv basic
+ **/
+
+int smpi_coll_basic_alltoallv(void *sbuf, int *scounts, int *sdisps, MPI_Datatype sdtype,
+ void *rbuf, int *rcounts, int *rdisps, MPI_Datatype rdtype,
+ MPI_Comm comm) {
+
+ int i;
+ int system_alltoallv_tag = 889;
+ int rank;
+ int size = comm->size;
+ int err;
+ char *psnd;
+ char *prcv;
+ //int nreq = 0;
+ int rreq = 0;
+ int sreq = 0;
+ MPI_Aint lb;
+ MPI_Aint sndextent;
+ MPI_Aint rcvextent;
+ MPI_Request *reqs;
+
+ /* Initialize. */
+ rank = smpi_mpi_comm_rank(comm);
+ DEBUG1("<%d> algorithm basic_alltoallv() called.",rank);
+
+ err = smpi_mpi_type_get_extent(sdtype, &lb, &sndextent);
+ err = smpi_mpi_type_get_extent(rdtype, &lb, &rcvextent);
+
+ psnd = (char *)sbuf;
+ //print_buffer_int(psnd,size*size,xbt_strdup("sbuff"),rank);
+
+ /* copy the local sbuf to rbuf when it's me */
+ psnd = ((char *) sbuf) + (sdisps[rank] * sndextent);
+ prcv = ((char *) rbuf) + (rdisps[rank] * rcvextent);
+
+ if (0 != scounts[rank]) {
+ err = copy_dt( psnd, scounts[rank], sdtype, prcv, rcounts[rank], rdtype );
+ if (MPI_SUCCESS != err) {
+ return err;
+ }
+ }
+
+ /* If only one process, we're done. */
+ if (1 == size) {
+ return MPI_SUCCESS;
+ }
+
+ /* Initiate all send/recv to/from others. */
+ reqs = xbt_malloc(2*(size-1) * sizeof(smpi_mpi_request_t));
+
+
+ /* Create all receives that will be posted first */
+ for (i = 0; i < size; ++i) {
+ if (i == rank || 0 == rcounts[i]) {
+ DEBUG3("<%d> skip req creation i=%d,rcounts[i]=%d",rank,i, rcounts[i]);
+ continue;
+ }
+ prcv = ((char *) rbuf) + (rdisps[i] * rcvextent);
+
+ err = smpi_create_request( prcv, rcounts[i], rdtype,
+ i, rank,
+ system_alltoallv_tag,
+ comm, &(reqs[rreq]));
+ if (MPI_SUCCESS != err) {
+ DEBUG2("<%d> failed to create request for rank %d",rank,i);
+ for (i=0;i< rreq;i++)
+ xbt_mallocator_release(smpi_global->request_mallocator, reqs[i]);
+ return err;
+ }
+ rreq++;
+ }
+ DEBUG2("<%d> %d irecv reqs created",rank,rreq);
+ /* Now create all sends */
+ for (i = 0; i < size; ++i) {
+ if (i == rank || 0 == scounts[i]) {
+ DEBUG3("<%d> skip req creation i=%d,scounts[i]=%d",rank,i, scounts[i]);
+ continue;
+ }
+ psnd = ((char *) sbuf) + (sdisps[i] * sndextent);
+
+ //fprintf(stderr,"<%d> send %d elems to <%d>\n",rank,scounts[i],i);
+ //print_buffer_int(psnd,scounts[i],xbt_strdup("sbuff part"),rank);
+ err = smpi_create_request (psnd, scounts[i], sdtype,
+ rank, i,
+ system_alltoallv_tag,
+ comm, &(reqs[rreq+sreq]));
+ if (MPI_SUCCESS != err) {
+ DEBUG2("<%d> failed to create request for rank %d\n",rank,i);
+ for (i=0;i< rreq+sreq;i++)
+ xbt_mallocator_release(smpi_global->request_mallocator, reqs[i]);
+ return err;
+ }
+ sreq++;
+ }
+ DEBUG2("<%d> %d isend reqs created",rank,sreq);
+
+ /* Start your engines. This will never return an error. */
+ for ( i=0; i< rreq; i++ ) {
+ DEBUG3("<%d> issued irecv request reqs[%d]=%p",rank,i,reqs[i]);
+ smpi_mpi_irecv( reqs[i] );
+ }
+ DEBUG3("<%d> for (i=%d;i<%d)",rank,rreq,sreq);
+ for ( i=rreq; i< rreq+sreq; i++ ) {
+ DEBUG3("<%d> issued isend request reqs[%d]=%p",rank,i,reqs[i]);
+ smpi_mpi_isend( reqs[i] );
+ }
+
+ /* Wait for them all. If there's an error, note that we don't
+ * care what the error was -- just that there *was* an error. The
+ * PML will finish all requests, even if one or more of them fail.
+ * i.e., by the end of this call, all the requests are free-able.
+ * So free them anyway -- even if there was an error, and return
+ * the error after we free everything. */
+
+ DEBUG2("<%d> wait for %d requests",rank,rreq+sreq);
+ // waitall is buggy: use a loop instead for the moment
+ // err = smpi_mpi_waitall(nreq, reqs, MPI_STATUS_IGNORE);
+ for (i=0;i< rreq+sreq;i++) {
+ err = smpi_mpi_wait( reqs[i], MPI_STATUS_IGNORE);
+ }
+
+ /* Free the reqs */
+ /* nreq might be < 2*(size-1) since some request creations are skipped */
+ for (i=0;i< rreq+sreq;i++) {
+ xbt_mallocator_release(smpi_global->request_mallocator, reqs[i]);
+ }
+ xbt_free( reqs );
+ return err;
+}