Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Update copyright lines with new year.
[simgrid.git] / src / smpi / colls / allreduce / allreduce-smp-binomial-pipeline.cpp
1 /* Copyright (c) 2013-2020. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "../colls_private.hpp"
8 /* IMPLEMENTED BY PITCH PATARASUK
9    Non-topology-specific (however, number of cores/node need to be changed)
10    all-reduce operation designed for smp clusters
11    It uses 2-layer communication: binomial for both intra-communication
12    inter-communication
13    The communication are done in a pipeline fashion */
14
15 /* this is a default segment size for pipelining,
16    but it is typically passed as a command line argument */
17 int allreduce_smp_binomial_pipeline_segment_size = 4096;
18
19 /* ** NOTE **
20    This code is modified from allreduce-smp-binomial.c by wrapping the code with pipeline effect as follow
21    for (loop over pipelength) {
22      smp-binomial main code;
23    }
24 */
25
26 /* ** NOTE **
27    Use -DMPICH2 if this code does not compile.
28    MPICH1 code also work on MPICH2 on our cluster and the performance are similar.
29    This code assume commutative and associative reduce operator (MPI_SUM, MPI_MAX, etc).
30 */
31
32 /*
33 This fucntion performs all-reduce operation as follow. ** in a pipeline fashion **
34 1) binomial_tree reduce inside each SMP node
35 2) binomial_tree reduce intra-communication between root of each SMP node
36 3) binomial_tree bcast intra-communication between root of each SMP node
37 4) binomial_tree bcast inside each SMP node
38 */
39 namespace simgrid{
40 namespace smpi{
41 int allreduce__smp_binomial_pipeline(const void *send_buf,
42                                      void *recv_buf, int count,
43                                      MPI_Datatype dtype,
44                                      MPI_Op op, MPI_Comm comm)
45 {
46   int comm_size, rank;
47   int tag = COLL_TAG_ALLREDUCE;
48   int mask, src, dst;
49   MPI_Status status;
50   if(comm->get_leaders_comm()==MPI_COMM_NULL){
51     comm->init_smp();
52   }
53   int num_core=1;
54   if (comm->is_uniform()){
55     num_core = comm->get_intra_comm()->size();
56   }
57
58   comm_size = comm->size();
59   rank = comm->rank();
60   MPI_Aint extent;
61   extent = dtype->get_extent();
62   unsigned char* tmp_buf = smpi_get_tmp_sendbuffer(count * extent);
63
64   int intra_rank, inter_rank;
65   intra_rank = rank % num_core;
66   inter_rank = rank / num_core;
67
68   int phase;
69   int send_offset;
70   int recv_offset;
71   int pcount = allreduce_smp_binomial_pipeline_segment_size;
72   if (pcount > count) {
73     pcount = count;
74   }
75
76   /* size of processes participate in intra communications =>
77      should be equal to number of machines */
78   int inter_comm_size = (comm_size + num_core - 1) / num_core;
79
80   /* copy input buffer to output buffer */
81   Request::sendrecv(send_buf, count, dtype, rank, tag,
82                recv_buf, count, dtype, rank, tag, comm, &status);
83
84   /* compute pipe length */
85   int pipelength;
86   pipelength = count / pcount;
87
88   /* pipelining over pipelength (+3 is because we have 4 stages:
89      reduce-intra, reduce-inter, bcast-inter, bcast-intra */
90   for (phase = 0; phase < pipelength + 3; phase++) {
91
92     /* start binomial reduce intra communication inside each SMP node */
93     if (phase < pipelength) {
94       mask = 1;
95       while (mask < num_core) {
96         if ((mask & intra_rank) == 0) {
97           src = (inter_rank * num_core) + (intra_rank | mask);
98           if (src < comm_size) {
99             recv_offset = phase * pcount * extent;
100             Request::recv(tmp_buf, pcount, dtype, src, tag, comm, &status);
101             if(op!=MPI_OP_NULL) op->apply( tmp_buf, (char *)recv_buf + recv_offset, &pcount, dtype);
102           }
103         } else {
104           send_offset = phase * pcount * extent;
105           dst = (inter_rank * num_core) + (intra_rank & (~mask));
106           Request::send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
107           break;
108         }
109         mask <<= 1;
110       }
111     }
112
113     /* start binomial reduce inter-communication between each SMP nodes:
114        each node only have one process that can communicate to other nodes */
115     if ((phase > 0) && (phase < (pipelength + 1))) {
116       if (intra_rank == 0) {
117
118         mask = 1;
119         while (mask < inter_comm_size) {
120           if ((mask & inter_rank) == 0) {
121             src = (inter_rank | mask) * num_core;
122             if (src < comm_size) {
123               recv_offset = (phase - 1) * pcount * extent;
124               Request::recv(tmp_buf, pcount, dtype, src, tag, comm, &status);
125               if(op!=MPI_OP_NULL) op->apply( tmp_buf, (char *)recv_buf + recv_offset, &pcount, dtype);
126             }
127           } else {
128             dst = (inter_rank & (~mask)) * num_core;
129             send_offset = (phase - 1) * pcount * extent;
130             Request::send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
131             break;
132           }
133           mask <<= 1;
134         }
135       }
136     }
137
138     /* start binomial broadcast inter-communication between each SMP nodes:
139        each node only have one process that can communicate to other nodes */
140     if ((phase > 1) && (phase < (pipelength + 2))) {
141       if (intra_rank == 0) {
142         mask = 1;
143         while (mask < inter_comm_size) {
144           if (inter_rank & mask) {
145             src = (inter_rank - mask) * num_core;
146             recv_offset = (phase - 2) * pcount * extent;
147             Request::recv((char *)recv_buf + recv_offset, pcount, dtype, src, tag, comm,
148                      &status);
149             break;
150           }
151           mask <<= 1;
152         }
153         mask >>= 1;
154
155         while (mask > 0) {
156           if (inter_rank < inter_comm_size) {
157             dst = (inter_rank + mask) * num_core;
158             if (dst < comm_size) {
159               //printf("Node %d send to node %d when mask is %d\n", rank, dst, mask);
160               send_offset = (phase - 2) * pcount * extent;
161               Request::send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
162             }
163           }
164           mask >>= 1;
165         }
166       }
167     }
168
169     /* start binomial broadcast intra-communication inside each SMP nodes */
170     if (phase > 2) {
171       int num_core_in_current_smp = num_core;
172       if (inter_rank == (inter_comm_size - 1)) {
173         num_core_in_current_smp = comm_size - (inter_rank * num_core);
174       }
175       mask = 1;
176       while (mask < num_core_in_current_smp) {
177         if (intra_rank & mask) {
178           src = (inter_rank * num_core) + (intra_rank - mask);
179           recv_offset = (phase - 3) * pcount * extent;
180           Request::recv((char *)recv_buf + recv_offset, pcount, dtype, src, tag, comm,
181                    &status);
182           break;
183         }
184         mask <<= 1;
185       }
186       mask >>= 1;
187
188       while (mask > 0) {
189         dst = (inter_rank * num_core) + (intra_rank + mask);
190         if (dst < comm_size) {
191           send_offset = (phase - 3) * pcount * extent;
192           Request::send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
193         }
194         mask >>= 1;
195       }
196     }
197   }                             // for phase
198
199   smpi_free_tmp_buffer(tmp_buf);
200   return MPI_SUCCESS;
201 }
202 }
203 }