Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'klement/simgrid-klement' into master
[simgrid.git] / src / smpi / colls / smpi_openmpi_selector.cpp
1 /* selector for collective algorithms based on openmpi's default coll_tuned_decision_fixed selector */
2
3 /* Copyright (c) 2009-2020. The SimGrid Team.
4  * All rights reserved.                                                     */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 #include "colls_private.hpp"
10
11 #include <memory>
12
13 namespace simgrid {
14 namespace smpi {
15
16 int allreduce__ompi(const void *sbuf, void *rbuf, int count,
17                     MPI_Datatype dtype, MPI_Op op, MPI_Comm comm)
18 {
19     size_t dsize, block_dsize;
20     int comm_size = comm->size();
21     const size_t intermediate_message = 10000;
22
23     /**
24      * Decision function based on MX results from the Grig cluster at UTK.
25      *
26      * Currently, linear, recursive doubling, and nonoverlapping algorithms
27      * can handle both commutative and non-commutative operations.
28      * Ring algorithm does not support non-commutative operations.
29      */
30     dsize = dtype->size();
31     block_dsize = dsize * count;
32
33     if (block_dsize < intermediate_message) {
34         return allreduce__rdb(sbuf, rbuf, count, dtype, op, comm);
35     }
36
37     if( ((op==MPI_OP_NULL) || op->is_commutative()) && (count > comm_size) ) {
38         const size_t segment_size = 1 << 20; /* 1 MB */
39         if ((comm_size * segment_size >= block_dsize)) {
40             //FIXME: ok, these are not the right algorithms, try to find closer ones
41             // lr is a good match for allreduce_ring (difference is mainly the use of sendrecv)
42             return allreduce__lr(sbuf, rbuf, count, dtype, op, comm);
43         } else {
44            return allreduce__ompi_ring_segmented(sbuf, rbuf, count, dtype, op, comm /*segment_size*/);
45         }
46     }
47
48     return allreduce__redbcast(sbuf, rbuf, count, dtype, op, comm);
49 }
50
51
52
53 int alltoall__ompi(const void *sbuf, int scount,
54                    MPI_Datatype sdtype,
55                    void* rbuf, int rcount,
56                    MPI_Datatype rdtype,
57                    MPI_Comm comm)
58 {
59     int communicator_size;
60     size_t dsize, block_dsize;
61     communicator_size = comm->size();
62
63     /* Decision function based on measurement on Grig cluster at
64        the University of Tennessee (2GB MX) up to 64 nodes.
65        Has better performance for messages of intermediate sizes than the old one */
66     /* determine block size */
67     dsize = sdtype->size();
68     block_dsize = dsize * scount;
69
70     if ((block_dsize < 200) && (communicator_size > 12)) {
71         return alltoall__bruck(sbuf, scount, sdtype,
72                                rbuf, rcount, rdtype, comm);
73
74     } else if (block_dsize < 3000) {
75         return alltoall__basic_linear(sbuf, scount, sdtype,
76                                       rbuf, rcount, rdtype, comm);
77     }
78
79     return alltoall__ring(sbuf, scount, sdtype,
80                           rbuf, rcount, rdtype, comm);
81 }
82
83 int alltoallv__ompi(const void *sbuf, const int *scounts, const int *sdisps,
84                     MPI_Datatype sdtype,
85                     void *rbuf, const int *rcounts, const int *rdisps,
86                     MPI_Datatype rdtype,
87                     MPI_Comm  comm
88                     )
89 {
90     /* For starters, just keep the original algorithm. */
91     return alltoallv__ring(sbuf, scounts, sdisps, sdtype,
92                            rbuf, rcounts, rdisps,rdtype,
93                            comm);
94 }
95
96 int barrier__ompi(MPI_Comm  comm)
97 {    int communicator_size = comm->size();
98
99     if( 2 == communicator_size )
100         return barrier__ompi_two_procs(comm);
101 /*     * Basic optimisation. If we have a power of 2 number of nodes*/
102 /*     * the use the recursive doubling algorithm, otherwise*/
103 /*     * bruck is the one we want.*/
104     {
105         bool has_one = false;
106         for( ; communicator_size > 0; communicator_size >>= 1 ) {
107             if( communicator_size & 0x1 ) {
108                 if( has_one )
109                     return barrier__ompi_bruck(comm);
110                 has_one = true;
111             }
112         }
113     }
114     return barrier__ompi_recursivedoubling(comm);
115 }
116
117 int bcast__ompi(void *buff, int count, MPI_Datatype datatype, int root, MPI_Comm  comm)
118 {
119     /* Decision function based on MX results for
120        messages up to 36MB and communicator sizes up to 64 nodes */
121     const size_t small_message_size = 2048;
122     const size_t intermediate_message_size = 370728;
123     const double a_p16  = 3.2118e-6; /* [1 / byte] */
124     const double b_p16  = 8.7936;
125     const double a_p64  = 2.3679e-6; /* [1 / byte] */
126     const double b_p64  = 1.1787;
127     const double a_p128 = 1.6134e-6; /* [1 / byte] */
128     const double b_p128 = 2.1102;
129
130     int communicator_size;
131     //int segsize = 0;
132     size_t message_size, dsize;
133
134     communicator_size = comm->size();
135
136     /* else we need data size for decision function */
137     dsize = datatype->size();
138     message_size = dsize * (unsigned long)count;   /* needed for decision */
139
140     /* Handle messages of small and intermediate size, and
141        single-element broadcasts */
142     if ((message_size < small_message_size) || (count <= 1)) {
143         /* Binomial without segmentation */
144         return bcast__binomial_tree(buff, count, datatype, root, comm);
145
146     } else if (message_size < intermediate_message_size) {
147         // SplittedBinary with 1KB segments
148         return bcast__ompi_split_bintree(buff, count, datatype, root, comm);
149
150     }
151      //Handle large message sizes
152     else if (communicator_size < (a_p128 * message_size + b_p128)) {
153         //Pipeline with 128KB segments
154         //segsize = 1024  << 7;
155         return bcast__ompi_pipeline(buff, count, datatype, root, comm);
156
157
158     } else if (communicator_size < 13) {
159         // Split Binary with 8KB segments
160         return bcast__ompi_split_bintree(buff, count, datatype, root, comm);
161
162     } else if (communicator_size < (a_p64 * message_size + b_p64)) {
163         // Pipeline with 64KB segments
164         //segsize = 1024 << 6;
165         return bcast__ompi_pipeline(buff, count, datatype, root, comm);
166
167
168     } else if (communicator_size < (a_p16 * message_size + b_p16)) {
169         //Pipeline with 16KB segments
170         //segsize = 1024 << 4;
171         return bcast__ompi_pipeline(buff, count, datatype, root, comm);
172
173
174     }
175     /* Pipeline with 8KB segments */
176     //segsize = 1024 << 3;
177     return bcast__flattree_pipeline(buff, count, datatype, root, comm /*segsize*/);
178 #if 0
179     /* this is based on gige measurements */
180
181     if (communicator_size  < 4) {
182         return bcast__intra_basic_linear(buff, count, datatype, root, comm, module);
183     }
184     if (communicator_size == 4) {
185         if (message_size < 524288) segsize = 0;
186         else segsize = 16384;
187         return bcast__intra_bintree(buff, count, datatype, root, comm, module, segsize);
188     }
189     if (communicator_size <= 8 && message_size < 4096) {
190         return bcast__intra_basic_linear(buff, count, datatype, root, comm, module);
191     }
192     if (communicator_size > 8 && message_size >= 32768 && message_size < 524288) {
193         segsize = 16384;
194         return  bcast__intra_bintree(buff, count, datatype, root, comm, module, segsize);
195     }
196     if (message_size >= 524288) {
197         segsize = 16384;
198         return bcast__intra_pipeline(buff, count, datatype, root, comm, module, segsize);
199     }
200     segsize = 0;
201     /* once tested can swap this back in */
202     /* return bcast__intra_bmtree(buff, count, datatype, root, comm, segsize); */
203     return bcast__intra_bintree(buff, count, datatype, root, comm, module, segsize);
204 #endif  /* 0 */
205 }
206
207 int reduce__ompi(const void *sendbuf, void *recvbuf,
208                  int count, MPI_Datatype  datatype,
209                  MPI_Op   op, int root,
210                  MPI_Comm   comm)
211 {
212     int communicator_size=0;
213     //int segsize = 0;
214     size_t message_size, dsize;
215     const double a1 =  0.6016 / 1024.0; /* [1/B] */
216     const double b1 =  1.3496;
217     const double a2 =  0.0410 / 1024.0; /* [1/B] */
218     const double b2 =  9.7128;
219     const double a3 =  0.0422 / 1024.0; /* [1/B] */
220     const double b3 =  1.1614;
221     //const double a4 =  0.0033 / 1024.0;  [1/B]
222     //const double b4 =  1.6761;
223
224     /* no limit on # of outstanding requests */
225     //const int max_requests = 0;
226
227     communicator_size = comm->size();
228
229     /* need data size for decision function */
230     dsize=datatype->size();
231     message_size = dsize * count;   /* needed for decision */
232
233     /**
234      * If the operation is non commutative we currently have choice of linear
235      * or in-order binary tree algorithm.
236      */
237     if ((op != MPI_OP_NULL) && not op->is_commutative()) {
238       if ((communicator_size < 12) && (message_size < 2048)) {
239         return reduce__ompi_basic_linear(sendbuf, recvbuf, count, datatype, op, root, comm /*, module*/);
240       }
241       return reduce__ompi_in_order_binary(sendbuf, recvbuf, count, datatype, op, root, comm /*, module,
242                                                              0, max_requests*/);
243     }
244
245     if ((communicator_size < 8) && (message_size < 512)){
246         /* Linear_0K */
247         return reduce__ompi_basic_linear(sendbuf, recvbuf, count, datatype, op, root, comm);
248     } else if (((communicator_size < 8) && (message_size < 20480)) ||
249                (message_size < 2048) || (count <= 1)) {
250         /* Binomial_0K */
251         //segsize = 0;
252         return reduce__ompi_binomial(sendbuf, recvbuf, count, datatype, op, root, comm/*, module, segsize, max_requests*/);
253     } else if (communicator_size > (a1 * message_size + b1)) {
254         // Binomial_1K
255         //segsize = 1024;
256         return reduce__ompi_binomial(sendbuf, recvbuf, count, datatype, op, root, comm/*, module,
257                                                      segsize, max_requests*/);
258     } else if (communicator_size > (a2 * message_size + b2)) {
259         // Pipeline_1K
260         //segsize = 1024;
261         return reduce__ompi_pipeline(sendbuf, recvbuf, count, datatype, op, root, comm/*, module,
262                                                       segsize, max_requests*/);
263     } else if (communicator_size > (a3 * message_size + b3)) {
264         // Binary_32K
265         //segsize = 32*1024;
266         return reduce__ompi_binary( sendbuf, recvbuf, count, datatype, op, root,
267                                                     comm/*, module, segsize, max_requests*/);
268     }
269 //    if (communicator_size > (a4 * message_size + b4)) {
270         // Pipeline_32K
271 //        segsize = 32*1024;
272 //    } else {
273         // Pipeline_64K
274 //        segsize = 64*1024;
275 //    }
276     return reduce__ompi_pipeline(sendbuf, recvbuf, count, datatype, op, root, comm/*, module,
277                                                   segsize, max_requests*/);
278
279 #if 0
280     /* for small messages use linear algorithm */
281     if (message_size <= 4096) {
282         segsize = 0;
283         fanout = communicator_size - 1;
284         /* when linear implemented or taken from basic put here, right now using chain as a linear system */
285         /* it is implemented and I shouldn't be calling a chain with a fanout bigger than MAXTREEFANOUT from topo.h! */
286         return reduce__intra_basic_linear(sendbuf, recvbuf, count, datatype, op, root, comm, module);
287         /*        return reduce__intra_chain(sendbuf, recvbuf, count, datatype, op, root, comm, segsize, fanout); */
288     }
289     if (message_size < 524288) {
290         if (message_size <= 65536 ) {
291             segsize = 32768;
292             fanout = 8;
293         } else {
294             segsize = 1024;
295             fanout = communicator_size/2;
296         }
297         /* later swap this for a binary tree */
298         /*         fanout = 2; */
299         return reduce__intra_chain(sendbuf, recvbuf, count, datatype, op, root, comm, module,
300                                    segsize, fanout, max_requests);
301     }
302     segsize = 1024;
303     return reduce__intra_pipeline(sendbuf, recvbuf, count, datatype, op, root, comm, module,
304                                   segsize, max_requests);
305 #endif  /* 0 */
306 }
307
308 int reduce_scatter__ompi(const void *sbuf, void *rbuf,
309                          const int *rcounts,
310                          MPI_Datatype dtype,
311                          MPI_Op  op,
312                          MPI_Comm  comm
313                          )
314 {
315     int comm_size, i, pow2;
316     size_t total_message_size, dsize;
317     const double a = 0.0012;
318     const double b = 8.0;
319     const size_t small_message_size = 12 * 1024;
320     const size_t large_message_size = 256 * 1024;
321     int zerocounts = 0;
322
323     XBT_DEBUG("reduce_scatter__ompi");
324
325     comm_size = comm->size();
326     // We need data size for decision function
327     dsize=dtype->size();
328     total_message_size = 0;
329     for (i = 0; i < comm_size; i++) {
330         total_message_size += rcounts[i];
331         if (0 == rcounts[i]) {
332             zerocounts = 1;
333         }
334     }
335
336     if (((op != MPI_OP_NULL) && not op->is_commutative()) || (zerocounts)) {
337       reduce_scatter__default(sbuf, rbuf, rcounts, dtype, op, comm);
338       return MPI_SUCCESS;
339     }
340
341     total_message_size *= dsize;
342
343     // compute the nearest power of 2
344     for (pow2 = 1; pow2 < comm_size; pow2 <<= 1);
345
346     if ((total_message_size <= small_message_size) ||
347         ((total_message_size <= large_message_size) && (pow2 == comm_size)) ||
348         (comm_size >= a * total_message_size + b)) {
349         return reduce_scatter__ompi_basic_recursivehalving(sbuf, rbuf, rcounts, dtype, op, comm);
350     }
351     return reduce_scatter__ompi_ring(sbuf, rbuf, rcounts, dtype, op, comm);
352 }
353
354 int allgather__ompi(const void *sbuf, int scount,
355                     MPI_Datatype sdtype,
356                     void* rbuf, int rcount,
357                     MPI_Datatype rdtype,
358                     MPI_Comm  comm
359                     )
360 {
361     int communicator_size, pow2_size;
362     size_t dsize, total_dsize;
363
364     communicator_size = comm->size();
365
366     /* Special case for 2 processes */
367     if (communicator_size == 2) {
368         return allgather__pair(sbuf, scount, sdtype,
369                                rbuf, rcount, rdtype,
370                                comm/*, module*/);
371     }
372
373     /* Determine complete data size */
374     dsize=sdtype->size();
375     total_dsize = dsize * scount * communicator_size;
376
377     for (pow2_size  = 1; pow2_size < communicator_size; pow2_size <<=1);
378
379     /* Decision based on MX 2Gb results from Grig cluster at
380        The University of Tennesse, Knoxville
381        - if total message size is less than 50KB use either bruck or
382        recursive doubling for non-power of two and power of two nodes,
383        respectively.
384        - else use ring and neighbor exchange algorithms for odd and even
385        number of nodes, respectively.
386     */
387     if (total_dsize < 50000) {
388         if (pow2_size == communicator_size) {
389             return allgather__rdb(sbuf, scount, sdtype,
390                                   rbuf, rcount, rdtype,
391                                   comm);
392         } else {
393             return allgather__bruck(sbuf, scount, sdtype,
394                                     rbuf, rcount, rdtype,
395                                     comm);
396         }
397     } else {
398         if (communicator_size % 2) {
399             return allgather__ring(sbuf, scount, sdtype,
400                                    rbuf, rcount, rdtype,
401                                    comm);
402         } else {
403             return allgather__ompi_neighborexchange(sbuf, scount, sdtype,
404                                                     rbuf, rcount, rdtype,
405                                                     comm);
406         }
407     }
408
409 #if defined(USE_MPICH2_DECISION)
410     /* Decision as in MPICH-2
411        presented in Thakur et.al. "Optimization of Collective Communication
412        Operations in MPICH", International Journal of High Performance Computing
413        Applications, Vol. 19, No. 1, 49-66 (2005)
414        - for power-of-two processes and small and medium size messages
415        (up to 512KB) use recursive doubling
416        - for non-power-of-two processes and small messages (80KB) use bruck,
417        - for everything else use ring.
418     */
419     if ((pow2_size == communicator_size) && (total_dsize < 524288)) {
420         return allgather__rdb(sbuf, scount, sdtype,
421                               rbuf, rcount, rdtype,
422                               comm);
423     } else if (total_dsize <= 81920) {
424         return allgather__bruck(sbuf, scount, sdtype,
425                                 rbuf, rcount, rdtype,
426                                 comm);
427     }
428     return allgather__ring(sbuf, scount, sdtype,
429                            rbuf, rcount, rdtype,
430                            comm);
431 #endif  /* defined(USE_MPICH2_DECISION) */
432 }
433
434 int allgatherv__ompi(const void *sbuf, int scount,
435                      MPI_Datatype sdtype,
436                      void* rbuf, const int *rcounts,
437                      const int *rdispls,
438                      MPI_Datatype rdtype,
439                      MPI_Comm  comm
440                      )
441 {
442     int i;
443     int communicator_size;
444     size_t dsize, total_dsize;
445
446     communicator_size = comm->size();
447
448     /* Special case for 2 processes */
449     if (communicator_size == 2) {
450         return allgatherv__pair(sbuf, scount, sdtype,
451                                 rbuf, rcounts, rdispls, rdtype,
452                                 comm);
453     }
454
455     /* Determine complete data size */
456     dsize=sdtype->size();
457     total_dsize = 0;
458     for (i = 0; i < communicator_size; i++) {
459         total_dsize += dsize * rcounts[i];
460     }
461
462     /* Decision based on allgather decision.   */
463     if (total_dsize < 50000) {
464         return allgatherv__ompi_bruck(sbuf, scount, sdtype,
465                                       rbuf, rcounts, rdispls, rdtype,
466                                       comm);
467
468     } else {
469         if (communicator_size % 2) {
470             return allgatherv__ring(sbuf, scount, sdtype,
471                                     rbuf, rcounts, rdispls, rdtype,
472                                     comm);
473         } else {
474             return  allgatherv__ompi_neighborexchange(sbuf, scount, sdtype,
475                                                       rbuf, rcounts, rdispls, rdtype,
476                                                       comm);
477         }
478     }
479 }
480
481 int gather__ompi(const void *sbuf, int scount,
482                  MPI_Datatype sdtype,
483                  void* rbuf, int rcount,
484                  MPI_Datatype rdtype,
485                  int root,
486                  MPI_Comm  comm
487                  )
488 {
489     //const int large_segment_size = 32768;
490     //const int small_segment_size = 1024;
491
492     //const size_t large_block_size = 92160;
493     const size_t intermediate_block_size = 6000;
494     const size_t small_block_size = 1024;
495
496     const int large_communicator_size = 60;
497     const int small_communicator_size = 10;
498
499     int communicator_size, rank;
500     size_t dsize, block_size;
501
502     XBT_DEBUG("smpi_coll_tuned_gather_ompi");
503
504     communicator_size = comm->size();
505     rank = comm->rank();
506
507     // Determine block size
508     if (rank == root) {
509         dsize = rdtype->size();
510         block_size = dsize * rcount;
511     } else {
512         dsize = sdtype->size();
513         block_size = dsize * scount;
514     }
515
516 /*    if (block_size > large_block_size) {*/
517 /*        return smpi_coll_tuned_gather_ompi_linear_sync (sbuf, scount, sdtype, */
518 /*                                                         rbuf, rcount, rdtype, */
519 /*                                                         root, comm);*/
520
521 /*    } else*/ if (block_size > intermediate_block_size) {
522         return gather__ompi_linear_sync(sbuf, scount, sdtype,
523                                         rbuf, rcount, rdtype,
524                                         root, comm);
525
526     } else if ((communicator_size > large_communicator_size) ||
527                ((communicator_size > small_communicator_size) &&
528                 (block_size < small_block_size))) {
529         return gather__ompi_binomial(sbuf, scount, sdtype,
530                                      rbuf, rcount, rdtype,
531                                      root, comm);
532
533     }
534     // Otherwise, use basic linear
535     return gather__ompi_basic_linear(sbuf, scount, sdtype,
536                                      rbuf, rcount, rdtype,
537                                      root, comm);
538 }
539
540
541 int scatter__ompi(const void *sbuf, int scount,
542                   MPI_Datatype sdtype,
543                   void* rbuf, int rcount,
544                   MPI_Datatype rdtype,
545                   int root, MPI_Comm  comm
546                   )
547 {
548     const size_t small_block_size = 300;
549     const int small_comm_size = 10;
550     int communicator_size, rank;
551     size_t dsize, block_size;
552
553     XBT_DEBUG("Coll_scatter_ompi::scatter");
554
555     communicator_size = comm->size();
556     rank = comm->rank();
557     // Determine block size
558     if (root == rank) {
559         dsize=sdtype->size();
560         block_size = dsize * scount;
561     } else {
562         dsize=rdtype->size();
563         block_size = dsize * rcount;
564     }
565
566     if ((communicator_size > small_comm_size) &&
567         (block_size < small_block_size)) {
568       std::unique_ptr<unsigned char[]> tmp_buf;
569       if (rank != root) {
570         tmp_buf = std::make_unique<unsigned char[]>(rcount * rdtype->get_extent());
571         sbuf   = tmp_buf.get();
572         scount = rcount;
573         sdtype = rdtype;
574       }
575       return scatter__ompi_binomial(sbuf, scount, sdtype, rbuf, rcount, rdtype, root, comm);
576     }
577     return scatter__ompi_basic_linear(sbuf, scount, sdtype,
578                                       rbuf, rcount, rdtype,
579                                       root, comm);
580 }
581
582 }
583 }