X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/1430176bca4f7627075440f1e2559be930f93fdf..863aeead864a309c494893a1b06ec33ed2b7daf1:/src/smpi/colls/reduce/reduce-ompi.cpp diff --git a/src/smpi/colls/reduce/reduce-ompi.cpp b/src/smpi/colls/reduce/reduce-ompi.cpp index b476889ae2..25d1cb85f1 100644 --- a/src/smpi/colls/reduce/reduce-ompi.cpp +++ b/src/smpi/colls/reduce/reduce-ompi.cpp @@ -38,7 +38,7 @@ int smpi_coll_tuned_ompi_reduce_generic( void* sendbuf, void* recvbuf, int origi * the number of datatype to the original count (original_count) * * Note that for non-commutative operations we cannot save memory copy - * for the first block: thus we must copy sendbuf to accumbuf on intermediate + * for the first block: thus we must copy sendbuf to accumbuf on intermediate * to keep the optimized loop happy. */ int smpi_coll_tuned_ompi_reduce_generic( void* sendbuf, void* recvbuf, int original_count, @@ -63,33 +63,33 @@ int smpi_coll_tuned_ompi_reduce_generic( void* sendbuf, void* recvbuf, int origi num_segments = (original_count + count_by_segment - 1) / count_by_segment; segment_increment = count_by_segment * extent; - sendtmpbuf = (char*) sendbuf; - if( sendbuf == MPI_IN_PLACE ) { - sendtmpbuf = (char *)recvbuf; + sendtmpbuf = (char*) sendbuf; + if( sendbuf == MPI_IN_PLACE ) { + sendtmpbuf = (char *)recvbuf; } XBT_DEBUG( "coll:tuned:reduce_generic count %d, msg size %ld, segsize %ld, max_requests %d", original_count, (unsigned long)(num_segments * segment_increment), (unsigned long)segment_increment, max_outstanding_reqs); rank = comm->rank(); - /* non-leaf nodes - wait for children to send me data & forward up + /* non-leaf nodes - wait for children to send me data & forward up (if needed) */ if( tree->tree_nextsize > 0 ) { ptrdiff_t true_extent, real_segment_size; true_extent=datatype->get_extent(); - /* handle non existant recv buffer (i.e. its NULL) and + /* handle non existant recv buffer (i.e. its NULL) and protect the recv buffer on non-root nodes */ accumbuf = (char*)recvbuf; if( (NULL == accumbuf) || (root != rank) ) { /* Allocate temporary accumulator buffer. */ accumbuf_free = (char*)smpi_get_tmp_sendbuffer(true_extent + (original_count - 1) * extent); - if (accumbuf_free == NULL) { - line = __LINE__; ret = -1; goto error_hndl; + if (accumbuf_free == NULL) { + line = __LINE__; ret = -1; goto error_hndl; } accumbuf = accumbuf_free - lower_bound; - } + } /* If this is a non-commutative operation we must copy sendbuf to the accumbuf, in order to simplfy the loops */ @@ -99,19 +99,19 @@ int smpi_coll_tuned_ompi_reduce_generic( void* sendbuf, void* recvbuf, int origi /* Allocate two buffers for incoming segments */ real_segment_size = true_extent + (count_by_segment - 1) * extent; inbuf_free[0] = (char*) smpi_get_tmp_recvbuffer(real_segment_size); - if( inbuf_free[0] == NULL ) { - line = __LINE__; ret = -1; goto error_hndl; + if( inbuf_free[0] == NULL ) { + line = __LINE__; ret = -1; goto error_hndl; } inbuf[0] = inbuf_free[0] - lower_bound; /* if there is chance to overlap communication - allocate second buffer */ if( (num_segments > 1) || (tree->tree_nextsize > 1) ) { inbuf_free[1] = (char*) smpi_get_tmp_recvbuffer(real_segment_size); - if( inbuf_free[1] == NULL ) { + if( inbuf_free[1] == NULL ) { line = __LINE__; ret = -1; goto error_hndl; } inbuf[1] = inbuf_free[1] - lower_bound; - } + } /* reset input buffer index and receive count */ inbi = 0; @@ -134,14 +134,14 @@ int smpi_coll_tuned_ompi_reduce_generic( void* sendbuf, void* recvbuf, int origi if( segindex < num_segments ) { void* local_recvbuf = inbuf[inbi]; if( 0 == i ) { - /* for the first step (1st child per segment) and - * commutative operations we might be able to irecv - * directly into the accumulate buffer so that we can - * reduce(op) this with our sendbuf in one step as - * ompi_op_reduce only has two buffer pointers, + /* for the first step (1st child per segment) and + * commutative operations we might be able to irecv + * directly into the accumulate buffer so that we can + * reduce(op) this with our sendbuf in one step as + * ompi_op_reduce only has two buffer pointers, * this avoids an extra memory copy. * - * BUT if the operation is non-commutative or + * BUT if the operation is non-commutative or * we are root and are USING MPI_IN_PLACE this is wrong! */ if( (op==MPI_OP_NULL || op->is_commutative()) && @@ -151,32 +151,32 @@ int smpi_coll_tuned_ompi_reduce_generic( void* sendbuf, void* recvbuf, int origi } reqs[inbi]=Request::irecv(local_recvbuf, recvcount, datatype, - tree->tree_next[i], + tree->tree_next[i], COLL_TAG_REDUCE, comm ); } /* wait for previous req to complete, if any. - if there are no requests reqs[inbi ^1] will be + if there are no requests reqs[inbi ^1] will be MPI_REQUEST_NULL. */ /* wait on data from last child for previous segment */ - Request::waitall( 1, &reqs[inbi ^ 1], + Request::waitall( 1, &reqs[inbi ^ 1], MPI_STATUSES_IGNORE ); local_op_buffer = inbuf[inbi ^ 1]; if( i > 0 ) { - /* our first operation is to combine our own [sendbuf] data - * with the data we recvd from down stream (but only - * the operation is commutative and if we are not root and + /* our first operation is to combine our own [sendbuf] data + * with the data we recvd from down stream (but only + * the operation is commutative and if we are not root and * not using MPI_IN_PLACE) */ if( 1 == i ) { - if( (op==MPI_OP_NULL || op->is_commutative())&& + if( (op==MPI_OP_NULL || op->is_commutative())&& !((MPI_IN_PLACE == sendbuf) && (rank == tree->tree_root)) ) { local_op_buffer = sendtmpbuf + segindex * segment_increment; } } /* apply operation */ - if(op!=MPI_OP_NULL) op->apply( local_op_buffer, - accumbuf + segindex * segment_increment, + if(op!=MPI_OP_NULL) op->apply( local_op_buffer, + accumbuf + segindex * segment_increment, &recvcount, datatype ); } else if ( segindex > 0 ) { void* accumulator = accumbuf + (segindex-1) * segment_increment; @@ -186,21 +186,21 @@ int smpi_coll_tuned_ompi_reduce_generic( void* sendbuf, void* recvbuf, int origi local_op_buffer = sendtmpbuf + (segindex-1) * segment_increment; } } - if(op!=MPI_OP_NULL) op->apply( local_op_buffer, accumulator, &prevcount, + if(op!=MPI_OP_NULL) op->apply( local_op_buffer, accumulator, &prevcount, datatype ); - /* all reduced on available data this step (i) complete, + /* all reduced on available data this step (i) complete, * pass to the next process unless you are the root. */ if (rank != tree->tree_root) { /* send combined/accumulated data to parent */ - Request::send( accumulator, prevcount, - datatype, tree->tree_prev, + Request::send( accumulator, prevcount, + datatype, tree->tree_prev, COLL_TAG_REDUCE, comm); } - /* we stop when segindex = number of segments + /* we stop when segindex = number of segments (i.e. we do num_segment+1 steps for pipelining */ if (segindex == num_segments) break; } @@ -216,33 +216,33 @@ int smpi_coll_tuned_ompi_reduce_generic( void* sendbuf, void* recvbuf, int origi smpi_free_tmp_buffer(accumbuf_free); } - /* leaf nodes - Depending on the value of max_outstanding_reqs and + /* leaf nodes + Depending on the value of max_outstanding_reqs and the number of segments we have two options: - send all segments using blocking send to the parent, or - - avoid overflooding the parent nodes by limiting the number of + - avoid overflooding the parent nodes by limiting the number of outstanding requests to max_oustanding_reqs. - TODO/POSSIBLE IMPROVEMENT: If there is a way to determine the eager size - for the current communication, synchronization should be used only + TODO/POSSIBLE IMPROVEMENT: If there is a way to determine the eager size + for the current communication, synchronization should be used only when the message/segment size is smaller than the eager size. */ else { /* If the number of segments is less than a maximum number of oustanding - requests or there is no limit on the maximum number of outstanding + requests or there is no limit on the maximum number of outstanding requests, we send data to the parent using blocking send */ - if ((0 == max_outstanding_reqs) || + if ((0 == max_outstanding_reqs) || (num_segments <= max_outstanding_reqs)) { - + segindex = 0; while ( original_count > 0) { if (original_count < count_by_segment) { count_by_segment = original_count; } - Request::send((char*)sendbuf + + Request::send((char*)sendbuf + segindex * segment_increment, count_by_segment, datatype, - tree->tree_prev, + tree->tree_prev, COLL_TAG_REDUCE, comm) ; segindex++; @@ -270,7 +270,7 @@ int smpi_coll_tuned_ompi_reduce_generic( void* sendbuf, void* recvbuf, int origi sreq[segindex]=Request::isend((char*)sendbuf + segindex * segment_increment, count_by_segment, datatype, - tree->tree_prev, + tree->tree_prev, COLL_TAG_REDUCE, comm); original_count -= count_by_segment; @@ -285,10 +285,10 @@ int smpi_coll_tuned_ompi_reduce_generic( void* sendbuf, void* recvbuf, int origi if( original_count < count_by_segment ) { count_by_segment = original_count; } - sreq[creq]=Request::isend((char*)sendbuf + - segindex * segment_increment, - count_by_segment, datatype, - tree->tree_prev, + sreq[creq]=Request::isend((char*)sendbuf + + segindex * segment_increment, + count_by_segment, datatype, + tree->tree_prev, COLL_TAG_REDUCE, comm ); creq = (creq + 1) % max_outstanding_reqs; @@ -297,7 +297,7 @@ int smpi_coll_tuned_ompi_reduce_generic( void* sendbuf, void* recvbuf, int origi } /* Wait on the remaining request to complete */ - Request::waitall( max_outstanding_reqs, sreq, + Request::waitall( max_outstanding_reqs, sreq, MPI_STATUSES_IGNORE ); /* free requests */ @@ -308,7 +308,7 @@ int smpi_coll_tuned_ompi_reduce_generic( void* sendbuf, void* recvbuf, int origi return MPI_SUCCESS; error_hndl: /* error handler */ - XBT_DEBUG("ERROR_HNDL: node %d file %s line %d error %d\n", + XBT_DEBUG("ERROR_HNDL: node %d file %s line %d error %d\n", rank, __FILE__, line, ret ); if( inbuf_free[0] != NULL ) free(inbuf_free[0]); if( inbuf_free[1] != NULL ) free(inbuf_free[1]); @@ -325,8 +325,8 @@ int smpi_coll_tuned_ompi_reduce_generic( void* sendbuf, void* recvbuf, int origi int Coll_reduce_ompi_chain::reduce( void *sendbuf, void *recvbuf, int count, - MPI_Datatype datatype, - MPI_Op op, int root, + MPI_Datatype datatype, + MPI_Op op, int root, MPI_Comm comm ) { @@ -342,12 +342,12 @@ int Coll_reduce_ompi_chain::reduce( void *sendbuf, void *recvbuf, int count, * sent per operation */ typelng = datatype->size(); - + COLL_TUNED_COMPUTED_SEGCOUNT( segsize, typelng, segcount ); - return smpi_coll_tuned_ompi_reduce_generic( sendbuf, recvbuf, count, datatype, + return smpi_coll_tuned_ompi_reduce_generic( sendbuf, recvbuf, count, datatype, op, root, comm, - ompi_coll_tuned_topo_build_chain(fanout, comm, root), + ompi_coll_tuned_topo_build_chain(fanout, comm, root), segcount, 0 ); } @@ -373,16 +373,16 @@ int Coll_reduce_ompi_pipeline::reduce( void *sendbuf, void *recvbuf, const double b4 = 1.6761; typelng= datatype->size(); int communicator_size = comm->size(); - size_t message_size = typelng * count; + size_t message_size = typelng * count; if (communicator_size > (a2 * message_size + b2)) { - // Pipeline_1K + // Pipeline_1K segsize = 1024; }else if (communicator_size > (a4 * message_size + b4)) { - // Pipeline_32K + // Pipeline_32K segsize = 32*1024; } else { - // Pipeline_64K + // Pipeline_64K segsize = 64*1024; } @@ -391,9 +391,9 @@ int Coll_reduce_ompi_pipeline::reduce( void *sendbuf, void *recvbuf, COLL_TUNED_COMPUTED_SEGCOUNT( segsize, typelng, segcount ); - return smpi_coll_tuned_ompi_reduce_generic( sendbuf, recvbuf, count, datatype, + return smpi_coll_tuned_ompi_reduce_generic( sendbuf, recvbuf, count, datatype, op, root, comm, - ompi_coll_tuned_topo_build_chain( 1, comm, root), + ompi_coll_tuned_topo_build_chain( 1, comm, root), segcount, 0); } @@ -414,7 +414,7 @@ int Coll_reduce_ompi_binary::reduce( void *sendbuf, void *recvbuf, */ typelng=datatype->size(); - // Binary_32K + // Binary_32K segsize = 32*1024; XBT_DEBUG("coll:tuned:reduce_intra_binary rank %d ss %5d", @@ -422,9 +422,9 @@ int Coll_reduce_ompi_binary::reduce( void *sendbuf, void *recvbuf, COLL_TUNED_COMPUTED_SEGCOUNT( segsize, typelng, segcount ); - return smpi_coll_tuned_ompi_reduce_generic( sendbuf, recvbuf, count, datatype, - op, root, comm, - ompi_coll_tuned_topo_build_tree(2, comm, root), + return smpi_coll_tuned_ompi_reduce_generic( sendbuf, recvbuf, count, datatype, + op, root, comm, + ompi_coll_tuned_topo_build_tree(2, comm, root), segcount, 0); } @@ -449,13 +449,13 @@ int Coll_reduce_ompi_binomial::reduce( void *sendbuf, void *recvbuf, */ typelng= datatype->size(); int communicator_size = comm->size(); - size_t message_size = typelng * count; + size_t message_size = typelng * count; if (((communicator_size < 8) && (message_size < 20480)) || (message_size < 2048) || (count <= 1)) { /* Binomial_0K */ segsize = 0; } else if (communicator_size > (a1 * message_size + b1)) { - // Binomial_1K + // Binomial_1K segsize = 1024; } @@ -463,21 +463,21 @@ int Coll_reduce_ompi_binomial::reduce( void *sendbuf, void *recvbuf, comm->rank(), segsize); COLL_TUNED_COMPUTED_SEGCOUNT( segsize, typelng, segcount ); - return smpi_coll_tuned_ompi_reduce_generic( sendbuf, recvbuf, count, datatype, - op, root, comm, - ompi_coll_tuned_topo_build_in_order_bmtree(comm, root), + return smpi_coll_tuned_ompi_reduce_generic( sendbuf, recvbuf, count, datatype, + op, root, comm, + ompi_coll_tuned_topo_build_in_order_bmtree(comm, root), segcount, 0); } /* - * reduce_intra_in_order_binary - * + * reduce_intra_in_order_binary + * * Function: Logarithmic reduce operation for non-commutative operations. * Acecpts: same as MPI_Reduce() * Returns: MPI_SUCCESS or error code */ int Coll_reduce_ompi_in_order_binary::reduce( void *sendbuf, void *recvbuf, - int count, + int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) @@ -503,9 +503,9 @@ int Coll_reduce_ompi_in_order_binary::reduce( void *sendbuf, void *recvbuf, /* An in-order binary tree must use root (size-1) to preserve the order of operations. Thus, if root is not rank (size - 1), then we must handle - 1. MPI_IN_PLACE option on real root, and + 1. MPI_IN_PLACE option on real root, and 2. we must allocate temporary recvbuf on rank (size - 1). - Note that generic function must be careful not to switch order of + Note that generic function must be careful not to switch order of operations for non-commutative ops. */ io_root = size - 1; @@ -514,7 +514,7 @@ int Coll_reduce_ompi_in_order_binary::reduce( void *sendbuf, void *recvbuf, if (io_root != root) { ptrdiff_t text, ext; char *tmpbuf = NULL; - + ext=datatype->get_extent(); text=datatype->get_extent(); @@ -538,8 +538,8 @@ int Coll_reduce_ompi_in_order_binary::reduce( void *sendbuf, void *recvbuf, /* Use generic reduce with in-order binary tree topology and io_root */ ret = smpi_coll_tuned_ompi_reduce_generic( use_this_sendbuf, use_this_recvbuf, count, datatype, - op, io_root, comm, - ompi_coll_tuned_topo_build_in_order_bintree(comm), + op, io_root, comm, + ompi_coll_tuned_topo_build_in_order_bintree(comm), segcount, 0 ); if (MPI_SUCCESS != ret) { return ret; } @@ -553,7 +553,7 @@ int Coll_reduce_ompi_in_order_binary::reduce( void *sendbuf, void *recvbuf, if (MPI_IN_PLACE == sendbuf) { smpi_free_tmp_buffer(use_this_sendbuf); } - + } else if (io_root == rank) { /* Send result from use_this_recvbuf to root */ Request::send(use_this_recvbuf, count, datatype, root, @@ -569,8 +569,8 @@ int Coll_reduce_ompi_in_order_binary::reduce( void *sendbuf, void *recvbuf, /* * Linear functions are copied from the BASIC coll module * they do not segment the message and are simple implementations - * but for some small number of nodes and/or small data sizes they - * are just as fast as tuned/tree based segmenting operations + * but for some small number of nodes and/or small data sizes they + * are just as fast as tuned/tree based segmenting operations * and as such may be selected by the decision functions * These are copied into this module due to the way we select modules * in V1. i.e. in V2 we will handle this differently and so will not @@ -618,7 +618,7 @@ Coll_reduce_ompi_basic_linear::reduce(void *sbuf, void *rbuf, int count, return MPI_SUCCESS; } - /* see discussion in ompi_coll_basic_reduce_lin_intra about + /* see discussion in ompi_coll_basic_reduce_lin_intra about extent and true extent */ /* for reducing buffer allocation lengths.... */