Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
use tuned barrier here if provided
[simgrid.git] / src / smpi / colls / allreduce-smp-rdb.c
1 #include "colls_private.h"
2 /* IMPLEMENTED BY PITCH PATARASUK 
3    Non-topoloty-specific (however, number of cores/node need to be changed) 
4    all-reduce operation designed for smp clusters
5    It uses 2-layer communication: binomial for intra-communication 
6    and rdb for inter-communication*/
7
8 /* change number of core per smp-node
9    we assume that number of core per process will be the same for all implementations */
10 #ifndef NUM_CORE
11 #define NUM_CORE 8
12 #endif
13
14 /* ** NOTE **
15    Use -DMPICH2 if this code does not compile.
16    MPICH1 code also work on MPICH2 on our cluster and the performance are similar.
17    This code assume commutative and associative reduce operator (MPI_SUM, MPI_MAX, etc).
18 */
19
20 //#include <star-reduction.c>
21
22 /*
23 This fucntion performs all-reduce operation as follow.
24 1) binomial_tree reduce inside each SMP node
25 2) Recursive doubling intra-communication between root of each SMP node
26 3) binomial_tree bcast inside each SMP node
27 */
28 int smpi_coll_tuned_allreduce_smp_rdb(void *send_buf, void *recv_buf, int count,
29                                       MPI_Datatype dtype, MPI_Op op,
30                                       MPI_Comm comm)
31 {
32   int comm_size, rank;
33   void *tmp_buf;
34   int tag = COLL_TAG_ALLREDUCE;
35   int mask, src, dst;
36   MPI_Status status;
37   int num_core = NUM_CORE;
38   /*
39      #ifdef MPICH2_REDUCTION
40      MPI_User_function * uop = MPIR_Op_table[op % 16 - 1];
41      #else
42      MPI_User_function *uop;
43      struct MPIR_OP *op_ptr;
44      op_ptr = MPIR_ToPointer(op);
45      uop  = op_ptr->op;
46      #endif
47    */
48   comm_size = smpi_comm_size(comm);
49   rank = smpi_comm_rank(comm);
50   MPI_Aint extent;
51   extent = smpi_datatype_get_extent(dtype);
52   tmp_buf = (void *) xbt_malloc(count * extent);
53
54   /* compute intra and inter ranking */
55   int intra_rank, inter_rank;
56   intra_rank = rank % num_core;
57   inter_rank = rank / num_core;
58
59   /* size of processes participate in intra communications =>
60      should be equal to number of machines */
61   int inter_comm_size = (comm_size + num_core - 1) / num_core;
62
63   /* copy input buffer to output buffer */
64   smpi_mpi_sendrecv(send_buf, count, dtype, rank, tag,
65                recv_buf, count, dtype, rank, tag, comm, &status);
66
67   /* start binomial reduce intra communication inside each SMP node */
68   mask = 1;
69   while (mask < num_core) {
70     if ((mask & intra_rank) == 0) {
71       src = (inter_rank * num_core) + (intra_rank | mask);
72       if (src < comm_size) {
73         smpi_mpi_recv(tmp_buf, count, dtype, src, tag, comm, &status);
74         smpi_op_apply(op, tmp_buf, recv_buf, &count, &dtype);
75       }
76     } else {
77       dst = (inter_rank * num_core) + (intra_rank & (~mask));
78       smpi_mpi_send(recv_buf, count, dtype, dst, tag, comm);
79       break;
80     }
81     mask <<= 1;
82   }                             /* end binomial reduce intra-communication */
83
84
85   /* start rdb (recursive doubling) all-reduce inter-communication 
86      between each SMP nodes : each node only have one process that can communicate
87      to other nodes */
88   if (intra_rank == 0) {
89
90     /* find nearest power-of-two less than or equal to inter_comm_size */
91     int pof2, rem, newrank, newdst;
92     pof2 = 1;
93     while (pof2 <= inter_comm_size)
94       pof2 <<= 1;
95     pof2 >>= 1;
96     rem = inter_comm_size - pof2;
97
98     /* In the non-power-of-two case, all even-numbered
99        processes of rank < 2*rem send their data to
100        (rank+1). These even-numbered processes no longer
101        participate in the algorithm until the very end.
102      */
103     if (inter_rank < 2 * rem) {
104       if (inter_rank % 2 == 0) {
105         dst = rank + num_core;
106         smpi_mpi_send(recv_buf, count, dtype, dst, tag, comm);
107         newrank = -1;
108       } else {
109         src = rank - num_core;
110         smpi_mpi_recv(tmp_buf, count, dtype, src, tag, comm, &status);
111         smpi_op_apply(op, tmp_buf, recv_buf, &count, &dtype);
112         newrank = inter_rank / 2;
113       }
114     } else {
115       newrank = inter_rank - rem;
116     }
117
118     /* example inter-communication RDB rank change algorithm 
119        0,4,8,12..36 <= true rank (assume 4 core per SMP)
120        0123 4567 89 <= inter_rank
121        1 3 4567 89 (1,3 got data from 0,2 : 0,2 will be idle until the end)
122        0 1 4567 89 
123        0 1 2345 67 => newrank
124      */
125
126     if (newrank != -1) {
127       mask = 1;
128       while (mask < pof2) {
129         newdst = newrank ^ mask;
130         /* find real rank of dest */
131         dst = (newdst < rem) ? newdst * 2 + 1 : newdst + rem;
132         dst *= num_core;
133
134         /* exchange data in rdb manner */
135         smpi_mpi_sendrecv(recv_buf, count, dtype, dst, tag, tmp_buf, count, dtype,
136                      dst, tag, comm, &status);
137         smpi_op_apply(op, tmp_buf, recv_buf, &count, &dtype);
138         mask <<= 1;
139       }
140     }
141
142     /* non pof2 case 
143        left-over processes (all even ranks: < 2 * rem) get the result    
144      */
145     if (inter_rank < 2 * rem) {
146       if (inter_rank % 2) {
147         smpi_mpi_send(recv_buf, count, dtype, rank - num_core, tag, comm);
148       } else {
149         smpi_mpi_recv(recv_buf, count, dtype, rank + num_core, tag, comm, &status);
150       }
151     }
152   }
153
154   /* start binomial broadcast intra-communication inside each SMP nodes */
155   int num_core_in_current_smp = num_core;
156   if (inter_rank == (inter_comm_size - 1)) {
157     num_core_in_current_smp = comm_size - (inter_rank * num_core);
158   }
159   mask = 1;
160   while (mask < num_core_in_current_smp) {
161     if (intra_rank & mask) {
162       src = (inter_rank * num_core) + (intra_rank - mask);
163       smpi_mpi_recv(recv_buf, count, dtype, src, tag, comm, &status);
164       break;
165     }
166     mask <<= 1;
167   }
168   mask >>= 1;
169
170   while (mask > 0) {
171     dst = (inter_rank * num_core) + (intra_rank + mask);
172     if (dst < comm_size) {
173       smpi_mpi_send(recv_buf, count, dtype, dst, tag, comm);
174     }
175     mask >>= 1;
176   }
177
178   free(tmp_buf);
179   return MPI_SUCCESS;
180 }