3 /* smpi_coll.c -- various optimized routing for collectives */
5 /* Copyright (c) 2009 Stephane Genaud. */
6 /* All rights reserved. */
8 /* This program is free software; you can redistribute it and/or modify it
9 * * under the terms of the license (GNU LGPL) which comes with this package. */
17 #include "smpi_coll_private.h"
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_coll, smpi,
20 "Logging specific to SMPI (coll)");
22 /* proc_tree taken and translated from P2P-MPI */
33 typedef struct proc_tree * proc_tree_t;
38 void build_tree( int index, int extent, proc_tree_t *tree);
39 proc_tree_t alloc_tree( int arity );
40 void free_tree( proc_tree_t tree);
41 void print_tree(proc_tree_t tree);
49 proc_tree_t alloc_tree( int arity ) {
50 proc_tree_t tree = malloc(1*sizeof(struct proc_tree));
53 tree->PROCTREE_A = arity;
55 tree->numChildren = 0;
56 tree->child = malloc(arity*sizeof(int));
57 for (i=0; i < arity; i++) {
68 void free_tree( proc_tree_t tree) {
76 * Build the tree depending on a process rank (index) and the group size (extent)
77 * @param index the rank of the calling process
78 * @param extent the total number of processes
80 void build_tree( int index, int extent, proc_tree_t *tree) {
81 int places = (*tree)->PROCTREE_A * index;
89 for (i = 1; i <= (*tree)->PROCTREE_A; i++) {
91 ch = (*tree)->PROCTREE_A * index + i + (*tree)->root;
92 //printf("places %d\n",places);
94 if (places < extent) {
95 //printf("ch <%d> = <%d>\n",i,ch);
96 //printf("adding to the tree at index <%d>\n\n",i-1);
97 (*tree)->child[i - 1] = ch;
98 (*tree)->numChildren++;
101 //fprintf(stderr,"not adding to the tree\n");
104 //fprintf(stderr,"procTree.numChildren <%d>\n",(*tree)->numChildren);
106 if (index == (*tree)->root) {
111 pr = (index - 1) / (*tree)->PROCTREE_A;
112 (*tree)->parent = pr;
117 * prints the tree as a graphical representation
119 void print_tree(proc_tree_t tree) {
122 if (-1 != tree->parent ) {
123 printf("[%d]\n +---",tree->parent);
129 printf("<%d>\n",tree->me);
130 for (i=0;i < tree->numChildren; i++) {
131 printf("%s +--- %d\n", spacer,tree->child[i]);
139 int tree_bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm, proc_tree_t tree);
140 int tree_bcast( void *buf, int count, MPI_Datatype datatype, int root,
141 MPI_Comm comm, proc_tree_t tree)
143 int system_tag = 999; // used negative int but smpi_create_request() declares this illegal (to be checked)
145 int retval = MPI_SUCCESS;
147 smpi_mpi_request_t request;
148 smpi_mpi_request_t * requests;
150 rank = smpi_mpi_comm_rank(comm);
152 /* wait for data from my parent in the tree */
155 printf("[%d] recv(%d from %d, tag=%d)\n",rank,rank, tree->parent, system_tag+rank);
157 retval = smpi_create_request(buf, count, datatype,
161 if (MPI_SUCCESS != retval) {
162 printf("** internal error: smpi_create_request() rank=%d returned retval=%d, %s:%d\n",
163 rank,retval,__FILE__,__LINE__);
165 smpi_mpi_irecv(request);
167 printf("[%d] waiting on irecv from %d\n",rank , tree->parent);
169 smpi_mpi_wait(request, MPI_STATUS_IGNORE);
170 xbt_mallocator_release(smpi_global->request_mallocator, request);
173 requests = xbt_malloc( tree->numChildren * sizeof(smpi_mpi_request_t));
175 printf("[%d] creates %d requests\n",rank,tree->numChildren);
178 /* iniates sends to ranks lower in the tree */
179 for (i=0; i < tree->numChildren; i++) {
180 if (tree->child[i] != -1) {
182 printf("[%d] send(%d->%d, tag=%d)\n",rank,rank, tree->child[i], system_tag+tree->child[i]);
184 retval = smpi_create_request(buf, count, datatype,
185 rank, tree->child[i],
186 system_tag + tree->child[i],
187 comm, &(requests[i]));
189 printf("[%d] after create req[%d]=%p req->(src=%d,dst=%d)\n",rank , i, requests[i],requests[i]->src,requests[i]->dst );
191 if (MPI_SUCCESS != retval) {
192 printf("** internal error: smpi_create_request() rank=%d returned retval=%d, %s:%d\n",
193 rank,retval,__FILE__,__LINE__);
195 smpi_mpi_isend(requests[i]);
196 /* FIXME : we should not wait immediately here. See next FIXME. */
197 smpi_mpi_wait( requests[i], MPI_STATUS_IGNORE);
198 xbt_mallocator_release(smpi_global->request_mallocator, requests[i]);
201 /* FIXME : normally, we sould wait only once all isend have been issued:
202 * this is the following commented code. It deadlocks, probably because
203 * of a bug in the sender process */
205 /* wait for completion of sends */
206 /* printf("[%d] wait for %d send completions\n",rank,tree->numChildren);
207 smpi_mpi_waitall( tree->numChildren, requests, MPI_STATUS_IGNORE);
208 printf("[%d] reqs completed\n)",rank);
213 /* checked ok with valgrind --leak-check=full*/
220 int tree_antibcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm, proc_tree_t tree);
221 int tree_antibcast( void *buf, int count, MPI_Datatype datatype, int root,
222 MPI_Comm comm, proc_tree_t tree)
224 int system_tag = 999; // used negative int but smpi_create_request() declares this illegal (to be checked)
226 int retval = MPI_SUCCESS;
228 smpi_mpi_request_t request;
229 smpi_mpi_request_t * requests;
231 rank = smpi_mpi_comm_rank(comm);
233 //------------------anti-bcast-------------------
235 // everyone sends to its parent, except root.
237 retval = smpi_create_request(buf, count, datatype,
241 if (MPI_SUCCESS != retval) {
242 printf("** internal error: smpi_create_request() rank=%d returned retval=%d, %s:%d\n",
243 rank,retval,__FILE__,__LINE__);
245 smpi_mpi_isend(request);
246 smpi_mpi_wait(request, MPI_STATUS_IGNORE);
247 xbt_mallocator_release(smpi_global->request_mallocator, request);
250 //every one receives as many messages as it has children
251 requests = xbt_malloc( tree->numChildren * sizeof(smpi_mpi_request_t));
252 for (i=0; i < tree->numChildren; i++) {
253 if (tree->child[i] != -1) {
254 retval = smpi_create_request(buf, count, datatype,
255 tree->child[i], rank,
256 system_tag + tree->child[i],
257 comm, &(requests[i]));
258 if (MPI_SUCCESS != retval) {
259 printf("** internal error: smpi_create_request() rank=%d returned retval=%d, %s:%d\n",
260 rank,retval,__FILE__,__LINE__);
262 smpi_mpi_irecv(requests[i]);
263 /* FIXME : we should not wait immediately here. See next FIXME. */
264 smpi_mpi_wait( requests[i], MPI_STATUS_IGNORE);
265 xbt_mallocator_release(smpi_global->request_mallocator, requests[i]);
271 /* checked ok with valgrind --leak-check=full*/
275 * bcast with a binary, ternary, or whatever tree ..
277 int nary_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root,
278 MPI_Comm comm, int arity)
283 rank = smpi_mpi_comm_rank( comm );
284 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
285 proc_tree_t tree = alloc_tree( arity );
286 build_tree( rank, comm->size, &tree );
288 retval = tree_bcast( buf, count, datatype, root, comm, tree );
298 int nary_tree_barrier( MPI_Comm comm , int arity)
301 int retval = MPI_SUCCESS;
304 rank = smpi_mpi_comm_rank(comm);
305 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
306 proc_tree_t tree = alloc_tree( arity );
308 build_tree( rank, comm->size, &tree );
310 retval = tree_antibcast( &dummy, 1, MPI_CHAR, 0, comm, tree);
311 if (MPI_SUCCESS != retval) {
312 printf("[%s:%d] ** Error: tree_antibcast() returned retval=%d\n",__FILE__,__LINE__,retval);
315 retval = tree_bcast(&dummy, 1, MPI_CHAR, 0, comm, tree);
321 /* checked ok with valgrind --leak-check=full*/
328 * this algorithm performs size steps (1<=s<=size) and
329 * at each step s, a process p sends iand receive to.from a unique distinct remote process
330 * size=5 : s=1: 4->0->1, 0->1->2, 1->2->3, ...
331 * s=2: 3->0->2, 4->1->3, 0->2->4, 1->3->0 , 2->4->1
333 * Openmpi calls this routine when the message size sent to each rank is greater than 3000 bytes
336 int smpi_coll_tuned_alltoall_pairwise (void *sendbuf, int sendcount, MPI_Datatype datatype,
337 void* recvbuf, int recvcount, MPI_Datatype recvdatatype, MPI_Comm comm)
339 int retval = MPI_SUCCESS;
341 int size = comm->size;
343 int sendto, recvfrom;
344 int tag_alltoall=999;
345 void * tmpsend, *tmprecv;
347 rank = smpi_mpi_comm_rank(comm);
348 /* Perform pairwise exchange - starting from 1 so the local copy is last */
349 for (step = 1; step < size+1; step++) {
351 /* who do we talk to in this step? */
352 sendto = (rank+step)%size;
353 recvfrom = (rank+size-step)%size;
355 /* where from are we sending and where from are we receiving actual data ? */
356 tmpsend = (char*)sendbuf+sendto*datatype->size*sendcount;
357 tmprecv = (char*)recvbuf+recvfrom*recvdatatype->size*recvcount;
359 /* send and receive */
360 /* in OpenMPI, they use :
361 err = ompi_coll_tuned_sendrecv( tmpsend, scount, sdtype, sendto, MCA_COLL_BASE_TAG_ALLTOALL,
362 tmprecv, rcount, rdtype, recvfrom, MCA_COLL_BASE_TAG_ALLTOALL,
363 comm, MPI_STATUS_IGNORE, rank);
365 retval = MPI_Sendrecv(tmpsend, sendcount, datatype, sendto, tag_alltoall,
366 tmprecv, recvcount, recvdatatype, recvfrom, tag_alltoall,
367 comm, MPI_STATUS_IGNORE);
373 * helper: copy a datatype into another (in the simple case dt1=dt2)
375 int copy_dt( void *sbuf, int scount, const MPI_Datatype sdtype, void *rbuf, int rcount, const MPI_Datatype rdtype);
376 int copy_dt( void *sbuf, int scount, const MPI_Datatype sdtype,
377 void *rbuf, int rcount, const MPI_Datatype rdtype)
379 /* First check if we really have something to do */
381 return ((0 == scount) ? MPI_SUCCESS : MPI_ERR_TRUNCATE);
383 /* If same datatypes used, just copy. */
384 if (sdtype == rdtype) {
385 int count = ( scount < rcount ? scount : rcount );
386 memcpy( rbuf, sbuf, sdtype->size*count);
387 return ((scount > rcount) ? MPI_ERR_TRUNCATE : MPI_SUCCESS);
390 * - If receive packed.
392 * to be treated once we have the MPI_Pack things ...
394 return( MPI_SUCCESS );
400 int smpi_coll_tuned_alltoall_basic_linear(void *sbuf, int scount, MPI_Datatype sdtype,
401 void* rbuf, int rcount, MPI_Datatype rdtype, MPI_Comm comm)
404 int system_tag = 999;
406 int size = comm->size;
417 rank = smpi_mpi_comm_rank(comm);
419 err = smpi_mpi_type_get_extent(sdtype, &lb, &sndinc);
420 err = smpi_mpi_type_get_extent(rdtype, &lb, &rcvinc);
421 /* simple optimization */
422 psnd = ((char *) sbuf) + (rank * sndinc);
423 prcv = ((char *) rbuf) + (rank * rcvinc);
425 err = copy_dt( psnd, scount, sdtype, prcv, rcount, rdtype );
426 if (MPI_SUCCESS != err) {
430 /* If only one process, we're done. */
435 /* Initiate all send/recv to/from others. */
436 reqs = xbt_malloc(2*(size-1) * sizeof(smpi_mpi_request_t));
438 prcv = (char *) rbuf;
439 psnd = (char *) sbuf;
441 /* Post all receives first -- a simple optimization */
442 for (i = (rank + 1) % size; i != rank; i = (i + 1) % size) {
443 err = smpi_create_request( prcv + (i * rcvinc), rcount, rdtype,
446 comm, &(reqs[nreq]));
447 if (MPI_SUCCESS != err) {
448 DEBUG2("[%d] failed to create request for rank %d\n",rank,i);
449 for (i=0;i< nreq;i++)
450 xbt_mallocator_release(smpi_global->request_mallocator, reqs[i]);
455 /* Now post all sends in reverse order
456 * - We would like to minimize the search time through message queue
457 * when messages actually arrive in the order in which they were posted.
459 for (i = (rank + size - 1) % size; i != rank; i = (i + size - 1) % size ) {
460 err = smpi_create_request (psnd + (i * sndinc), scount, sdtype,
463 comm, &(reqs[nreq]));
464 if (MPI_SUCCESS != err) {
465 DEBUG2("[%d] failed to create request for rank %d\n",rank,i);
466 for (i=0;i< nreq;i++)
467 xbt_mallocator_release(smpi_global->request_mallocator, reqs[i]);
473 /* Start your engines. This will never return an error. */
474 for ( i=0; i< nreq/2; i++ ) {
475 smpi_mpi_irecv( reqs[i] );
477 for ( i= nreq/2; i<nreq; i++ ) {
478 smpi_mpi_isend( reqs[i] );
482 /* Wait for them all. If there's an error, note that we don't
483 * care what the error was -- just that there *was* an error. The
484 * PML will finish all requests, even if one or more of them fail.
485 * i.e., by the end of this call, all the requests are free-able.
486 * So free them anyway -- even if there was an error, and return
487 * the error after we free everything. */
489 err = smpi_mpi_waitall(nreq, reqs, MPI_STATUS_IGNORE);
492 for (i=0;i< 2*(size-1);i++) {
493 xbt_mallocator_release(smpi_global->request_mallocator, reqs[i]);
502 * Openmpi calls this routine when the message size sent to each rank < 2000 bytes and size < 12
506 int smpi_coll_tuned_alltoall_bruck(void *sendbuf, int sendcount, MPI_Datatype sdtype,
507 void* recvbuf, int recvcount, MPI_Datatype rdtype, MPI_Comm comm)
509 /* int size = comm->size;
511 int sendto, recvfrom, distance, *displs=NULL, *blen=NULL;
512 int maxpacksize, packsize, position;
513 char * tmpbuf=NULL, *packbuf=NULL;
514 ptrdiff_t lb, sext, rext;
519 rank = smpi_mpi_comm_rank(comm);
521 INFO0("coll:tuned:alltoall_intra_bruck ** NOT IMPLEMENTED YET**");
523 displs = xbt_malloc(size*sizeof(int));
524 blen = xbt_malloc(size*sizeof(int));
528 /* Prepare for packing data */
530 err = MPI_Pack_size( scount*size, sdtype, comm, &maxpacksize );
531 if (err != MPI_SUCCESS) { }
533 /* pack buffer allocation */
534 /* packbuf = (char*) malloc((unsigned) maxpacksize);
535 if (packbuf == NULL) { }
537 /* tmp buffer allocation for message data */
538 /* tmpbuf = xbt_malloc(scount*size*sext);
539 if (tmpbuf == NULL) { }
542 /* Step 1 - local rotation - shift up by rank */
543 /* err = ompi_ddt_copy_content_same_ddt (sdtype, (int32_t) ((size-rank)*scount),
544 tmpbuf, ((char*)sbuf)+rank*scount*sext);
546 line = __LINE__; err = -1; goto err_hndl;
550 err = ompi_ddt_copy_content_same_ddt (sdtype, (int32_t) (rank*scount),
551 tmpbuf+(size-rank)*scount*sext, (char*)sbuf);
553 line = __LINE__; err = -1; goto err_hndl;
557 /* perform communication step */
558 /* for (distance = 1; distance < size; distance<<=1) {
560 /* send data to "sendto" */
561 /* sendto = (rank+distance)%size;
562 recvfrom = (rank-distance+size)%size;
566 /* create indexed datatype */
567 // for (i = 1; i < size; i++) {
568 // if ((i&distance) == distance) {
569 // displs[k] = i*scount; blen[k] = scount;
573 /* Set indexes and displacements */
574 // err = MPI_Type_indexed(k, blen, displs, sdtype, &iddt);
575 // if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
576 // /* Commit the new datatype */
577 /// err = MPI_Type_commit(&iddt);
578 // if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
580 /* have the new distribution ddt, pack and exchange data */
581 // err = MPI_Pack(tmpbuf, 1, iddt, packbuf, maxpacksize, &packsize, comm);
582 // if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
585 // err = ompi_coll_tuned_sendrecv ( packbuf, packsize, MPI_PACKED, sendto,
586 // MCA_COLL_BASE_TAG_ALLTOALL,
587 // rbuf, packsize, MPI_PACKED, recvfrom,
588 // MCA_COLL_BASE_TAG_ALLTOALL,
589 // comm, MPI_STATUS_IGNORE, rank);
590 // if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
592 /* Unpack data from rbuf to tmpbuf */
594 // err = MPI_Unpack(rbuf, packsize, &position,
595 // tmpbuf, 1, iddt, comm);
596 // if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
599 // err = MPI_Type_free(&iddt);
600 // if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
601 // } /* end of for (distance = 1... */
603 /* Step 3 - local rotation - */
604 // for (i = 0; i < size; i++) {
606 // err = ompi_ddt_copy_content_same_ddt (rdtype, (int32_t) rcount,
607 // ((char*)rbuf)+(((rank-i+size)%size)*rcount*rext),
608 // tmpbuf+i*rcount*rext);
611 // line = __LINE__; err = -1; goto err_hndl;
615 /* Step 4 - clean up */
616 /* if (tmpbuf != NULL) free(tmpbuf);
617 if (packbuf != NULL) free(packbuf);
619 if (displs != NULL) free(displs);
620 if (blen != NULL) free(blen);
625 OPAL_OUTPUT((ompi_coll_tuned_stream,"%s:%4d\tError occurred %d, rank %2d", __FILE__,line,err,rank));
626 if (tmpbuf != NULL) free(tmpbuf);
627 if (packbuf != NULL) free(packbuf);
629 if (displs != NULL) free(displs);
630 if (blen != NULL) free(blen);
634 return -1; /* FIXME: to be changed*/
639 * -----------------------------------------------------------------------------------------------------
641 * -----------------------------------------------------------------------------------------------------
650 for (rank=0;rank<size;rank++) {
651 printf("--------------tree for rank %d ----------\n",rank);
652 tree = alloc_tree( 2 );
653 build_tree( rank, size, &tree );
658 printf("-------------- bcast ----------\n");
659 for (rank=0;rank<size;rank++) {