Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
SMPI colls in not really C++. But cleaner than before.
[simgrid.git] / src / smpi / colls / smpi_openmpi_selector.cpp
1 /* selector for collective algorithms based on openmpi's default coll_tuned_decision_fixed selector */
2
3 /* Copyright (c) 2009-2010, 2013-2014. The SimGrid Team.
4  * All rights reserved.                                                     */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 #include "colls_private.h"
10
11 namespace simgrid{
12 namespace smpi{
13
14 int Coll_allreduce_ompi::allreduce(void *sbuf, void *rbuf, int count,
15                         MPI_Datatype dtype, MPI_Op op, MPI_Comm comm)
16 {
17     size_t dsize, block_dsize;
18     int comm_size = comm->size();
19     const size_t intermediate_message = 10000;
20
21     /**
22      * Decision function based on MX results from the Grig cluster at UTK.
23      * 
24      * Currently, linear, recursive doubling, and nonoverlapping algorithms 
25      * can handle both commutative and non-commutative operations.
26      * Ring algorithm does not support non-commutative operations.
27      */
28     dsize = dtype->size();
29     block_dsize = dsize * count;
30
31     if (block_dsize < intermediate_message) {
32         return (Coll_allreduce_rdb::allreduce (sbuf, rbuf, 
33                                                                    count, dtype,
34                                                                    op, comm));
35     } 
36
37     if( ((op==MPI_OP_NULL) || op->is_commutative()) && (count > comm_size) ) {
38         const size_t segment_size = 1 << 20; /* 1 MB */
39         if ((comm_size * segment_size >= block_dsize)) {
40             //FIXME: ok, these are not the right algorithms, try to find closer ones
41             // lr is a good match for allreduce_ring (difference is mainly the use of sendrecv)
42             return Coll_allreduce_lr::allreduce(sbuf, rbuf, count, dtype,
43                                               op, comm);
44         } else {
45            return (Coll_allreduce_ompi_ring_segmented::allreduce (sbuf, rbuf,
46                                                                     count, dtype, 
47                                                                     op, comm 
48                                                                     /*segment_size*/));
49         }
50     }
51
52     return (Coll_allreduce_redbcast::allreduce(sbuf, rbuf, count, 
53                                                             dtype, op, comm));
54 }
55
56
57
58 int Coll_alltoall_ompi::alltoall( void *sbuf, int scount, 
59                                              MPI_Datatype sdtype,
60                                              void* rbuf, int rcount, 
61                                              MPI_Datatype rdtype, 
62                                              MPI_Comm comm)
63 {
64     int communicator_size;
65     size_t dsize, block_dsize;
66     communicator_size = comm->size();
67
68     /* Decision function based on measurement on Grig cluster at 
69        the University of Tennessee (2GB MX) up to 64 nodes.
70        Has better performance for messages of intermediate sizes than the old one */
71     /* determine block size */
72     dsize = sdtype->size();
73     block_dsize = dsize * scount;
74
75     if ((block_dsize < 200) && (communicator_size > 12)) {
76         return Coll_alltoall_bruck::alltoall(sbuf, scount, sdtype, 
77                                                     rbuf, rcount, rdtype,
78                                                     comm);
79
80     } else if (block_dsize < 3000) {
81         return Coll_alltoall_basic_linear::alltoall(sbuf, scount, sdtype, 
82                                                            rbuf, rcount, rdtype, 
83                                                            comm);
84     }
85
86     return Coll_alltoall_ring::alltoall (sbuf, scount, sdtype, 
87                                                     rbuf, rcount, rdtype,
88                                                     comm);
89 }
90
91 int Coll_alltoallv_ompi::alltoallv(void *sbuf, int *scounts, int *sdisps,
92                                               MPI_Datatype sdtype,
93                                               void *rbuf, int *rcounts, int *rdisps,
94                                               MPI_Datatype rdtype,
95                                               MPI_Comm  comm
96                                               )
97 {
98     /* For starters, just keep the original algorithm. */
99     return Coll_alltoallv_ompi_basic_linear::alltoallv(sbuf, scounts, sdisps, sdtype, 
100                                                         rbuf, rcounts, rdisps,rdtype,
101                                                         comm);
102 }
103
104
105 int Coll_barrier_ompi::barrier(MPI_Comm  comm)
106 {    int communicator_size = comm->size();
107
108     if( 2 == communicator_size )
109         return Coll_barrier_ompi_two_procs::barrier(comm);
110 /*     * Basic optimisation. If we have a power of 2 number of nodes*/
111 /*     * the use the recursive doubling algorithm, otherwise*/
112 /*     * bruck is the one we want.*/
113     {
114         int has_one = 0;
115         for( ; communicator_size > 0; communicator_size >>= 1 ) {
116             if( communicator_size & 0x1 ) {
117                 if( has_one )
118                     return Coll_barrier_ompi_bruck::barrier(comm);
119                 has_one = 1;
120             }
121         }
122     }
123     return Coll_barrier_ompi_recursivedoubling::barrier(comm);
124 }
125
126 int Coll_bcast_ompi::bcast(void *buff, int count,
127                                           MPI_Datatype datatype, int root,
128                                           MPI_Comm  comm
129                                           )
130 {
131     /* Decision function based on MX results for 
132        messages up to 36MB and communicator sizes up to 64 nodes */
133     const size_t small_message_size = 2048;
134     const size_t intermediate_message_size = 370728;
135     const double a_p16  = 3.2118e-6; /* [1 / byte] */
136     const double b_p16  = 8.7936;   
137     const double a_p64  = 2.3679e-6; /* [1 / byte] */
138     const double b_p64  = 1.1787;     
139     const double a_p128 = 1.6134e-6; /* [1 / byte] */
140     const double b_p128 = 2.1102;
141
142     int communicator_size;
143     //int segsize = 0;
144     size_t message_size, dsize;
145
146     communicator_size = comm->size();
147
148     /* else we need data size for decision function */
149     dsize = datatype->size();
150     message_size = dsize * (unsigned long)count;   /* needed for decision */
151
152     /* Handle messages of small and intermediate size, and 
153        single-element broadcasts */
154     if ((message_size < small_message_size) || (count <= 1)) {
155         /* Binomial without segmentation */
156         return  Coll_bcast_binomial_tree::bcast (buff, count, datatype, 
157                                                       root, comm);
158
159     } else if (message_size < intermediate_message_size) {
160         // SplittedBinary with 1KB segments
161         return Coll_bcast_ompi_split_bintree::bcast(buff, count, datatype, 
162                                                          root, comm);
163
164     }
165      //Handle large message sizes 
166     else if (communicator_size < (a_p128 * message_size + b_p128)) {
167         //Pipeline with 128KB segments 
168         //segsize = 1024  << 7;
169         return Coll_bcast_ompi_pipeline::bcast (buff, count, datatype, 
170                                                      root, comm);
171                                                      
172
173     } else if (communicator_size < 13) {
174         // Split Binary with 8KB segments 
175         return Coll_bcast_ompi_split_bintree::bcast(buff, count, datatype, 
176                                                          root, comm);
177        
178     } else if (communicator_size < (a_p64 * message_size + b_p64)) {
179         // Pipeline with 64KB segments 
180         //segsize = 1024 << 6;
181         return Coll_bcast_ompi_pipeline::bcast (buff, count, datatype, 
182                                                      root, comm);
183                                                      
184
185     } else if (communicator_size < (a_p16 * message_size + b_p16)) {
186         //Pipeline with 16KB segments 
187         //segsize = 1024 << 4;
188         return Coll_bcast_ompi_pipeline::bcast (buff, count, datatype, 
189                                                      root, comm);
190                                                      
191
192     }
193     /* Pipeline with 8KB segments */
194     //segsize = 1024 << 3;
195     return Coll_bcast_flattree_pipeline::bcast (buff, count, datatype, 
196                                                  root, comm
197                                                  /*segsize*/);
198 #if 0
199     /* this is based on gige measurements */
200
201     if (communicator_size  < 4) {
202         return Coll_bcast_intra_basic_linear::bcast (buff, count, datatype, root, comm, module);
203     }
204     if (communicator_size == 4) {
205         if (message_size < 524288) segsize = 0;
206         else segsize = 16384;
207         return Coll_bcast_intra_bintree::bcast (buff, count, datatype, root, comm, module, segsize);
208     }
209     if (communicator_size <= 8 && message_size < 4096) {
210         return Coll_bcast_intra_basic_linear::bcast (buff, count, datatype, root, comm, module);
211     }
212     if (communicator_size > 8 && message_size >= 32768 && message_size < 524288) {
213         segsize = 16384;
214         return  Coll_bcast_intra_bintree::bcast (buff, count, datatype, root, comm, module, segsize);
215     }
216     if (message_size >= 524288) {
217         segsize = 16384;
218         return Coll_bcast_intra_pipeline::bcast (buff, count, datatype, root, comm, module, segsize);
219     }
220     segsize = 0;
221     /* once tested can swap this back in */
222     /* return Coll_bcast_intra_bmtree::bcast (buff, count, datatype, root, comm, segsize); */
223     return Coll_bcast_intra_bintree::bcast (buff, count, datatype, root, comm, module, segsize);
224 #endif  /* 0 */
225 }
226
227 int Coll_reduce_ompi::reduce( void *sendbuf, void *recvbuf,
228                                             int count, MPI_Datatype  datatype,
229                                             MPI_Op   op, int root,
230                                             MPI_Comm   comm
231                                             )
232 {
233     int communicator_size=0;
234     //int segsize = 0;
235     size_t message_size, dsize;
236     const double a1 =  0.6016 / 1024.0; /* [1/B] */
237     const double b1 =  1.3496;
238     const double a2 =  0.0410 / 1024.0; /* [1/B] */
239     const double b2 =  9.7128;
240     const double a3 =  0.0422 / 1024.0; /* [1/B] */
241     const double b3 =  1.1614;
242     //const double a4 =  0.0033 / 1024.0;  [1/B]
243     //const double b4 =  1.6761;
244
245     /* no limit on # of outstanding requests */
246     //const int max_requests = 0;
247
248     communicator_size = comm->size();
249
250     /* need data size for decision function */
251     dsize=datatype->size();
252     message_size = dsize * count;   /* needed for decision */
253
254     /**
255      * If the operation is non commutative we currently have choice of linear 
256      * or in-order binary tree algorithm.
257      */
258     if(  (op!=MPI_OP_NULL) && !op->is_commutative() ) {
259         if ((communicator_size < 12) && (message_size < 2048)) {
260             return Coll_reduce_ompi_basic_linear::reduce (sendbuf, recvbuf, count, datatype, op, root, comm/*, module*/); 
261         } 
262         return Coll_reduce_ompi_in_order_binary::reduce (sendbuf, recvbuf, count, datatype, op, root, comm/*, module,
263                                                              0, max_requests*/); 
264     }
265
266     if ((communicator_size < 8) && (message_size < 512)){
267         /* Linear_0K */
268         return Coll_reduce_ompi_basic_linear::reduce (sendbuf, recvbuf, count, datatype, op, root, comm); 
269     } else if (((communicator_size < 8) && (message_size < 20480)) ||
270                (message_size < 2048) || (count <= 1)) {
271         /* Binomial_0K */
272         //segsize = 0;
273         return Coll_reduce_ompi_binomial::reduce(sendbuf, recvbuf, count, datatype, op, root, comm/*, module,
274                                                      segsize, max_requests*/);
275     } else if (communicator_size > (a1 * message_size + b1)) {
276         // Binomial_1K 
277         //segsize = 1024;
278         return Coll_reduce_ompi_binomial::reduce(sendbuf, recvbuf, count, datatype, op, root, comm/*, module,
279                                                      segsize, max_requests*/);
280     } else if (communicator_size > (a2 * message_size + b2)) {
281         // Pipeline_1K 
282         //segsize = 1024;
283         return Coll_reduce_ompi_pipeline::reduce (sendbuf, recvbuf, count, datatype, op, root, comm/*, module, 
284                                                       segsize, max_requests*/);
285     } else if (communicator_size > (a3 * message_size + b3)) {
286         // Binary_32K 
287         //segsize = 32*1024;
288         return Coll_reduce_ompi_binary::reduce( sendbuf, recvbuf, count, datatype, op, root,
289                                                     comm/*, module, segsize, max_requests*/);
290     }
291 //    if (communicator_size > (a4 * message_size + b4)) {
292         // Pipeline_32K 
293 //        segsize = 32*1024;
294 //    } else {
295         // Pipeline_64K 
296 //        segsize = 64*1024;
297 //    }
298     return Coll_reduce_ompi_pipeline::reduce (sendbuf, recvbuf, count, datatype, op, root, comm/*, module, 
299                                                   segsize, max_requests*/);
300
301 #if 0
302     /* for small messages use linear algorithm */
303     if (message_size <= 4096) {
304         segsize = 0;
305         fanout = communicator_size - 1;
306         /* when linear implemented or taken from basic put here, right now using chain as a linear system */
307         /* it is implemented and I shouldn't be calling a chain with a fanout bigger than MAXTREEFANOUT from topo.h! */
308         return Coll_reduce_intra_basic_linear::reduce (sendbuf, recvbuf, count, datatype, op, root, comm, module); 
309         /*        return Coll_reduce_intra_chain::reduce (sendbuf, recvbuf, count, datatype, op, root, comm, segsize, fanout); */
310     }
311     if (message_size < 524288) {
312         if (message_size <= 65536 ) {
313             segsize = 32768;
314             fanout = 8;
315         } else {
316             segsize = 1024;
317             fanout = communicator_size/2;
318         }
319         /* later swap this for a binary tree */
320         /*         fanout = 2; */
321         return Coll_reduce_intra_chain::reduce (sendbuf, recvbuf, count, datatype, op, root, comm, module,
322                                                    segsize, fanout, max_requests);
323     }
324     segsize = 1024;
325     return Coll_reduce_intra_pipeline::reduce (sendbuf, recvbuf, count, datatype, op, root, comm, module,
326                                                   segsize, max_requests);
327 #endif  /* 0 */
328 }
329
330 int Coll_reduce_scatter_ompi::reduce_scatter( void *sbuf, void *rbuf,
331                                                     int *rcounts,
332                                                     MPI_Datatype dtype,
333                                                     MPI_Op  op,
334                                                     MPI_Comm  comm
335                                                     )
336 {
337     int comm_size, i, pow2;
338     size_t total_message_size, dsize;
339     const double a = 0.0012;
340     const double b = 8.0;
341     const size_t small_message_size = 12 * 1024;
342     const size_t large_message_size = 256 * 1024;
343     int zerocounts = 0;
344
345     XBT_DEBUG("Coll_reduce_scatter_ompi::reduce_scatter");
346     
347     comm_size = comm->size();
348     // We need data size for decision function 
349     dsize=dtype->size();
350     total_message_size = 0;
351     for (i = 0; i < comm_size; i++) { 
352         total_message_size += rcounts[i];
353         if (0 == rcounts[i]) {
354             zerocounts = 1;
355         }
356     }
357
358     if(  ((op!=MPI_OP_NULL) && !op->is_commutative()) || (zerocounts)) {
359         Coll_reduce_scatter_default::reduce_scatter (sbuf, rbuf, rcounts, 
360                                                                     dtype, op, 
361                                                                     comm); 
362         return MPI_SUCCESS;
363     }
364    
365     total_message_size *= dsize;
366
367     // compute the nearest power of 2 
368     for (pow2 = 1; pow2 < comm_size; pow2 <<= 1);
369
370     if ((total_message_size <= small_message_size) ||
371         ((total_message_size <= large_message_size) && (pow2 == comm_size)) ||
372         (comm_size >= a * total_message_size + b)) {
373         return 
374             Coll_reduce_scatter_ompi_basic_recursivehalving::reduce_scatter(sbuf, rbuf, rcounts,
375                                                                         dtype, op,
376                                                                         comm);
377     } 
378     return Coll_reduce_scatter_ompi_ring::reduce_scatter(sbuf, rbuf, rcounts,
379                                                      dtype, op,
380                                                      comm);
381
382
383
384 }
385
386 int Coll_allgather_ompi::allgather(void *sbuf, int scount, 
387                                               MPI_Datatype sdtype,
388                                               void* rbuf, int rcount, 
389                                               MPI_Datatype rdtype, 
390                                               MPI_Comm  comm
391                                               )
392 {
393     int communicator_size, pow2_size;
394     size_t dsize, total_dsize;
395
396     communicator_size = comm->size();
397
398     /* Special case for 2 processes */
399     if (communicator_size == 2) {
400         return Coll_allgather_pair::allgather (sbuf, scount, sdtype, 
401                                                           rbuf, rcount, rdtype, 
402                                                           comm/*, module*/);
403     }
404
405     /* Determine complete data size */
406     dsize=sdtype->size();
407     total_dsize = dsize * scount * communicator_size;   
408    
409     for (pow2_size  = 1; pow2_size < communicator_size; pow2_size <<=1); 
410
411     /* Decision based on MX 2Gb results from Grig cluster at 
412        The University of Tennesse, Knoxville 
413        - if total message size is less than 50KB use either bruck or 
414        recursive doubling for non-power of two and power of two nodes, 
415        respectively.
416        - else use ring and neighbor exchange algorithms for odd and even 
417        number of nodes, respectively.
418     */
419     if (total_dsize < 50000) {
420         if (pow2_size == communicator_size) {
421             return Coll_allgather_rdb::allgather(sbuf, scount, sdtype, 
422                                                                      rbuf, rcount, rdtype,
423                                                                      comm);
424         } else {
425             return Coll_allgather_bruck::allgather(sbuf, scount, sdtype, 
426                                                          rbuf, rcount, rdtype, 
427                                                          comm);
428         }
429     } else {
430         if (communicator_size % 2) {
431             return Coll_allgather_ring::allgather(sbuf, scount, sdtype, 
432                                                         rbuf, rcount, rdtype, 
433                                                         comm);
434         } else {
435             return  Coll_allgather_ompi_neighborexchange::allgather(sbuf, scount, sdtype,
436                                                                      rbuf, rcount, rdtype,
437                                                                      comm);
438         }
439     }
440    
441 #if defined(USE_MPICH2_DECISION)
442     /* Decision as in MPICH-2 
443        presented in Thakur et.al. "Optimization of Collective Communication 
444        Operations in MPICH", International Journal of High Performance Computing 
445        Applications, Vol. 19, No. 1, 49-66 (2005)
446        - for power-of-two processes and small and medium size messages 
447        (up to 512KB) use recursive doubling
448        - for non-power-of-two processes and small messages (80KB) use bruck,
449        - for everything else use ring.
450     */
451     if ((pow2_size == communicator_size) && (total_dsize < 524288)) {
452         return Coll_allgather_rdb::allgather(sbuf, scount, sdtype, 
453                                                                  rbuf, rcount, rdtype, 
454                                                                  comm);
455     } else if (total_dsize <= 81920) { 
456         return Coll_allgather_bruck::allgather(sbuf, scount, sdtype, 
457                                                      rbuf, rcount, rdtype,
458                                                      comm);
459     } 
460     return Coll_allgather_ring::allgather(sbuf, scount, sdtype, 
461                                                 rbuf, rcount, rdtype,
462                                                 comm);
463 #endif  /* defined(USE_MPICH2_DECISION) */
464 }
465
466 int Coll_allgatherv_ompi::allgatherv(void *sbuf, int scount, 
467                                                MPI_Datatype sdtype,
468                                                void* rbuf, int *rcounts, 
469                                                int *rdispls,
470                                                MPI_Datatype rdtype, 
471                                                MPI_Comm  comm
472                                                )
473 {
474     int i;
475     int communicator_size;
476     size_t dsize, total_dsize;
477     
478     communicator_size = comm->size();
479     
480     /* Special case for 2 processes */
481     if (communicator_size == 2) {
482         return Coll_allgatherv_pair::allgatherv(sbuf, scount, sdtype,
483                                                            rbuf, rcounts, rdispls, rdtype, 
484                                                            comm);
485     }
486     
487     /* Determine complete data size */
488     dsize=sdtype->size();
489     total_dsize = 0;
490     for (i = 0; i < communicator_size; i++) {
491         total_dsize += dsize * rcounts[i];
492     }
493     
494     /* Decision based on allgather decision.   */
495     if (total_dsize < 50000) {
496 /*        return Coll_allgatherv_intra_bruck::allgatherv(sbuf, scount, sdtype, 
497                                                       rbuf, rcounts, rdispls, rdtype, 
498                                                       comm, module);*/
499     return Coll_allgatherv_ring::allgatherv(sbuf, scount, sdtype, 
500                                                       rbuf, rcounts, rdispls, rdtype, 
501                                                       comm);
502
503     } else {
504         if (communicator_size % 2) {
505             return Coll_allgatherv_ring::allgatherv(sbuf, scount, sdtype, 
506                                                          rbuf, rcounts, rdispls, rdtype, 
507                                                          comm);
508         } else {
509             return  Coll_allgatherv_ompi_neighborexchange::allgatherv(sbuf, scount, sdtype,
510                                                                       rbuf, rcounts, rdispls, rdtype, 
511                                                                       comm);
512         }
513     }
514 }
515
516 int Coll_gather_ompi::gather(void *sbuf, int scount, 
517                                            MPI_Datatype sdtype,
518                                            void* rbuf, int rcount, 
519                                            MPI_Datatype rdtype, 
520                                            int root,
521                                            MPI_Comm  comm
522                                            )
523 {
524     //const int large_segment_size = 32768;
525     //const int small_segment_size = 1024;
526
527     //const size_t large_block_size = 92160;
528     const size_t intermediate_block_size = 6000;
529     const size_t small_block_size = 1024;
530
531     const int large_communicator_size = 60;
532     const int small_communicator_size = 10;
533
534     int communicator_size, rank;
535     size_t dsize, block_size;
536
537     XBT_DEBUG("smpi_coll_tuned_gather_ompi");
538
539     communicator_size = comm->size();
540     rank = comm->rank();
541
542     // Determine block size 
543     if (rank == root) {
544         dsize = rdtype->size();
545         block_size = dsize * rcount;
546     } else {
547         dsize = sdtype->size();
548         block_size = dsize * scount;
549     }
550
551 /*    if (block_size > large_block_size) {*/
552 /*        return smpi_coll_tuned_gather_ompi_linear_sync (sbuf, scount, sdtype, */
553 /*                                                         rbuf, rcount, rdtype, */
554 /*                                                         root, comm);*/
555
556 /*    } else*/ if (block_size > intermediate_block_size) {
557         return Coll_gather_ompi_linear_sync::gather (sbuf, scount, sdtype, 
558                                                          rbuf, rcount, rdtype, 
559                                                          root, comm);
560
561     } else if ((communicator_size > large_communicator_size) ||
562                ((communicator_size > small_communicator_size) &&
563                 (block_size < small_block_size))) {
564         return Coll_gather_ompi_binomial::gather (sbuf, scount, sdtype, 
565                                                       rbuf, rcount, rdtype, 
566                                                       root, comm);
567
568     }
569     // Otherwise, use basic linear 
570     return Coll_gather_ompi_basic_linear::gather (sbuf, scount, sdtype, 
571                                                       rbuf, rcount, rdtype, 
572                                                       root, comm);
573 }
574
575
576 int Coll_scatter_ompi::scatter(void *sbuf, int scount, 
577                                             MPI_Datatype sdtype,
578                                             void* rbuf, int rcount, 
579                                             MPI_Datatype rdtype, 
580                                             int root, MPI_Comm  comm
581                                             )
582 {
583     const size_t small_block_size = 300;
584     const int small_comm_size = 10;
585     int communicator_size, rank;
586     size_t dsize, block_size;
587
588     XBT_DEBUG("Coll_scatter_ompi::scatter");
589
590     communicator_size = comm->size();
591     rank = comm->rank();
592     // Determine block size 
593     if (root == rank) {
594         dsize=sdtype->size();
595         block_size = dsize * scount;
596     } else {
597         dsize=rdtype->size();
598         block_size = dsize * rcount;
599     } 
600
601     if ((communicator_size > small_comm_size) &&
602         (block_size < small_block_size)) {
603         if(rank!=root){
604             sbuf=xbt_malloc(rcount*rdtype->get_extent());
605             scount=rcount;
606             sdtype=rdtype;
607         }
608         int ret=Coll_scatter_ompi_binomial::scatter (sbuf, scount, sdtype,
609             rbuf, rcount, rdtype,
610             root, comm);
611         if(rank!=root){
612             xbt_free(sbuf);
613         }
614         return ret;
615     }
616     return Coll_scatter_ompi_basic_linear::scatter (sbuf, scount, sdtype, 
617                                                        rbuf, rcount, rdtype, 
618                                                        root, comm);
619 }
620
621 }
622 }