1 /* Copyright (c) 2013-2022. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "../colls_private.hpp"
8 /* IMPLEMENTED BY PITCH PATARASUK
9 Non-topology-specific (however, number of cores/node need to be changed)
10 all-reduce operation designed for smp clusters
11 It uses 2-layer communication: binomial for both intra-communication
13 The communication are done in a pipeline fashion */
15 /* this is a default segment size for pipelining,
16 but it is typically passed as a command line argument */
17 int allreduce_smp_binomial_pipeline_segment_size = 4096;
20 This code is modified from allreduce-smp-binomial.c by wrapping the code with pipeline effect as follow
21 for (loop over pipelength) {
22 smp-binomial main code;
27 Use -DMPICH2 if this code does not compile.
28 MPICH1 code also work on MPICH2 on our cluster and the performance are similar.
29 This code assume commutative and associative reduce operator (MPI_SUM, MPI_MAX, etc).
33 This function performs all-reduce operation as follow. ** in a pipeline fashion **
34 1) binomial_tree reduce inside each SMP node
35 2) binomial_tree reduce intra-communication between root of each SMP node
36 3) binomial_tree bcast intra-communication between root of each SMP node
37 4) binomial_tree bcast inside each SMP node
39 namespace simgrid::smpi {
40 int allreduce__smp_binomial_pipeline(const void *send_buf,
41 void *recv_buf, int count,
43 MPI_Op op, MPI_Comm comm)
46 int tag = COLL_TAG_ALLREDUCE;
49 if(comm->get_leaders_comm()==MPI_COMM_NULL){
53 if (comm->is_uniform()){
54 num_core = comm->get_intra_comm()->size();
57 comm_size = comm->size();
60 extent = dtype->get_extent();
61 unsigned char* tmp_buf = smpi_get_tmp_sendbuffer(count * extent);
63 int intra_rank, inter_rank;
64 intra_rank = rank % num_core;
65 inter_rank = rank / num_core;
70 int pcount = allreduce_smp_binomial_pipeline_segment_size;
75 /* size of processes participate in intra communications =>
76 should be equal to number of machines */
77 int inter_comm_size = (comm_size + num_core - 1) / num_core;
79 /* copy input buffer to output buffer */
80 Request::sendrecv(send_buf, count, dtype, rank, tag,
81 recv_buf, count, dtype, rank, tag, comm, &status);
83 /* compute pipe length */
85 pipelength = count / pcount;
87 /* pipelining over pipelength (+3 is because we have 4 stages:
88 reduce-intra, reduce-inter, bcast-inter, bcast-intra */
89 for (phase = 0; phase < pipelength + 3; phase++) {
91 /* start binomial reduce intra communication inside each SMP node */
92 if (phase < pipelength) {
94 while (mask < num_core) {
95 if ((mask & intra_rank) == 0) {
96 src = (inter_rank * num_core) + (intra_rank | mask);
97 if (src < comm_size) {
98 recv_offset = phase * pcount * extent;
99 Request::recv(tmp_buf, pcount, dtype, src, tag, comm, &status);
100 if(op!=MPI_OP_NULL) op->apply( tmp_buf, (char *)recv_buf + recv_offset, &pcount, dtype);
103 send_offset = phase * pcount * extent;
104 dst = (inter_rank * num_core) + (intra_rank & (~mask));
105 Request::send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
112 /* start binomial reduce inter-communication between each SMP nodes:
113 each node only have one process that can communicate to other nodes */
114 if ((phase > 0) && (phase < (pipelength + 1))) {
115 if (intra_rank == 0) {
118 while (mask < inter_comm_size) {
119 if ((mask & inter_rank) == 0) {
120 src = (inter_rank | mask) * num_core;
121 if (src < comm_size) {
122 recv_offset = (phase - 1) * pcount * extent;
123 Request::recv(tmp_buf, pcount, dtype, src, tag, comm, &status);
124 if(op!=MPI_OP_NULL) op->apply( tmp_buf, (char *)recv_buf + recv_offset, &pcount, dtype);
127 dst = (inter_rank & (~mask)) * num_core;
128 send_offset = (phase - 1) * pcount * extent;
129 Request::send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
137 /* start binomial broadcast inter-communication between each SMP nodes:
138 each node only have one process that can communicate to other nodes */
139 if ((phase > 1) && (phase < (pipelength + 2))) {
140 if (intra_rank == 0) {
142 while (mask < inter_comm_size) {
143 if (inter_rank & mask) {
144 src = (inter_rank - mask) * num_core;
145 recv_offset = (phase - 2) * pcount * extent;
146 Request::recv((char *)recv_buf + recv_offset, pcount, dtype, src, tag, comm,
155 if (inter_rank < inter_comm_size) {
156 dst = (inter_rank + mask) * num_core;
157 if (dst < comm_size) {
158 //printf("Node %d send to node %d when mask is %d\n", rank, dst, mask);
159 send_offset = (phase - 2) * pcount * extent;
160 Request::send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
168 /* start binomial broadcast intra-communication inside each SMP nodes */
170 int num_core_in_current_smp = num_core;
171 if (inter_rank == (inter_comm_size - 1)) {
172 num_core_in_current_smp = comm_size - (inter_rank * num_core);
175 while (mask < num_core_in_current_smp) {
176 if (intra_rank & mask) {
177 src = (inter_rank * num_core) + (intra_rank - mask);
178 recv_offset = (phase - 3) * pcount * extent;
179 Request::recv((char *)recv_buf + recv_offset, pcount, dtype, src, tag, comm,
188 dst = (inter_rank * num_core) + (intra_rank + mask);
189 if (dst < comm_size) {
190 send_offset = (phase - 3) * pcount * extent;
191 Request::send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
198 smpi_free_tmp_buffer(tmp_buf);
201 } // namespace simgrid::smpi