1 /* Copyright (c) 2013-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "colls_private.h"
8 /* IMPLEMENTED BY PITCH PATARASUK
9 Non-topoloty-specific (however, number of cores/node need to be changed)
10 all-reduce operation designed for smp clusters
11 It uses 2-layer communication: binomial for both intra-communication
13 The communication are done in a pipeline fashion */
17 /* this is a default segment size for pipelining,
18 but it is typically passed as a command line argument */
19 int allreduce_smp_binomial_pipeline_segment_size = 4096;
22 This code is modified from allreduce-smp-binomial.c by wrapping the code with pipeline effect as follow
23 for (loop over pipelength) {
24 smp-binomial main code;
29 Use -DMPICH2 if this code does not compile.
30 MPICH1 code also work on MPICH2 on our cluster and the performance are similar.
31 This code assume commutative and associative reduce operator (MPI_SUM, MPI_MAX, etc).
35 This fucntion performs all-reduce operation as follow. ** in a pipeline fashion **
36 1) binomial_tree reduce inside each SMP node
37 2) binomial_tree reduce intra-communication between root of each SMP node
38 3) binomial_tree bcast intra-communication between root of each SMP node
39 4) binomial_tree bcast inside each SMP node
41 int smpi_coll_tuned_allreduce_smp_binomial_pipeline(void *send_buf,
42 void *recv_buf, int count,
44 MPI_Op op, MPI_Comm comm)
48 int tag = COLL_TAG_ALLREDUCE;
51 if(smpi_comm_get_leaders_comm(comm)==MPI_COMM_NULL){
52 smpi_comm_init_smp(comm);
55 if (smpi_comm_is_uniform(comm)){
56 num_core = smpi_comm_size(smpi_comm_get_intra_comm(comm));
59 comm_size = smpi_comm_size(comm);
60 rank = smpi_comm_rank(comm);
62 extent = smpi_datatype_get_extent(dtype);
63 tmp_buf = (void *) smpi_get_tmp_sendbuffer(count * extent);
65 int intra_rank, inter_rank;
66 intra_rank = rank % num_core;
67 inter_rank = rank / num_core;
72 int pcount = allreduce_smp_binomial_pipeline_segment_size;
77 /* size of processes participate in intra communications =>
78 should be equal to number of machines */
79 int inter_comm_size = (comm_size + num_core - 1) / num_core;
81 /* copy input buffer to output buffer */
82 smpi_mpi_sendrecv(send_buf, count, dtype, rank, tag,
83 recv_buf, count, dtype, rank, tag, comm, &status);
85 /* compute pipe length */
87 pipelength = count / pcount;
89 /* pipelining over pipelength (+3 is because we have 4 stages:
90 reduce-intra, reduce-inter, bcast-inter, bcast-intra */
91 for (phase = 0; phase < pipelength + 3; phase++) {
93 /* start binomial reduce intra communication inside each SMP node */
94 if (phase < pipelength) {
96 while (mask < num_core) {
97 if ((mask & intra_rank) == 0) {
98 src = (inter_rank * num_core) + (intra_rank | mask);
99 if (src < comm_size) {
100 recv_offset = phase * pcount * extent;
101 smpi_mpi_recv(tmp_buf, pcount, dtype, src, tag, comm, &status);
102 smpi_op_apply(op, tmp_buf, (char *)recv_buf + recv_offset, &pcount, &dtype);
105 send_offset = phase * pcount * extent;
106 dst = (inter_rank * num_core) + (intra_rank & (~mask));
107 smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
114 /* start binomial reduce inter-communication between each SMP nodes:
115 each node only have one process that can communicate to other nodes */
116 if ((phase > 0) && (phase < (pipelength + 1))) {
117 if (intra_rank == 0) {
120 while (mask < inter_comm_size) {
121 if ((mask & inter_rank) == 0) {
122 src = (inter_rank | mask) * num_core;
123 if (src < comm_size) {
124 recv_offset = (phase - 1) * pcount * extent;
125 smpi_mpi_recv(tmp_buf, pcount, dtype, src, tag, comm, &status);
126 smpi_op_apply(op, tmp_buf, (char *)recv_buf + recv_offset, &pcount, &dtype);
129 dst = (inter_rank & (~mask)) * num_core;
130 send_offset = (phase - 1) * pcount * extent;
131 smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
139 /* start binomial broadcast inter-communication between each SMP nodes:
140 each node only have one process that can communicate to other nodes */
141 if ((phase > 1) && (phase < (pipelength + 2))) {
142 if (intra_rank == 0) {
144 while (mask < inter_comm_size) {
145 if (inter_rank & mask) {
146 src = (inter_rank - mask) * num_core;
147 recv_offset = (phase - 2) * pcount * extent;
148 smpi_mpi_recv((char *)recv_buf + recv_offset, pcount, dtype, src, tag, comm,
157 if (inter_rank < inter_comm_size) {
158 dst = (inter_rank + mask) * num_core;
159 if (dst < comm_size) {
160 //printf("Node %d send to node %d when mask is %d\n", rank, dst, mask);
161 send_offset = (phase - 2) * pcount * extent;
162 smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
170 /* start binomial broadcast intra-communication inside each SMP nodes */
172 int num_core_in_current_smp = num_core;
173 if (inter_rank == (inter_comm_size - 1)) {
174 num_core_in_current_smp = comm_size - (inter_rank * num_core);
177 while (mask < num_core_in_current_smp) {
178 if (intra_rank & mask) {
179 src = (inter_rank * num_core) + (intra_rank - mask);
180 recv_offset = (phase - 3) * pcount * extent;
181 smpi_mpi_recv((char *)recv_buf + recv_offset, pcount, dtype, src, tag, comm,
190 dst = (inter_rank * num_core) + (intra_rank + mask);
191 if (dst < comm_size) {
192 send_offset = (phase - 3) * pcount * extent;
193 smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
200 smpi_free_tmp_buffer(tmp_buf);