Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
add scatter algos from ompi
[simgrid.git] / src / smpi / colls / allreduce-NTS.c
1 #include "colls_private.h"
2 /* IMPLEMENTED BY PITCH PATARASUK 
3    Non-topoloty-specific all-reduce operation designed bandwidth optimally */
4
5 /* ** NOTE **
6    Use -DMPICH2_REDUCTION if this code does not compile.
7    MPICH1 code also work on MPICH2 on our cluster and the performance are similar.
8    This code assume commutative and associative reduce operator (MPI_SUM, MPI_MAX, etc).
9 */
10
11 //#include <star-reduction.c>
12
13 int
14 smpi_coll_tuned_allreduce_NTS(void *sbuf, void *rbuf, int rcount,
15                               MPI_Datatype dtype, MPI_Op op, MPI_Comm comm)
16 {
17   int tag = 5000;
18   MPI_Status status;
19   int rank, i, size, count;
20   int send_offset, recv_offset;
21   int remainder, remainder_flag, remainder_offset;
22
23   rank = smpi_comm_rank(MPI_COMM_WORLD);
24   size = smpi_comm_size(MPI_COMM_WORLD);
25
26   /* make it compatible with all data type */
27   MPI_Aint extent;
28   extent = smpi_datatype_get_extent(dtype);
29
30   /* when communication size is smaller than number of process (not support) */
31   if (rcount < size) {
32     return mpi_coll_allreduce_fun(sbuf, rbuf, rcount, dtype, op, comm);
33   }
34
35   /* when communication size is not divisible by number of process: 
36      call the native implementation for the remain chunk at the end of the operation */
37   if (rcount % size != 0) {
38     remainder = rcount % size;
39     remainder_flag = 1;
40     remainder_offset = (rcount / size) * size * extent;
41   } else {
42     remainder = remainder_flag = remainder_offset = 0;
43   }
44
45   /* size of each point-to-point communication is equal to the size of the whole message
46      divided by number of processes
47    */
48   count = rcount / size;
49
50   /* our ALL-REDUCE implementation
51      1. copy (partial of)send_buf to recv_buf
52      2. use logical ring reduce-scatter
53      3. use logical ring all-gather 
54    */
55
56   // copy partial data
57   send_offset = ((rank - 1 + size) % size) * count * extent;
58   recv_offset = ((rank - 1 + size) % size) * count * extent;
59   smpi_mpi_sendrecv((char *) sbuf + send_offset, count, dtype, rank, tag - 1,
60                (char *) rbuf + recv_offset, count, dtype, rank, tag - 1, comm,
61                &status);
62
63   // reduce-scatter
64   for (i = 0; i < (size - 1); i++) {
65     send_offset = ((rank - 1 - i + size) % size) * count * extent;
66     recv_offset = ((rank - 2 - i + size) % size) * count * extent;
67     smpi_mpi_sendrecv((char *) rbuf + send_offset, count, dtype, ((rank + 1) % size),
68                  tag + i, (char *) rbuf + recv_offset, count, dtype,
69                  ((rank + size - 1) % size), tag + i, comm, &status);
70
71     // compute result to rbuf+recv_offset
72     smpi_op_apply(op, (char *)sbuf + recv_offset, (char *)rbuf + recv_offset, &count, &dtype);
73   }
74
75   // all-gather
76   for (i = 0; i < (size - 1); i++) {
77     send_offset = ((rank - i + size) % size) * count * extent;
78     recv_offset = ((rank - 1 - i + size) % size) * count * extent;
79     smpi_mpi_sendrecv((char *) rbuf + send_offset, count, dtype, ((rank + 1) % size),
80                  tag + i, (char *) rbuf + recv_offset, count, dtype,
81                  ((rank + size - 1) % size), tag + i, comm, &status);
82   }
83
84   /* when communication size is not divisible by number of process: 
85      call the native implementation for the remain chunk at the end of the operation */
86   if (remainder_flag) {
87     XBT_WARN("MPI_allreduce_NTS use default MPI_allreduce.");
88     smpi_mpi_allreduce((char *) sbuf + remainder_offset,
89                          (char *) rbuf + remainder_offset, remainder, dtype, op,
90                          comm);
91     return MPI_SUCCESS;    
92   }
93
94   return MPI_SUCCESS;
95 }