Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
sanitize the OOP of kernel::profile
[simgrid.git] / src / smpi / colls / allreduce / allreduce-smp-binomial-pipeline.cpp
1 /* Copyright (c) 2013-2019. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "../colls_private.hpp"
8 /* IMPLEMENTED BY PITCH PATARASUK
9    Non-topoloty-specific (however, number of cores/node need to be changed)
10    all-reduce operation designed for smp clusters
11    It uses 2-layer communication: binomial for both intra-communication
12    inter-communication
13    The communication are done in a pipeline fashion */
14
15
16
17 /* this is a default segment size for pipelining,
18    but it is typically passed as a command line argument */
19 int allreduce_smp_binomial_pipeline_segment_size = 4096;
20
21 /* ** NOTE **
22    This code is modified from allreduce-smp-binomial.c by wrapping the code with pipeline effect as follow
23    for (loop over pipelength) {
24      smp-binomial main code;
25    }
26 */
27
28 /* ** NOTE **
29    Use -DMPICH2 if this code does not compile.
30    MPICH1 code also work on MPICH2 on our cluster and the performance are similar.
31    This code assume commutative and associative reduce operator (MPI_SUM, MPI_MAX, etc).
32 */
33
34 /*
35 This fucntion performs all-reduce operation as follow. ** in a pipeline fashion **
36 1) binomial_tree reduce inside each SMP node
37 2) binomial_tree reduce intra-communication between root of each SMP node
38 3) binomial_tree bcast intra-communication between root of each SMP node
39 4) binomial_tree bcast inside each SMP node
40 */
41 namespace simgrid{
42 namespace smpi{
43 int Coll_allreduce_smp_binomial_pipeline::allreduce(void *send_buf,
44                                                     void *recv_buf, int count,
45                                                     MPI_Datatype dtype,
46                                                     MPI_Op op, MPI_Comm comm)
47 {
48   int comm_size, rank;
49   void *tmp_buf;
50   int tag = COLL_TAG_ALLREDUCE;
51   int mask, src, dst;
52   MPI_Status status;
53   if(comm->get_leaders_comm()==MPI_COMM_NULL){
54     comm->init_smp();
55   }
56   int num_core=1;
57   if (comm->is_uniform()){
58     num_core = comm->get_intra_comm()->size();
59   }
60
61   comm_size = comm->size();
62   rank = comm->rank();
63   MPI_Aint extent;
64   extent = dtype->get_extent();
65   tmp_buf = (void *) smpi_get_tmp_sendbuffer(count * extent);
66
67   int intra_rank, inter_rank;
68   intra_rank = rank % num_core;
69   inter_rank = rank / num_core;
70
71   int phase;
72   int send_offset;
73   int recv_offset;
74   int pcount = allreduce_smp_binomial_pipeline_segment_size;
75   if (pcount > count) {
76     pcount = count;
77   }
78
79   /* size of processes participate in intra communications =>
80      should be equal to number of machines */
81   int inter_comm_size = (comm_size + num_core - 1) / num_core;
82
83   /* copy input buffer to output buffer */
84   Request::sendrecv(send_buf, count, dtype, rank, tag,
85                recv_buf, count, dtype, rank, tag, comm, &status);
86
87   /* compute pipe length */
88   int pipelength;
89   pipelength = count / pcount;
90
91   /* pipelining over pipelength (+3 is because we have 4 stages:
92      reduce-intra, reduce-inter, bcast-inter, bcast-intra */
93   for (phase = 0; phase < pipelength + 3; phase++) {
94
95     /* start binomial reduce intra communication inside each SMP node */
96     if (phase < pipelength) {
97       mask = 1;
98       while (mask < num_core) {
99         if ((mask & intra_rank) == 0) {
100           src = (inter_rank * num_core) + (intra_rank | mask);
101           if (src < comm_size) {
102             recv_offset = phase * pcount * extent;
103             Request::recv(tmp_buf, pcount, dtype, src, tag, comm, &status);
104             if(op!=MPI_OP_NULL) op->apply( tmp_buf, (char *)recv_buf + recv_offset, &pcount, dtype);
105           }
106         } else {
107           send_offset = phase * pcount * extent;
108           dst = (inter_rank * num_core) + (intra_rank & (~mask));
109           Request::send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
110           break;
111         }
112         mask <<= 1;
113       }
114     }
115
116     /* start binomial reduce inter-communication between each SMP nodes:
117        each node only have one process that can communicate to other nodes */
118     if ((phase > 0) && (phase < (pipelength + 1))) {
119       if (intra_rank == 0) {
120
121         mask = 1;
122         while (mask < inter_comm_size) {
123           if ((mask & inter_rank) == 0) {
124             src = (inter_rank | mask) * num_core;
125             if (src < comm_size) {
126               recv_offset = (phase - 1) * pcount * extent;
127               Request::recv(tmp_buf, pcount, dtype, src, tag, comm, &status);
128               if(op!=MPI_OP_NULL) op->apply( tmp_buf, (char *)recv_buf + recv_offset, &pcount, dtype);
129             }
130           } else {
131             dst = (inter_rank & (~mask)) * num_core;
132             send_offset = (phase - 1) * pcount * extent;
133             Request::send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
134             break;
135           }
136           mask <<= 1;
137         }
138       }
139     }
140
141     /* start binomial broadcast inter-communication between each SMP nodes:
142        each node only have one process that can communicate to other nodes */
143     if ((phase > 1) && (phase < (pipelength + 2))) {
144       if (intra_rank == 0) {
145         mask = 1;
146         while (mask < inter_comm_size) {
147           if (inter_rank & mask) {
148             src = (inter_rank - mask) * num_core;
149             recv_offset = (phase - 2) * pcount * extent;
150             Request::recv((char *)recv_buf + recv_offset, pcount, dtype, src, tag, comm,
151                      &status);
152             break;
153           }
154           mask <<= 1;
155         }
156         mask >>= 1;
157
158         while (mask > 0) {
159           if (inter_rank < inter_comm_size) {
160             dst = (inter_rank + mask) * num_core;
161             if (dst < comm_size) {
162               //printf("Node %d send to node %d when mask is %d\n", rank, dst, mask);
163               send_offset = (phase - 2) * pcount * extent;
164               Request::send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
165             }
166           }
167           mask >>= 1;
168         }
169       }
170     }
171
172     /* start binomial broadcast intra-communication inside each SMP nodes */
173     if (phase > 2) {
174       int num_core_in_current_smp = num_core;
175       if (inter_rank == (inter_comm_size - 1)) {
176         num_core_in_current_smp = comm_size - (inter_rank * num_core);
177       }
178       mask = 1;
179       while (mask < num_core_in_current_smp) {
180         if (intra_rank & mask) {
181           src = (inter_rank * num_core) + (intra_rank - mask);
182           recv_offset = (phase - 3) * pcount * extent;
183           Request::recv((char *)recv_buf + recv_offset, pcount, dtype, src, tag, comm,
184                    &status);
185           break;
186         }
187         mask <<= 1;
188       }
189       mask >>= 1;
190
191       while (mask > 0) {
192         dst = (inter_rank * num_core) + (intra_rank + mask);
193         if (dst < comm_size) {
194           send_offset = (phase - 3) * pcount * extent;
195           Request::send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
196         }
197         mask >>= 1;
198       }
199     }
200   }                             // for phase
201
202   smpi_free_tmp_buffer(tmp_buf);
203   return MPI_SUCCESS;
204 }
205 }
206 }