Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
have smp-aware algorithms use number of cores on the node as basis for their computat...
[simgrid.git] / src / smpi / colls / allreduce-smp-rdb.c
1 #include "colls_private.h"
2 /* IMPLEMENTED BY PITCH PATARASUK 
3    Non-topoloty-specific (however, number of cores/node need to be changed) 
4    all-reduce operation designed for smp clusters
5    It uses 2-layer communication: binomial for intra-communication 
6    and rdb for inter-communication*/
7
8 /* change number of core per smp-node
9    we assume that number of core per process will be the same for all implementations */
10 #ifndef NUM_CORE
11 #define NUM_CORE 8
12 #endif
13
14 /* ** NOTE **
15    Use -DMPICH2 if this code does not compile.
16    MPICH1 code also work on MPICH2 on our cluster and the performance are similar.
17    This code assume commutative and associative reduce operator (MPI_SUM, MPI_MAX, etc).
18 */
19
20 //#include <star-reduction.c>
21
22 /*
23 This fucntion performs all-reduce operation as follow.
24 1) binomial_tree reduce inside each SMP node
25 2) Recursive doubling intra-communication between root of each SMP node
26 3) binomial_tree bcast inside each SMP node
27 */
28 int smpi_coll_tuned_allreduce_smp_rdb(void *send_buf, void *recv_buf, int count,
29                                       MPI_Datatype dtype, MPI_Op op,
30                                       MPI_Comm comm)
31 {
32   int comm_size, rank;
33   void *tmp_buf;
34   int tag = COLL_TAG_ALLREDUCE;
35   int mask, src, dst;
36   MPI_Status status;
37   int num_core = simcall_host_get_core(SIMIX_host_self());
38   // do we use the default one or the number of cores in the platform ?
39   // if the number of cores is one, the platform may be simulated with 1 node = 1 core
40   if (num_core == 1) num_core = NUM_CORE;
41   /*
42      #ifdef MPICH2_REDUCTION
43      MPI_User_function * uop = MPIR_Op_table[op % 16 - 1];
44      #else
45      MPI_User_function *uop;
46      struct MPIR_OP *op_ptr;
47      op_ptr = MPIR_ToPointer(op);
48      uop  = op_ptr->op;
49      #endif
50    */
51   comm_size = smpi_comm_size(comm);
52   rank = smpi_comm_rank(comm);
53   MPI_Aint extent;
54   extent = smpi_datatype_get_extent(dtype);
55   tmp_buf = (void *) xbt_malloc(count * extent);
56
57   /* compute intra and inter ranking */
58   int intra_rank, inter_rank;
59   intra_rank = rank % num_core;
60   inter_rank = rank / num_core;
61
62   /* size of processes participate in intra communications =>
63      should be equal to number of machines */
64   int inter_comm_size = (comm_size + num_core - 1) / num_core;
65
66   /* copy input buffer to output buffer */
67   smpi_mpi_sendrecv(send_buf, count, dtype, rank, tag,
68                recv_buf, count, dtype, rank, tag, comm, &status);
69
70   /* start binomial reduce intra communication inside each SMP node */
71   mask = 1;
72   while (mask < num_core) {
73     if ((mask & intra_rank) == 0) {
74       src = (inter_rank * num_core) + (intra_rank | mask);
75       if (src < comm_size) {
76         smpi_mpi_recv(tmp_buf, count, dtype, src, tag, comm, &status);
77         smpi_op_apply(op, tmp_buf, recv_buf, &count, &dtype);
78       }
79     } else {
80       dst = (inter_rank * num_core) + (intra_rank & (~mask));
81       smpi_mpi_send(recv_buf, count, dtype, dst, tag, comm);
82       break;
83     }
84     mask <<= 1;
85   }                             /* end binomial reduce intra-communication */
86
87
88   /* start rdb (recursive doubling) all-reduce inter-communication 
89      between each SMP nodes : each node only have one process that can communicate
90      to other nodes */
91   if (intra_rank == 0) {
92
93     /* find nearest power-of-two less than or equal to inter_comm_size */
94     int pof2, rem, newrank, newdst;
95     pof2 = 1;
96     while (pof2 <= inter_comm_size)
97       pof2 <<= 1;
98     pof2 >>= 1;
99     rem = inter_comm_size - pof2;
100
101     /* In the non-power-of-two case, all even-numbered
102        processes of rank < 2*rem send their data to
103        (rank+1). These even-numbered processes no longer
104        participate in the algorithm until the very end.
105      */
106     if (inter_rank < 2 * rem) {
107       if (inter_rank % 2 == 0) {
108         dst = rank + num_core;
109         smpi_mpi_send(recv_buf, count, dtype, dst, tag, comm);
110         newrank = -1;
111       } else {
112         src = rank - num_core;
113         smpi_mpi_recv(tmp_buf, count, dtype, src, tag, comm, &status);
114         smpi_op_apply(op, tmp_buf, recv_buf, &count, &dtype);
115         newrank = inter_rank / 2;
116       }
117     } else {
118       newrank = inter_rank - rem;
119     }
120
121     /* example inter-communication RDB rank change algorithm 
122        0,4,8,12..36 <= true rank (assume 4 core per SMP)
123        0123 4567 89 <= inter_rank
124        1 3 4567 89 (1,3 got data from 0,2 : 0,2 will be idle until the end)
125        0 1 4567 89 
126        0 1 2345 67 => newrank
127      */
128
129     if (newrank != -1) {
130       mask = 1;
131       while (mask < pof2) {
132         newdst = newrank ^ mask;
133         /* find real rank of dest */
134         dst = (newdst < rem) ? newdst * 2 + 1 : newdst + rem;
135         dst *= num_core;
136
137         /* exchange data in rdb manner */
138         smpi_mpi_sendrecv(recv_buf, count, dtype, dst, tag, tmp_buf, count, dtype,
139                      dst, tag, comm, &status);
140         smpi_op_apply(op, tmp_buf, recv_buf, &count, &dtype);
141         mask <<= 1;
142       }
143     }
144
145     /* non pof2 case 
146        left-over processes (all even ranks: < 2 * rem) get the result    
147      */
148     if (inter_rank < 2 * rem) {
149       if (inter_rank % 2) {
150         smpi_mpi_send(recv_buf, count, dtype, rank - num_core, tag, comm);
151       } else {
152         smpi_mpi_recv(recv_buf, count, dtype, rank + num_core, tag, comm, &status);
153       }
154     }
155   }
156
157   /* start binomial broadcast intra-communication inside each SMP nodes */
158   int num_core_in_current_smp = num_core;
159   if (inter_rank == (inter_comm_size - 1)) {
160     num_core_in_current_smp = comm_size - (inter_rank * num_core);
161   }
162   mask = 1;
163   while (mask < num_core_in_current_smp) {
164     if (intra_rank & mask) {
165       src = (inter_rank * num_core) + (intra_rank - mask);
166       smpi_mpi_recv(recv_buf, count, dtype, src, tag, comm, &status);
167       break;
168     }
169     mask <<= 1;
170   }
171   mask >>= 1;
172
173   while (mask > 0) {
174     dst = (inter_rank * num_core) + (intra_rank + mask);
175     if (dst < comm_size) {
176       smpi_mpi_send(recv_buf, count, dtype, dst, tag, comm);
177     }
178     mask >>= 1;
179   }
180
181   free(tmp_buf);
182   return MPI_SUCCESS;
183 }