Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'actor-yield' of github.com:Takishipp/simgrid into actor-yield
[simgrid.git] / src / smpi / colls / allreduce / allreduce-smp-rdb.cpp
1 /* Copyright (c) 2013-2017. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "../colls_private.hpp"
8 /* IMPLEMENTED BY PITCH PATARASUK
9    Non-topoloty-specific (however, number of cores/node need to be changed)
10    all-reduce operation designed for smp clusters
11    It uses 2-layer communication: binomial for intra-communication
12    and rdb for inter-communication*/
13
14
15 /* ** NOTE **
16    Use -DMPICH2 if this code does not compile.
17    MPICH1 code also work on MPICH2 on our cluster and the performance are similar.
18    This code assume commutative and associative reduce operator (MPI_SUM, MPI_MAX, etc).
19 */
20
21 //#include <star-reduction.c>
22
23 /*
24 This fucntion performs all-reduce operation as follow.
25 1) binomial_tree reduce inside each SMP node
26 2) Recursive doubling intra-communication between root of each SMP node
27 3) binomial_tree bcast inside each SMP node
28 */
29 namespace simgrid{
30 namespace smpi{
31 int Coll_allreduce_smp_rdb::allreduce(void *send_buf, void *recv_buf, int count,
32                                       MPI_Datatype dtype, MPI_Op op,
33                                       MPI_Comm comm)
34 {
35   int comm_size, rank;
36   void *tmp_buf;
37   int tag = COLL_TAG_ALLREDUCE;
38   int mask, src, dst;
39   MPI_Status status;
40   if(comm->get_leaders_comm()==MPI_COMM_NULL){
41     comm->init_smp();
42   }
43   int num_core=1;
44   if (comm->is_uniform()){
45     num_core = comm->get_intra_comm()->size();
46   }
47   /*
48      #ifdef MPICH2_REDUCTION
49      MPI_User_function * uop = MPIR_Op_table[op % 16 - 1];
50      #else
51      MPI_User_function *uop;
52      MPIR_OP *op_ptr;
53      op_ptr = MPIR_ToPointer(op);
54      uop  = op_ptr->op;
55      #endif
56    */
57   comm_size = comm->size();
58   rank = comm->rank();
59   MPI_Aint extent;
60   extent = dtype->get_extent();
61   tmp_buf = (void *) smpi_get_tmp_sendbuffer(count * extent);
62
63   /* compute intra and inter ranking */
64   int intra_rank, inter_rank;
65   intra_rank = rank % num_core;
66   inter_rank = rank / num_core;
67
68   /* size of processes participate in intra communications =>
69      should be equal to number of machines */
70   int inter_comm_size = (comm_size + num_core - 1) / num_core;
71
72   /* copy input buffer to output buffer */
73   Request::sendrecv(send_buf, count, dtype, rank, tag,
74                recv_buf, count, dtype, rank, tag, comm, &status);
75
76   /* start binomial reduce intra communication inside each SMP node */
77   mask = 1;
78   while (mask < num_core) {
79     if ((mask & intra_rank) == 0) {
80       src = (inter_rank * num_core) + (intra_rank | mask);
81       if (src < comm_size) {
82         Request::recv(tmp_buf, count, dtype, src, tag, comm, &status);
83         if(op!=MPI_OP_NULL) op->apply( tmp_buf, recv_buf, &count, dtype);
84       }
85     } else {
86       dst = (inter_rank * num_core) + (intra_rank & (~mask));
87       Request::send(recv_buf, count, dtype, dst, tag, comm);
88       break;
89     }
90     mask <<= 1;
91   }                             /* end binomial reduce intra-communication */
92
93
94   /* start rdb (recursive doubling) all-reduce inter-communication
95      between each SMP nodes : each node only have one process that can communicate
96      to other nodes */
97   if (intra_rank == 0) {
98
99     /* find nearest power-of-two less than or equal to inter_comm_size */
100     int pof2, rem, newrank, newdst;
101     pof2 = 1;
102     while (pof2 <= inter_comm_size)
103       pof2 <<= 1;
104     pof2 >>= 1;
105     rem = inter_comm_size - pof2;
106
107     /* In the non-power-of-two case, all even-numbered
108        processes of rank < 2*rem send their data to
109        (rank+1). These even-numbered processes no longer
110        participate in the algorithm until the very end.
111      */
112     if (inter_rank < 2 * rem) {
113       if (inter_rank % 2 == 0) {
114         dst = rank + num_core;
115         Request::send(recv_buf, count, dtype, dst, tag, comm);
116         newrank = -1;
117       } else {
118         src = rank - num_core;
119         Request::recv(tmp_buf, count, dtype, src, tag, comm, &status);
120         if(op!=MPI_OP_NULL) op->apply( tmp_buf, recv_buf, &count, dtype);
121         newrank = inter_rank / 2;
122       }
123     } else {
124       newrank = inter_rank - rem;
125     }
126
127     /* example inter-communication RDB rank change algorithm
128        0,4,8,12..36 <= true rank (assume 4 core per SMP)
129        0123 4567 89 <= inter_rank
130        1 3 4567 89 (1,3 got data from 0,2 : 0,2 will be idle until the end)
131        0 1 4567 89
132        0 1 2345 67 => newrank
133      */
134
135     if (newrank != -1) {
136       mask = 1;
137       while (mask < pof2) {
138         newdst = newrank ^ mask;
139         /* find real rank of dest */
140         dst = (newdst < rem) ? newdst * 2 + 1 : newdst + rem;
141         dst *= num_core;
142
143         /* exchange data in rdb manner */
144         Request::sendrecv(recv_buf, count, dtype, dst, tag, tmp_buf, count, dtype,
145                      dst, tag, comm, &status);
146         if(op!=MPI_OP_NULL) op->apply( tmp_buf, recv_buf, &count, dtype);
147         mask <<= 1;
148       }
149     }
150
151     /* non pof2 case
152        left-over processes (all even ranks: < 2 * rem) get the result
153      */
154     if (inter_rank < 2 * rem) {
155       if (inter_rank % 2) {
156         Request::send(recv_buf, count, dtype, rank - num_core, tag, comm);
157       } else {
158         Request::recv(recv_buf, count, dtype, rank + num_core, tag, comm, &status);
159       }
160     }
161   }
162
163   /* start binomial broadcast intra-communication inside each SMP nodes */
164   int num_core_in_current_smp = num_core;
165   if (inter_rank == (inter_comm_size - 1)) {
166     num_core_in_current_smp = comm_size - (inter_rank * num_core);
167   }
168   mask = 1;
169   while (mask < num_core_in_current_smp) {
170     if (intra_rank & mask) {
171       src = (inter_rank * num_core) + (intra_rank - mask);
172       Request::recv(recv_buf, count, dtype, src, tag, comm, &status);
173       break;
174     }
175     mask <<= 1;
176   }
177   mask >>= 1;
178
179   while (mask > 0) {
180     dst = (inter_rank * num_core) + (intra_rank + mask);
181     if (dst < comm_size) {
182       Request::send(recv_buf, count, dtype, dst, tag, comm);
183     }
184     mask >>= 1;
185   }
186
187   smpi_free_tmp_buffer(tmp_buf);
188   return MPI_SUCCESS;
189 }
190 }
191 }