Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add/update copyright notices.
[simgrid.git] / src / smpi / colls / allreduce-smp-binomial-pipeline.c
1 /* Copyright (c) 2013-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "colls_private.h"
8 /* IMPLEMENTED BY PITCH PATARASUK 
9    Non-topoloty-specific (however, number of cores/node need to be changed) 
10    all-reduce operation designed for smp clusters
11    It uses 2-layer communication: binomial for both intra-communication 
12    inter-communication
13    The communication are done in a pipeline fashion */
14
15 /* change number of core per smp-node
16    we assume that number of core per process will be the same for all implementations */
17 #ifndef NUM_CORE
18 #define NUM_CORE 8
19 #endif
20
21 /* this is a default segment size for pipelining, 
22    but it is typically passed as a command line argument */
23 int allreduce_smp_binomial_pipeline_segment_size = 4096;
24
25 /* ** NOTE **
26    This code is modified from allreduce-smp-binomial.c by wrapping the code with pipeline effect as follow
27    for (loop over pipelength) {
28      smp-binomial main code;
29    }
30 */
31
32 /* ** NOTE **
33    Use -DMPICH2 if this code does not compile.
34    MPICH1 code also work on MPICH2 on our cluster and the performance are similar.
35    This code assume commutative and associative reduce operator (MPI_SUM, MPI_MAX, etc).
36 */
37
38 /*
39 This fucntion performs all-reduce operation as follow. ** in a pipeline fashion **
40 1) binomial_tree reduce inside each SMP node
41 2) binomial_tree reduce intra-communication between root of each SMP node
42 3) binomial_tree bcast intra-communication between root of each SMP node
43 4) binomial_tree bcast inside each SMP node
44 */
45 int smpi_coll_tuned_allreduce_smp_binomial_pipeline(void *send_buf,
46                                                     void *recv_buf, int count,
47                                                     MPI_Datatype dtype,
48                                                     MPI_Op op, MPI_Comm comm)
49 {
50   int comm_size, rank;
51   void *tmp_buf;
52   int tag = COLL_TAG_ALLREDUCE;
53   int mask, src, dst;
54   MPI_Status status;
55   int num_core = simcall_host_get_core(SIMIX_host_self());
56   // do we use the default one or the number of cores in the platform ?
57   // if the number of cores is one, the platform may be simulated with 1 node = 1 core
58   if (num_core == 1) num_core = NUM_CORE;
59
60   comm_size = smpi_comm_size(comm);
61   rank = smpi_comm_rank(comm);
62   MPI_Aint extent;
63   extent = smpi_datatype_get_extent(dtype);
64   tmp_buf = (void *) xbt_malloc(count * extent);
65
66   int intra_rank, inter_rank;
67   intra_rank = rank % num_core;
68   inter_rank = rank / num_core;
69
70   int phase;
71   int send_offset;
72   int recv_offset;
73   int pcount = allreduce_smp_binomial_pipeline_segment_size;
74   if (pcount > count) {
75     pcount = count;
76   }
77
78   /* size of processes participate in intra communications =>
79      should be equal to number of machines */
80   int inter_comm_size = (comm_size + num_core - 1) / num_core;
81
82   /* copy input buffer to output buffer */
83   smpi_mpi_sendrecv(send_buf, count, dtype, rank, tag,
84                recv_buf, count, dtype, rank, tag, comm, &status);
85
86   /* compute pipe length */
87   int pipelength;
88   pipelength = count / pcount;
89
90   /* pipelining over pipelength (+3 is because we have 4 stages:
91      reduce-intra, reduce-inter, bcast-inter, bcast-intra */
92   for (phase = 0; phase < pipelength + 3; phase++) {
93
94     /* start binomial reduce intra communication inside each SMP node */
95     if (phase < pipelength) {
96       mask = 1;
97       while (mask < num_core) {
98         if ((mask & intra_rank) == 0) {
99           src = (inter_rank * num_core) + (intra_rank | mask);
100           if (src < comm_size) {
101             recv_offset = phase * pcount * extent;
102             smpi_mpi_recv(tmp_buf, pcount, dtype, src, tag, comm, &status);
103             smpi_op_apply(op, tmp_buf, (char *)recv_buf + recv_offset, &pcount, &dtype);
104           }
105         } else {
106           send_offset = phase * pcount * extent;
107           dst = (inter_rank * num_core) + (intra_rank & (~mask));
108           smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
109           break;
110         }
111         mask <<= 1;
112       }
113     }
114
115     /* start binomial reduce inter-communication between each SMP nodes: 
116        each node only have one process that can communicate to other nodes */
117     if ((phase > 0) && (phase < (pipelength + 1))) {
118       if (intra_rank == 0) {
119
120         mask = 1;
121         while (mask < inter_comm_size) {
122           if ((mask & inter_rank) == 0) {
123             src = (inter_rank | mask) * num_core;
124             if (src < comm_size) {
125               recv_offset = (phase - 1) * pcount * extent;
126               smpi_mpi_recv(tmp_buf, pcount, dtype, src, tag, comm, &status);
127               smpi_op_apply(op, tmp_buf, (char *)recv_buf + recv_offset, &pcount, &dtype);
128             }
129           } else {
130             dst = (inter_rank & (~mask)) * num_core;
131             send_offset = (phase - 1) * pcount * extent;
132             smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
133             break;
134           }
135           mask <<= 1;
136         }
137       }
138     }
139
140     /* start binomial broadcast inter-communication between each SMP nodes: 
141        each node only have one process that can communicate to other nodes */
142     if ((phase > 1) && (phase < (pipelength + 2))) {
143       if (intra_rank == 0) {
144         mask = 1;
145         while (mask < inter_comm_size) {
146           if (inter_rank & mask) {
147             src = (inter_rank - mask) * num_core;
148             recv_offset = (phase - 2) * pcount * extent;
149             smpi_mpi_recv((char *)recv_buf + recv_offset, pcount, dtype, src, tag, comm,
150                      &status);
151             break;
152           }
153           mask <<= 1;
154         }
155         mask >>= 1;
156
157         while (mask > 0) {
158           if (inter_rank < inter_comm_size) {
159             dst = (inter_rank + mask) * num_core;
160             if (dst < comm_size) {
161               //printf("Node %d send to node %d when mask is %d\n", rank, dst, mask);
162               send_offset = (phase - 2) * pcount * extent;
163               smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
164             }
165           }
166           mask >>= 1;
167         }
168       }
169     }
170
171     /* start binomial broadcast intra-communication inside each SMP nodes */
172     if (phase > 2) {
173       int num_core_in_current_smp = num_core;
174       if (inter_rank == (inter_comm_size - 1)) {
175         num_core_in_current_smp = comm_size - (inter_rank * num_core);
176       }
177       mask = 1;
178       while (mask < num_core_in_current_smp) {
179         if (intra_rank & mask) {
180           src = (inter_rank * num_core) + (intra_rank - mask);
181           recv_offset = (phase - 3) * pcount * extent;
182           smpi_mpi_recv((char *)recv_buf + recv_offset, pcount, dtype, src, tag, comm,
183                    &status);
184           break;
185         }
186         mask <<= 1;
187       }
188       mask >>= 1;
189
190       while (mask > 0) {
191         dst = (inter_rank * num_core) + (intra_rank + mask);
192         if (dst < comm_size) {
193           send_offset = (phase - 3) * pcount * extent;
194           smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
195         }
196         mask >>= 1;
197       }
198     }
199   }                             // for phase
200
201   free(tmp_buf);
202   return MPI_SUCCESS;
203 }