Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
4f57f6d864bfd08f81e5934fbccc2aad21d07424
[simgrid.git] / src / smpi / mpi / smpi_request.cpp
1 /* Copyright (c) 2007-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_request.hpp"
7
8 #include "mc/mc.h"
9 #include "src/kernel/activity/CommImpl.hpp"
10 #include "src/mc/mc_replay.h"
11 #include "SmpiHost.hpp"
12 #include "private.h"
13 #include "private.hpp"
14 #include "smpi_comm.hpp"
15 #include "smpi_datatype.hpp"
16 #include "smpi_op.hpp"
17 #include "smpi_process.hpp"
18
19 #include <algorithm>
20
21 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_request, smpi, "Logging specific to SMPI (reques)");
22
23 static simgrid::config::Flag<double> smpi_iprobe_sleep(
24   "smpi/iprobe", "Minimum time to inject inside a call to MPI_Iprobe", 1e-4);
25 static simgrid::config::Flag<double> smpi_test_sleep(
26   "smpi/test", "Minimum time to inject inside a call to MPI_Test", 1e-4);
27
28 std::vector<s_smpi_factor_t> smpi_ois_values;
29
30 extern void (*smpi_comm_copy_data_callback) (smx_activity_t, void*, size_t);
31
32 namespace simgrid{
33 namespace smpi{
34
35 Request::Request(void *buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm, unsigned flags) : buf_(buf), old_type_(datatype), src_(src), dst_(dst), tag_(tag), comm_(comm), flags_(flags)
36 {
37   void *old_buf = nullptr;
38 // FIXME Handle the case of a partial shared malloc.
39   if ((((flags & RECV) != 0) && ((flags & ACCUMULATE) != 0)) || (datatype->flags() & DT_FLAG_DERIVED)) {
40     // This part handles the problem of non-contiguous memory
41     old_buf = buf;
42     if (count==0){
43       buf_ = nullptr;
44     }else {
45       buf_ = xbt_malloc(count*datatype->size());
46       if ((datatype->flags() & DT_FLAG_DERIVED) && ((flags & SEND) != 0)) {
47         datatype->serialize(old_buf, buf_, count);
48       }
49     }
50   }
51   // This part handles the problem of non-contiguous memory (for the unserialisation at the reception)
52   old_buf_  = old_buf;
53   size_ = datatype->size() * count;
54   datatype->ref();
55   comm_->ref();
56   action_          = nullptr;
57   detached_        = 0;
58   detached_sender_ = nullptr;
59   real_src_        = 0;
60   truncated_       = 0;
61   real_size_       = 0;
62   real_tag_        = 0;
63   if (flags & PERSISTENT)
64     refcount_ = 1;
65   else
66     refcount_ = 0;
67   op_   = MPI_REPLACE;
68 }
69
70 MPI_Comm Request::comm(){
71   return comm_;
72 }
73
74 int Request::src(){
75   return src_;
76 }
77
78 int Request::dst(){
79   return dst_;
80 }
81
82 int Request::tag(){
83   return tag_;
84 }
85
86 int Request::flags(){
87   return flags_;
88 }
89
90 int Request::detached(){
91   return detached_;
92 }
93
94 size_t Request::size(){
95   return size_;
96 }
97
98 size_t Request::real_size(){
99   return real_size_;
100 }
101
102 void Request::unref(MPI_Request* request)
103 {
104   if((*request) != MPI_REQUEST_NULL){
105     (*request)->refcount_--;
106     if((*request)->refcount_<0) xbt_die("wrong refcount");
107     if((*request)->refcount_==0){
108         Datatype::unref((*request)->old_type_);
109         Comm::unref((*request)->comm_);
110         (*request)->print_request("Destroying");
111         delete *request;
112         *request = MPI_REQUEST_NULL;
113     }else{
114       (*request)->print_request("Decrementing");
115     }
116   }else{
117     xbt_die("freeing an already free request");
118   }
119 }
120
121 int Request::match_recv(void* a, void* b, simgrid::kernel::activity::CommImpl* ignored)
122 {
123   MPI_Request ref = static_cast<MPI_Request>(a);
124   MPI_Request req = static_cast<MPI_Request>(b);
125   XBT_DEBUG("Trying to match a recv of src %d against %d, tag %d against %d",ref->src_,req->src_, ref->tag_, req->tag_);
126
127   xbt_assert(ref, "Cannot match recv against null reference");
128   xbt_assert(req, "Cannot match recv against null request");
129   if((ref->src_ == MPI_ANY_SOURCE || req->src_ == ref->src_)
130     && ((ref->tag_ == MPI_ANY_TAG && req->tag_ >=0) || req->tag_ == ref->tag_)){
131     //we match, we can transfer some values
132     if(ref->src_ == MPI_ANY_SOURCE)
133       ref->real_src_ = req->src_;
134     if(ref->tag_ == MPI_ANY_TAG)
135       ref->real_tag_ = req->tag_;
136     if(ref->real_size_ < req->real_size_)
137       ref->truncated_ = 1;
138     if(req->detached_==1)
139       ref->detached_sender_=req; //tie the sender to the receiver, as it is detached and has to be freed in the receiver
140     XBT_DEBUG("match succeeded");
141     return 1;
142   }else return 0;
143 }
144
145 int Request::match_send(void* a, void* b, simgrid::kernel::activity::CommImpl* ignored)
146 {
147   MPI_Request ref = static_cast<MPI_Request>(a);
148   MPI_Request req = static_cast<MPI_Request>(b);
149   XBT_DEBUG("Trying to match a send of src %d against %d, tag %d against %d",ref->src_,req->src_, ref->tag_, req->tag_);
150   xbt_assert(ref, "Cannot match send against null reference");
151   xbt_assert(req, "Cannot match send against null request");
152
153   if((req->src_ == MPI_ANY_SOURCE || req->src_ == ref->src_)
154       && ((req->tag_ == MPI_ANY_TAG && ref->tag_ >=0)|| req->tag_ == ref->tag_)){
155     if(req->src_ == MPI_ANY_SOURCE)
156       req->real_src_ = ref->src_;
157     if(req->tag_ == MPI_ANY_TAG)
158       req->real_tag_ = ref->tag_;
159     if(req->real_size_ < ref->real_size_)
160       req->truncated_ = 1;
161     if(ref->detached_==1)
162       req->detached_sender_=ref; //tie the sender to the receiver, as it is detached and has to be freed in the receiver
163     XBT_DEBUG("match succeeded");
164     return 1;
165   } else
166     return 0;
167 }
168
169 void Request::print_request(const char *message)
170 {
171   XBT_VERB("%s  request %p  [buf = %p, size = %zu, src = %d, dst = %d, tag = %d, flags = %x]",
172        message, this, buf_, size_, src_, dst_, tag_, flags_);
173 }
174
175
176 /* factories, to hide the internal flags from the caller */
177 MPI_Request Request::send_init(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
178 {
179
180   return new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process()->index(),
181                           comm->group()->index(dst), tag, comm, PERSISTENT | SEND | PREPARED);
182 }
183
184 MPI_Request Request::ssend_init(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
185 {
186   return new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process()->index(),
187                         comm->group()->index(dst), tag, comm, PERSISTENT | SSEND | SEND | PREPARED);
188 }
189
190 MPI_Request Request::isend_init(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
191 {
192   return new Request(buf==MPI_BOTTOM ? nullptr : buf , count, datatype, smpi_process()->index(),
193                           comm->group()->index(dst), tag,comm, PERSISTENT | ISEND | SEND | PREPARED);
194 }
195
196
197 MPI_Request Request::rma_send_init(void *buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm,
198                                MPI_Op op)
199 {
200   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
201   if(op==MPI_OP_NULL){
202     request = new Request(buf==MPI_BOTTOM ? nullptr : buf , count, datatype, src, dst, tag,
203                             comm, RMA | NON_PERSISTENT | ISEND | SEND | PREPARED);
204   }else{
205     request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype,  src, dst, tag,
206                             comm, RMA | NON_PERSISTENT | ISEND | SEND | PREPARED | ACCUMULATE);
207     request->op_ = op;
208   }
209   return request;
210 }
211
212 MPI_Request Request::recv_init(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm)
213 {
214   return new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype,
215                           src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : comm->group()->index(src),
216                           smpi_process()->index(), tag, comm, PERSISTENT | RECV | PREPARED);
217 }
218
219 MPI_Request Request::rma_recv_init(void *buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm,
220                                MPI_Op op)
221 {
222   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
223   if(op==MPI_OP_NULL){
224     request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, src, dst, tag, comm,
225                           RMA | NON_PERSISTENT | RECV | PREPARED);
226   }else{
227     request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, src, dst, tag, comm,
228                           RMA | NON_PERSISTENT | RECV | PREPARED | ACCUMULATE);
229     request->op_ = op;
230   }
231   return request;
232 }
233
234 MPI_Request Request::irecv_init(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm)
235 {
236   return new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype,
237                      src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : comm->group()->index(src), smpi_process()->index(), tag,
238                      comm, PERSISTENT | RECV | PREPARED);
239 }
240
241 MPI_Request Request::isend(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
242 {
243   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
244   request             = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process()->index(),
245                         comm->group()->index(dst), tag, comm, NON_PERSISTENT | ISEND | SEND);
246   request->start();
247   return request;
248 }
249
250 MPI_Request Request::issend(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
251 {
252   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
253   request             = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process()->index(),
254                         comm->group()->index(dst), tag, comm, NON_PERSISTENT | ISEND | SSEND | SEND);
255   request->start();
256   return request;
257 }
258
259
260 MPI_Request Request::irecv(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm)
261 {
262   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
263   request             = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype,
264                         src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : comm->group()->index(src), smpi_process()->index(),
265                         tag, comm, NON_PERSISTENT | RECV);
266   request->start();
267   return request;
268 }
269
270 void Request::recv(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status * status)
271 {
272   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
273   request = irecv(buf, count, datatype, src, tag, comm);
274   wait(&request,status);
275   request = nullptr;
276 }
277
278 void Request::send(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
279 {
280   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
281   request             = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process()->index(),
282                         comm->group()->index(dst), tag, comm, NON_PERSISTENT | SEND);
283
284   request->start();
285   wait(&request, MPI_STATUS_IGNORE);
286   request = nullptr;
287 }
288
289 void Request::ssend(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
290 {
291   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
292   request             = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process()->index(),
293                         comm->group()->index(dst), tag, comm, NON_PERSISTENT | SSEND | SEND);
294
295   request->start();
296   wait(&request,MPI_STATUS_IGNORE);
297   request = nullptr;
298 }
299
300 void Request::sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,int dst, int sendtag,
301                        void *recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag,
302                        MPI_Comm comm, MPI_Status * status)
303 {
304   MPI_Request requests[2];
305   MPI_Status stats[2];
306   int myid=smpi_process()->index();
307   if ((comm->group()->index(dst) == myid) && (comm->group()->index(src) == myid)){
308       Datatype::copy(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype);
309       return;
310   }
311   requests[0] = isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
312   requests[1] = irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
313   startall(2, requests);
314   waitall(2, requests, stats);
315   unref(&requests[0]);
316   unref(&requests[1]);
317   if(status != MPI_STATUS_IGNORE) {
318     // Copy receive status
319     *status = stats[1];
320   }
321 }
322
323 void Request::start()
324 {
325   smx_mailbox_t mailbox;
326
327   xbt_assert(action_ == nullptr, "Cannot (re-)start unfinished communication");
328   flags_ &= ~PREPARED;
329   flags_ &= ~FINISHED;
330   refcount_++;
331
332   if ((flags_ & RECV) != 0) {
333     this->print_request("New recv");
334
335     simgrid::smpi::Process* process = smpi_process_remote(dst_);
336
337     int async_small_thresh = xbt_cfg_get_int("smpi/async-small-thresh");
338
339     xbt_mutex_t mut = process->mailboxes_mutex();
340     if (async_small_thresh != 0 || (flags_ & RMA) != 0)
341       xbt_mutex_acquire(mut);
342
343     if (async_small_thresh == 0 && (flags_ & RMA) == 0 ) {
344       mailbox = process->mailbox();
345     }
346     else if (((flags_ & RMA) != 0) || static_cast<int>(size_) < async_small_thresh) {
347       //We have to check both mailboxes (because SSEND messages are sent to the large mbox).
348       //begin with the more appropriate one : the small one.
349       mailbox = process->mailbox_small();
350       XBT_DEBUG("Is there a corresponding send already posted in the small mailbox %p (in case of SSEND)?", mailbox);
351       smx_activity_t action = simcall_comm_iprobe(mailbox, 0, &match_recv, static_cast<void*>(this));
352
353       if (action == nullptr) {
354         mailbox = process->mailbox();
355         XBT_DEBUG("No, nothing in the small mailbox test the other one : %p", mailbox);
356         action = simcall_comm_iprobe(mailbox, 0, &match_recv, static_cast<void*>(this));
357         if (action == nullptr) {
358           XBT_DEBUG("Still nothing, switch back to the small mailbox : %p", mailbox);
359           mailbox = process->mailbox_small();
360         }
361       } else {
362         XBT_DEBUG("yes there was something for us in the large mailbox");
363       }
364     } else {
365       mailbox = process->mailbox_small();
366       XBT_DEBUG("Is there a corresponding send already posted the small mailbox?");
367       smx_activity_t action = simcall_comm_iprobe(mailbox, 0, &match_recv, static_cast<void*>(this));
368
369       if (action == nullptr) {
370         XBT_DEBUG("No, nothing in the permanent receive mailbox");
371         mailbox = process->mailbox();
372       } else {
373         XBT_DEBUG("yes there was something for us in the small mailbox");
374       }
375     }
376
377     // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
378     real_size_=size_;
379     action_   = simcall_comm_irecv(
380         process->process(), mailbox, buf_, &real_size_, &match_recv,
381         process->replaying() ? &smpi_comm_null_copy_buffer_callback : smpi_comm_copy_data_callback, this, -1.0);
382     XBT_DEBUG("recv simcall posted");
383
384     if (async_small_thresh != 0 || (flags_ & RMA) != 0 )
385       xbt_mutex_release(mut);
386   } else { /* the RECV flag was not set, so this is a send */
387     simgrid::smpi::Process* process = smpi_process_remote(dst_);
388     int rank = src_;
389     if (TRACE_smpi_view_internals()) {
390       TRACE_smpi_send(rank, rank, dst_, tag_, size_);
391     }
392     this->print_request("New send");
393
394     void* buf = buf_;
395     if ((flags_ & SSEND) == 0 && ( (flags_ & RMA) != 0
396         || static_cast<int>(size_) < xbt_cfg_get_int("smpi/send-is-detached-thresh") ) ) {
397       void *oldbuf = nullptr;
398       detached_ = 1;
399       XBT_DEBUG("Send request %p is detached", this);
400       refcount_++;
401       if (not(old_type_->flags() & DT_FLAG_DERIVED)) {
402         oldbuf = buf_;
403         if (not process->replaying() && oldbuf != nullptr && size_ != 0) {
404           if((smpi_privatize_global_variables != 0)
405             && (static_cast<char*>(buf_) >= smpi_start_data_exe)
406             && (static_cast<char*>(buf_) < smpi_start_data_exe + smpi_size_data_exe )){
407             XBT_DEBUG("Privatization : We are sending from a zone inside global memory. Switch data segment ");
408             smpi_switch_data_segment(src_);
409           }
410           buf = xbt_malloc(size_);
411           memcpy(buf,oldbuf,size_);
412           XBT_DEBUG("buf %p copied into %p",oldbuf,buf);
413         }
414       }
415     }
416
417     //if we are giving back the control to the user without waiting for completion, we have to inject timings
418     double sleeptime = 0.0;
419     if (detached_ != 0 || ((flags_ & (ISEND | SSEND)) != 0)) { // issend should be treated as isend
420       // isend and send timings may be different
421       sleeptime = ((flags_ & ISEND) != 0)
422                       ? simgrid::s4u::Actor::self()->getHost()->extension<simgrid::smpi::SmpiHost>()->oisend(size_)
423                       : simgrid::s4u::Actor::self()->getHost()->extension<simgrid::smpi::SmpiHost>()->osend(size_);
424     }
425
426     if(sleeptime > 0.0){
427       simcall_process_sleep(sleeptime);
428       XBT_DEBUG("sending size of %zu : sleep %f ", size_, sleeptime);
429     }
430
431     int async_small_thresh = xbt_cfg_get_int("smpi/async-small-thresh");
432
433     xbt_mutex_t mut=process->mailboxes_mutex();
434
435     if (async_small_thresh != 0 || (flags_ & RMA) != 0)
436       xbt_mutex_acquire(mut);
437
438     if (not(async_small_thresh != 0 || (flags_ & RMA) != 0)) {
439       mailbox = process->mailbox();
440     } else if (((flags_ & RMA) != 0) || static_cast<int>(size_) < async_small_thresh) { // eager mode
441       mailbox = process->mailbox();
442       XBT_DEBUG("Is there a corresponding recv already posted in the large mailbox %p?", mailbox);
443       smx_activity_t action = simcall_comm_iprobe(mailbox, 1, &match_send, static_cast<void*>(this));
444       if (action == nullptr) {
445         if ((flags_ & SSEND) == 0){
446           mailbox = process->mailbox_small();
447           XBT_DEBUG("No, nothing in the large mailbox, message is to be sent on the small one %p", mailbox);
448         } else {
449           mailbox = process->mailbox_small();
450           XBT_DEBUG("SSEND : Is there a corresponding recv already posted in the small mailbox %p?", mailbox);
451           action = simcall_comm_iprobe(mailbox, 1, &match_send, static_cast<void*>(this));
452           if (action == nullptr) {
453             XBT_DEBUG("No, we are first, send to large mailbox");
454             mailbox = process->mailbox();
455           }
456         }
457       } else {
458         XBT_DEBUG("Yes there was something for us in the large mailbox");
459       }
460     } else {
461       mailbox = process->mailbox();
462       XBT_DEBUG("Send request %p is in the large mailbox %p (buf: %p)",mailbox, this,buf_);
463     }
464
465     // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
466     real_size_=size_;
467     action_   = simcall_comm_isend(
468         SIMIX_process_from_PID(src_ + 1), mailbox, size_, -1.0, buf, real_size_, &match_send,
469         &xbt_free_f, // how to free the userdata if a detached send fails
470         not process->replaying() ? smpi_comm_copy_data_callback : &smpi_comm_null_copy_buffer_callback, this,
471         // detach if msg size < eager/rdv switch limit
472         detached_);
473     XBT_DEBUG("send simcall posted");
474
475     /* FIXME: detached sends are not traceable (action_ == nullptr) */
476     if (action_ != nullptr)
477       simcall_set_category(action_, TRACE_internal_smpi_get_category());
478     if (async_small_thresh != 0 || ((flags_ & RMA)!=0))
479       xbt_mutex_release(mut);
480   }
481 }
482
483 void Request::startall(int count, MPI_Request * requests)
484 {
485   if(requests== nullptr)
486     return;
487
488   for(int i = 0; i < count; i++) {
489     requests[i]->start();
490   }
491 }
492
493 int Request::test(MPI_Request * request, MPI_Status * status) {
494   //assume that request is not MPI_REQUEST_NULL (filtered in PMPI_Test or testall before)
495   // to avoid deadlocks if used as a break condition, such as
496   //     while (MPI_Test(request, flag, status) && flag) dostuff...
497   // because the time will not normally advance when only calls to MPI_Test are made -> deadlock
498   // multiplier to the sleeptime, to increase speed of execution, each failed test will increase it
499   static int nsleeps = 1;
500   if(smpi_test_sleep > 0)
501     simcall_process_sleep(nsleeps*smpi_test_sleep);
502
503   Status::empty(status);
504   int flag = 1;
505   if (((*request)->flags_ & PREPARED) == 0) {
506     if ((*request)->action_ != nullptr)
507       flag = simcall_comm_test((*request)->action_);
508     if (flag) {
509       finish_wait(request,status);
510       nsleeps=1;//reset the number of sleeps we will do next time
511       if (*request != MPI_REQUEST_NULL && ((*request)->flags_ & PERSISTENT)==0)
512       *request = MPI_REQUEST_NULL;
513     } else if (xbt_cfg_get_boolean("smpi/grow-injected-times")){
514       nsleeps++;
515     }
516   }
517   return flag;
518 }
519
520 int Request::testsome(int incount, MPI_Request requests[], int *indices, MPI_Status status[])
521 {
522   int count = 0;
523   int count_dead = 0;
524   MPI_Status stat;
525   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
526
527   for (int i = 0; i < incount; i++) {
528     if (requests[i] != MPI_REQUEST_NULL) {
529       if (test(&requests[i], pstat)) {
530         indices[i] = 1;
531         count++;
532         if (status != MPI_STATUSES_IGNORE)
533           status[i] = *pstat;
534         if ((requests[i] != MPI_REQUEST_NULL) && (requests[i]->flags_ & NON_PERSISTENT))
535           requests[i] = MPI_REQUEST_NULL;
536       }
537     } else {
538       count_dead++;
539     }
540   }
541   if(count_dead==incount)
542     return MPI_UNDEFINED;
543   else return count;
544 }
545
546 int Request::testany(int count, MPI_Request requests[], int *index, MPI_Status * status)
547 {
548   std::vector<simgrid::kernel::activity::ActivityImplPtr> comms;
549   comms.reserve(count);
550
551   int i;
552   int flag = 0;
553
554   *index = MPI_UNDEFINED;
555
556   std::vector<int> map; /** Maps all matching comms back to their location in requests **/
557   for(i = 0; i < count; i++) {
558     if ((requests[i] != MPI_REQUEST_NULL) && requests[i]->action_ && not(requests[i]->flags_ & PREPARED)) {
559       comms.push_back(requests[i]->action_);
560       map.push_back(i);
561     }
562   }
563   if (not map.empty()) {
564     //multiplier to the sleeptime, to increase speed of execution, each failed testany will increase it
565     static int nsleeps = 1;
566     if(smpi_test_sleep > 0)
567       simcall_process_sleep(nsleeps*smpi_test_sleep);
568
569     i = simcall_comm_testany(comms.data(), comms.size()); // The i-th element in comms matches!
570     if (i != -1) { // -1 is not MPI_UNDEFINED but a SIMIX return code. (nothing matches)
571       *index = map[i];
572       finish_wait(&requests[*index],status);
573       flag             = 1;
574       nsleeps          = 1;
575       if (requests[*index] != MPI_REQUEST_NULL && (requests[*index]->flags_ & NON_PERSISTENT)) {
576         requests[*index] = MPI_REQUEST_NULL;
577       }
578     } else {
579       nsleeps++;
580     }
581   } else {
582       //all requests are null or inactive, return true
583       flag = 1;
584       Status::empty(status);
585   }
586
587   return flag;
588 }
589
590 int Request::testall(int count, MPI_Request requests[], MPI_Status status[])
591 {
592   MPI_Status stat;
593   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
594   int flag=1;
595   for(int i=0; i<count; i++){
596     if (requests[i] != MPI_REQUEST_NULL && not(requests[i]->flags_ & PREPARED)) {
597       if (test(&requests[i], pstat)!=1){
598         flag=0;
599       }else{
600           requests[i]=MPI_REQUEST_NULL;
601       }
602     }else{
603       Status::empty(pstat);
604     }
605     if(status != MPI_STATUSES_IGNORE) {
606       status[i] = *pstat;
607     }
608   }
609   return flag;
610 }
611
612 void Request::probe(int source, int tag, MPI_Comm comm, MPI_Status* status){
613   int flag=0;
614   //FIXME find another way to avoid busy waiting ?
615   // the issue here is that we have to wait on a nonexistent comm
616   while(flag==0){
617     iprobe(source, tag, comm, &flag, status);
618     XBT_DEBUG("Busy Waiting on probing : %d", flag);
619   }
620 }
621
622 void Request::iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status){
623   // to avoid deadlock, we have to sleep some time here, or the timer won't advance and we will only do iprobe simcalls
624   // especially when used as a break condition, such as while (MPI_Iprobe(...)) dostuff...
625   // nsleeps is a multiplier to the sleeptime, to increase speed of execution, each failed iprobe will increase it
626   // This can speed up the execution of certain applications by an order of magnitude, such as HPL
627   static int nsleeps = 1;
628   double speed        = simgrid::s4u::Actor::self()->getHost()->getSpeed();
629   double maxrate = xbt_cfg_get_double("smpi/iprobe-cpu-usage");
630   MPI_Request request = new Request(nullptr, 0, MPI_CHAR, source == MPI_ANY_SOURCE ? MPI_ANY_SOURCE :
631                  comm->group()->index(source), comm->rank(), tag, comm, PERSISTENT | RECV);
632   if (smpi_iprobe_sleep > 0) {
633     smx_activity_t iprobe_sleep = simcall_execution_start("iprobe", /* flops to executek*/nsleeps*smpi_iprobe_sleep*speed*maxrate, /* priority */1.0, /* performance bound */maxrate*speed);
634     simcall_execution_wait(iprobe_sleep);
635   }
636   // behave like a receive, but don't do it
637   smx_mailbox_t mailbox;
638
639   request->print_request("New iprobe");
640   // We have to test both mailboxes as we don't know if we will receive one one or another
641   if (xbt_cfg_get_int("smpi/async-small-thresh") > 0){
642       mailbox = smpi_process()->mailbox_small();
643       XBT_DEBUG("Trying to probe the perm recv mailbox");
644       request->action_ = simcall_comm_iprobe(mailbox, 0, &match_recv, static_cast<void*>(request));
645   }
646
647   if (request->action_ == nullptr){
648     mailbox = smpi_process()->mailbox();
649     XBT_DEBUG("trying to probe the other mailbox");
650     request->action_ = simcall_comm_iprobe(mailbox, 0, &match_recv, static_cast<void*>(request));
651   }
652
653   if (request->action_ != nullptr){
654     simgrid::kernel::activity::CommImplPtr sync_comm =
655         boost::static_pointer_cast<simgrid::kernel::activity::CommImpl>(request->action_);
656     MPI_Request req                            = static_cast<MPI_Request>(sync_comm->src_data);
657     *flag = 1;
658     if(status != MPI_STATUS_IGNORE && (req->flags_ & PREPARED) == 0) {
659       status->MPI_SOURCE = comm->group()->rank(req->src_);
660       status->MPI_TAG    = req->tag_;
661       status->MPI_ERROR  = MPI_SUCCESS;
662       status->count      = req->real_size_;
663     }
664     nsleeps = 1;//reset the number of sleeps we will do next time
665   }
666   else {
667     *flag = 0;
668     if (xbt_cfg_get_boolean("smpi/grow-injected-times"))
669       nsleeps++;
670   }
671   unref(&request);
672 }
673
674 void Request::finish_wait(MPI_Request* request, MPI_Status * status)
675 {
676   MPI_Request req = *request;
677   Status::empty(status);
678
679   if (not((req->detached_ != 0) && ((req->flags_ & SEND) != 0)) && ((req->flags_ & PREPARED) == 0)) {
680     if(status != MPI_STATUS_IGNORE) {
681       int src = req->src_ == MPI_ANY_SOURCE ? req->real_src_ : req->src_;
682       status->MPI_SOURCE = req->comm_->group()->rank(src);
683       status->MPI_TAG = req->tag_ == MPI_ANY_TAG ? req->real_tag_ : req->tag_;
684       status->MPI_ERROR = req->truncated_ != 0 ? MPI_ERR_TRUNCATE : MPI_SUCCESS;
685       // this handles the case were size in receive differs from size in send
686       status->count = req->real_size_;
687     }
688
689     req->print_request("Finishing");
690     MPI_Datatype datatype = req->old_type_;
691
692 // FIXME Handle the case of a partial shared malloc.
693     if (((req->flags_ & ACCUMULATE) != 0) ||
694         (datatype->flags() & DT_FLAG_DERIVED)) { // && (not smpi_is_shared(req->old_buf_))){
695
696       if (not smpi_process()->replaying()) {
697         if( smpi_privatize_global_variables != 0 && (static_cast<char*>(req->old_buf_) >= smpi_start_data_exe)
698             && ((char*)req->old_buf_ < smpi_start_data_exe + smpi_size_data_exe )){
699             XBT_VERB("Privatization : We are unserializing to a zone in global memory  Switch data segment ");
700             smpi_switch_data_segment(smpi_process()->index());
701         }
702       }
703
704       if(datatype->flags() & DT_FLAG_DERIVED){
705         // This part handles the problem of non-contignous memory the unserialization at the reception
706         if((req->flags_ & RECV) && datatype->size()!=0)
707           datatype->unserialize(req->buf_, req->old_buf_, req->real_size_/datatype->size() , req->op_);
708         xbt_free(req->buf_);
709       }else if(req->flags_ & RECV){//apply op on contiguous buffer for accumulate
710           if(datatype->size()!=0){
711             int n =req->real_size_/datatype->size();
712             req->op_->apply(req->buf_, req->old_buf_, &n, datatype);
713           }
714           xbt_free(req->buf_);
715       }
716     }
717   }
718
719   if (TRACE_smpi_view_internals() && ((req->flags_ & RECV) != 0)){
720     int rank = smpi_process()->index();
721     int src_traced = (req->src_ == MPI_ANY_SOURCE ? req->real_src_ : req->src_);
722     TRACE_smpi_recv(src_traced, rank,req->tag_);
723   }
724   if(req->detached_sender_ != nullptr){
725     //integrate pseudo-timing for buffering of small messages, do not bother to execute the simcall if 0
726     double sleeptime =
727         simgrid::s4u::Actor::self()->getHost()->extension<simgrid::smpi::SmpiHost>()->orecv(req->real_size());
728     if(sleeptime > 0.0){
729       simcall_process_sleep(sleeptime);
730       XBT_DEBUG("receiving size of %zu : sleep %f ", req->real_size_, sleeptime);
731     }
732     unref(&(req->detached_sender_));
733   }
734   if(req->flags_ & PERSISTENT)
735     req->action_ = nullptr;
736   req->flags_ |= FINISHED;
737   unref(request);
738 }
739
740 void Request::wait(MPI_Request * request, MPI_Status * status)
741 {
742   (*request)->print_request("Waiting");
743   if ((*request)->flags_ & PREPARED) {
744     Status::empty(status);
745     return;
746   }
747
748   if ((*request)->action_ != nullptr)
749     // this is not a detached send
750     simcall_comm_wait((*request)->action_, -1.0);
751
752   finish_wait(request,status);
753   if (*request != MPI_REQUEST_NULL && (((*request)->flags_ & NON_PERSISTENT)!=0))
754     *request = MPI_REQUEST_NULL;
755 }
756
757 int Request::waitany(int count, MPI_Request requests[], MPI_Status * status)
758 {
759   s_xbt_dynar_t comms; // Keep it on stack to save some extra mallocs
760   int index = MPI_UNDEFINED;
761
762   if(count > 0) {
763     int size = 0;
764     // Wait for a request to complete
765     xbt_dynar_init(&comms, sizeof(smx_activity_t), [](void*ptr){
766       intrusive_ptr_release(*(simgrid::kernel::activity::ActivityImpl**)ptr);
767     });
768     int *map = xbt_new(int, count);
769     XBT_DEBUG("Wait for one of %d", count);
770     for(int i = 0; i < count; i++) {
771       if (requests[i] != MPI_REQUEST_NULL && not(requests[i]->flags_ & PREPARED) &&
772           not(requests[i]->flags_ & FINISHED)) {
773         if (requests[i]->action_ != nullptr) {
774           XBT_DEBUG("Waiting any %p ", requests[i]);
775           intrusive_ptr_add_ref(requests[i]->action_.get());
776           xbt_dynar_push_as(&comms, simgrid::kernel::activity::ActivityImpl*, requests[i]->action_.get());
777           map[size] = i;
778           size++;
779         } else {
780           // This is a finished detached request, let's return this one
781           size  = 0; // so we free the dynar but don't do the waitany call
782           index = i;
783           finish_wait(&requests[i], status); // cleanup if refcount = 0
784           if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags_ & NON_PERSISTENT))
785             requests[i] = MPI_REQUEST_NULL; // set to null
786           break;
787         }
788       }
789     }
790     if (size > 0) {
791       XBT_DEBUG("Enter waitany for %lu comms", xbt_dynar_length(&comms));
792       int i = simcall_comm_waitany(&comms, -1);
793
794       // not MPI_UNDEFINED, as this is a simix return code
795       if (i != -1) {
796         index = map[i];
797         //in case of an accumulate, we have to wait the end of all requests to apply the operation, ordered correctly.
798         if ((requests[index] == MPI_REQUEST_NULL) ||
799             (not((requests[index]->flags_ & ACCUMULATE) && (requests[index]->flags_ & RECV)))) {
800           finish_wait(&requests[index],status);
801           if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags_ & NON_PERSISTENT))
802             requests[index] = MPI_REQUEST_NULL;
803         }
804       }
805     }
806
807     xbt_dynar_free_data(&comms);
808     xbt_free(map);
809   }
810
811   if (index==MPI_UNDEFINED)
812     Status::empty(status);
813
814   return index;
815 }
816
817 static int sort_accumulates(MPI_Request a, MPI_Request b)
818 {
819   return (a->tag() > b->tag());
820 }
821
822 int Request::waitall(int count, MPI_Request requests[], MPI_Status status[])
823 {
824   std::vector<MPI_Request> accumulates;
825   int index;
826   MPI_Status stat;
827   MPI_Status *pstat = (status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat);
828   int retvalue = MPI_SUCCESS;
829   //tag invalid requests in the set
830   if (status != MPI_STATUSES_IGNORE) {
831     for (int c = 0; c < count; c++) {
832       if (requests[c] == MPI_REQUEST_NULL || requests[c]->dst_ == MPI_PROC_NULL || (requests[c]->flags_ & PREPARED)) {
833         Status::empty(&status[c]);
834       } else if (requests[c]->src_ == MPI_PROC_NULL) {
835         Status::empty(&status[c]);
836         status[c].MPI_SOURCE = MPI_PROC_NULL;
837       }
838     }
839   }
840   for (int c = 0; c < count; c++) {
841     if (MC_is_active() || MC_record_replay_is_active()) {
842       wait(&requests[c],pstat);
843       index = c;
844     } else {
845       index = waitany(count, (MPI_Request*)requests, pstat);
846       if (index == MPI_UNDEFINED)
847         break;
848
849       if (requests[index] != MPI_REQUEST_NULL
850            && (requests[index]->flags_ & RECV)
851            && (requests[index]->flags_ & ACCUMULATE))
852         accumulates.push_back(requests[index]);
853       if (requests[index] != MPI_REQUEST_NULL && (requests[index]->flags_ & NON_PERSISTENT))
854         requests[index] = MPI_REQUEST_NULL;
855     }
856     if (status != MPI_STATUSES_IGNORE) {
857       status[index] = *pstat;
858       if (status[index].MPI_ERROR == MPI_ERR_TRUNCATE)
859         retvalue = MPI_ERR_IN_STATUS;
860     }
861   }
862
863   if (not accumulates.empty()) {
864     std::sort(accumulates.begin(), accumulates.end(), sort_accumulates);
865     for (auto& req : accumulates) {
866       finish_wait(&req, status);
867     }
868   }
869
870   return retvalue;
871 }
872
873 int Request::waitsome(int incount, MPI_Request requests[], int *indices, MPI_Status status[])
874 {
875   int count = 0;
876   MPI_Status stat;
877   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
878
879   for (int i = 0; i < incount; i++) {
880     int index = waitany(incount, requests, pstat);
881     if(index!=MPI_UNDEFINED){
882       indices[count] = index;
883       count++;
884       if(status != MPI_STATUSES_IGNORE) {
885         status[index] = *pstat;
886       }
887      if (requests[index] != MPI_REQUEST_NULL && (requests[index]->flags_ & NON_PERSISTENT))
888      requests[index]=MPI_REQUEST_NULL;
889     }else{
890       return MPI_UNDEFINED;
891     }
892   }
893   return count;
894 }
895
896 MPI_Request Request::f2c(int id) {
897   char key[KEY_SIZE];
898   if(id==MPI_FORTRAN_REQUEST_NULL)
899     return static_cast<MPI_Request>(MPI_REQUEST_NULL);
900   return static_cast<MPI_Request>(F2C::f2c_lookup()->at(get_key_id(key, id)));
901 }
902
903 int Request::add_f()
904 {
905   if (F2C::f2c_lookup() == nullptr) {
906     F2C::set_f2c_lookup(new std::unordered_map<std::string, F2C*>);
907   }
908   char key[KEY_SIZE];
909   (*(F2C::f2c_lookup()))[get_key_id(key, F2C::f2c_id())] = this;
910   F2C::f2c_id_increment();
911   return F2C::f2c_id()-1;
912 }
913
914 void Request::free_f(int id)
915 {
916   if (id != MPI_FORTRAN_REQUEST_NULL) {
917     char key[KEY_SIZE];
918     F2C::f2c_lookup()->erase(get_key_id(key, id));
919   }
920 }
921
922 }
923 }