1 #include "colls_private.h"
4 #define MAXTREEFANOUT 32
6 #define COLL_TUNED_COMPUTED_SEGCOUNT(SEGSIZE, TYPELNG, SEGCOUNT) \
7 if( ((SEGSIZE) >= (TYPELNG)) && \
8 ((SEGSIZE) < ((TYPELNG) * (SEGCOUNT))) ) { \
10 (SEGCOUNT) = (int)((SEGSIZE) / (TYPELNG)); \
11 residual = (SEGSIZE) - (SEGCOUNT) * (TYPELNG); \
12 if( residual > ((TYPELNG) >> 1) ) \
16 typedef struct ompi_coll_tree_t {
21 int32_t tree_next[MAXTREEFANOUT];
22 int32_t tree_nextsize;
26 ompi_coll_tuned_topo_build_chain( int fanout,
31 ompi_coll_tuned_topo_build_chain( int fanout,
36 int srank; /* shifted rank */
39 ompi_coll_tree_t *chain;
41 XBT_DEBUG("coll:tuned:topo:build_chain fo %d rt %d", fanout, root);
44 * Get size and rank of the process in this communicator
46 size = smpi_comm_size(comm);
47 rank = smpi_comm_rank(comm);
50 XBT_DEBUG("coll:tuned:topo:build_chain WARNING invalid fanout of ZERO, forcing to 1 (pipeline)!");
53 if (fanout>MAXTREEFANOUT) {
54 XBT_DEBUG("coll:tuned:topo:build_chain WARNING invalid fanout %d bigger than max %d, forcing to max!", fanout, MAXTREEFANOUT);
55 fanout = MAXTREEFANOUT;
59 * Allocate space for topology arrays if needed
61 chain = (ompi_coll_tree_t*)malloc( sizeof(ompi_coll_tree_t) );
63 XBT_DEBUG("coll:tuned:topo:build_chain PANIC out of memory");
67 chain->tree_root = MPI_UNDEFINED;
68 chain->tree_nextsize = -1;
69 for(i=0;i<fanout;i++) chain->tree_next[i] = -1;
74 chain->tree_root = root;
75 if( (size - 1) < fanout ) {
76 chain->tree_nextsize = size-1;
79 chain->tree_nextsize = fanout;
86 if (srank < 0) srank += size;
89 * Special case - fanout == 1
92 if( srank == 0 ) chain->tree_prev = -1;
93 else chain->tree_prev = (srank-1+root)%size;
95 if( (srank + 1) >= size) {
96 chain->tree_next[0] = -1;
97 chain->tree_nextsize = 0;
99 chain->tree_next[0] = (srank+1+root)%size;
100 chain->tree_nextsize = 1;
105 /* Let's handle the case where there is just one node in the communicator */
107 chain->tree_next[0] = -1;
108 chain->tree_nextsize = 0;
109 chain->tree_prev = -1;
113 * Calculate maximum chain length
115 maxchainlen = (size-1) / fanout;
116 if( (size-1) % fanout != 0 ) {
118 mark = (size-1)%fanout;
124 * Find your own place in the list of shifted ranks
128 if( srank-1 < (mark * maxchainlen) ) {
129 column = (srank-1)/maxchainlen;
130 head = 1+column*maxchainlen;
133 column = mark + (srank-1-mark*maxchainlen)/(maxchainlen-1);
134 head = mark*maxchainlen+1+(column-mark)*(maxchainlen-1);
138 if( srank == head ) {
139 chain->tree_prev = 0; /*root*/
141 chain->tree_prev = srank-1; /* rank -1 */
143 if( srank == (head + len - 1) ) {
144 chain->tree_next[0] = -1;
145 chain->tree_nextsize = 0;
147 if( (srank + 1) < size ) {
148 chain->tree_next[0] = srank+1;
149 chain->tree_nextsize = 1;
151 chain->tree_next[0] = -1;
152 chain->tree_nextsize = 0;
161 chain->tree_prev = -1;
162 chain->tree_next[0] = (root+1)%size;
163 for( i = 1; i < fanout; i++ ) {
164 chain->tree_next[i] = chain->tree_next[i-1] + maxchainlen;
166 chain->tree_next[i]--;
168 chain->tree_next[i] %= size;
170 chain->tree_nextsize = fanout;
172 chain->tree_prev = (chain->tree_prev+root)%size;
173 if( chain->tree_next[0] != -1 ) {
174 chain->tree_next[0] = (chain->tree_next[0]+root)%size;
181 smpi_coll_tuned_bcast_ompi_pipeline( void* buffer,
183 MPI_Datatype datatype,
187 int count_by_segment = original_count;
190 //mca_coll_tuned_module_t *tuned_module = (mca_coll_tuned_module_t*) module;
191 //mca_coll_tuned_comm_t *data = tuned_module->tuned_data;
193 // return ompi_coll_tuned_bcast_intra_generic( buffer, count, datatype, root, comm, module,
194 // count_by_segment, data->cached_pipeline );
195 ompi_coll_tree_t * tree = ompi_coll_tuned_topo_build_chain( 1, comm, root );
196 int err = 0, line, i;
199 int num_segments; /* Number of segments */
200 int sendcount; /* number of elements sent in this segment */
204 MPI_Request recv_reqs[2] = {MPI_REQUEST_NULL, MPI_REQUEST_NULL};
205 MPI_Request *send_reqs = NULL;
209 * Determine number of elements sent per operation.
211 type_size = smpi_datatype_size(datatype);
213 size = smpi_comm_size(comm);
214 rank = smpi_comm_rank(comm);
215 xbt_assert( size > 1 );
218 const double a_p16 = 3.2118e-6; /* [1 / byte] */
219 const double b_p16 = 8.7936;
220 const double a_p64 = 2.3679e-6; /* [1 / byte] */
221 const double b_p64 = 1.1787;
222 const double a_p128 = 1.6134e-6; /* [1 / byte] */
223 const double b_p128 = 2.1102;
226 /* else we need data size for decision function */
227 message_size = type_size * (unsigned long)original_count; /* needed for decision */
229 if (size < (a_p128 * message_size + b_p128)) {
230 //Pipeline with 128KB segments
232 }else if (size < (a_p64 * message_size + b_p64)) {
233 // Pipeline with 64KB segments
235 }else if (size < (a_p16 * message_size + b_p16)) {
236 //Pipeline with 16KB segments
240 COLL_TUNED_COMPUTED_SEGCOUNT( segsize, type_size, count_by_segment );
242 XBT_DEBUG("coll:tuned:bcast_intra_pipeline rank %d ss %5d type_size %lu count_by_segment %d",
243 smpi_comm_rank(comm), segsize, (unsigned long)type_size, count_by_segment);
247 extent = smpi_datatype_get_extent (datatype);
248 num_segments = (original_count + count_by_segment - 1) / count_by_segment;
249 realsegsize = count_by_segment * extent;
251 /* Set the buffer pointers */
252 tmpbuf = (char *) buffer;
254 if( tree->tree_nextsize != 0 ) {
255 send_reqs = xbt_new(MPI_Request, tree->tree_nextsize );
262 - send segment to all children.
263 The last segment may have less elements than other segments.
265 sendcount = count_by_segment;
266 for( segindex = 0; segindex < num_segments; segindex++ ) {
267 if( segindex == (num_segments - 1) ) {
268 sendcount = original_count - segindex * count_by_segment;
270 for( i = 0; i < tree->tree_nextsize; i++ ) {
271 send_reqs[i] = smpi_mpi_isend(tmpbuf, sendcount, datatype,
276 /* complete the sends before starting the next sends */
277 smpi_mpi_waitall( tree->tree_nextsize, send_reqs,
278 MPI_STATUSES_IGNORE );
280 /* update tmp buffer */
281 tmpbuf += realsegsize;
286 /* Intermediate nodes code */
287 else if( tree->tree_nextsize > 0 ) {
290 1) Post the first receive
291 2) For segments 1 .. num_segments
293 - wait on the previous receive to complete
294 - send this data to children
295 3) Wait on the last segment
296 4) Compute number of elements in last segment.
297 5) Send the last segment to children
300 recv_reqs[req_index]=smpi_mpi_irecv(tmpbuf, count_by_segment, datatype,
301 tree->tree_prev, 777,
304 for( segindex = 1; segindex < num_segments; segindex++ ) {
306 req_index = req_index ^ 0x1;
309 recv_reqs[req_index]= smpi_mpi_irecv( tmpbuf + realsegsize, count_by_segment,
310 datatype, tree->tree_prev,
314 /* wait for and forward the previous segment to children */
315 smpi_mpi_wait( &recv_reqs[req_index ^ 0x1],
316 MPI_STATUSES_IGNORE );
318 for( i = 0; i < tree->tree_nextsize; i++ ) {
319 send_reqs[i]=smpi_mpi_isend(tmpbuf, count_by_segment, datatype,
324 /* complete the sends before starting the next iteration */
325 smpi_mpi_waitall( tree->tree_nextsize, send_reqs,
326 MPI_STATUSES_IGNORE );
328 /* Update the receive buffer */
329 tmpbuf += realsegsize;
332 /* Process the last segment */
333 smpi_mpi_wait( &recv_reqs[req_index], MPI_STATUSES_IGNORE );
334 sendcount = original_count - (num_segments - 1) * count_by_segment;
335 for( i = 0; i < tree->tree_nextsize; i++ ) {
336 send_reqs[i] = smpi_mpi_isend(tmpbuf, sendcount, datatype,
341 smpi_mpi_waitall( tree->tree_nextsize, send_reqs,
342 MPI_STATUSES_IGNORE );
348 Receive all segments from parent in a loop:
349 1) post irecv for the first segment
350 2) for segments 1 .. num_segments
351 - post irecv for the next segment
352 - wait on the previous segment to arrive
353 3) wait for the last segment
356 recv_reqs[req_index] = smpi_mpi_irecv(tmpbuf, count_by_segment, datatype,
357 tree->tree_prev, 777,
360 for( segindex = 1; segindex < num_segments; segindex++ ) {
361 req_index = req_index ^ 0x1;
362 tmpbuf += realsegsize;
363 /* post receive for the next segment */
364 recv_reqs[req_index] = smpi_mpi_irecv(tmpbuf, count_by_segment, datatype,
365 tree->tree_prev, 777,
367 /* wait on the previous segment */
368 smpi_mpi_wait( &recv_reqs[req_index ^ 0x1],
372 smpi_mpi_wait( &recv_reqs[req_index], MPI_STATUS_IGNORE );
375 if( NULL != send_reqs ) free(send_reqs);
377 return (MPI_SUCCESS);