Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Kill smpi::Group::actor(int).
[simgrid.git] / src / smpi / mpi / smpi_request.cpp
1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_request.hpp"
7
8 #include "mc/mc.h"
9 #include "private.hpp"
10 #include "simgrid/Exception.hpp"
11 #include "simgrid/s4u/Exec.hpp"
12 #include "simgrid/s4u/Mutex.hpp"
13 #include "simgrid/s4u/ConditionVariable.hpp"
14 #include "smpi_comm.hpp"
15 #include "smpi_datatype.hpp"
16 #include "smpi_host.hpp"
17 #include "smpi_op.hpp"
18 #include "src/kernel/activity/CommImpl.hpp"
19 #include "src/mc/mc_replay.hpp"
20 #include "src/smpi/include/smpi_actor.hpp"
21
22 #include <algorithm>
23 #include <array>
24
25 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_request, smpi, "Logging specific to SMPI (request)");
26
27 static simgrid::config::Flag<double> smpi_iprobe_sleep(
28   "smpi/iprobe", "Minimum time to inject inside a call to MPI_Iprobe", 1e-4);
29 static simgrid::config::Flag<double> smpi_test_sleep(
30   "smpi/test", "Minimum time to inject inside a call to MPI_Test", 1e-4);
31
32 std::vector<s_smpi_factor_t> smpi_ois_values;
33
34 extern void (*smpi_comm_copy_data_callback)(simgrid::kernel::activity::CommImpl*, void*, size_t);
35
36 namespace simgrid{
37 namespace smpi{
38
39 Request::Request(const void* buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm,
40                  unsigned flags, MPI_Op op)
41     : buf_(const_cast<void*>(buf))
42     , old_type_(datatype)
43     , size_(datatype->size() * count)
44     , src_(src)
45     , dst_(dst)
46     , tag_(tag)
47     , comm_(comm)
48     , flags_(flags)
49     , op_(op)
50 {
51   datatype->ref();
52   comm_->ref();
53   if(op != MPI_REPLACE && op != MPI_OP_NULL)
54     op_->ref();
55   action_          = nullptr;
56   detached_        = false;
57   detached_sender_ = nullptr;
58   real_src_        = 0;
59   truncated_       = false;
60   real_size_       = 0;
61   real_tag_        = 0;
62   if (flags & MPI_REQ_PERSISTENT)
63     refcount_ = 1;
64   else
65     refcount_ = 0;
66   nbc_requests_=nullptr;
67   nbc_requests_size_=0;
68   init_buffer(count);
69   this->add_f();
70 }
71
72 void Request::ref(){
73   refcount_++;
74 }
75
76 void Request::unref(MPI_Request* request)
77 {
78   xbt_assert(*request != MPI_REQUEST_NULL, "freeing an already free request");
79
80   (*request)->refcount_--;
81   if ((*request)->refcount_ < 0) {
82     (*request)->print_request("wrong refcount");
83     xbt_die("Whoops, wrong refcount");
84   }
85   if ((*request)->refcount_ == 0) {
86     if ((*request)->flags_ & MPI_REQ_GENERALIZED) {
87       ((*request)->generalized_funcs)->free_fn(((*request)->generalized_funcs)->extra_state);
88     } else {
89       Comm::unref((*request)->comm_);
90       Datatype::unref((*request)->old_type_);
91     }
92     if ((*request)->op_ != MPI_REPLACE && (*request)->op_ != MPI_OP_NULL)
93       Op::unref(&(*request)->op_);
94
95     (*request)->print_request("Destroying");
96     F2C::free_f((*request)->f2c_id());
97     delete *request;
98     *request = MPI_REQUEST_NULL;
99   } else {
100     (*request)->print_request("Decrementing");
101   }
102 }
103
104 bool Request::match_common(MPI_Request req, MPI_Request sender, MPI_Request receiver)
105 {
106   xbt_assert(sender, "Cannot match against null sender");
107   xbt_assert(receiver, "Cannot match against null receiver");
108   XBT_DEBUG("Trying to match %s of sender src %d against %d, tag %d against %d, id %d against %d",
109             (req == receiver ? "send" : "recv"), sender->src_, receiver->src_, sender->tag_, receiver->tag_,
110             sender->comm_->id(), receiver->comm_->id());
111
112   if ((receiver->comm_->id() == MPI_UNDEFINED || sender->comm_->id() == MPI_UNDEFINED ||
113        receiver->comm_->id() == sender->comm_->id()) &&
114       ((receiver->src_ == MPI_ANY_SOURCE && (receiver->comm_->group()->rank(sender->src_) != MPI_UNDEFINED)) ||
115        receiver->src_ == sender->src_) &&
116       ((receiver->tag_ == MPI_ANY_TAG && sender->tag_ >= 0) || receiver->tag_ == sender->tag_)) {
117     // we match, we can transfer some values
118     if (receiver->src_ == MPI_ANY_SOURCE)
119       receiver->real_src_ = sender->src_;
120     if (receiver->tag_ == MPI_ANY_TAG)
121       receiver->real_tag_ = sender->tag_;
122     if ((receiver->flags_ & MPI_REQ_PROBE) == 0 ){
123       if (receiver->real_size_ < sender->real_size_){
124         XBT_DEBUG("Truncating message - should not happen: receiver size : %zu < sender size : %zu", receiver->real_size_, sender->real_size_);
125         receiver->truncated_ = true;
126       } else if (receiver->real_size_ > sender->real_size_){
127         receiver->real_size_=sender->real_size_;
128       }
129     }
130     if (sender->detached_)
131       receiver->detached_sender_ = sender; // tie the sender to the receiver, as it is detached and has to be freed in
132                                            // the receiver
133     req->flags_ |= MPI_REQ_MATCHED; // mark as impossible to cancel anymore
134     XBT_DEBUG("match succeeded");
135     return true;
136   }
137   return false;
138 }
139
140 void Request::init_buffer(int count){
141   void *old_buf = nullptr;
142 // FIXME Handle the case of a partial shared malloc.
143   // This part handles the problem of non-contiguous memory (for the unserialization at the reception)
144   if ((((flags_ & MPI_REQ_RECV) != 0) && ((flags_ & MPI_REQ_ACCUMULATE) != 0)) || (old_type_->flags() & DT_FLAG_DERIVED)) {
145     // This part handles the problem of non-contiguous memory
146     old_buf = buf_;
147     if (count==0){
148       buf_ = nullptr;
149     }else {
150       buf_ = xbt_malloc(count*old_type_->size());
151       if ((old_type_->flags() & DT_FLAG_DERIVED) && ((flags_ & MPI_REQ_SEND) != 0)) {
152         old_type_->serialize(old_buf, buf_, count);
153       }
154     }
155   }
156   old_buf_  = old_buf;
157 }
158
159 bool Request::match_recv(void* a, void* b, simgrid::kernel::activity::CommImpl*)
160 {
161   auto ref = static_cast<MPI_Request>(a);
162   auto req = static_cast<MPI_Request>(b);
163   return match_common(req, req, ref);
164 }
165
166 bool Request::match_send(void* a, void* b, simgrid::kernel::activity::CommImpl*)
167 {
168   auto ref = static_cast<MPI_Request>(a);
169   auto req = static_cast<MPI_Request>(b);
170   return match_common(req, ref, req);
171 }
172
173 void Request::print_request(const char* message) const
174 {
175   XBT_VERB("%s  request %p  [buf = %p, size = %zu, src = %d, dst = %d, tag = %d, flags = %x]",
176        message, this, buf_, size_, src_, dst_, tag_, flags_);
177 }
178
179 /* factories, to hide the internal flags from the caller */
180 MPI_Request Request::bsend_init(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
181 {
182   return new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
183                      dst != MPI_PROC_NULL ? comm->group()->actor_pid(dst) : MPI_PROC_NULL, tag, comm,
184                      MPI_REQ_PERSISTENT | MPI_REQ_SEND | MPI_REQ_PREPARED | MPI_REQ_BSEND);
185 }
186
187 MPI_Request Request::send_init(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
188 {
189   return new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
190                      dst != MPI_PROC_NULL ? comm->group()->actor_pid(dst) : MPI_PROC_NULL, tag, comm,
191                      MPI_REQ_PERSISTENT | MPI_REQ_SEND | MPI_REQ_PREPARED);
192 }
193
194 MPI_Request Request::ssend_init(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
195 {
196   return new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
197                      dst != MPI_PROC_NULL ? comm->group()->actor_pid(dst) : MPI_PROC_NULL, tag, comm,
198                      MPI_REQ_PERSISTENT | MPI_REQ_SSEND | MPI_REQ_SEND | MPI_REQ_PREPARED);
199 }
200
201 MPI_Request Request::isend_init(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
202 {
203   return new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
204                      dst != MPI_PROC_NULL ? comm->group()->actor_pid(dst) : MPI_PROC_NULL, tag, comm,
205                      MPI_REQ_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_PREPARED);
206 }
207
208
209 MPI_Request Request::rma_send_init(const void *buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm,
210                                MPI_Op op)
211 {
212   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
213   if(op==MPI_OP_NULL){
214     request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, comm->group()->actor_pid(src),
215                           dst != MPI_PROC_NULL ? comm->group()->actor_pid(dst) : MPI_PROC_NULL, tag, comm,
216                           MPI_REQ_RMA | MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_PREPARED);
217   }else{
218     request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, comm->group()->actor_pid(src),
219                           dst != MPI_PROC_NULL ? comm->group()->actor_pid(dst) : MPI_PROC_NULL, tag, comm,
220                           MPI_REQ_RMA | MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_PREPARED |
221                               MPI_REQ_ACCUMULATE,
222                           op);
223   }
224   return request;
225 }
226
227 MPI_Request Request::recv_init(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm)
228 {
229   int source = MPI_PROC_NULL;
230   if (src == MPI_ANY_SOURCE)
231     source = MPI_ANY_SOURCE;
232   else if (src != MPI_PROC_NULL)
233     source = comm->group()->actor_pid(src);
234   return new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype,
235                      source,
236                      simgrid::s4u::this_actor::get_pid(), tag, comm,
237                      MPI_REQ_PERSISTENT | MPI_REQ_RECV | MPI_REQ_PREPARED);
238 }
239
240 MPI_Request Request::rma_recv_init(void *buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm,
241                                MPI_Op op)
242 {
243   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
244   int source = MPI_PROC_NULL;
245   if (src == MPI_ANY_SOURCE)
246     source = MPI_ANY_SOURCE;
247   else if (src != MPI_PROC_NULL)
248     source = comm->group()->actor_pid(src);
249   if(op==MPI_OP_NULL){
250     request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, source,
251                           dst != MPI_PROC_NULL ? comm->group()->actor_pid(dst) : MPI_PROC_NULL, tag, comm,
252                           MPI_REQ_RMA | MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV | MPI_REQ_PREPARED);
253   }else{
254     request =
255         new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, source,
256                     dst != MPI_PROC_NULL ? comm->group()->actor_pid(dst) : MPI_PROC_NULL, tag, comm,
257                     MPI_REQ_RMA | MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV | MPI_REQ_PREPARED | MPI_REQ_ACCUMULATE, op);
258   }
259   return request;
260 }
261
262 MPI_Request Request::irecv_init(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm)
263 {
264   int source = MPI_PROC_NULL;
265   if (src == MPI_ANY_SOURCE)
266     source = MPI_ANY_SOURCE;
267   else if (src != MPI_PROC_NULL)
268     source = comm->group()->actor_pid(src);
269   return new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype,
270                      source, simgrid::s4u::this_actor::get_pid(), tag, comm,
271                      MPI_REQ_PERSISTENT | MPI_REQ_RECV | MPI_REQ_PREPARED);
272 }
273
274 MPI_Request Request::ibsend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
275 {
276   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
277   request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
278                         dst != MPI_PROC_NULL ? comm->group()->actor_pid(dst) : MPI_PROC_NULL, tag, comm,
279                         MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_BSEND);
280   if(dst != MPI_PROC_NULL)
281     request->start();
282   return request;
283 }
284
285 MPI_Request Request::isend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
286 {
287   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
288   request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
289                         dst != MPI_PROC_NULL ? comm->group()->actor_pid(dst) : MPI_PROC_NULL, tag, comm,
290                         MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND);
291   if(dst != MPI_PROC_NULL)
292     request->start();
293   return request;
294 }
295
296 MPI_Request Request::issend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
297 {
298   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
299   request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
300                         dst != MPI_PROC_NULL ? comm->group()->actor_pid(dst) : MPI_PROC_NULL, tag, comm,
301                         MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SSEND | MPI_REQ_SEND);
302   if(dst != MPI_PROC_NULL)
303     request->start();
304   return request;
305 }
306
307
308 MPI_Request Request::irecv(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm)
309 {
310   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
311   int source = MPI_PROC_NULL;
312   if (src == MPI_ANY_SOURCE)
313     source = MPI_ANY_SOURCE;
314   else if (src != MPI_PROC_NULL)
315     source = comm->group()->actor_pid(src);
316   request             = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype,
317                         source, simgrid::s4u::this_actor::get_pid(), tag, comm, 
318                         MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV);
319   if(src != MPI_PROC_NULL)
320     request->start();
321   return request;
322 }
323
324 int Request::recv(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status * status)
325 {
326   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
327   request = irecv(buf, count, datatype, src, tag, comm);
328   int retval = wait(&request,status);
329   request = nullptr;
330   return retval;
331 }
332
333 void Request::bsend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
334 {
335   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
336   request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
337                         dst != MPI_PROC_NULL ? comm->group()->actor_pid(dst) : MPI_PROC_NULL, tag, comm,
338                         MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND | MPI_REQ_BSEND);
339
340   if(dst != MPI_PROC_NULL)
341    request->start();
342   wait(&request, MPI_STATUS_IGNORE);
343   request = nullptr;
344 }
345
346 void Request::send(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
347 {
348   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
349   request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
350                         dst != MPI_PROC_NULL ? comm->group()->actor_pid(dst) : MPI_PROC_NULL, tag, comm,
351                         MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND);
352   if(dst != MPI_PROC_NULL)
353    request->start();
354   wait(&request, MPI_STATUS_IGNORE);
355   request = nullptr;
356 }
357
358 void Request::ssend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
359 {
360   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
361   request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
362                         dst != MPI_PROC_NULL ? comm->group()->actor_pid(dst) : MPI_PROC_NULL, tag, comm,
363                         MPI_REQ_NON_PERSISTENT | MPI_REQ_SSEND | MPI_REQ_SEND);
364
365   if(dst != MPI_PROC_NULL)
366    request->start();
367   wait(&request,MPI_STATUS_IGNORE);
368   request = nullptr;
369 }
370
371 void Request::sendrecv(const void *sendbuf, int sendcount, MPI_Datatype sendtype,int dst, int sendtag,
372                        void *recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag,
373                        MPI_Comm comm, MPI_Status * status)
374 {
375   int source = MPI_PROC_NULL;
376   if (src == MPI_ANY_SOURCE)
377     source = MPI_ANY_SOURCE;
378   else if (src != MPI_PROC_NULL)
379     source = comm->group()->actor_pid(src);
380   int destination = dst != MPI_PROC_NULL ? comm->group()->actor_pid(dst) : MPI_PROC_NULL;
381
382   std::array<MPI_Request, 2> requests;
383   std::array<MPI_Status, 2> stats;
384   int myid = simgrid::s4u::this_actor::get_pid();
385   if ((destination == myid) && (source == myid)) {
386     Datatype::copy(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype);
387     if (status != MPI_STATUS_IGNORE) {
388       status->MPI_SOURCE = source;
389       status->MPI_TAG    = recvtag;
390       status->MPI_ERROR  = MPI_SUCCESS;
391       status->count      = sendcount * sendtype->size();
392     }
393     return;
394   }
395   requests[0] = isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
396   requests[1] = irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
397   startall(2, requests.data());
398   waitall(2, requests.data(), stats.data());
399   unref(&requests[0]);
400   unref(&requests[1]);
401   if(status != MPI_STATUS_IGNORE) {
402     // Copy receive status
403     *status = stats[1];
404   }
405 }
406
407 void Request::start()
408 {
409   s4u::Mailbox* mailbox;
410
411   xbt_assert(action_ == nullptr, "Cannot (re-)start unfinished communication");
412   //reinitialize temporary buffer for persistent requests
413   if(real_size_ > 0 && flags_ & MPI_REQ_FINISHED){
414     buf_ = old_buf_;
415     init_buffer(real_size_/old_type_->size());
416   }
417   flags_ &= ~MPI_REQ_PREPARED;
418   flags_ &= ~MPI_REQ_FINISHED;
419   this->ref();
420
421   // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
422   real_size_=size_;
423   if ((flags_ & MPI_REQ_RECV) != 0) {
424     this->print_request("New recv");
425
426     simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_));
427
428     simgrid::s4u::MutexPtr mut = process->mailboxes_mutex();
429     if (smpi_cfg_async_small_thresh() != 0 || (flags_ & MPI_REQ_RMA) != 0)
430       mut->lock();
431
432     if (smpi_cfg_async_small_thresh() == 0 && (flags_ & MPI_REQ_RMA) == 0) {
433       mailbox = process->mailbox();
434     } else if (((flags_ & MPI_REQ_RMA) != 0) || static_cast<int>(size_) < smpi_cfg_async_small_thresh()) {
435       //We have to check both mailboxes (because SSEND messages are sent to the large mbox).
436       //begin with the more appropriate one : the small one.
437       mailbox = process->mailbox_small();
438       XBT_DEBUG("Is there a corresponding send already posted in the small mailbox %s (in case of SSEND)?",
439                 mailbox->get_cname());
440       simgrid::kernel::activity::ActivityImplPtr action = mailbox->iprobe(0, &match_recv, static_cast<void*>(this));
441
442       if (action == nullptr) {
443         mailbox = process->mailbox();
444         XBT_DEBUG("No, nothing in the small mailbox test the other one : %s", mailbox->get_cname());
445         action = mailbox->iprobe(0, &match_recv, static_cast<void*>(this));
446         if (action == nullptr) {
447           XBT_DEBUG("Still nothing, switch back to the small mailbox : %s", mailbox->get_cname());
448           mailbox = process->mailbox_small();
449         }
450       } else {
451         XBT_DEBUG("yes there was something for us in the large mailbox");
452       }
453     } else {
454       mailbox = process->mailbox_small();
455       XBT_DEBUG("Is there a corresponding send already posted the small mailbox?");
456       simgrid::kernel::activity::ActivityImplPtr action = mailbox->iprobe(0, &match_recv, static_cast<void*>(this));
457
458       if (action == nullptr) {
459         XBT_DEBUG("No, nothing in the permanent receive mailbox");
460         mailbox = process->mailbox();
461       } else {
462         XBT_DEBUG("yes there was something for us in the small mailbox");
463       }
464     }
465
466     action_   = simcall_comm_irecv(
467         process->get_actor()->get_impl(), mailbox->get_impl(), buf_, &real_size_, &match_recv,
468         process->replaying() ? &smpi_comm_null_copy_buffer_callback : smpi_comm_copy_data_callback, this, -1.0);
469     XBT_DEBUG("recv simcall posted");
470
471     if (smpi_cfg_async_small_thresh() != 0 || (flags_ & MPI_REQ_RMA) != 0)
472       mut->unlock();
473   } else { /* the RECV flag was not set, so this is a send */
474     const simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_));
475     xbt_assert(process, "Actor pid=%d is gone??", dst_);
476     if (TRACE_smpi_view_internals())
477       TRACE_smpi_send(src_, src_, dst_, tag_, size_);
478     this->print_request("New send");
479
480     void* buf = buf_;
481     if ((flags_ & MPI_REQ_SSEND) == 0 &&
482         ((flags_ & MPI_REQ_RMA) != 0 || (flags_ & MPI_REQ_BSEND) != 0 ||
483          static_cast<int>(size_) < smpi_cfg_detached_send_thresh())) {
484       void *oldbuf = nullptr;
485       detached_    = true;
486       XBT_DEBUG("Send request %p is detached", this);
487       this->ref();
488       if (not(old_type_->flags() & DT_FLAG_DERIVED)) {
489         oldbuf = buf_;
490         if (not process->replaying() && oldbuf != nullptr && size_ != 0) {
491           if ((smpi_cfg_privatization() != SmpiPrivStrategies::NONE) &&
492               (static_cast<char*>(buf_) >= smpi_data_exe_start) &&
493               (static_cast<char*>(buf_) < smpi_data_exe_start + smpi_data_exe_size)) {
494             XBT_DEBUG("Privatization : We are sending from a zone inside global memory. Switch data segment ");
495             smpi_switch_data_segment(simgrid::s4u::Actor::by_pid(src_));
496           }
497           //we need this temporary buffer even for bsend, as it will be released in the copy callback and we don't have a way to differentiate it
498           //so actually ... don't use manually attached buffer space.
499           buf = xbt_malloc(size_);
500           memcpy(buf,oldbuf,size_);
501           XBT_DEBUG("buf %p copied into %p",oldbuf,buf);
502         }
503       }
504     }
505
506     //if we are giving back the control to the user without waiting for completion, we have to inject timings
507     double sleeptime = 0.0;
508     if (detached_ || ((flags_ & (MPI_REQ_ISEND | MPI_REQ_SSEND)) != 0)) { // issend should be treated as isend
509       // isend and send timings may be different
510       sleeptime = ((flags_ & MPI_REQ_ISEND) != 0)
511                       ? simgrid::s4u::Actor::self()->get_host()->extension<simgrid::smpi::Host>()->oisend(size_)
512                       : simgrid::s4u::Actor::self()->get_host()->extension<simgrid::smpi::Host>()->osend(size_);
513     }
514
515     if(sleeptime > 0.0){
516       simgrid::s4u::this_actor::sleep_for(sleeptime);
517       XBT_DEBUG("sending size of %zu : sleep %f ", size_, sleeptime);
518     }
519
520     simgrid::s4u::MutexPtr mut = process->mailboxes_mutex();
521
522     if (smpi_cfg_async_small_thresh() != 0 || (flags_ & MPI_REQ_RMA) != 0)
523       mut->lock();
524
525     if (not(smpi_cfg_async_small_thresh() != 0 || (flags_ & MPI_REQ_RMA) != 0)) {
526       mailbox = process->mailbox();
527     } else if (((flags_ & MPI_REQ_RMA) != 0) || static_cast<int>(size_) < smpi_cfg_async_small_thresh()) { // eager mode
528       mailbox = process->mailbox();
529       XBT_DEBUG("Is there a corresponding recv already posted in the large mailbox %s?", mailbox->get_cname());
530       simgrid::kernel::activity::ActivityImplPtr action = mailbox->iprobe(1, &match_send, static_cast<void*>(this));
531       if (action == nullptr) {
532         if ((flags_ & MPI_REQ_SSEND) == 0) {
533           mailbox = process->mailbox_small();
534           XBT_DEBUG("No, nothing in the large mailbox, message is to be sent on the small one %s",
535                     mailbox->get_cname());
536         } else {
537           mailbox = process->mailbox_small();
538           XBT_DEBUG("SSEND : Is there a corresponding recv already posted in the small mailbox %s?",
539                     mailbox->get_cname());
540           action = mailbox->iprobe(1, &match_send, static_cast<void*>(this));
541           if (action == nullptr) {
542             XBT_DEBUG("No, we are first, send to large mailbox");
543             mailbox = process->mailbox();
544           }
545         }
546       } else {
547         XBT_DEBUG("Yes there was something for us in the large mailbox");
548       }
549     } else {
550       mailbox = process->mailbox();
551       XBT_DEBUG("Send request %p is in the large mailbox %s (buf: %p)", this, mailbox->get_cname(), buf_);
552     }
553
554     size_t payload_size_ = size_ + 16;//MPI enveloppe size (tag+dest+communicator)
555     action_              = simcall_comm_isend(
556         simgrid::kernel::actor::ActorImpl::by_pid(src_), mailbox->get_impl(), payload_size_, -1.0, buf, real_size_,
557         &match_send,
558         &xbt_free_f, // how to free the userdata if a detached send fails
559         process->replaying() ? &smpi_comm_null_copy_buffer_callback : smpi_comm_copy_data_callback, this,
560         // detach if msg size < eager/rdv switch limit
561         detached_);
562     XBT_DEBUG("send simcall posted");
563
564     /* FIXME: detached sends are not traceable (action_ == nullptr) */
565     if (action_ != nullptr) {
566       boost::static_pointer_cast<kernel::activity::CommImpl>(action_)->set_tracing_category(
567           smpi_process()->get_tracing_category());
568     }
569
570     if (smpi_cfg_async_small_thresh() != 0 || ((flags_ & MPI_REQ_RMA) != 0))
571       mut->unlock();
572   }
573 }
574
575 void Request::startall(int count, MPI_Request * requests)
576 {
577   if(requests== nullptr)
578     return;
579
580   for(int i = 0; i < count; i++) {
581     if(requests[i]->src_ != MPI_PROC_NULL && requests[i]->dst_ != MPI_PROC_NULL)
582       requests[i]->start();
583   }
584 }
585
586 void Request::cancel()
587 {
588   this->flags_ |= MPI_REQ_CANCELLED;
589   if (this->action_ != nullptr)
590     (boost::static_pointer_cast<simgrid::kernel::activity::CommImpl>(this->action_))->cancel();
591 }
592
593 int Request::test(MPI_Request * request, MPI_Status * status, int* flag) {
594   // assume that *request is not MPI_REQUEST_NULL (filtered in PMPI_Test or testall before)
595   // to avoid deadlocks if used as a break condition, such as
596   //     while (MPI_Test(request, flag, status) && flag) dostuff...
597   // because the time will not normally advance when only calls to MPI_Test are made -> deadlock
598   // multiplier to the sleeptime, to increase speed of execution, each failed test will increase it
599   xbt_assert(*request != MPI_REQUEST_NULL);
600
601   static int nsleeps = 1;
602   int ret = MPI_SUCCESS;
603   
604   // Are we testing a request meant for non blocking collectives ?
605   // If so, test all the subrequests.
606   if ((*request)->nbc_requests_size_>0){
607     ret = testall((*request)->nbc_requests_size_, (*request)->nbc_requests_, flag, MPI_STATUSES_IGNORE);
608     if(*flag){
609       delete[] (*request)->nbc_requests_;
610       (*request)->nbc_requests_size_=0;
611       unref(request);
612     }
613     return ret;
614   }
615   
616   if(smpi_test_sleep > 0)
617     simgrid::s4u::this_actor::sleep_for(nsleeps * smpi_test_sleep);
618
619   Status::empty(status);
620   *flag = 1;
621   if (((*request)->flags_ & (MPI_REQ_PREPARED | MPI_REQ_FINISHED)) == 0) {
622     if ((*request)->action_ != nullptr && ((*request)->flags_ & MPI_REQ_CANCELLED) == 0){
623       try{
624         *flag = simcall_comm_test((*request)->action_.get());
625       } catch (const Exception&) {
626         *flag = 0;
627         return ret;
628       }
629     }
630     if (((*request)->flags_ & MPI_REQ_GENERALIZED) && !((*request)->flags_ & MPI_REQ_COMPLETE))
631       *flag=0;
632     if (*flag) {
633       finish_wait(request, status); // may invalidate *request
634       if (*request != MPI_REQUEST_NULL && ((*request)->flags_ & MPI_REQ_GENERALIZED)){
635         MPI_Status tmp_status;
636         MPI_Status* mystatus;
637         if (status == MPI_STATUS_IGNORE) {
638           mystatus = &tmp_status;
639           Status::empty(mystatus);
640         } else {
641           mystatus = status;
642         }
643         ret = ((*request)->generalized_funcs)->query_fn(((*request)->generalized_funcs)->extra_state, mystatus);
644       }
645       nsleeps=1;//reset the number of sleeps we will do next time
646       if (*request != MPI_REQUEST_NULL && ((*request)->flags_ & MPI_REQ_PERSISTENT) == 0)
647         *request = MPI_REQUEST_NULL;
648     } else if (smpi_cfg_grow_injected_times()) {
649       nsleeps++;
650     }
651   }
652   return ret;
653 }
654
655 int Request::testsome(int incount, MPI_Request requests[], int *count, int *indices, MPI_Status status[])
656 {
657   int error=0;
658   int count_dead = 0;
659   int flag = 0;
660   MPI_Status stat;
661   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
662
663   *count = 0;
664   for (int i = 0; i < incount; i++) {
665     if (requests[i] != MPI_REQUEST_NULL && not (requests[i]->flags_ & MPI_REQ_FINISHED)) {
666       int ret = test(&requests[i], pstat, &flag);
667       if(ret!=MPI_SUCCESS)
668         error = 1;
669       if(flag) {
670         indices[*count] = i;
671         if (status != MPI_STATUSES_IGNORE)
672           status[*count] = *pstat;
673         (*count)++;
674         if ((requests[i] != MPI_REQUEST_NULL) && (requests[i]->flags_ & MPI_REQ_NON_PERSISTENT))
675           requests[i] = MPI_REQUEST_NULL;
676       }
677     } else {
678       count_dead++;
679     }
680   }
681   if(count_dead==incount)*count=MPI_UNDEFINED;
682   if(error!=0)
683     return MPI_ERR_IN_STATUS;
684   else
685     return MPI_SUCCESS;
686 }
687
688 int Request::testany(int count, MPI_Request requests[], int *index, int* flag, MPI_Status * status)
689 {
690   std::vector<simgrid::kernel::activity::CommImpl*> comms;
691   comms.reserve(count);
692
693   int i;
694   *flag = 0;
695   int ret = MPI_SUCCESS;
696   *index = MPI_UNDEFINED;
697
698   std::vector<int> map; /** Maps all matching comms back to their location in requests **/
699   for(i = 0; i < count; i++) {
700     if ((requests[i] != MPI_REQUEST_NULL) && requests[i]->action_ && not(requests[i]->flags_ & MPI_REQ_PREPARED)) {
701       comms.push_back(static_cast<simgrid::kernel::activity::CommImpl*>(requests[i]->action_.get()));
702       map.push_back(i);
703     }
704   }
705   if (not map.empty()) {
706     //multiplier to the sleeptime, to increase speed of execution, each failed testany will increase it
707     static int nsleeps = 1;
708     if(smpi_test_sleep > 0)
709       simgrid::s4u::this_actor::sleep_for(nsleeps * smpi_test_sleep);
710     try{
711       i = simcall_comm_testany(comms.data(), comms.size()); // The i-th element in comms matches!
712     } catch (const Exception&) {
713       XBT_DEBUG("Exception in testany");
714       return 0;
715     }
716     
717     if (i != -1) { // -1 is not MPI_UNDEFINED but a SIMIX return code. (nothing matches)
718       *index = map[i];
719       if (requests[*index] != MPI_REQUEST_NULL && 
720           (requests[*index]->flags_ & MPI_REQ_GENERALIZED)
721           && !(requests[*index]->flags_ & MPI_REQ_COMPLETE)) {
722         *flag=0;
723       } else {
724         finish_wait(&requests[*index],status);
725       if (requests[*index] != MPI_REQUEST_NULL && (requests[*index]->flags_ & MPI_REQ_GENERALIZED)){
726         MPI_Status tmp_status;
727         MPI_Status* mystatus;
728         if (status == MPI_STATUS_IGNORE) {
729           mystatus = &tmp_status;
730           Status::empty(mystatus);
731         } else {
732           mystatus = status;
733         }
734         ret=(requests[*index]->generalized_funcs)->query_fn((requests[*index]->generalized_funcs)->extra_state, mystatus);
735       }
736
737         if (requests[*index] != MPI_REQUEST_NULL && (requests[*index]->flags_ & MPI_REQ_NON_PERSISTENT)) 
738           requests[*index] = MPI_REQUEST_NULL;
739         XBT_DEBUG("Testany - returning with index %d", *index);
740         *flag=1;
741       }
742       nsleeps = 1;
743     } else {
744       nsleeps++;
745     }
746   } else {
747       XBT_DEBUG("Testany on inactive handles, returning flag=1 but empty status");
748       //all requests are null or inactive, return true
749       *flag = 1;
750       *index = MPI_UNDEFINED;
751       Status::empty(status);
752   }
753
754   return ret;
755 }
756
757 int Request::testall(int count, MPI_Request requests[], int* outflag, MPI_Status status[])
758 {
759   MPI_Status stat;
760   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
761   int flag;
762   int error = 0;
763   *outflag = 1;
764   for(int i=0; i<count; i++){
765     if (requests[i] != MPI_REQUEST_NULL && not(requests[i]->flags_ & MPI_REQ_PREPARED)) {
766       int ret = test(&requests[i], pstat, &flag);
767       if (flag){
768         flag=0;
769         requests[i]=MPI_REQUEST_NULL;
770       }else{
771         *outflag=0;
772       }
773       if (ret != MPI_SUCCESS) 
774         error = 1;
775     }else{
776       Status::empty(pstat);
777     }
778     if(status != MPI_STATUSES_IGNORE) {
779       status[i] = *pstat;
780     }
781   }
782   if(error==1) 
783     return MPI_ERR_IN_STATUS;
784   else 
785     return MPI_SUCCESS;
786 }
787
788 void Request::probe(int source, int tag, MPI_Comm comm, MPI_Status* status){
789   int flag=0;
790   //FIXME find another way to avoid busy waiting ?
791   // the issue here is that we have to wait on a nonexistent comm
792   while(flag==0){
793     iprobe(source, tag, comm, &flag, status);
794     XBT_DEBUG("Busy Waiting on probing : %d", flag);
795   }
796 }
797
798 void Request::iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status){
799   // to avoid deadlock, we have to sleep some time here, or the timer won't advance and we will only do iprobe simcalls
800   // especially when used as a break condition, such as while (MPI_Iprobe(...)) dostuff...
801   // nsleeps is a multiplier to the sleeptime, to increase speed of execution, each failed iprobe will increase it
802   // This can speed up the execution of certain applications by an order of magnitude, such as HPL
803   static int nsleeps = 1;
804   double speed        = s4u::this_actor::get_host()->get_speed();
805   double maxrate      = smpi_cfg_iprobe_cpu_usage();
806   auto request =
807       new Request(nullptr, 0, MPI_CHAR, source == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : comm->group()->actor_pid(source),
808                   simgrid::s4u::this_actor::get_pid(), tag, comm, MPI_REQ_PERSISTENT | MPI_REQ_RECV | MPI_REQ_PROBE);
809   if (smpi_iprobe_sleep > 0) {
810     /** Compute the number of flops we will sleep **/
811     s4u::this_actor::exec_init(/*nsleeps: See comment above */ nsleeps *
812                                /*(seconds * flop/s -> total flops)*/ smpi_iprobe_sleep * speed * maxrate)
813         ->set_name("iprobe")
814         /* Not the entire CPU can be used when iprobing: This is important for
815          * the energy consumption caused by polling with iprobes. 
816          * Note also that the number of flops that was
817          * computed above contains a maxrate factor and is hence reduced (maxrate < 1)
818          */
819         ->set_bound(maxrate*speed)
820         ->start()
821         ->wait();
822   }
823   // behave like a receive, but don't do it
824   s4u::Mailbox* mailbox;
825
826   request->print_request("New iprobe");
827   // We have to test both mailboxes as we don't know if we will receive one or another
828   if (smpi_cfg_async_small_thresh() > 0) {
829     mailbox = smpi_process()->mailbox_small();
830     XBT_DEBUG("Trying to probe the perm recv mailbox");
831     request->action_ = mailbox->iprobe(0, &match_recv, static_cast<void*>(request));
832   }
833
834   if (request->action_ == nullptr){
835     mailbox = smpi_process()->mailbox();
836     XBT_DEBUG("trying to probe the other mailbox");
837     request->action_ = mailbox->iprobe(0, &match_recv, static_cast<void*>(request));
838   }
839
840   if (request->action_ != nullptr){
841     kernel::activity::CommImplPtr sync_comm = boost::static_pointer_cast<kernel::activity::CommImpl>(request->action_);
842     const Request* req                      = static_cast<MPI_Request>(sync_comm->src_data_);
843     *flag = 1;
844     if (status != MPI_STATUS_IGNORE && (req->flags_ & MPI_REQ_PREPARED) == 0) {
845       status->MPI_SOURCE = comm->group()->rank(req->src_);
846       status->MPI_TAG    = req->tag_;
847       status->MPI_ERROR  = MPI_SUCCESS;
848       status->count      = req->real_size_;
849     }
850     nsleeps = 1;//reset the number of sleeps we will do next time
851   }
852   else {
853     *flag = 0;
854     if (smpi_cfg_grow_injected_times())
855       nsleeps++;
856   }
857   unref(&request);
858   xbt_assert(request == MPI_REQUEST_NULL);
859 }
860
861 void Request::finish_wait(MPI_Request* request, MPI_Status * status)
862 {
863   MPI_Request req = *request;
864   Status::empty(status);
865   if((req->flags_ & MPI_REQ_CANCELLED) != 0 && (req->flags_ & MPI_REQ_MATCHED) == 0) {
866     if (status!=MPI_STATUS_IGNORE)
867       status->cancelled=1;
868     if(req->detached_sender_ != nullptr)
869       unref(&(req->detached_sender_));
870     unref(request);
871     return;
872   }
873
874   if ((req->flags_ & (MPI_REQ_PREPARED | MPI_REQ_GENERALIZED | MPI_REQ_FINISHED)) == 0) {
875     if (status != MPI_STATUS_IGNORE) {
876       if (req->src_== MPI_PROC_NULL || req->dst_== MPI_PROC_NULL){
877         Status::empty(status);
878         status->MPI_SOURCE = MPI_PROC_NULL;
879       } else {
880         int src = req->src_ == MPI_ANY_SOURCE ? req->real_src_ : req->src_;
881         status->MPI_SOURCE = req->comm_->group()->rank(src);
882         status->MPI_TAG = req->tag_ == MPI_ANY_TAG ? req->real_tag_ : req->tag_;
883         status->MPI_ERROR  = req->truncated_ ? MPI_ERR_TRUNCATE : MPI_SUCCESS;
884       }
885       // this handles the case were size in receive differs from size in send
886       status->count = req->real_size_;
887     }
888     //detached send will be finished at the other end
889     if (not(req->detached_ && ((req->flags_ & MPI_REQ_SEND) != 0))) {
890       req->print_request("Finishing");
891       MPI_Datatype datatype = req->old_type_;
892
893       // FIXME Handle the case of a partial shared malloc.
894       if (((req->flags_ & MPI_REQ_ACCUMULATE) != 0) ||
895           (datatype->flags() & DT_FLAG_DERIVED)) { // && (not smpi_is_shared(req->old_buf_))){
896         if (not smpi_process()->replaying() && smpi_cfg_privatization() != SmpiPrivStrategies::NONE &&
897             static_cast<char*>(req->old_buf_) >= smpi_data_exe_start &&
898             static_cast<char*>(req->old_buf_) < smpi_data_exe_start + smpi_data_exe_size) {
899           XBT_VERB("Privatization : We are unserializing to a zone in global memory  Switch data segment ");
900           smpi_switch_data_segment(simgrid::s4u::Actor::self());
901         }
902
903         if(datatype->flags() & DT_FLAG_DERIVED){
904           // This part handles the problem of non-contiguous memory the unserialization at the reception
905           if ((req->flags_ & MPI_REQ_RECV) && datatype->size() != 0)
906             datatype->unserialize(req->buf_, req->old_buf_, req->real_size_/datatype->size() , req->op_);
907           xbt_free(req->buf_);
908           req->buf_=nullptr;
909         } else if (req->flags_ & MPI_REQ_RECV) { // apply op on contiguous buffer for accumulate
910           if (datatype->size() != 0) {
911             int n = req->real_size_ / datatype->size();
912             req->op_->apply(req->buf_, req->old_buf_, &n, datatype);
913           }
914           xbt_free(req->buf_);
915           req->buf_=nullptr;
916         }
917       }
918     }
919   }
920
921   if (TRACE_smpi_view_internals() && ((req->flags_ & MPI_REQ_RECV) != 0)) {
922     int rank       = simgrid::s4u::this_actor::get_pid();
923     int src_traced = (req->src_ == MPI_ANY_SOURCE ? req->real_src_ : req->src_);
924     TRACE_smpi_recv(src_traced, rank,req->tag_);
925   }
926   if(req->detached_sender_ != nullptr){
927     //integrate pseudo-timing for buffering of small messages, do not bother to execute the simcall if 0
928     double sleeptime =
929         simgrid::s4u::Actor::self()->get_host()->extension<simgrid::smpi::Host>()->orecv(req->real_size());
930     if (sleeptime > 0.0) {
931       simgrid::s4u::this_actor::sleep_for(sleeptime);
932       XBT_DEBUG("receiving size of %zu : sleep %f ", req->real_size_, sleeptime);
933     }
934     unref(&(req->detached_sender_));
935   }
936   if (req->flags_ & MPI_REQ_PERSISTENT)
937     req->action_ = nullptr;
938   req->flags_ |= MPI_REQ_FINISHED;
939
940   if (req->truncated_) {
941     char error_string[MPI_MAX_ERROR_STRING];
942     int error_size;
943     PMPI_Error_string(MPI_ERR_TRUNCATE, error_string, &error_size);
944     MPI_Errhandler err = (req->comm_) ? (req->comm_)->errhandler() : MPI_ERRHANDLER_NULL;
945     if (err == MPI_ERRHANDLER_NULL || err == MPI_ERRORS_RETURN)
946       XBT_WARN("recv - returned %.*s instead of MPI_SUCCESS", error_size, error_string);
947     else if (err == MPI_ERRORS_ARE_FATAL)
948       xbt_die("recv - returned %.*s instead of MPI_SUCCESS", error_size, error_string);
949     else
950       err->call((req->comm_), MPI_ERR_TRUNCATE);
951     if (err != MPI_ERRHANDLER_NULL)
952       simgrid::smpi::Errhandler::unref(err);
953     MC_assert(not MC_is_active()); /* Only fail in MC mode */
954   }
955   if(req->src_ != MPI_PROC_NULL && req->dst_ != MPI_PROC_NULL)
956     unref(request);
957 }
958
959 int Request::wait(MPI_Request * request, MPI_Status * status)
960 {
961   // assume that *request is not MPI_REQUEST_NULL (filtered in PMPI_Wait before)
962   xbt_assert(*request != MPI_REQUEST_NULL);
963
964   int ret=MPI_SUCCESS;
965
966   if((*request)->src_ == MPI_PROC_NULL || (*request)->dst_ == MPI_PROC_NULL){
967     if (status != MPI_STATUS_IGNORE) {
968       Status::empty(status);
969       status->MPI_SOURCE = MPI_PROC_NULL;
970     }
971     (*request)=MPI_REQUEST_NULL;
972     return ret;
973   }
974   // Are we waiting on a request meant for non blocking collectives ?
975   // If so, wait for all the subrequests.
976   if ((*request)->nbc_requests_size_>0){
977     ret = waitall((*request)->nbc_requests_size_, (*request)->nbc_requests_, MPI_STATUSES_IGNORE);
978     for (int i = 0; i < (*request)->nbc_requests_size_; i++) {
979       if((*request)->buf_!=nullptr && (*request)->nbc_requests_[i]!=MPI_REQUEST_NULL){//reduce case
980         void * buf=(*request)->nbc_requests_[i]->buf_;
981         if((*request)->old_type_->flags() & DT_FLAG_DERIVED)
982           buf=(*request)->nbc_requests_[i]->old_buf_;
983         if((*request)->nbc_requests_[i]->flags_ & MPI_REQ_RECV ){
984           if((*request)->op_!=MPI_OP_NULL){
985             int count=(*request)->size_/ (*request)->old_type_->size();
986             (*request)->op_->apply(buf, (*request)->buf_, &count, (*request)->old_type_);
987           }
988           smpi_free_tmp_buffer(static_cast<unsigned char*>(buf));
989         }
990       }
991       if((*request)->nbc_requests_[i]!=MPI_REQUEST_NULL)
992         Request::unref(&((*request)->nbc_requests_[i]));
993     }
994     delete[] (*request)->nbc_requests_;
995     (*request)->nbc_requests_size_=0;
996     unref(request);
997     (*request)=MPI_REQUEST_NULL;
998     return ret;
999   }
1000
1001   (*request)->print_request("Waiting");
1002   if ((*request)->flags_ & (MPI_REQ_PREPARED | MPI_REQ_FINISHED)) {
1003     Status::empty(status);
1004     return ret;
1005   }
1006
1007   if ((*request)->action_ != nullptr){
1008       try{
1009         // this is not a detached send
1010         simcall_comm_wait((*request)->action_.get(), -1.0);
1011       } catch (const Exception&) {
1012         XBT_VERB("Request cancelled");
1013       }
1014   }
1015
1016   if ((*request)->flags_ & MPI_REQ_GENERALIZED) {
1017     if(!((*request)->flags_ & MPI_REQ_COMPLETE)){
1018       ((*request)->generalized_funcs)->mutex->lock();
1019       ((*request)->generalized_funcs)->cond->wait(((*request)->generalized_funcs)->mutex);
1020       ((*request)->generalized_funcs)->mutex->unlock();
1021     }
1022     MPI_Status tmp_status;
1023     MPI_Status* mystatus;
1024     if (status == MPI_STATUS_IGNORE) {
1025       mystatus = &tmp_status;
1026       Status::empty(mystatus);
1027     } else {
1028       mystatus = status;
1029     }
1030     ret = ((*request)->generalized_funcs)->query_fn(((*request)->generalized_funcs)->extra_state, mystatus);
1031   }
1032
1033   if ((*request)->truncated_)
1034     ret = MPI_ERR_TRUNCATE;
1035
1036   finish_wait(request, status); // may invalidate *request
1037   if (*request != MPI_REQUEST_NULL && (((*request)->flags_ & MPI_REQ_NON_PERSISTENT) != 0))
1038     *request = MPI_REQUEST_NULL;
1039   return ret;
1040 }
1041
1042 int Request::waitany(int count, MPI_Request requests[], MPI_Status * status)
1043 {
1044   int index = MPI_UNDEFINED;
1045
1046   if(count > 0) {
1047     // Wait for a request to complete
1048     std::vector<simgrid::kernel::activity::CommImpl*> comms;
1049     std::vector<int> map;
1050     XBT_DEBUG("Wait for one of %d", count);
1051     for(int i = 0; i < count; i++) {
1052       if (requests[i] != MPI_REQUEST_NULL && not(requests[i]->flags_ & MPI_REQ_PREPARED) &&
1053           not(requests[i]->flags_ & MPI_REQ_FINISHED)) {
1054         if (requests[i]->action_ != nullptr) {
1055           XBT_DEBUG("Waiting any %p ", requests[i]);
1056           comms.push_back(static_cast<simgrid::kernel::activity::CommImpl*>(requests[i]->action_.get()));
1057           map.push_back(i);
1058         } else {
1059           // This is a finished detached request, let's return this one
1060           comms.clear(); // don't do the waitany call afterwards
1061           index = i;
1062           finish_wait(&requests[i], status); // cleanup if refcount = 0
1063           if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags_ & MPI_REQ_NON_PERSISTENT))
1064             requests[i] = MPI_REQUEST_NULL; // set to null
1065           break;
1066         }
1067       }
1068     }
1069     if (not comms.empty()) {
1070       XBT_DEBUG("Enter waitany for %zu comms", comms.size());
1071       int i;
1072       try{
1073         i = simcall_comm_waitany(comms.data(), comms.size(), -1);
1074       } catch (const Exception&) {
1075         XBT_INFO("request cancelled");
1076         i = -1;
1077       }
1078
1079       // not MPI_UNDEFINED, as this is a simix return code
1080       if (i != -1) {
1081         index = map[i];
1082         //in case of an accumulate, we have to wait the end of all requests to apply the operation, ordered correctly.
1083         if ((requests[index] == MPI_REQUEST_NULL) ||
1084             (not((requests[index]->flags_ & MPI_REQ_ACCUMULATE) && (requests[index]->flags_ & MPI_REQ_RECV)))) {
1085           finish_wait(&requests[index],status);
1086           if (requests[index] != MPI_REQUEST_NULL && (requests[index]->flags_ & MPI_REQ_NON_PERSISTENT))
1087             requests[index] = MPI_REQUEST_NULL;
1088         }
1089       }
1090     }
1091   }
1092
1093   if (index==MPI_UNDEFINED)
1094     Status::empty(status);
1095
1096   return index;
1097 }
1098
1099 static int sort_accumulates(const Request* a, const Request* b)
1100 {
1101   return (a->tag() > b->tag());
1102 }
1103
1104 int Request::waitall(int count, MPI_Request requests[], MPI_Status status[])
1105 {
1106   std::vector<MPI_Request> accumulates;
1107   int index;
1108   MPI_Status stat;
1109   MPI_Status *pstat = (status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat);
1110   int retvalue = MPI_SUCCESS;
1111   //tag invalid requests in the set
1112   if (status != MPI_STATUSES_IGNORE) {
1113     for (int c = 0; c < count; c++) {
1114       if (requests[c] == MPI_REQUEST_NULL || requests[c]->dst_ == MPI_PROC_NULL ||
1115           (requests[c]->flags_ & MPI_REQ_PREPARED)) {
1116         Status::empty(&status[c]);
1117       } else if (requests[c]->src_ == MPI_PROC_NULL) {
1118         Status::empty(&status[c]);
1119         status[c].MPI_SOURCE = MPI_PROC_NULL;
1120       }
1121     }
1122   }
1123   for (int c = 0; c < count; c++) {
1124     if (MC_is_active() || MC_record_replay_is_active()) {
1125       wait(&requests[c],pstat);
1126       index = c;
1127     } else {
1128       index = waitany(count, requests, pstat);
1129
1130       if (index == MPI_UNDEFINED)
1131         break;
1132
1133       if (requests[index] != MPI_REQUEST_NULL && (requests[index]->flags_ & MPI_REQ_RECV) &&
1134           (requests[index]->flags_ & MPI_REQ_ACCUMULATE))
1135         accumulates.push_back(requests[index]);
1136       if (requests[index] != MPI_REQUEST_NULL && (requests[index]->flags_ & MPI_REQ_NON_PERSISTENT))
1137         requests[index] = MPI_REQUEST_NULL;
1138     }
1139     if (status != MPI_STATUSES_IGNORE) {
1140       status[index] = *pstat;
1141       if (status[index].MPI_ERROR == MPI_ERR_TRUNCATE)
1142         retvalue = MPI_ERR_IN_STATUS;
1143     }
1144   }
1145
1146   if (not accumulates.empty()) {
1147     std::sort(accumulates.begin(), accumulates.end(), sort_accumulates);
1148     for (auto& req : accumulates) {
1149       finish_wait(&req, status);
1150     }
1151   }
1152
1153   return retvalue;
1154 }
1155
1156 int Request::waitsome(int incount, MPI_Request requests[], int *indices, MPI_Status status[])
1157 {
1158   int count = 0;
1159   int flag = 0;
1160   int index = 0;
1161   MPI_Status stat;
1162   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
1163   index             = waitany(incount, requests, pstat);
1164   if(index==MPI_UNDEFINED) return MPI_UNDEFINED;
1165   if(status != MPI_STATUSES_IGNORE) {
1166     status[count] = *pstat;
1167   }
1168   indices[count] = index;
1169   count++;
1170   for (int i = 0; i < incount; i++) {
1171     if (i!=index && requests[i] != MPI_REQUEST_NULL 
1172         && not(requests[i]->flags_ & MPI_REQ_FINISHED)) {
1173       test(&requests[i], pstat,&flag);
1174       if (flag==1){
1175         indices[count] = i;
1176         if(status != MPI_STATUSES_IGNORE) {
1177           status[count] = *pstat;
1178         }
1179         if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags_ & MPI_REQ_NON_PERSISTENT))
1180           requests[i]=MPI_REQUEST_NULL;
1181         count++;
1182       }
1183     }
1184   }
1185   return count;
1186 }
1187
1188 MPI_Request Request::f2c(int id)
1189 {
1190   if(id==MPI_FORTRAN_REQUEST_NULL)
1191     return MPI_REQUEST_NULL;
1192   return static_cast<MPI_Request>(F2C::lookup()->at(id));
1193 }
1194
1195 void Request::free_f(int id)
1196 {
1197   if (id != MPI_FORTRAN_REQUEST_NULL) {
1198     F2C::lookup()->erase(id);
1199   }
1200 }
1201
1202 int Request::get_status(const Request* req, int* flag, MPI_Status* status)
1203 {
1204   *flag=0;
1205
1206   if(req != MPI_REQUEST_NULL && req->action_ != nullptr) {
1207     req->iprobe(req->comm_->group()->rank(req->src_), req->tag_, req->comm_, flag, status);
1208     if(*flag)
1209       return MPI_SUCCESS;
1210   }
1211   if (req != MPI_REQUEST_NULL && 
1212      (req->flags_ & MPI_REQ_GENERALIZED)
1213      && !(req->flags_ & MPI_REQ_COMPLETE)) {
1214      *flag=0;
1215     return MPI_SUCCESS;
1216   }
1217
1218   *flag=1;
1219   if(req != MPI_REQUEST_NULL &&
1220      status != MPI_STATUS_IGNORE) {
1221     int src = req->src_ == MPI_ANY_SOURCE ? req->real_src_ : req->src_;
1222     status->MPI_SOURCE = req->comm_->group()->rank(src);
1223     status->MPI_TAG = req->tag_ == MPI_ANY_TAG ? req->real_tag_ : req->tag_;
1224     status->MPI_ERROR = req->truncated_ ? MPI_ERR_TRUNCATE : MPI_SUCCESS;
1225     status->count = req->real_size_;
1226   }
1227   return MPI_SUCCESS;
1228 }
1229
1230 int Request::grequest_start(MPI_Grequest_query_function* query_fn, MPI_Grequest_free_function* free_fn,
1231                             MPI_Grequest_cancel_function* cancel_fn, void* extra_state, MPI_Request* request)
1232 {
1233   *request = new Request();
1234   (*request)->flags_ |= MPI_REQ_GENERALIZED;
1235   (*request)->flags_ |= MPI_REQ_PERSISTENT;
1236   (*request)->refcount_ = 1;
1237   ((*request)->generalized_funcs)             = std::make_unique<smpi_mpi_generalized_request_funcs_t>();
1238   ((*request)->generalized_funcs)->query_fn=query_fn;
1239   ((*request)->generalized_funcs)->free_fn=free_fn;
1240   ((*request)->generalized_funcs)->cancel_fn=cancel_fn;
1241   ((*request)->generalized_funcs)->extra_state=extra_state;
1242   ((*request)->generalized_funcs)->cond = simgrid::s4u::ConditionVariable::create();
1243   ((*request)->generalized_funcs)->mutex = simgrid::s4u::Mutex::create();
1244   return MPI_SUCCESS;
1245 }
1246
1247 int Request::grequest_complete(MPI_Request request)
1248 {
1249   if ((!(request->flags_ & MPI_REQ_GENERALIZED)) || request->generalized_funcs->mutex == nullptr)
1250     return MPI_ERR_REQUEST;
1251   request->generalized_funcs->mutex->lock();
1252   request->flags_ |= MPI_REQ_COMPLETE; // in case wait would be called after complete
1253   request->generalized_funcs->cond->notify_one();
1254   request->generalized_funcs->mutex->unlock();
1255   return MPI_SUCCESS;
1256 }
1257
1258 void Request::set_nbc_requests(MPI_Request* reqs, int size){
1259   nbc_requests_size_ = size;
1260   if (size > 0) {
1261     nbc_requests_ = reqs;
1262   } else {
1263     delete[] reqs;
1264     nbc_requests_ = nullptr;
1265   }
1266 }
1267
1268 int Request::get_nbc_requests_size() const
1269 {
1270   return nbc_requests_size_;
1271 }
1272
1273 MPI_Request* Request::get_nbc_requests() const
1274 {
1275   return nbc_requests_;
1276 }
1277 }
1278 }