Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
kill all trailling whitespaces
[simgrid.git] / src / smpi / colls / smpi_default_selector.cpp
1 /* selector with default/naive Simgrid algorithms. These should not be trusted for performance evaluations */
2
3 /* Copyright (c) 2009-2010, 2013-2017. The SimGrid Team.
4  * All rights reserved.                                                     */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 #include "colls_private.h"
10 #include "src/smpi/smpi_process.hpp"
11
12 namespace simgrid{
13 namespace smpi{
14
15 int Coll_bcast_default::bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm)
16 {
17   return Coll_bcast_binomial_tree::bcast(buf, count, datatype, root, comm);
18 }
19
20 int Coll_barrier_default::barrier(MPI_Comm comm)
21 {
22   return Coll_barrier_ompi_basic_linear::barrier(comm);
23 }
24
25
26 int Coll_gather_default::gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
27                      void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
28 {
29   int system_tag = COLL_TAG_GATHER;
30   MPI_Aint lb = 0;
31   MPI_Aint recvext = 0;
32
33   int rank = comm->rank();
34   int size = comm->size();
35   if(rank != root) {
36     // Send buffer to root
37     Request::send(sendbuf, sendcount, sendtype, root, system_tag, comm);
38   } else {
39     recvtype->extent(&lb, &recvext);
40     // Local copy from root
41     Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + root * recvcount * recvext,
42                        recvcount, recvtype);
43     // Receive buffers from senders
44     MPI_Request *requests = xbt_new(MPI_Request, size - 1);
45     int index = 0;
46     for (int src = 0; src < size; src++) {
47       if(src != root) {
48         requests[index] = Request::irecv_init(static_cast<char*>(recvbuf) + src * recvcount * recvext, recvcount, recvtype,
49                                           src, system_tag, comm);
50         index++;
51       }
52     }
53     // Wait for completion of irecv's.
54     Request::startall(size - 1, requests);
55     Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
56     for (int src = 0; src < size-1; src++) {
57       Request::unref(&requests[src]);
58     }
59     xbt_free(requests);
60   }
61   return MPI_SUCCESS;
62 }
63
64 int Coll_reduce_scatter_default::reduce_scatter(void *sendbuf, void *recvbuf, int *recvcounts, MPI_Datatype datatype, MPI_Op op,
65                              MPI_Comm comm)
66 {
67   int rank = comm->rank();
68
69   /* arbitrarily choose root as rank 0 */
70   int size = comm->size();
71   int count = 0;
72   int *displs = xbt_new(int, size);
73   for (int i = 0; i < size; i++) {
74     displs[i] = count;
75     count += recvcounts[i];
76   }
77   void *tmpbuf = static_cast<void*>(smpi_get_tmp_sendbuffer(count*datatype->get_extent()));
78   int ret = MPI_SUCCESS;
79
80   ret = Coll_reduce_default::reduce(sendbuf, tmpbuf, count, datatype, op, 0, comm);
81   if(ret==MPI_SUCCESS)
82     ret = Colls::scatterv(tmpbuf, recvcounts, displs, datatype, recvbuf, recvcounts[rank], datatype, 0, comm);
83   xbt_free(displs);
84   smpi_free_tmp_buffer(tmpbuf);
85   return ret;
86 }
87
88
89 int Coll_allgather_default::allgather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
90                         void *recvbuf,int recvcount, MPI_Datatype recvtype, MPI_Comm comm)
91 {
92   int system_tag = COLL_TAG_ALLGATHER;
93   MPI_Aint lb = 0;
94   MPI_Aint recvext = 0;
95   MPI_Request *requests;
96
97   int rank = comm->rank();
98   int size = comm->size();
99   // FIXME: check for errors
100   recvtype->extent(&lb, &recvext);
101   // Local copy from self
102   Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char *>(recvbuf) + rank * recvcount * recvext, recvcount,
103                      recvtype);
104   // Send/Recv buffers to/from others;
105   requests = xbt_new(MPI_Request, 2 * (size - 1));
106   int index = 0;
107   for (int other = 0; other < size; other++) {
108     if(other != rank) {
109       requests[index] = Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag,comm);
110       index++;
111       requests[index] = Request::irecv_init(static_cast<char *>(recvbuf) + other * recvcount * recvext, recvcount, recvtype,
112                                         other, system_tag, comm);
113       index++;
114     }
115   }
116   // Wait for completion of all comms.
117   Request::startall(2 * (size - 1), requests);
118   Request::waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
119   for (int other = 0; other < 2*(size-1); other++) {
120     Request::unref(&requests[other]);
121   }
122   xbt_free(requests);
123   return MPI_SUCCESS;
124 }
125
126 int Coll_allgatherv_default::allgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf,
127                          int *recvcounts, int *displs, MPI_Datatype recvtype, MPI_Comm comm)
128 {
129   int system_tag = COLL_TAG_ALLGATHERV;
130   MPI_Aint lb = 0;
131   MPI_Aint recvext = 0;
132
133   int rank = comm->rank();
134   int size = comm->size();
135   recvtype->extent(&lb, &recvext);
136   // Local copy from self
137   Datatype::copy(sendbuf, sendcount, sendtype,
138                      static_cast<char *>(recvbuf) + displs[rank] * recvext,recvcounts[rank], recvtype);
139   // Send buffers to others;
140   MPI_Request *requests = xbt_new(MPI_Request, 2 * (size - 1));
141   int index = 0;
142   for (int other = 0; other < size; other++) {
143     if(other != rank) {
144       requests[index] =
145         Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm);
146       index++;
147       requests[index] = Request::irecv_init(static_cast<char *>(recvbuf) + displs[other] * recvext, recvcounts[other],
148                           recvtype, other, system_tag, comm);
149       index++;
150     }
151   }
152   // Wait for completion of all comms.
153   Request::startall(2 * (size - 1), requests);
154   Request::waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
155   for (int other = 0; other < 2*(size-1); other++) {
156     Request::unref(&requests[other]);
157   }
158   xbt_free(requests);
159   return MPI_SUCCESS;
160 }
161
162 int Coll_scatter_default::scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
163                       void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
164 {
165   int system_tag = COLL_TAG_SCATTER;
166   MPI_Aint lb = 0;
167   MPI_Aint sendext = 0;
168   MPI_Request *requests;
169
170   int rank = comm->rank();
171   int size = comm->size();
172   if(rank != root) {
173     // Recv buffer from root
174     Request::recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
175   } else {
176     sendtype->extent(&lb, &sendext);
177     // Local copy from root
178     if(recvbuf!=MPI_IN_PLACE){
179         Datatype::copy(static_cast<char *>(sendbuf) + root * sendcount * sendext,
180                            sendcount, sendtype, recvbuf, recvcount, recvtype);
181     }
182     // Send buffers to receivers
183     requests = xbt_new(MPI_Request, size - 1);
184     int index = 0;
185     for(int dst = 0; dst < size; dst++) {
186       if(dst != root) {
187         requests[index] = Request::isend_init(static_cast<char *>(sendbuf) + dst * sendcount * sendext, sendcount, sendtype,
188                                           dst, system_tag, comm);
189         index++;
190       }
191     }
192     // Wait for completion of isend's.
193     Request::startall(size - 1, requests);
194     Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
195     for (int dst = 0; dst < size-1; dst++) {
196       Request::unref(&requests[dst]);
197     }
198     xbt_free(requests);
199   }
200   return MPI_SUCCESS;
201 }
202
203
204
205 int Coll_reduce_default::reduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root,
206                      MPI_Comm comm)
207 {
208   int system_tag = COLL_TAG_REDUCE;
209   MPI_Aint lb = 0;
210   MPI_Aint dataext = 0;
211
212   char* sendtmpbuf = static_cast<char *>(sendbuf);
213
214   int rank = comm->rank();
215   int size = comm->size();
216   //non commutative case, use a working algo from openmpi
217   if (op != MPI_OP_NULL && not op->is_commutative()) {
218     return Coll_reduce_ompi_basic_linear::reduce(sendtmpbuf, recvbuf, count, datatype, op, root, comm);
219   }
220
221   if( sendbuf == MPI_IN_PLACE ) {
222     sendtmpbuf = static_cast<char *>(smpi_get_tmp_sendbuffer(count*datatype->get_extent()));
223     Datatype::copy(recvbuf, count, datatype,sendtmpbuf, count, datatype);
224   }
225
226   if(rank != root) {
227     // Send buffer to root
228     Request::send(sendtmpbuf, count, datatype, root, system_tag, comm);
229   } else {
230     datatype->extent(&lb, &dataext);
231     // Local copy from root
232     if (sendtmpbuf != nullptr && recvbuf != nullptr)
233       Datatype::copy(sendtmpbuf, count, datatype, recvbuf, count, datatype);
234     // Receive buffers from senders
235     MPI_Request *requests = xbt_new(MPI_Request, size - 1);
236     void **tmpbufs = xbt_new(void *, size - 1);
237     int index = 0;
238     for (int src = 0; src < size; src++) {
239       if (src != root) {
240         if (not smpi_process()->replaying())
241           tmpbufs[index] = xbt_malloc(count * dataext);
242          else
243            tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
244         requests[index] =
245           Request::irecv_init(tmpbufs[index], count, datatype, src, system_tag, comm);
246         index++;
247       }
248     }
249     // Wait for completion of irecv's.
250     Request::startall(size - 1, requests);
251     for (int src = 0; src < size - 1; src++) {
252       index = Request::waitany(size - 1, requests, MPI_STATUS_IGNORE);
253       XBT_DEBUG("finished waiting any request with index %d", index);
254       if(index == MPI_UNDEFINED) {
255         break;
256       }else{
257         Request::unref(&requests[index]);
258       }
259       if(op) /* op can be MPI_OP_NULL that does nothing */
260         if(op!=MPI_OP_NULL) op->apply( tmpbufs[index], recvbuf, &count, datatype);
261     }
262       for(index = 0; index < size - 1; index++) {
263         smpi_free_tmp_buffer(tmpbufs[index]);
264       }
265     xbt_free(tmpbufs);
266     xbt_free(requests);
267
268   }
269   if( sendbuf == MPI_IN_PLACE ) {
270     smpi_free_tmp_buffer(sendtmpbuf);
271   }
272   return MPI_SUCCESS;
273 }
274
275 int Coll_allreduce_default::allreduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
276 {
277   int ret;
278   ret = Colls::reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
279   if(ret==MPI_SUCCESS)
280     ret = Colls::bcast(recvbuf, count, datatype, 0, comm);
281   return ret;
282 }
283
284 int Coll_alltoall_default::alltoall( void *sbuf, int scount, MPI_Datatype sdtype, void* rbuf, int rcount, MPI_Datatype rdtype, MPI_Comm comm)
285 {
286   return Coll_alltoall_ompi::alltoall(sbuf, scount, sdtype, rbuf, rcount, rdtype, comm);
287 }
288
289
290
291 int Coll_alltoallv_default::alltoallv(void *sendbuf, int *sendcounts, int *senddisps, MPI_Datatype sendtype,
292                               void *recvbuf, int *recvcounts, int *recvdisps, MPI_Datatype recvtype, MPI_Comm comm)
293 {
294   int system_tag = 889;
295   int i;
296   int count;
297   MPI_Aint lb = 0;
298   MPI_Aint sendext = 0;
299   MPI_Aint recvext = 0;
300   MPI_Request *requests;
301
302   /* Initialize. */
303   int rank = comm->rank();
304   int size = comm->size();
305   XBT_DEBUG("<%d> algorithm basic_alltoallv() called.", rank);
306   sendtype->extent(&lb, &sendext);
307   recvtype->extent(&lb, &recvext);
308   /* Local copy from self */
309   int err = Datatype::copy(static_cast<char *>(sendbuf) + senddisps[rank] * sendext, sendcounts[rank], sendtype,
310                                static_cast<char *>(recvbuf) + recvdisps[rank] * recvext, recvcounts[rank], recvtype);
311   if (err == MPI_SUCCESS && size > 1) {
312     /* Initiate all send/recv to/from others. */
313     requests = xbt_new(MPI_Request, 2 * (size - 1));
314     count = 0;
315     /* Create all receives that will be posted first */
316     for (i = 0; i < size; ++i) {
317       if (i != rank && recvcounts[i] != 0) {
318         requests[count] = Request::irecv_init(static_cast<char *>(recvbuf) + recvdisps[i] * recvext,
319                                           recvcounts[i], recvtype, i, system_tag, comm);
320         count++;
321       }else{
322         XBT_DEBUG("<%d> skip request creation [src = %d, recvcounts[src] = %d]", rank, i, recvcounts[i]);
323       }
324     }
325     /* Now create all sends  */
326     for (i = 0; i < size; ++i) {
327       if (i != rank && sendcounts[i] != 0) {
328       requests[count] = Request::isend_init(static_cast<char *>(sendbuf) + senddisps[i] * sendext,
329                                         sendcounts[i], sendtype, i, system_tag, comm);
330       count++;
331       }else{
332         XBT_DEBUG("<%d> skip request creation [dst = %d, sendcounts[dst] = %d]", rank, i, sendcounts[i]);
333       }
334     }
335     /* Wait for them all. */
336     Request::startall(count, requests);
337     XBT_DEBUG("<%d> wait for %d requests", rank, count);
338     Request::waitall(count, requests, MPI_STATUS_IGNORE);
339     for(i = 0; i < count; i++) {
340       if(requests[i]!=MPI_REQUEST_NULL)
341         Request::unref(&requests[i]);
342     }
343     xbt_free(requests);
344   }
345   return err;
346 }
347
348 }
349 }
350