Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Unlike errors on communicators and windows, the default behavior for files is to...
[simgrid.git] / src / smpi / colls / reduce / reduce-mvapich-two-level.cpp
1 /* Copyright (c) 2013-2019. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 /*
8  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
9  *                         University Research and Technology
10  *                         Corporation.  All rights reserved.
11  * Copyright (c) 2004-2009 The University of Tennessee and The University
12  *                         of Tennessee Research Foundation.  All rights
13  *                         reserved.
14  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
15  *                         University of Stuttgart.  All rights reserved.
16  * Copyright (c) 2004-2005 The Regents of the University of California.
17  *                         All rights reserved.
18  *
19  * Additional copyrights may follow
20  */
21  /* -*- Mode: C; c-basic-offset:4 ; -*- */
22 /* Copyright (c) 2001-2014, The Ohio State University. All rights
23  * reserved.
24  *
25  * This file is part of the MVAPICH2 software package developed by the
26  * team members of The Ohio State University's Network-Based Computing
27  * Laboratory (NBCL), headed by Professor Dhabaleswar K. (DK) Panda.
28  *
29  * For detailed copyright and licensing information, please refer to the
30  * copyright file COPYRIGHT in the top level MVAPICH2 directory.
31  */
32 /*
33  *
34  *  (C) 2001 by Argonne National Laboratory.
35  *      See COPYRIGHT in top-level directory.
36  */
37
38 #include "../colls_private.hpp"
39 #include <algorithm>
40
41 #define MV2_INTRA_SHMEM_REDUCE_MSG 2048
42
43 #define mv2_g_shmem_coll_max_msg_size (1 << 17)
44 #define SHMEM_COLL_BLOCK_SIZE (local_size * mv2_g_shmem_coll_max_msg_size)
45 #define mv2_use_knomial_reduce 1
46
47 #define MPIR_Reduce_inter_knomial_wrapper_MV2 Coll_reduce_mvapich2_knomial::reduce
48 #define MPIR_Reduce_intra_knomial_wrapper_MV2 Coll_reduce_mvapich2_knomial::reduce
49 #define MPIR_Reduce_binomial_MV2 Coll_reduce_binomial::reduce
50 #define MPIR_Reduce_redscat_gather_MV2 Coll_reduce_scatter_gather::reduce
51 #define MPIR_Reduce_shmem_MV2 Coll_reduce_ompi_basic_linear::reduce
52
53 extern int (*MV2_Reduce_function)( const void *sendbuf,
54     void *recvbuf,
55     int count,
56     MPI_Datatype datatype,
57     MPI_Op op,
58     int root,
59     MPI_Comm  comm_ptr);
60
61 extern int (*MV2_Reduce_intra_function)( const void *sendbuf,
62     void *recvbuf,
63     int count,
64     MPI_Datatype datatype,
65     MPI_Op op,
66     int root,
67     MPI_Comm  comm_ptr);
68
69
70 /*Fn pointers for collectives */
71 static int (*reduce_fn)(const void *sendbuf,
72                              void *recvbuf,
73                              int count,
74                              MPI_Datatype datatype,
75                              MPI_Op op, int root, MPI_Comm  comm);
76 namespace simgrid{
77 namespace smpi{
78 int Coll_reduce_mvapich2_two_level::reduce( const void *sendbuf,
79                                      void *recvbuf,
80                                      int count,
81                                      MPI_Datatype datatype,
82                                      MPI_Op op,
83                                      int root,
84                                      MPI_Comm comm)
85 {
86     int mpi_errno = MPI_SUCCESS;
87     int my_rank, total_size, local_rank, local_size;
88     int leader_comm_rank = -1, leader_comm_size = 0;
89     MPI_Comm shmem_comm, leader_comm;
90     int leader_root, leader_of_root;
91     const unsigned char* in_buf = nullptr;
92     unsigned char *out_buf = nullptr, *tmp_buf = nullptr;
93     MPI_Aint true_lb, true_extent, extent;
94     int is_commutative = 0, stride = 0;
95     int intra_node_root=0;
96
97     //if not set (use of the algo directly, without mvapich2 selector)
98     if(MV2_Reduce_function==NULL)
99       MV2_Reduce_function=Coll_reduce_mpich::reduce;
100     if(MV2_Reduce_intra_function==NULL)
101       MV2_Reduce_intra_function=Coll_reduce_mpich::reduce;
102
103     if(comm->get_leaders_comm()==MPI_COMM_NULL){
104       comm->init_smp();
105     }
106
107     my_rank = comm->rank();
108     total_size = comm->size();
109     shmem_comm = comm->get_intra_comm();
110     local_rank = shmem_comm->rank();
111     local_size = shmem_comm->size();
112
113     leader_comm = comm->get_leaders_comm();
114     int* leaders_map = comm->get_leaders_map();
115     leader_of_root = comm->group()->rank(leaders_map[root]);
116     leader_root = leader_comm->group()->rank(leaders_map[root]);
117
118     is_commutative= (op==MPI_OP_NULL || op->is_commutative());
119
120     datatype->extent(&true_lb,
121                                        &true_extent);
122     extent =datatype->get_extent();
123     stride = count * std::max(extent, true_extent);
124
125     if (local_size == total_size) {
126         /* First handle the case where there is only one node */
127         if (stride <= MV2_INTRA_SHMEM_REDUCE_MSG &&
128             is_commutative == 1) {
129             if (local_rank == 0 ) {
130               tmp_buf = smpi_get_tmp_sendbuffer(count * std::max(extent, true_extent));
131               tmp_buf = tmp_buf - true_lb;
132             }
133
134             if (sendbuf != MPI_IN_PLACE) {
135               in_buf = static_cast<const unsigned char*>(sendbuf);
136             } else {
137               in_buf = static_cast<const unsigned char*>(recvbuf);
138             }
139
140             if (local_rank == 0) {
141                  if( my_rank != root) {
142                      out_buf = tmp_buf;
143                  } else {
144                    out_buf = static_cast<unsigned char*>(recvbuf);
145                    if (in_buf == out_buf) {
146                      in_buf  = static_cast<const unsigned char*>(MPI_IN_PLACE);
147                      out_buf = static_cast<unsigned char*>(recvbuf);
148                      }
149                  }
150             } else {
151               in_buf  = static_cast<const unsigned char*>(sendbuf);
152               out_buf = nullptr;
153             }
154
155             if (count * (std::max(extent, true_extent)) < SHMEM_COLL_BLOCK_SIZE) {
156               mpi_errno = MPIR_Reduce_shmem_MV2(in_buf, out_buf, count, datatype, op, 0, shmem_comm);
157             } else {
158               mpi_errno = MPIR_Reduce_intra_knomial_wrapper_MV2(in_buf, out_buf, count, datatype, op, 0, shmem_comm);
159             }
160
161             if (local_rank == 0 && root != my_rank) {
162                 Request::send(out_buf, count, datatype, root,
163                                          COLL_TAG_REDUCE+1, comm);
164             }
165             if ((local_rank != 0) && (root == my_rank)) {
166                 Request::recv(recvbuf, count, datatype,
167                                          leader_of_root, COLL_TAG_REDUCE+1, comm,
168                                          MPI_STATUS_IGNORE);
169             }
170         } else {
171             if(mv2_use_knomial_reduce == 1) {
172                 reduce_fn = &MPIR_Reduce_intra_knomial_wrapper_MV2;
173             } else {
174                 reduce_fn = &MPIR_Reduce_binomial_MV2;
175             }
176             mpi_errno = reduce_fn(sendbuf, recvbuf, count,
177                                   datatype, op,
178                                   root, comm);
179         }
180         /* We are done */
181         if (tmp_buf != nullptr)
182           smpi_free_tmp_buffer(tmp_buf + true_lb);
183         goto fn_exit;
184     }
185
186
187     if (local_rank == 0) {
188         leader_comm = comm->get_leaders_comm();
189         if(leader_comm==MPI_COMM_NULL){
190           leader_comm = MPI_COMM_WORLD;
191         }
192         leader_comm_size = leader_comm->size();
193         leader_comm_rank = leader_comm->rank();
194         tmp_buf          = smpi_get_tmp_sendbuffer(count * std::max(extent, true_extent));
195         tmp_buf          = tmp_buf - true_lb;
196     }
197     if (sendbuf != MPI_IN_PLACE) {
198       in_buf = static_cast<const unsigned char*>(sendbuf);
199     } else {
200       in_buf = static_cast<const unsigned char*>(recvbuf);
201     }
202     if (local_rank == 0) {
203       out_buf = static_cast<unsigned char*>(tmp_buf);
204     } else {
205       out_buf = nullptr;
206     }
207
208
209     if(local_size > 1) {
210         /* Lets do the intra-node reduce operations, if we have more than one
211          * process in the node */
212
213         /*Fix the input and outbuf buffers for the intra-node reduce.
214          *Node leaders will have the reduced data in tmp_buf after
215          *this step*/
216         if (MV2_Reduce_intra_function == & MPIR_Reduce_shmem_MV2)
217         {
218           if (is_commutative == 1 && (count * (std::max(extent, true_extent)) < SHMEM_COLL_BLOCK_SIZE)) {
219             mpi_errno = MV2_Reduce_intra_function(in_buf, out_buf, count, datatype, op, intra_node_root, shmem_comm);
220             } else {
221                     mpi_errno = MPIR_Reduce_intra_knomial_wrapper_MV2(in_buf, out_buf, count,
222                                       datatype, op,
223                                       intra_node_root, shmem_comm);
224             }
225         } else {
226
227             mpi_errno = MV2_Reduce_intra_function(in_buf, out_buf, count,
228                                       datatype, op,
229                                       intra_node_root, shmem_comm);
230         }
231     } else {
232       smpi_free_tmp_buffer(tmp_buf + true_lb);
233       tmp_buf = (unsigned char*)in_buf; // xxx
234     }
235
236     /* Now work on the inter-leader phase. Data is in tmp_buf */
237     if (local_rank == 0 && leader_comm_size > 1) {
238         /*The leader of root will have the global reduced data in tmp_buf
239            or recv_buf
240            at the end of the reduce */
241         if (leader_comm_rank == leader_root) {
242             if (my_rank == root) {
243                 /* I am the root of the leader-comm, and the
244                  * root of the reduce op. So, I will write the
245                  * final result directly into my recvbuf */
246                 if(tmp_buf != recvbuf) {
247                   in_buf  = tmp_buf;
248                   out_buf = static_cast<unsigned char*>(recvbuf);
249                 } else {
250
251                   unsigned char* buf = smpi_get_tmp_sendbuffer(count * datatype->get_extent());
252                   Datatype::copy(tmp_buf, count, datatype, buf, count, datatype);
253                   // in_buf = MPI_IN_PLACE;
254                   in_buf  = buf;
255                   out_buf = static_cast<unsigned char*>(recvbuf);
256                 }
257             } else {
258               unsigned char* buf = smpi_get_tmp_sendbuffer(count * datatype->get_extent());
259               Datatype::copy(tmp_buf, count, datatype, buf, count, datatype);
260               // in_buf = MPI_IN_PLACE;
261               in_buf  = buf;
262               out_buf = tmp_buf;
263             }
264         } else {
265             in_buf = tmp_buf;
266             out_buf = nullptr;
267         }
268
269         /* inter-leader communication  */
270         mpi_errno = MV2_Reduce_function(in_buf, out_buf, count,
271                               datatype, op,
272                               leader_root, leader_comm);
273
274     }
275
276     if (local_size > 1) {
277       /* Send the message to the root if the leader is not the
278        * root of the reduce operation. The reduced data is in tmp_buf */
279       if ((local_rank == 0) && (root != my_rank) && (leader_root == leader_comm_rank)) {
280         Request::send(tmp_buf, count, datatype, root, COLL_TAG_REDUCE + 1, comm);
281       }
282       if ((local_rank != 0) && (root == my_rank)) {
283         Request::recv(recvbuf, count, datatype, leader_of_root, COLL_TAG_REDUCE + 1, comm, MPI_STATUS_IGNORE);
284       }
285       smpi_free_tmp_buffer(tmp_buf + true_lb);
286
287       if (leader_comm_rank == leader_root) {
288         if (my_rank != root || (my_rank == root && tmp_buf == recvbuf)) {
289           smpi_free_tmp_buffer(in_buf);
290         }
291       }
292     }
293
294
295
296   fn_exit:
297     return mpi_errno;
298 }
299 }
300 }