Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge pull request #228 from Takishipp/actor-execute
[simgrid.git] / src / smpi / colls / smpi_default_selector.cpp
1 /* selector with default/naive Simgrid algorithms. These should not be trusted for performance evaluations */
2
3 /* Copyright (c) 2009-2010, 2013-2017. The SimGrid Team.
4  * All rights reserved.                                                     */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 #include "colls_private.hpp"
10 #include "smpi_process.hpp"
11
12 namespace simgrid{
13 namespace smpi{
14
15 int Coll_bcast_default::bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm)
16 {
17   return Coll_bcast_binomial_tree::bcast(buf, count, datatype, root, comm);
18 }
19
20 int Coll_barrier_default::barrier(MPI_Comm comm)
21 {
22   return Coll_barrier_ompi_basic_linear::barrier(comm);
23 }
24
25
26 int Coll_gather_default::gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
27                      void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
28 {
29   int system_tag = COLL_TAG_GATHER;
30   MPI_Aint lb = 0;
31   MPI_Aint recvext = 0;
32
33   int rank = comm->rank();
34   int size = comm->size();
35   if(rank != root) {
36     // Send buffer to root
37     Request::send(sendbuf, sendcount, sendtype, root, system_tag, comm);
38   } else {
39     recvtype->extent(&lb, &recvext);
40     // Local copy from root
41     Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + root * recvcount * recvext,
42                        recvcount, recvtype);
43     // Receive buffers from senders
44     MPI_Request *requests = xbt_new(MPI_Request, size - 1);
45     int index = 0;
46     for (int src = 0; src < size; src++) {
47       if(src != root) {
48         requests[index] = Request::irecv_init(static_cast<char*>(recvbuf) + src * recvcount * recvext, recvcount, recvtype,
49                                           src, system_tag, comm);
50         index++;
51       }
52     }
53     // Wait for completion of irecv's.
54     Request::startall(size - 1, requests);
55     Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
56     for (int src = 0; src < size-1; src++) {
57       Request::unref(&requests[src]);
58     }
59     xbt_free(requests);
60   }
61   return MPI_SUCCESS;
62 }
63
64 int Coll_reduce_scatter_default::reduce_scatter(void *sendbuf, void *recvbuf, int *recvcounts, MPI_Datatype datatype, MPI_Op op,
65                              MPI_Comm comm)
66 {
67   int rank = comm->rank();
68
69   /* arbitrarily choose root as rank 0 */
70   int size = comm->size();
71   int count = 0;
72   int *displs = xbt_new(int, size);
73   for (int i = 0; i < size; i++) {
74     displs[i] = count;
75     count += recvcounts[i];
76   }
77   void *tmpbuf = static_cast<void*>(smpi_get_tmp_sendbuffer(count*datatype->get_extent()));
78   int ret = MPI_SUCCESS;
79
80   ret = Coll_reduce_default::reduce(sendbuf, tmpbuf, count, datatype, op, 0, comm);
81   if(ret==MPI_SUCCESS)
82     ret = Colls::scatterv(tmpbuf, recvcounts, displs, datatype, recvbuf, recvcounts[rank], datatype, 0, comm);
83   xbt_free(displs);
84   smpi_free_tmp_buffer(tmpbuf);
85   return ret;
86 }
87
88
89 int Coll_allgather_default::allgather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
90                         void *recvbuf,int recvcount, MPI_Datatype recvtype, MPI_Comm comm)
91 {
92   int system_tag = COLL_TAG_ALLGATHER;
93   MPI_Aint lb = 0;
94   MPI_Aint recvext = 0;
95   MPI_Request *requests;
96
97   int rank = comm->rank();
98   int size = comm->size();
99   // FIXME: check for errors
100   recvtype->extent(&lb, &recvext);
101   // Local copy from self
102   Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char *>(recvbuf) + rank * recvcount * recvext, recvcount,
103                      recvtype);
104   // Send/Recv buffers to/from others;
105   requests = xbt_new(MPI_Request, 2 * (size - 1));
106   int index = 0;
107   for (int other = 0; other < size; other++) {
108     if(other != rank) {
109       requests[index] = Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag,comm);
110       index++;
111       requests[index] = Request::irecv_init(static_cast<char *>(recvbuf) + other * recvcount * recvext, recvcount, recvtype,
112                                         other, system_tag, comm);
113       index++;
114     }
115   }
116   // Wait for completion of all comms.
117   Request::startall(2 * (size - 1), requests);
118   Request::waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
119   for (int other = 0; other < 2*(size-1); other++) {
120     Request::unref(&requests[other]);
121   }
122   xbt_free(requests);
123   return MPI_SUCCESS;
124 }
125
126 int Coll_allgatherv_default::allgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf,
127                          int *recvcounts, int *displs, MPI_Datatype recvtype, MPI_Comm comm)
128 {
129   int system_tag = COLL_TAG_ALLGATHERV;
130   MPI_Aint lb = 0;
131   MPI_Aint recvext = 0;
132
133   int rank = comm->rank();
134   int size = comm->size();
135   recvtype->extent(&lb, &recvext);
136   // Local copy from self
137   Datatype::copy(sendbuf, sendcount, sendtype,
138                      static_cast<char *>(recvbuf) + displs[rank] * recvext,recvcounts[rank], recvtype);
139   // Send buffers to others;
140   MPI_Request *requests = xbt_new(MPI_Request, 2 * (size - 1));
141   int index = 0;
142   for (int other = 0; other < size; other++) {
143     if(other != rank) {
144       requests[index] =
145         Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm);
146       index++;
147       requests[index] = Request::irecv_init(static_cast<char *>(recvbuf) + displs[other] * recvext, recvcounts[other],
148                           recvtype, other, system_tag, comm);
149       index++;
150     }
151   }
152   // Wait for completion of all comms.
153   Request::startall(2 * (size - 1), requests);
154   Request::waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
155   for (int other = 0; other < 2*(size-1); other++) {
156     Request::unref(&requests[other]);
157   }
158   xbt_free(requests);
159   return MPI_SUCCESS;
160 }
161
162 int Coll_scatter_default::scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
163                       void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
164 {
165   int system_tag = COLL_TAG_SCATTER;
166   MPI_Aint lb = 0;
167   MPI_Aint sendext = 0;
168   MPI_Request *requests;
169
170   int rank = comm->rank();
171   int size = comm->size();
172   if(rank != root) {
173     // Recv buffer from root
174     Request::recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
175   } else {
176     sendtype->extent(&lb, &sendext);
177     // Local copy from root
178     if(recvbuf!=MPI_IN_PLACE){
179         Datatype::copy(static_cast<char *>(sendbuf) + root * sendcount * sendext,
180                            sendcount, sendtype, recvbuf, recvcount, recvtype);
181     }
182     // Send buffers to receivers
183     requests = xbt_new(MPI_Request, size - 1);
184     int index = 0;
185     for(int dst = 0; dst < size; dst++) {
186       if(dst != root) {
187         requests[index] = Request::isend_init(static_cast<char *>(sendbuf) + dst * sendcount * sendext, sendcount, sendtype,
188                                           dst, system_tag, comm);
189         index++;
190       }
191     }
192     // Wait for completion of isend's.
193     Request::startall(size - 1, requests);
194     Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
195     for (int dst = 0; dst < size-1; dst++) {
196       Request::unref(&requests[dst]);
197     }
198     xbt_free(requests);
199   }
200   return MPI_SUCCESS;
201 }
202
203
204
205 int Coll_reduce_default::reduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root,
206                      MPI_Comm comm)
207 {
208   int system_tag = COLL_TAG_REDUCE;
209   MPI_Aint lb = 0;
210   MPI_Aint dataext = 0;
211
212   char* sendtmpbuf = static_cast<char *>(sendbuf);
213
214   int rank = comm->rank();
215   int size = comm->size();
216   if(size==0)
217     return MPI_ERR_COMM;
218   //non commutative case, use a working algo from openmpi
219   if (op != MPI_OP_NULL && not op->is_commutative()) {
220     return Coll_reduce_ompi_basic_linear::reduce(sendtmpbuf, recvbuf, count, datatype, op, root, comm);
221   }
222
223   if( sendbuf == MPI_IN_PLACE ) {
224     sendtmpbuf = static_cast<char *>(smpi_get_tmp_sendbuffer(count*datatype->get_extent()));
225     Datatype::copy(recvbuf, count, datatype,sendtmpbuf, count, datatype);
226   }
227
228   if(rank != root) {
229     // Send buffer to root
230     Request::send(sendtmpbuf, count, datatype, root, system_tag, comm);
231   } else {
232     datatype->extent(&lb, &dataext);
233     // Local copy from root
234     if (sendtmpbuf != nullptr && recvbuf != nullptr)
235       Datatype::copy(sendtmpbuf, count, datatype, recvbuf, count, datatype);
236     // Receive buffers from senders
237     MPI_Request *requests = xbt_new(MPI_Request, size - 1);
238     void **tmpbufs = xbt_new(void *, size - 1);
239     int index = 0;
240     for (int src = 0; src < size; src++) {
241       if (src != root) {
242         if (not smpi_process()->replaying())
243           tmpbufs[index] = xbt_malloc(count * dataext);
244          else
245            tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
246         requests[index] =
247           Request::irecv_init(tmpbufs[index], count, datatype, src, system_tag, comm);
248         index++;
249       }
250     }
251     // Wait for completion of irecv's.
252     Request::startall(size - 1, requests);
253     for (int src = 0; src < size - 1; src++) {
254       index = Request::waitany(size - 1, requests, MPI_STATUS_IGNORE);
255       XBT_DEBUG("finished waiting any request with index %d", index);
256       if(index == MPI_UNDEFINED) {
257         break;
258       }else{
259         Request::unref(&requests[index]);
260       }
261       if(op) /* op can be MPI_OP_NULL that does nothing */
262         if(op!=MPI_OP_NULL) op->apply( tmpbufs[index], recvbuf, &count, datatype);
263     }
264       for(index = 0; index < size - 1; index++) {
265         smpi_free_tmp_buffer(tmpbufs[index]);
266       }
267     xbt_free(tmpbufs);
268     xbt_free(requests);
269
270   }
271   if( sendbuf == MPI_IN_PLACE ) {
272     smpi_free_tmp_buffer(sendtmpbuf);
273   }
274   return MPI_SUCCESS;
275 }
276
277 int Coll_allreduce_default::allreduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
278 {
279   int ret;
280   ret = Colls::reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
281   if(ret==MPI_SUCCESS)
282     ret = Colls::bcast(recvbuf, count, datatype, 0, comm);
283   return ret;
284 }
285
286 int Coll_alltoall_default::alltoall( void *sbuf, int scount, MPI_Datatype sdtype, void* rbuf, int rcount, MPI_Datatype rdtype, MPI_Comm comm)
287 {
288   return Coll_alltoall_ompi::alltoall(sbuf, scount, sdtype, rbuf, rcount, rdtype, comm);
289 }
290
291
292
293 int Coll_alltoallv_default::alltoallv(void *sendbuf, int *sendcounts, int *senddisps, MPI_Datatype sendtype,
294                               void *recvbuf, int *recvcounts, int *recvdisps, MPI_Datatype recvtype, MPI_Comm comm)
295 {
296   int system_tag = 889;
297   int i;
298   int count;
299   MPI_Aint lb = 0;
300   MPI_Aint sendext = 0;
301   MPI_Aint recvext = 0;
302   MPI_Request *requests;
303
304   /* Initialize. */
305   int rank = comm->rank();
306   int size = comm->size();
307   XBT_DEBUG("<%d> algorithm basic_alltoallv() called.", rank);
308   sendtype->extent(&lb, &sendext);
309   recvtype->extent(&lb, &recvext);
310   /* Local copy from self */
311   int err = Datatype::copy(static_cast<char *>(sendbuf) + senddisps[rank] * sendext, sendcounts[rank], sendtype,
312                                static_cast<char *>(recvbuf) + recvdisps[rank] * recvext, recvcounts[rank], recvtype);
313   if (err == MPI_SUCCESS && size > 1) {
314     /* Initiate all send/recv to/from others. */
315     requests = xbt_new(MPI_Request, 2 * (size - 1));
316     count = 0;
317     /* Create all receives that will be posted first */
318     for (i = 0; i < size; ++i) {
319       if (i != rank && recvcounts[i] != 0) {
320         requests[count] = Request::irecv_init(static_cast<char *>(recvbuf) + recvdisps[i] * recvext,
321                                           recvcounts[i], recvtype, i, system_tag, comm);
322         count++;
323       }else{
324         XBT_DEBUG("<%d> skip request creation [src = %d, recvcounts[src] = %d]", rank, i, recvcounts[i]);
325       }
326     }
327     /* Now create all sends  */
328     for (i = 0; i < size; ++i) {
329       if (i != rank && sendcounts[i] != 0) {
330       requests[count] = Request::isend_init(static_cast<char *>(sendbuf) + senddisps[i] * sendext,
331                                         sendcounts[i], sendtype, i, system_tag, comm);
332       count++;
333       }else{
334         XBT_DEBUG("<%d> skip request creation [dst = %d, sendcounts[dst] = %d]", rank, i, sendcounts[i]);
335       }
336     }
337     /* Wait for them all. */
338     Request::startall(count, requests);
339     XBT_DEBUG("<%d> wait for %d requests", rank, count);
340     Request::waitall(count, requests, MPI_STATUS_IGNORE);
341     for(i = 0; i < count; i++) {
342       if(requests[i]!=MPI_REQUEST_NULL)
343         Request::unref(&requests[i]);
344     }
345     xbt_free(requests);
346   }
347   return err;
348 }
349
350 }
351 }
352