Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Update copyright lines with new year.
[simgrid.git] / src / smpi / colls / smpi_openmpi_selector.cpp
1 /* selector for collective algorithms based on openmpi's default coll_tuned_decision_fixed selector */
2
3 /* Copyright (c) 2009-2020. The SimGrid Team.
4  * All rights reserved.                                                     */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 #include "colls_private.hpp"
10
11 namespace simgrid {
12 namespace smpi {
13
14 int allreduce__ompi(const void *sbuf, void *rbuf, int count,
15                     MPI_Datatype dtype, MPI_Op op, MPI_Comm comm)
16 {
17     size_t dsize, block_dsize;
18     int comm_size = comm->size();
19     const size_t intermediate_message = 10000;
20
21     /**
22      * Decision function based on MX results from the Grig cluster at UTK.
23      *
24      * Currently, linear, recursive doubling, and nonoverlapping algorithms
25      * can handle both commutative and non-commutative operations.
26      * Ring algorithm does not support non-commutative operations.
27      */
28     dsize = dtype->size();
29     block_dsize = dsize * count;
30
31     if (block_dsize < intermediate_message) {
32         return allreduce__rdb(sbuf, rbuf, count, dtype, op, comm);
33     }
34
35     if( ((op==MPI_OP_NULL) || op->is_commutative()) && (count > comm_size) ) {
36         const size_t segment_size = 1 << 20; /* 1 MB */
37         if ((comm_size * segment_size >= block_dsize)) {
38             //FIXME: ok, these are not the right algorithms, try to find closer ones
39             // lr is a good match for allreduce_ring (difference is mainly the use of sendrecv)
40             return allreduce__lr(sbuf, rbuf, count, dtype, op, comm);
41         } else {
42            return allreduce__ompi_ring_segmented(sbuf, rbuf, count, dtype, op, comm /*segment_size*/);
43         }
44     }
45
46     return allreduce__redbcast(sbuf, rbuf, count, dtype, op, comm);
47 }
48
49
50
51 int alltoall__ompi(const void *sbuf, int scount,
52                    MPI_Datatype sdtype,
53                    void* rbuf, int rcount,
54                    MPI_Datatype rdtype,
55                    MPI_Comm comm)
56 {
57     int communicator_size;
58     size_t dsize, block_dsize;
59     communicator_size = comm->size();
60
61     /* Decision function based on measurement on Grig cluster at
62        the University of Tennessee (2GB MX) up to 64 nodes.
63        Has better performance for messages of intermediate sizes than the old one */
64     /* determine block size */
65     dsize = sdtype->size();
66     block_dsize = dsize * scount;
67
68     if ((block_dsize < 200) && (communicator_size > 12)) {
69         return alltoall__bruck(sbuf, scount, sdtype,
70                                rbuf, rcount, rdtype, comm);
71
72     } else if (block_dsize < 3000) {
73         return alltoall__basic_linear(sbuf, scount, sdtype,
74                                       rbuf, rcount, rdtype, comm);
75     }
76
77     return alltoall__ring(sbuf, scount, sdtype,
78                           rbuf, rcount, rdtype, comm);
79 }
80
81 int alltoallv__ompi(const void *sbuf, const int *scounts, const int *sdisps,
82                     MPI_Datatype sdtype,
83                     void *rbuf, const int *rcounts, const int *rdisps,
84                     MPI_Datatype rdtype,
85                     MPI_Comm  comm
86                     )
87 {
88     /* For starters, just keep the original algorithm. */
89     return alltoallv__ring(sbuf, scounts, sdisps, sdtype,
90                            rbuf, rcounts, rdisps,rdtype,
91                            comm);
92 }
93
94 int barrier__ompi(MPI_Comm  comm)
95 {    int communicator_size = comm->size();
96
97     if( 2 == communicator_size )
98         return barrier__ompi_two_procs(comm);
99 /*     * Basic optimisation. If we have a power of 2 number of nodes*/
100 /*     * the use the recursive doubling algorithm, otherwise*/
101 /*     * bruck is the one we want.*/
102     {
103         bool has_one = false;
104         for( ; communicator_size > 0; communicator_size >>= 1 ) {
105             if( communicator_size & 0x1 ) {
106                 if( has_one )
107                     return barrier__ompi_bruck(comm);
108                 has_one = true;
109             }
110         }
111     }
112     return barrier__ompi_recursivedoubling(comm);
113 }
114
115 int bcast__ompi(void *buff, int count, MPI_Datatype datatype, int root, MPI_Comm  comm)
116 {
117     /* Decision function based on MX results for
118        messages up to 36MB and communicator sizes up to 64 nodes */
119     const size_t small_message_size = 2048;
120     const size_t intermediate_message_size = 370728;
121     const double a_p16  = 3.2118e-6; /* [1 / byte] */
122     const double b_p16  = 8.7936;
123     const double a_p64  = 2.3679e-6; /* [1 / byte] */
124     const double b_p64  = 1.1787;
125     const double a_p128 = 1.6134e-6; /* [1 / byte] */
126     const double b_p128 = 2.1102;
127
128     int communicator_size;
129     //int segsize = 0;
130     size_t message_size, dsize;
131
132     communicator_size = comm->size();
133
134     /* else we need data size for decision function */
135     dsize = datatype->size();
136     message_size = dsize * (unsigned long)count;   /* needed for decision */
137
138     /* Handle messages of small and intermediate size, and
139        single-element broadcasts */
140     if ((message_size < small_message_size) || (count <= 1)) {
141         /* Binomial without segmentation */
142         return bcast__binomial_tree(buff, count, datatype, root, comm);
143
144     } else if (message_size < intermediate_message_size) {
145         // SplittedBinary with 1KB segments
146         return bcast__ompi_split_bintree(buff, count, datatype, root, comm);
147
148     }
149      //Handle large message sizes
150     else if (communicator_size < (a_p128 * message_size + b_p128)) {
151         //Pipeline with 128KB segments
152         //segsize = 1024  << 7;
153         return bcast__ompi_pipeline(buff, count, datatype, root, comm);
154
155
156     } else if (communicator_size < 13) {
157         // Split Binary with 8KB segments
158         return bcast__ompi_split_bintree(buff, count, datatype, root, comm);
159
160     } else if (communicator_size < (a_p64 * message_size + b_p64)) {
161         // Pipeline with 64KB segments
162         //segsize = 1024 << 6;
163         return bcast__ompi_pipeline(buff, count, datatype, root, comm);
164
165
166     } else if (communicator_size < (a_p16 * message_size + b_p16)) {
167         //Pipeline with 16KB segments
168         //segsize = 1024 << 4;
169         return bcast__ompi_pipeline(buff, count, datatype, root, comm);
170
171
172     }
173     /* Pipeline with 8KB segments */
174     //segsize = 1024 << 3;
175     return bcast__flattree_pipeline(buff, count, datatype, root, comm /*segsize*/);
176 #if 0
177     /* this is based on gige measurements */
178
179     if (communicator_size  < 4) {
180         return bcast__intra_basic_linear(buff, count, datatype, root, comm, module);
181     }
182     if (communicator_size == 4) {
183         if (message_size < 524288) segsize = 0;
184         else segsize = 16384;
185         return bcast__intra_bintree(buff, count, datatype, root, comm, module, segsize);
186     }
187     if (communicator_size <= 8 && message_size < 4096) {
188         return bcast__intra_basic_linear(buff, count, datatype, root, comm, module);
189     }
190     if (communicator_size > 8 && message_size >= 32768 && message_size < 524288) {
191         segsize = 16384;
192         return  bcast__intra_bintree(buff, count, datatype, root, comm, module, segsize);
193     }
194     if (message_size >= 524288) {
195         segsize = 16384;
196         return bcast__intra_pipeline(buff, count, datatype, root, comm, module, segsize);
197     }
198     segsize = 0;
199     /* once tested can swap this back in */
200     /* return bcast__intra_bmtree(buff, count, datatype, root, comm, segsize); */
201     return bcast__intra_bintree(buff, count, datatype, root, comm, module, segsize);
202 #endif  /* 0 */
203 }
204
205 int reduce__ompi(const void *sendbuf, void *recvbuf,
206                  int count, MPI_Datatype  datatype,
207                  MPI_Op   op, int root,
208                  MPI_Comm   comm)
209 {
210     int communicator_size=0;
211     //int segsize = 0;
212     size_t message_size, dsize;
213     const double a1 =  0.6016 / 1024.0; /* [1/B] */
214     const double b1 =  1.3496;
215     const double a2 =  0.0410 / 1024.0; /* [1/B] */
216     const double b2 =  9.7128;
217     const double a3 =  0.0422 / 1024.0; /* [1/B] */
218     const double b3 =  1.1614;
219     //const double a4 =  0.0033 / 1024.0;  [1/B]
220     //const double b4 =  1.6761;
221
222     /* no limit on # of outstanding requests */
223     //const int max_requests = 0;
224
225     communicator_size = comm->size();
226
227     /* need data size for decision function */
228     dsize=datatype->size();
229     message_size = dsize * count;   /* needed for decision */
230
231     /**
232      * If the operation is non commutative we currently have choice of linear
233      * or in-order binary tree algorithm.
234      */
235     if ((op != MPI_OP_NULL) && not op->is_commutative()) {
236       if ((communicator_size < 12) && (message_size < 2048)) {
237         return reduce__ompi_basic_linear(sendbuf, recvbuf, count, datatype, op, root, comm /*, module*/);
238       }
239       return reduce__ompi_in_order_binary(sendbuf, recvbuf, count, datatype, op, root, comm /*, module,
240                                                              0, max_requests*/);
241     }
242
243     if ((communicator_size < 8) && (message_size < 512)){
244         /* Linear_0K */
245         return reduce__ompi_basic_linear(sendbuf, recvbuf, count, datatype, op, root, comm);
246     } else if (((communicator_size < 8) && (message_size < 20480)) ||
247                (message_size < 2048) || (count <= 1)) {
248         /* Binomial_0K */
249         //segsize = 0;
250         return reduce__ompi_binomial(sendbuf, recvbuf, count, datatype, op, root, comm/*, module, segsize, max_requests*/);
251     } else if (communicator_size > (a1 * message_size + b1)) {
252         // Binomial_1K
253         //segsize = 1024;
254         return reduce__ompi_binomial(sendbuf, recvbuf, count, datatype, op, root, comm/*, module,
255                                                      segsize, max_requests*/);
256     } else if (communicator_size > (a2 * message_size + b2)) {
257         // Pipeline_1K
258         //segsize = 1024;
259         return reduce__ompi_pipeline(sendbuf, recvbuf, count, datatype, op, root, comm/*, module,
260                                                       segsize, max_requests*/);
261     } else if (communicator_size > (a3 * message_size + b3)) {
262         // Binary_32K
263         //segsize = 32*1024;
264         return reduce__ompi_binary( sendbuf, recvbuf, count, datatype, op, root,
265                                                     comm/*, module, segsize, max_requests*/);
266     }
267 //    if (communicator_size > (a4 * message_size + b4)) {
268         // Pipeline_32K
269 //        segsize = 32*1024;
270 //    } else {
271         // Pipeline_64K
272 //        segsize = 64*1024;
273 //    }
274     return reduce__ompi_pipeline(sendbuf, recvbuf, count, datatype, op, root, comm/*, module,
275                                                   segsize, max_requests*/);
276
277 #if 0
278     /* for small messages use linear algorithm */
279     if (message_size <= 4096) {
280         segsize = 0;
281         fanout = communicator_size - 1;
282         /* when linear implemented or taken from basic put here, right now using chain as a linear system */
283         /* it is implemented and I shouldn't be calling a chain with a fanout bigger than MAXTREEFANOUT from topo.h! */
284         return reduce__intra_basic_linear(sendbuf, recvbuf, count, datatype, op, root, comm, module);
285         /*        return reduce__intra_chain(sendbuf, recvbuf, count, datatype, op, root, comm, segsize, fanout); */
286     }
287     if (message_size < 524288) {
288         if (message_size <= 65536 ) {
289             segsize = 32768;
290             fanout = 8;
291         } else {
292             segsize = 1024;
293             fanout = communicator_size/2;
294         }
295         /* later swap this for a binary tree */
296         /*         fanout = 2; */
297         return reduce__intra_chain(sendbuf, recvbuf, count, datatype, op, root, comm, module,
298                                    segsize, fanout, max_requests);
299     }
300     segsize = 1024;
301     return reduce__intra_pipeline(sendbuf, recvbuf, count, datatype, op, root, comm, module,
302                                   segsize, max_requests);
303 #endif  /* 0 */
304 }
305
306 int reduce_scatter__ompi(const void *sbuf, void *rbuf,
307                          const int *rcounts,
308                          MPI_Datatype dtype,
309                          MPI_Op  op,
310                          MPI_Comm  comm
311                          )
312 {
313     int comm_size, i, pow2;
314     size_t total_message_size, dsize;
315     const double a = 0.0012;
316     const double b = 8.0;
317     const size_t small_message_size = 12 * 1024;
318     const size_t large_message_size = 256 * 1024;
319     int zerocounts = 0;
320
321     XBT_DEBUG("reduce_scatter__ompi");
322
323     comm_size = comm->size();
324     // We need data size for decision function
325     dsize=dtype->size();
326     total_message_size = 0;
327     for (i = 0; i < comm_size; i++) {
328         total_message_size += rcounts[i];
329         if (0 == rcounts[i]) {
330             zerocounts = 1;
331         }
332     }
333
334     if (((op != MPI_OP_NULL) && not op->is_commutative()) || (zerocounts)) {
335       reduce_scatter__default(sbuf, rbuf, rcounts, dtype, op, comm);
336       return MPI_SUCCESS;
337     }
338
339     total_message_size *= dsize;
340
341     // compute the nearest power of 2
342     for (pow2 = 1; pow2 < comm_size; pow2 <<= 1);
343
344     if ((total_message_size <= small_message_size) ||
345         ((total_message_size <= large_message_size) && (pow2 == comm_size)) ||
346         (comm_size >= a * total_message_size + b)) {
347         return reduce_scatter__ompi_basic_recursivehalving(sbuf, rbuf, rcounts, dtype, op, comm);
348     }
349     return reduce_scatter__ompi_ring(sbuf, rbuf, rcounts, dtype, op, comm);
350 }
351
352 int allgather__ompi(const void *sbuf, int scount,
353                     MPI_Datatype sdtype,
354                     void* rbuf, int rcount,
355                     MPI_Datatype rdtype,
356                     MPI_Comm  comm
357                     )
358 {
359     int communicator_size, pow2_size;
360     size_t dsize, total_dsize;
361
362     communicator_size = comm->size();
363
364     /* Special case for 2 processes */
365     if (communicator_size == 2) {
366         return allgather__pair(sbuf, scount, sdtype,
367                                rbuf, rcount, rdtype,
368                                comm/*, module*/);
369     }
370
371     /* Determine complete data size */
372     dsize=sdtype->size();
373     total_dsize = dsize * scount * communicator_size;
374
375     for (pow2_size  = 1; pow2_size < communicator_size; pow2_size <<=1);
376
377     /* Decision based on MX 2Gb results from Grig cluster at
378        The University of Tennesse, Knoxville
379        - if total message size is less than 50KB use either bruck or
380        recursive doubling for non-power of two and power of two nodes,
381        respectively.
382        - else use ring and neighbor exchange algorithms for odd and even
383        number of nodes, respectively.
384     */
385     if (total_dsize < 50000) {
386         if (pow2_size == communicator_size) {
387             return allgather__rdb(sbuf, scount, sdtype,
388                                   rbuf, rcount, rdtype,
389                                   comm);
390         } else {
391             return allgather__bruck(sbuf, scount, sdtype,
392                                     rbuf, rcount, rdtype,
393                                     comm);
394         }
395     } else {
396         if (communicator_size % 2) {
397             return allgather__ring(sbuf, scount, sdtype,
398                                    rbuf, rcount, rdtype,
399                                    comm);
400         } else {
401             return allgather__ompi_neighborexchange(sbuf, scount, sdtype,
402                                                     rbuf, rcount, rdtype,
403                                                     comm);
404         }
405     }
406
407 #if defined(USE_MPICH2_DECISION)
408     /* Decision as in MPICH-2
409        presented in Thakur et.al. "Optimization of Collective Communication
410        Operations in MPICH", International Journal of High Performance Computing
411        Applications, Vol. 19, No. 1, 49-66 (2005)
412        - for power-of-two processes and small and medium size messages
413        (up to 512KB) use recursive doubling
414        - for non-power-of-two processes and small messages (80KB) use bruck,
415        - for everything else use ring.
416     */
417     if ((pow2_size == communicator_size) && (total_dsize < 524288)) {
418         return allgather__rdb(sbuf, scount, sdtype,
419                               rbuf, rcount, rdtype,
420                               comm);
421     } else if (total_dsize <= 81920) {
422         return allgather__bruck(sbuf, scount, sdtype,
423                                 rbuf, rcount, rdtype,
424                                 comm);
425     }
426     return allgather__ring(sbuf, scount, sdtype,
427                            rbuf, rcount, rdtype,
428                            comm);
429 #endif  /* defined(USE_MPICH2_DECISION) */
430 }
431
432 int allgatherv__ompi(const void *sbuf, int scount,
433                      MPI_Datatype sdtype,
434                      void* rbuf, const int *rcounts,
435                      const int *rdispls,
436                      MPI_Datatype rdtype,
437                      MPI_Comm  comm
438                      )
439 {
440     int i;
441     int communicator_size;
442     size_t dsize, total_dsize;
443
444     communicator_size = comm->size();
445
446     /* Special case for 2 processes */
447     if (communicator_size == 2) {
448         return allgatherv__pair(sbuf, scount, sdtype,
449                                 rbuf, rcounts, rdispls, rdtype,
450                                 comm);
451     }
452
453     /* Determine complete data size */
454     dsize=sdtype->size();
455     total_dsize = 0;
456     for (i = 0; i < communicator_size; i++) {
457         total_dsize += dsize * rcounts[i];
458     }
459
460     /* Decision based on allgather decision.   */
461     if (total_dsize < 50000) {
462         return allgatherv__ompi_bruck(sbuf, scount, sdtype,
463                                       rbuf, rcounts, rdispls, rdtype,
464                                       comm);
465
466     } else {
467         if (communicator_size % 2) {
468             return allgatherv__ring(sbuf, scount, sdtype,
469                                     rbuf, rcounts, rdispls, rdtype,
470                                     comm);
471         } else {
472             return  allgatherv__ompi_neighborexchange(sbuf, scount, sdtype,
473                                                       rbuf, rcounts, rdispls, rdtype,
474                                                       comm);
475         }
476     }
477 }
478
479 int gather__ompi(const void *sbuf, int scount,
480                  MPI_Datatype sdtype,
481                  void* rbuf, int rcount,
482                  MPI_Datatype rdtype,
483                  int root,
484                  MPI_Comm  comm
485                  )
486 {
487     //const int large_segment_size = 32768;
488     //const int small_segment_size = 1024;
489
490     //const size_t large_block_size = 92160;
491     const size_t intermediate_block_size = 6000;
492     const size_t small_block_size = 1024;
493
494     const int large_communicator_size = 60;
495     const int small_communicator_size = 10;
496
497     int communicator_size, rank;
498     size_t dsize, block_size;
499
500     XBT_DEBUG("smpi_coll_tuned_gather_ompi");
501
502     communicator_size = comm->size();
503     rank = comm->rank();
504
505     // Determine block size
506     if (rank == root) {
507         dsize = rdtype->size();
508         block_size = dsize * rcount;
509     } else {
510         dsize = sdtype->size();
511         block_size = dsize * scount;
512     }
513
514 /*    if (block_size > large_block_size) {*/
515 /*        return smpi_coll_tuned_gather_ompi_linear_sync (sbuf, scount, sdtype, */
516 /*                                                         rbuf, rcount, rdtype, */
517 /*                                                         root, comm);*/
518
519 /*    } else*/ if (block_size > intermediate_block_size) {
520         return gather__ompi_linear_sync(sbuf, scount, sdtype,
521                                         rbuf, rcount, rdtype,
522                                         root, comm);
523
524     } else if ((communicator_size > large_communicator_size) ||
525                ((communicator_size > small_communicator_size) &&
526                 (block_size < small_block_size))) {
527         return gather__ompi_binomial(sbuf, scount, sdtype,
528                                      rbuf, rcount, rdtype,
529                                      root, comm);
530
531     }
532     // Otherwise, use basic linear
533     return gather__ompi_basic_linear(sbuf, scount, sdtype,
534                                      rbuf, rcount, rdtype,
535                                      root, comm);
536 }
537
538
539 int scatter__ompi(const void *sbuf, int scount,
540                   MPI_Datatype sdtype,
541                   void* rbuf, int rcount,
542                   MPI_Datatype rdtype,
543                   int root, MPI_Comm  comm
544                   )
545 {
546     const size_t small_block_size = 300;
547     const int small_comm_size = 10;
548     int communicator_size, rank;
549     size_t dsize, block_size;
550
551     XBT_DEBUG("Coll_scatter_ompi::scatter");
552
553     communicator_size = comm->size();
554     rank = comm->rank();
555     // Determine block size
556     if (root == rank) {
557         dsize=sdtype->size();
558         block_size = dsize * scount;
559     } else {
560         dsize=rdtype->size();
561         block_size = dsize * rcount;
562     }
563
564     if ((communicator_size > small_comm_size) &&
565         (block_size < small_block_size)) {
566       std::unique_ptr<unsigned char[]> tmp_buf;
567       if (rank != root) {
568         tmp_buf.reset(new unsigned char[rcount * rdtype->get_extent()]);
569         sbuf   = tmp_buf.get();
570         scount = rcount;
571         sdtype = rdtype;
572       }
573       return scatter__ompi_binomial(sbuf, scount, sdtype, rbuf, rcount, rdtype, root, comm);
574     }
575     return scatter__ompi_basic_linear(sbuf, scount, sdtype,
576                                       rbuf, rcount, rdtype,
577                                       root, comm);
578 }
579
580 }
581 }