Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
MPI_Comm -> C++
[simgrid.git] / src / smpi / colls / allreduce-smp-binomial-pipeline.cpp
1 /* Copyright (c) 2013-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "colls_private.h"
8 /* IMPLEMENTED BY PITCH PATARASUK 
9    Non-topoloty-specific (however, number of cores/node need to be changed) 
10    all-reduce operation designed for smp clusters
11    It uses 2-layer communication: binomial for both intra-communication 
12    inter-communication
13    The communication are done in a pipeline fashion */
14
15
16
17 /* this is a default segment size for pipelining, 
18    but it is typically passed as a command line argument */
19 int allreduce_smp_binomial_pipeline_segment_size = 4096;
20
21 /* ** NOTE **
22    This code is modified from allreduce-smp-binomial.c by wrapping the code with pipeline effect as follow
23    for (loop over pipelength) {
24      smp-binomial main code;
25    }
26 */
27
28 /* ** NOTE **
29    Use -DMPICH2 if this code does not compile.
30    MPICH1 code also work on MPICH2 on our cluster and the performance are similar.
31    This code assume commutative and associative reduce operator (MPI_SUM, MPI_MAX, etc).
32 */
33
34 /*
35 This fucntion performs all-reduce operation as follow. ** in a pipeline fashion **
36 1) binomial_tree reduce inside each SMP node
37 2) binomial_tree reduce intra-communication between root of each SMP node
38 3) binomial_tree bcast intra-communication between root of each SMP node
39 4) binomial_tree bcast inside each SMP node
40 */
41 int smpi_coll_tuned_allreduce_smp_binomial_pipeline(void *send_buf,
42                                                     void *recv_buf, int count,
43                                                     MPI_Datatype dtype,
44                                                     MPI_Op op, MPI_Comm comm)
45 {
46   int comm_size, rank;
47   void *tmp_buf;
48   int tag = COLL_TAG_ALLREDUCE;
49   int mask, src, dst;
50   MPI_Status status;
51   if(comm->get_leaders_comm()==MPI_COMM_NULL){
52     comm->init_smp();
53   }
54   int num_core=1;
55   if (comm->is_uniform()){
56     num_core = comm->get_intra_comm()->size();
57   }
58
59   comm_size = comm->size();
60   rank = comm->rank();
61   MPI_Aint extent;
62   extent = smpi_datatype_get_extent(dtype);
63   tmp_buf = (void *) smpi_get_tmp_sendbuffer(count * extent);
64
65   int intra_rank, inter_rank;
66   intra_rank = rank % num_core;
67   inter_rank = rank / num_core;
68
69   int phase;
70   int send_offset;
71   int recv_offset;
72   int pcount = allreduce_smp_binomial_pipeline_segment_size;
73   if (pcount > count) {
74     pcount = count;
75   }
76
77   /* size of processes participate in intra communications =>
78      should be equal to number of machines */
79   int inter_comm_size = (comm_size + num_core - 1) / num_core;
80
81   /* copy input buffer to output buffer */
82   smpi_mpi_sendrecv(send_buf, count, dtype, rank, tag,
83                recv_buf, count, dtype, rank, tag, comm, &status);
84
85   /* compute pipe length */
86   int pipelength;
87   pipelength = count / pcount;
88
89   /* pipelining over pipelength (+3 is because we have 4 stages:
90      reduce-intra, reduce-inter, bcast-inter, bcast-intra */
91   for (phase = 0; phase < pipelength + 3; phase++) {
92
93     /* start binomial reduce intra communication inside each SMP node */
94     if (phase < pipelength) {
95       mask = 1;
96       while (mask < num_core) {
97         if ((mask & intra_rank) == 0) {
98           src = (inter_rank * num_core) + (intra_rank | mask);
99           if (src < comm_size) {
100             recv_offset = phase * pcount * extent;
101             smpi_mpi_recv(tmp_buf, pcount, dtype, src, tag, comm, &status);
102             smpi_op_apply(op, tmp_buf, (char *)recv_buf + recv_offset, &pcount, &dtype);
103           }
104         } else {
105           send_offset = phase * pcount * extent;
106           dst = (inter_rank * num_core) + (intra_rank & (~mask));
107           smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
108           break;
109         }
110         mask <<= 1;
111       }
112     }
113
114     /* start binomial reduce inter-communication between each SMP nodes: 
115        each node only have one process that can communicate to other nodes */
116     if ((phase > 0) && (phase < (pipelength + 1))) {
117       if (intra_rank == 0) {
118
119         mask = 1;
120         while (mask < inter_comm_size) {
121           if ((mask & inter_rank) == 0) {
122             src = (inter_rank | mask) * num_core;
123             if (src < comm_size) {
124               recv_offset = (phase - 1) * pcount * extent;
125               smpi_mpi_recv(tmp_buf, pcount, dtype, src, tag, comm, &status);
126               smpi_op_apply(op, tmp_buf, (char *)recv_buf + recv_offset, &pcount, &dtype);
127             }
128           } else {
129             dst = (inter_rank & (~mask)) * num_core;
130             send_offset = (phase - 1) * pcount * extent;
131             smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
132             break;
133           }
134           mask <<= 1;
135         }
136       }
137     }
138
139     /* start binomial broadcast inter-communication between each SMP nodes: 
140        each node only have one process that can communicate to other nodes */
141     if ((phase > 1) && (phase < (pipelength + 2))) {
142       if (intra_rank == 0) {
143         mask = 1;
144         while (mask < inter_comm_size) {
145           if (inter_rank & mask) {
146             src = (inter_rank - mask) * num_core;
147             recv_offset = (phase - 2) * pcount * extent;
148             smpi_mpi_recv((char *)recv_buf + recv_offset, pcount, dtype, src, tag, comm,
149                      &status);
150             break;
151           }
152           mask <<= 1;
153         }
154         mask >>= 1;
155
156         while (mask > 0) {
157           if (inter_rank < inter_comm_size) {
158             dst = (inter_rank + mask) * num_core;
159             if (dst < comm_size) {
160               //printf("Node %d send to node %d when mask is %d\n", rank, dst, mask);
161               send_offset = (phase - 2) * pcount * extent;
162               smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
163             }
164           }
165           mask >>= 1;
166         }
167       }
168     }
169
170     /* start binomial broadcast intra-communication inside each SMP nodes */
171     if (phase > 2) {
172       int num_core_in_current_smp = num_core;
173       if (inter_rank == (inter_comm_size - 1)) {
174         num_core_in_current_smp = comm_size - (inter_rank * num_core);
175       }
176       mask = 1;
177       while (mask < num_core_in_current_smp) {
178         if (intra_rank & mask) {
179           src = (inter_rank * num_core) + (intra_rank - mask);
180           recv_offset = (phase - 3) * pcount * extent;
181           smpi_mpi_recv((char *)recv_buf + recv_offset, pcount, dtype, src, tag, comm,
182                    &status);
183           break;
184         }
185         mask <<= 1;
186       }
187       mask >>= 1;
188
189       while (mask > 0) {
190         dst = (inter_rank * num_core) + (intra_rank + mask);
191         if (dst < comm_size) {
192           send_offset = (phase - 3) * pcount * extent;
193           smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
194         }
195         mask >>= 1;
196       }
197     }
198   }                             // for phase
199
200   smpi_free_tmp_buffer(tmp_buf);
201   return MPI_SUCCESS;
202 }