Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Update copyright lines for 2023.
[simgrid.git] / src / smpi / colls / allreduce / allreduce-smp-binomial-pipeline.cpp
1 /* Copyright (c) 2013-2023. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "../colls_private.hpp"
8 /* IMPLEMENTED BY PITCH PATARASUK
9    Non-topology-specific (however, number of cores/node need to be changed)
10    all-reduce operation designed for smp clusters
11    It uses 2-layer communication: binomial for both intra-communication
12    inter-communication
13    The communication are done in a pipeline fashion */
14
15 /* this is a default segment size for pipelining,
16    but it is typically passed as a command line argument */
17 int allreduce_smp_binomial_pipeline_segment_size = 4096;
18
19 /* ** NOTE **
20    This code is modified from allreduce-smp-binomial.c by wrapping the code with pipeline effect as follow
21    for (loop over pipelength) {
22      smp-binomial main code;
23    }
24 */
25
26 /* ** NOTE **
27    Use -DMPICH2 if this code does not compile.
28    MPICH1 code also work on MPICH2 on our cluster and the performance are similar.
29    This code assume commutative and associative reduce operator (MPI_SUM, MPI_MAX, etc).
30 */
31
32 /*
33 This function performs all-reduce operation as follow. ** in a pipeline fashion **
34 1) binomial_tree reduce inside each SMP node
35 2) binomial_tree reduce intra-communication between root of each SMP node
36 3) binomial_tree bcast intra-communication between root of each SMP node
37 4) binomial_tree bcast inside each SMP node
38 */
39 namespace simgrid::smpi {
40 int allreduce__smp_binomial_pipeline(const void *send_buf,
41                                      void *recv_buf, int count,
42                                      MPI_Datatype dtype,
43                                      MPI_Op op, MPI_Comm comm)
44 {
45   int comm_size, rank;
46   int tag = COLL_TAG_ALLREDUCE;
47   int mask, src, dst;
48   MPI_Status status;
49   if(comm->get_leaders_comm()==MPI_COMM_NULL){
50     comm->init_smp();
51   }
52   int num_core=1;
53   if (comm->is_uniform()){
54     num_core = comm->get_intra_comm()->size();
55   }
56
57   comm_size = comm->size();
58   rank = comm->rank();
59   MPI_Aint extent;
60   extent = dtype->get_extent();
61   unsigned char* tmp_buf = smpi_get_tmp_sendbuffer(count * extent);
62
63   int intra_rank, inter_rank;
64   intra_rank = rank % num_core;
65   inter_rank = rank / num_core;
66
67   int phase;
68   int send_offset;
69   int recv_offset;
70   int pcount = allreduce_smp_binomial_pipeline_segment_size;
71   if (pcount > count) {
72     pcount = count;
73   }
74
75   /* size of processes participate in intra communications =>
76      should be equal to number of machines */
77   int inter_comm_size = (comm_size + num_core - 1) / num_core;
78
79   /* copy input buffer to output buffer */
80   Request::sendrecv(send_buf, count, dtype, rank, tag,
81                recv_buf, count, dtype, rank, tag, comm, &status);
82
83   /* compute pipe length */
84   int pipelength;
85   pipelength = count / pcount;
86
87   /* pipelining over pipelength (+3 is because we have 4 stages:
88      reduce-intra, reduce-inter, bcast-inter, bcast-intra */
89   for (phase = 0; phase < pipelength + 3; phase++) {
90
91     /* start binomial reduce intra communication inside each SMP node */
92     if (phase < pipelength) {
93       mask = 1;
94       while (mask < num_core) {
95         if ((mask & intra_rank) == 0) {
96           src = (inter_rank * num_core) + (intra_rank | mask);
97           if (src < comm_size) {
98             recv_offset = phase * pcount * extent;
99             Request::recv(tmp_buf, pcount, dtype, src, tag, comm, &status);
100             if(op!=MPI_OP_NULL) op->apply( tmp_buf, (char *)recv_buf + recv_offset, &pcount, dtype);
101           }
102         } else {
103           send_offset = phase * pcount * extent;
104           dst = (inter_rank * num_core) + (intra_rank & (~mask));
105           Request::send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
106           break;
107         }
108         mask <<= 1;
109       }
110     }
111
112     /* start binomial reduce inter-communication between each SMP nodes:
113        each node only have one process that can communicate to other nodes */
114     if ((phase > 0) && (phase < (pipelength + 1))) {
115       if (intra_rank == 0) {
116
117         mask = 1;
118         while (mask < inter_comm_size) {
119           if ((mask & inter_rank) == 0) {
120             src = (inter_rank | mask) * num_core;
121             if (src < comm_size) {
122               recv_offset = (phase - 1) * pcount * extent;
123               Request::recv(tmp_buf, pcount, dtype, src, tag, comm, &status);
124               if(op!=MPI_OP_NULL) op->apply( tmp_buf, (char *)recv_buf + recv_offset, &pcount, dtype);
125             }
126           } else {
127             dst = (inter_rank & (~mask)) * num_core;
128             send_offset = (phase - 1) * pcount * extent;
129             Request::send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
130             break;
131           }
132           mask <<= 1;
133         }
134       }
135     }
136
137     /* start binomial broadcast inter-communication between each SMP nodes:
138        each node only have one process that can communicate to other nodes */
139     if ((phase > 1) && (phase < (pipelength + 2))) {
140       if (intra_rank == 0) {
141         mask = 1;
142         while (mask < inter_comm_size) {
143           if (inter_rank & mask) {
144             src = (inter_rank - mask) * num_core;
145             recv_offset = (phase - 2) * pcount * extent;
146             Request::recv((char *)recv_buf + recv_offset, pcount, dtype, src, tag, comm,
147                      &status);
148             break;
149           }
150           mask <<= 1;
151         }
152         mask >>= 1;
153
154         while (mask > 0) {
155           if (inter_rank < inter_comm_size) {
156             dst = (inter_rank + mask) * num_core;
157             if (dst < comm_size) {
158               //printf("Node %d send to node %d when mask is %d\n", rank, dst, mask);
159               send_offset = (phase - 2) * pcount * extent;
160               Request::send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
161             }
162           }
163           mask >>= 1;
164         }
165       }
166     }
167
168     /* start binomial broadcast intra-communication inside each SMP nodes */
169     if (phase > 2) {
170       int num_core_in_current_smp = num_core;
171       if (inter_rank == (inter_comm_size - 1)) {
172         num_core_in_current_smp = comm_size - (inter_rank * num_core);
173       }
174       mask = 1;
175       while (mask < num_core_in_current_smp) {
176         if (intra_rank & mask) {
177           src = (inter_rank * num_core) + (intra_rank - mask);
178           recv_offset = (phase - 3) * pcount * extent;
179           Request::recv((char *)recv_buf + recv_offset, pcount, dtype, src, tag, comm,
180                    &status);
181           break;
182         }
183         mask <<= 1;
184       }
185       mask >>= 1;
186
187       while (mask > 0) {
188         dst = (inter_rank * num_core) + (intra_rank + mask);
189         if (dst < comm_size) {
190           send_offset = (phase - 3) * pcount * extent;
191           Request::send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
192         }
193         mask >>= 1;
194       }
195     }
196   }                             // for phase
197
198   smpi_free_tmp_buffer(tmp_buf);
199   return MPI_SUCCESS;
200 }
201 } // namespace simgrid::smpi