1 #include "colls_private.h"
2 #include "coll_tuned_topo.h"
4 #define MAXTREEFANOUT 32
7 int smpi_coll_tuned_bcast_ompi_pipeline( void* buffer,
13 int count_by_segment = original_count;
15 int segsize =1024 << 7;
16 //mca_coll_tuned_module_t *tuned_module = (mca_coll_tuned_module_t*) module;
17 //mca_coll_tuned_comm_t *data = tuned_module->tuned_data;
19 // return ompi_coll_tuned_bcast_intra_generic( buffer, count, datatype, root, comm, module,
20 // count_by_segment, data->cached_pipeline );
21 ompi_coll_tree_t * tree = ompi_coll_tuned_topo_build_chain( 1, comm, root );
25 int num_segments; /* Number of segments */
26 int sendcount; /* number of elements sent in this segment */
30 MPI_Request recv_reqs[2] = {MPI_REQUEST_NULL, MPI_REQUEST_NULL};
31 MPI_Request *send_reqs = NULL;
35 * Determine number of elements sent per operation.
37 type_size = smpi_datatype_size(datatype);
39 size = smpi_comm_size(comm);
40 rank = smpi_comm_rank(comm);
41 xbt_assert( size > 1 );
44 const double a_p16 = 3.2118e-6; /* [1 / byte] */
45 const double b_p16 = 8.7936;
46 const double a_p64 = 2.3679e-6; /* [1 / byte] */
47 const double b_p64 = 1.1787;
48 const double a_p128 = 1.6134e-6; /* [1 / byte] */
49 const double b_p128 = 2.1102;
52 /* else we need data size for decision function */
53 message_size = type_size * (unsigned long)original_count; /* needed for decision */
55 if (size < (a_p128 * message_size + b_p128)) {
56 //Pipeline with 128KB segments
58 }else if (size < (a_p64 * message_size + b_p64)) {
59 // Pipeline with 64KB segments
61 }else if (size < (a_p16 * message_size + b_p16)) {
62 //Pipeline with 16KB segments
66 COLL_TUNED_COMPUTED_SEGCOUNT( segsize, type_size, count_by_segment );
68 XBT_DEBUG("coll:tuned:bcast_intra_pipeline rank %d ss %5d type_size %lu count_by_segment %d",
69 smpi_comm_rank(comm), segsize, (unsigned long)type_size, count_by_segment);
73 extent = smpi_datatype_get_extent (datatype);
74 num_segments = (original_count + count_by_segment - 1) / count_by_segment;
75 realsegsize = count_by_segment * extent;
77 /* Set the buffer pointers */
78 tmpbuf = (char *) buffer;
80 if( tree->tree_nextsize != 0 ) {
81 send_reqs = xbt_new(MPI_Request, tree->tree_nextsize );
88 - send segment to all children.
89 The last segment may have less elements than other segments.
91 sendcount = count_by_segment;
92 for( segindex = 0; segindex < num_segments; segindex++ ) {
93 if( segindex == (num_segments - 1) ) {
94 sendcount = original_count - segindex * count_by_segment;
96 for( i = 0; i < tree->tree_nextsize; i++ ) {
97 send_reqs[i] = smpi_mpi_isend(tmpbuf, sendcount, datatype,
99 COLL_TAG_BCAST, comm);
102 /* complete the sends before starting the next sends */
103 smpi_mpi_waitall( tree->tree_nextsize, send_reqs,
104 MPI_STATUSES_IGNORE );
106 /* update tmp buffer */
107 tmpbuf += realsegsize;
112 /* Intermediate nodes code */
113 else if( tree->tree_nextsize > 0 ) {
116 1) Post the first receive
117 2) For segments 1 .. num_segments
119 - wait on the previous receive to complete
120 - send this data to children
121 3) Wait on the last segment
122 4) Compute number of elements in last segment.
123 5) Send the last segment to children
126 recv_reqs[req_index]=smpi_mpi_irecv(tmpbuf, count_by_segment, datatype,
127 tree->tree_prev, COLL_TAG_BCAST,
130 for( segindex = 1; segindex < num_segments; segindex++ ) {
132 req_index = req_index ^ 0x1;
135 recv_reqs[req_index]= smpi_mpi_irecv( tmpbuf + realsegsize, count_by_segment,
136 datatype, tree->tree_prev,
140 /* wait for and forward the previous segment to children */
141 smpi_mpi_wait( &recv_reqs[req_index ^ 0x1],
142 MPI_STATUSES_IGNORE );
144 for( i = 0; i < tree->tree_nextsize; i++ ) {
145 send_reqs[i]=smpi_mpi_isend(tmpbuf, count_by_segment, datatype,
147 COLL_TAG_BCAST, comm );
150 /* complete the sends before starting the next iteration */
151 smpi_mpi_waitall( tree->tree_nextsize, send_reqs,
152 MPI_STATUSES_IGNORE );
154 /* Update the receive buffer */
155 tmpbuf += realsegsize;
158 /* Process the last segment */
159 smpi_mpi_wait( &recv_reqs[req_index], MPI_STATUSES_IGNORE );
160 sendcount = original_count - (num_segments - 1) * count_by_segment;
161 for( i = 0; i < tree->tree_nextsize; i++ ) {
162 send_reqs[i] = smpi_mpi_isend(tmpbuf, sendcount, datatype,
164 COLL_TAG_BCAST, comm);
167 smpi_mpi_waitall( tree->tree_nextsize, send_reqs,
168 MPI_STATUSES_IGNORE );
174 Receive all segments from parent in a loop:
175 1) post irecv for the first segment
176 2) for segments 1 .. num_segments
177 - post irecv for the next segment
178 - wait on the previous segment to arrive
179 3) wait for the last segment
182 recv_reqs[req_index] = smpi_mpi_irecv(tmpbuf, count_by_segment, datatype,
183 tree->tree_prev, COLL_TAG_BCAST,
186 for( segindex = 1; segindex < num_segments; segindex++ ) {
187 req_index = req_index ^ 0x1;
188 tmpbuf += realsegsize;
189 /* post receive for the next segment */
190 recv_reqs[req_index] = smpi_mpi_irecv(tmpbuf, count_by_segment, datatype,
191 tree->tree_prev, COLL_TAG_BCAST,
193 /* wait on the previous segment */
194 smpi_mpi_wait( &recv_reqs[req_index ^ 0x1],
198 smpi_mpi_wait( &recv_reqs[req_index], MPI_STATUS_IGNORE );
201 if( NULL != send_reqs ) free(send_reqs);
204 return (MPI_SUCCESS);