Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add collectives for allgather, allreduce, bcast and reduce
[simgrid.git] / src / smpi / colls / allreduce-lr.c
1 #include "colls.h"
2
3 /* IMPLEMENTED BY PITCH PATARASUK 
4    Non-topoloty-specific all-reduce operation designed bandwidth optimally 
5    Bug fixing by Xin Yuan, 04/04/2008
6 */
7
8 /* ** NOTE **
9    Use -DMPICH2_REDUCTION if this code does not compile.
10    MPICH1 code also work on MPICH2 on our cluster and the performance are similar.
11    This code assume commutative and associative reduce operator (MPI_SUM, MPI_MAX, etc).
12 */
13
14 //#include <star-reduction.c>
15
16 int
17 smpi_coll_tuned_allreduce_lr(void *sbuf, void *rbuf, int rcount,
18                              MPI_Datatype dtype, MPI_Op op, MPI_Comm comm)
19 {
20   int tag = 5000;
21   MPI_Status status;
22   int rank, i, size, count;
23   int send_offset, recv_offset;
24   int remainder, remainder_flag, remainder_offset;
25
26   MPI_Comm_rank(MPI_COMM_WORLD, &rank);
27   MPI_Comm_size(MPI_COMM_WORLD, &size);
28
29   /* make it compatible with all data type */
30   MPI_Aint extent;
31   MPI_Type_extent(dtype, &extent);
32
33   /* when communication size is smaller than number of process (not support) */
34   if (rcount < size) {
35     return MPI_Allreduce(sbuf, rbuf, rcount, dtype, op, comm);
36   }
37
38   /* when communication size is not divisible by number of process: 
39      call the native implementation for the remain chunk at the end of the operation */
40   else if (rcount % size != 0) {
41     remainder = rcount % size;
42     remainder_flag = 1;
43     remainder_offset = (rcount / size) * size * extent;
44   } else {
45     remainder_flag = remainder_offset = 0;
46   }
47
48   /* size of each point-to-point communication is equal to the size of the whole message
49      divided by number of processes
50    */
51   count = rcount / size;
52
53   /* our ALL-REDUCE implementation
54      1. copy (partial of)send_buf to recv_buf
55      2. use logical ring reduce-scatter
56      3. use logical ring all-gather 
57    */
58
59   // copy partial data
60   send_offset = ((rank - 1 + size) % size) * count * extent;
61   recv_offset = ((rank - 1 + size) % size) * count * extent;
62   MPI_Sendrecv((char *) sbuf + send_offset, count, dtype, rank, tag - 1,
63                (char *) rbuf + recv_offset, count, dtype, rank, tag - 1, comm,
64                &status);
65
66   // reduce-scatter
67   for (i = 0; i < (size - 1); i++) {
68     send_offset = ((rank - 1 - i + 2 * size) % size) * count * extent;
69     recv_offset = ((rank - 2 - i + 2 * size) % size) * count * extent;
70     //    recv_offset = ((rank-i+2*size)%size)*count*extent;
71     MPI_Sendrecv((char *) rbuf + send_offset, count, dtype, ((rank + 1) % size),
72                  tag + i, (char *) rbuf + recv_offset, count, dtype,
73                  ((rank + size - 1) % size), tag + i, comm, &status);
74
75     // compute result to rbuf+recv_offset
76     star_reduction(op, (char *) sbuf + recv_offset, (char *) rbuf + recv_offset,
77                    &count, &dtype);
78   }
79
80   // all-gather
81   for (i = 0; i < (size - 1); i++) {
82     send_offset = ((rank - i + 2 * size) % size) * count * extent;
83     recv_offset = ((rank - 1 - i + 2 * size) % size) * count * extent;
84     MPI_Sendrecv((char *) rbuf + send_offset, count, dtype, ((rank + 1) % size),
85                  tag + i, (char *) rbuf + recv_offset, count, dtype,
86                  ((rank + size - 1) % size), tag + i, comm, &status);
87   }
88
89   /* when communication size is not divisible by number of process: 
90      call the native implementation for the remain chunk at the end of the operation */
91   if (remainder_flag) {
92     return MPI_Allreduce((char *) sbuf + remainder_offset,
93                          (char *) rbuf + remainder_offset, remainder, dtype, op,
94                          comm);
95   }
96
97   return 0;
98 }