Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
4d74e826f9494cc3edfac8bda9d8ab1abfdf747d
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7
8 #include "private.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
16
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
18
19
20 namespace simgrid{
21 namespace smpi{
22 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
23 int Win::keyval_id_=0;
24
25 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
26   int comm_size = comm->size();
27   rank_         = comm->rank();
28   XBT_DEBUG("Creating window");
29   if(info!=MPI_INFO_NULL)
30     info->ref();
31   name_                  = nullptr;
32   opened_                = 0;
33   group_                 = MPI_GROUP_NULL;
34   requests_              = new std::vector<MPI_Request>();
35   mut_                   = s4u::Mutex::create();
36   lock_mut_              = s4u::Mutex::create();
37   atomic_mut_            = s4u::Mutex::create();
38   connected_wins_        = new MPI_Win[comm_size];
39   connected_wins_[rank_] = this;
40   count_                 = 0;
41   if(rank_==0){
42     bar_ = new s4u::Barrier(comm_size);
43   }
44   mode_=0;
45   errhandler_=MPI_ERRORS_ARE_FATAL;
46   errhandler_->ref();
47   comm->add_rma_win(this);
48   comm->ref();
49
50   colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win), MPI_BYTE,
51                    comm);
52
53   colls::bcast(&(bar_), sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
54
55   colls::barrier(comm);
56 }
57
58 Win::~Win(){
59   //As per the standard, perform a barrier to ensure every async comm is finished
60   bar_->wait();
61
62   int finished = finish_comms();
63   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
64
65   delete requests_;
66   delete[] connected_wins_;
67   if (name_ != nullptr){
68     xbt_free(name_);
69   }
70   if (info_ != MPI_INFO_NULL)
71     simgrid::smpi::Info::unref(info_);
72   if (errhandler_ != MPI_ERRHANDLER_NULL)
73     simgrid::smpi::Errhandler::unref(errhandler_);
74
75   comm_->remove_rma_win(this);
76
77   colls::barrier(comm_);
78   Comm::unref(comm_);
79   
80   if (rank_ == 0)
81     delete bar_;
82
83   if(allocated_ !=0)
84     xbt_free(base_);
85
86   cleanup_attr<Win>();
87 }
88
89 int Win::attach(void* /*base*/, MPI_Aint size)
90 {
91   if (not(base_ == MPI_BOTTOM || base_ == 0))
92     return MPI_ERR_ARG;
93   base_=0;//actually the address will be given in the RMA calls, as being the disp.
94   size_+=size;
95   return MPI_SUCCESS;
96 }
97
98 int Win::detach(const void* /*base*/)
99 {
100   base_=MPI_BOTTOM;
101   size_=-1;
102   return MPI_SUCCESS;
103 }
104
105 void Win::get_name(char* name, int* length){
106   if(name_==nullptr){
107     *length=0;
108     name=nullptr;
109     return;
110   }
111   *length = strlen(name_);
112   strncpy(name, name_, *length+1);
113 }
114
115 void Win::get_group(MPI_Group* group){
116   if(comm_ != MPI_COMM_NULL){
117     *group = comm_->group();
118   } else {
119     *group = MPI_GROUP_NULL;
120   }
121 }
122
123 MPI_Info Win::info()
124 {
125   if (info_ == MPI_INFO_NULL)
126     info_ = new Info();
127   info_->ref();
128   return info_;
129 }
130
131 int Win::rank(){
132   return rank_;
133 }
134
135 MPI_Aint Win::size(){
136   return size_;
137 }
138
139 void* Win::base(){
140   return base_;
141 }
142
143 int Win::disp_unit(){
144   return disp_unit_;
145 }
146
147 int Win::dynamic(){
148   return dynamic_;
149 }
150
151 void Win::set_info(MPI_Info info)
152 {
153   if (info_ != MPI_INFO_NULL)
154     simgrid::smpi::Info::unref(info_);
155   info_ = info;
156   if (info_ != MPI_INFO_NULL)
157     info_->ref();
158 }
159
160 void Win::set_name(const char* name){
161   name_ = xbt_strdup(name);
162 }
163
164 int Win::fence(int assert)
165 {
166   XBT_DEBUG("Entering fence");
167   if (opened_ == 0)
168     opened_=1;
169   if (assert != MPI_MODE_NOPRECEDE) {
170     // This is not the first fence => finalize what came before
171     bar_->wait();
172     mut_->lock();
173     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
174     // Without this, the vector could get redimensioned when another process pushes.
175     // This would result in the array used by Request::waitall() to be invalidated.
176     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
177     std::vector<MPI_Request> *reqs = requests_;
178     int size = static_cast<int>(reqs->size());
179     // start all requests that have been prepared by another process
180     if (size > 0) {
181       MPI_Request* treqs = &(*reqs)[0];
182       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
183     }
184     count_=0;
185     mut_->unlock();
186   }
187
188   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
189     opened_=0;
190   assert_ = assert;
191
192   bar_->wait();
193   XBT_DEBUG("Leaving fence");
194
195   return MPI_SUCCESS;
196 }
197
198 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
199               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
200 {
201   //get receiver pointer
202   MPI_Win recv_win = connected_wins_[target_rank];
203
204   if(opened_==0){//check that post/start has been done
205     // no fence or start .. lock ok ?
206     int locked=0;
207     for (auto const& it : recv_win->lockers_)
208       if (it == comm_->rank())
209         locked = 1;
210     if(locked != 1)
211       return MPI_ERR_WIN;
212   }
213
214   if(target_count*target_datatype->get_extent()>recv_win->size_)
215     return MPI_ERR_ARG;
216
217   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
218
219   if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
220     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
221     // prepare send_request
222     MPI_Request sreq =
223         // TODO cheinrich Check for rank / pid conversion
224         Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
225                                comm_, MPI_OP_NULL);
226
227     //prepare receiver request
228     // TODO cheinrich Check for rank / pid conversion
229     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
230                                               target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
231
232     //start send
233     sreq->start();
234
235     if(request!=nullptr){
236       *request=sreq;
237     }else{
238       mut_->lock();
239       requests_->push_back(sreq);
240       mut_->unlock();
241     }
242
243     //push request to receiver's win
244     recv_win->mut_->lock();
245     recv_win->requests_->push_back(rreq);
246     rreq->start();
247     recv_win->mut_->unlock();
248   } else {
249     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
250     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
251     if(request!=nullptr)
252       *request = MPI_REQUEST_NULL;
253   }
254
255   return MPI_SUCCESS;
256 }
257
258 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
259               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
260 {
261   //get sender pointer
262   MPI_Win send_win = connected_wins_[target_rank];
263
264   if(opened_==0){//check that post/start has been done
265     // no fence or start .. lock ok ?
266     int locked=0;
267     for (auto const& it : send_win->lockers_)
268       if (it == comm_->rank())
269         locked = 1;
270     if(locked != 1)
271       return MPI_ERR_WIN;
272   }
273
274   if(target_count*target_datatype->get_extent()>send_win->size_)
275     return MPI_ERR_ARG;
276
277   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
278   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
279
280   if(target_rank != comm_->rank()){
281     //prepare send_request
282     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
283                                               send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
284
285     //prepare receiver request
286     MPI_Request rreq = Request::rma_recv_init(
287         origin_addr, origin_count, origin_datatype, target_rank,
288         comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
289         SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
290
291     //start the send, with another process than us as sender.
292     sreq->start();
293     //push request to receiver's win
294     send_win->mut_->lock();
295     send_win->requests_->push_back(sreq);
296     send_win->mut_->unlock();
297
298     //start recv
299     rreq->start();
300
301     if(request!=nullptr){
302       *request=rreq;
303     }else{
304       mut_->lock();
305       requests_->push_back(rreq);
306       mut_->unlock();
307     }
308   } else {
309     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
310     if(request!=nullptr)
311       *request=MPI_REQUEST_NULL;
312   }
313   return MPI_SUCCESS;
314 }
315
316 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
317               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
318 {
319   XBT_DEBUG("Entering MPI_Win_Accumulate");
320   //get receiver pointer
321   MPI_Win recv_win = connected_wins_[target_rank];
322
323   if(opened_==0){//check that post/start has been done
324     // no fence or start .. lock ok ?
325     int locked=0;
326     for (auto const& it : recv_win->lockers_)
327       if (it == comm_->rank())
328         locked = 1;
329     if(locked != 1)
330       return MPI_ERR_WIN;
331   }
332   //FIXME: local version
333
334   if(target_count*target_datatype->get_extent()>recv_win->size_)
335     return MPI_ERR_ARG;
336
337   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
338   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
339   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
340   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
341   // prepare send_request
342
343   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
344                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
345
346   // prepare receiver request
347   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
348                                             recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
349
350   count_++;
351
352   // start send
353   sreq->start();
354   // push request to receiver's win
355   recv_win->mut_->lock();
356   recv_win->requests_->push_back(rreq);
357   rreq->start();
358   recv_win->mut_->unlock();
359
360   if (request != nullptr) {
361     *request = sreq;
362   } else {
363     mut_->lock();
364     requests_->push_back(sreq);
365     mut_->unlock();
366   }
367
368   XBT_DEBUG("Leaving MPI_Win_Accumulate");
369   return MPI_SUCCESS;
370 }
371
372 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
373                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
374                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
375 {
376   //get sender pointer
377   MPI_Win send_win = connected_wins_[target_rank];
378
379   if(opened_==0){//check that post/start has been done
380     // no fence or start .. lock ok ?
381     int locked=0;
382     for (auto const& it : send_win->lockers_)
383       if (it == comm_->rank())
384         locked = 1;
385     if(locked != 1)
386       return MPI_ERR_WIN;
387   }
388
389   if(target_count*target_datatype->get_extent()>send_win->size_)
390     return MPI_ERR_ARG;
391
392   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
393   //need to be sure ops are correctly ordered, so finish request here ? slow.
394   MPI_Request req;
395   send_win->atomic_mut_->lock();
396   get(result_addr, result_count, result_datatype, target_rank,
397               target_disp, target_count, target_datatype, &req);
398   if (req != MPI_REQUEST_NULL)
399     Request::wait(&req, MPI_STATUS_IGNORE);
400   if(op!=MPI_NO_OP)
401     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
402               target_disp, target_count, target_datatype, op, &req);
403   if (req != MPI_REQUEST_NULL)
404     Request::wait(&req, MPI_STATUS_IGNORE);
405   send_win->atomic_mut_->unlock();
406   return MPI_SUCCESS;
407 }
408
409 int Win::compare_and_swap(const void *origin_addr, void *compare_addr,
410         void *result_addr, MPI_Datatype datatype, int target_rank,
411         MPI_Aint target_disp){
412   //get sender pointer
413   MPI_Win send_win = connected_wins_[target_rank];
414
415   if(opened_==0){//check that post/start has been done
416     // no fence or start .. lock ok ?
417     int locked=0;
418     for (auto const& it : send_win->lockers_)
419       if (it == comm_->rank())
420         locked = 1;
421     if(locked != 1)
422       return MPI_ERR_WIN;
423   }
424
425   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
426   MPI_Request req = MPI_REQUEST_NULL;
427   send_win->atomic_mut_->lock();
428   get(result_addr, 1, datatype, target_rank,
429               target_disp, 1, datatype, &req);
430   if (req != MPI_REQUEST_NULL)
431     Request::wait(&req, MPI_STATUS_IGNORE);
432   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
433     put(origin_addr, 1, datatype, target_rank,
434               target_disp, 1, datatype);
435   }
436   send_win->atomic_mut_->unlock();
437   return MPI_SUCCESS;
438 }
439
440 int Win::start(MPI_Group group, int /*assert*/)
441 {
442   /* From MPI forum advices
443   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
444   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
445   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
446   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
447   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
448   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
449   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
450   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
451   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
452   must complete, without further dependencies.  */
453
454   //naive, blocking implementation.
455   int i             = 0;
456   int j             = 0;
457   int size          = group->size();
458   MPI_Request* reqs = xbt_new0(MPI_Request, size);
459
460   XBT_DEBUG("Entering MPI_Win_Start");
461   while (j != size) {
462     int src = comm_->group()->rank(group->actor(j));
463     if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
464       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
465       i++;
466     }
467     j++;
468   }
469   size = i;
470   Request::startall(size, reqs);
471   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
472   for (i = 0; i < size; i++) {
473     Request::unref(&reqs[i]);
474   }
475   xbt_free(reqs);
476   opened_++; //we're open for business !
477   group_=group;
478   group->ref();
479   XBT_DEBUG("Leaving MPI_Win_Start");
480   return MPI_SUCCESS;
481 }
482
483 int Win::post(MPI_Group group, int /*assert*/)
484 {
485   //let's make a synchronous send here
486   int i             = 0;
487   int j             = 0;
488   int size = group->size();
489   MPI_Request* reqs = xbt_new0(MPI_Request, size);
490
491   XBT_DEBUG("Entering MPI_Win_Post");
492   while(j!=size){
493     int dst = comm_->group()->rank(group->actor(j));
494     if (dst != rank_ && dst != MPI_UNDEFINED) {
495       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
496       i++;
497     }
498     j++;
499   }
500   size=i;
501
502   Request::startall(size, reqs);
503   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
504   for(i=0;i<size;i++){
505     Request::unref(&reqs[i]);
506   }
507   xbt_free(reqs);
508   opened_++; //we're open for business !
509   group_=group;
510   group->ref();
511   XBT_DEBUG("Leaving MPI_Win_Post");
512   return MPI_SUCCESS;
513 }
514
515 int Win::complete(){
516   if(opened_==0)
517     xbt_die("Complete called on already opened MPI_Win");
518
519   XBT_DEBUG("Entering MPI_Win_Complete");
520   int i             = 0;
521   int j             = 0;
522   int size          = group_->size();
523   MPI_Request* reqs = xbt_new0(MPI_Request, size);
524
525   while(j!=size){
526     int dst = comm_->group()->rank(group_->actor(j));
527     if (dst != rank_ && dst != MPI_UNDEFINED) {
528       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
529       i++;
530     }
531     j++;
532   }
533   size=i;
534   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
535   Request::startall(size, reqs);
536   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
537
538   for(i=0;i<size;i++){
539     Request::unref(&reqs[i]);
540   }
541   xbt_free(reqs);
542
543   int finished = finish_comms();
544   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
545
546   Group::unref(group_);
547   opened_--; //we're closed for business !
548   return MPI_SUCCESS;
549 }
550
551 int Win::wait(){
552   //naive, blocking implementation.
553   XBT_DEBUG("Entering MPI_Win_Wait");
554   int i             = 0;
555   int j             = 0;
556   int size          = group_->size();
557   MPI_Request* reqs = xbt_new0(MPI_Request, size);
558
559   while(j!=size){
560     int src = comm_->group()->rank(group_->actor(j));
561     if (src != rank_ && src != MPI_UNDEFINED) {
562       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
563       i++;
564     }
565     j++;
566   }
567   size=i;
568   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
569   Request::startall(size, reqs);
570   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
571   for(i=0;i<size;i++){
572     Request::unref(&reqs[i]);
573   }
574   xbt_free(reqs);
575   int finished = finish_comms();
576   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
577
578   Group::unref(group_);
579   opened_--; //we're opened for business !
580   return MPI_SUCCESS;
581 }
582
583 int Win::lock(int lock_type, int rank, int /*assert*/)
584 {
585   MPI_Win target_win = connected_wins_[rank];
586
587   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
588     target_win->lock_mut_->lock();
589     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
590     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
591       target_win->lock_mut_->unlock();
592    }
593   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
594     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
595
596   target_win->lockers_.push_back(comm_->rank());
597
598   int finished = finish_comms(rank);
599   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
600   finished = target_win->finish_comms(rank_);
601   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
602   return MPI_SUCCESS;
603 }
604
605 int Win::lock_all(int assert){
606   int i=0;
607   int retval = MPI_SUCCESS;
608   for (i=0; i<comm_->size();i++){
609       int ret = this->lock(MPI_LOCK_SHARED, i, assert);
610       if(ret != MPI_SUCCESS)
611         retval = ret;
612   }
613   return retval;
614 }
615
616 int Win::unlock(int rank){
617   MPI_Win target_win = connected_wins_[rank];
618   int target_mode = target_win->mode_;
619   target_win->mode_= 0;
620   target_win->lockers_.remove(comm_->rank());
621   if (target_mode==MPI_LOCK_EXCLUSIVE){
622     target_win->lock_mut_->unlock();
623   }
624
625   int finished = finish_comms(rank);
626   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
627   finished = target_win->finish_comms(rank_);
628   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
629   return MPI_SUCCESS;
630 }
631
632 int Win::unlock_all(){
633   int i=0;
634   int retval = MPI_SUCCESS;
635   for (i=0; i<comm_->size();i++){
636     int ret = this->unlock(i);
637     if (ret != MPI_SUCCESS)
638       retval = ret;
639   }
640   return retval;
641 }
642
643 int Win::flush(int rank){
644   MPI_Win target_win = connected_wins_[rank];
645   int finished       = finish_comms(rank_);
646   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
647   finished = target_win->finish_comms(rank);
648   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
649   return MPI_SUCCESS;
650 }
651
652 int Win::flush_local(int rank){
653   int finished = finish_comms(rank);
654   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
655   return MPI_SUCCESS;
656 }
657
658 int Win::flush_all(){
659   int finished = finish_comms();
660   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
661   for (int i = 0; i < comm_->size(); i++) {
662     finished = connected_wins_[i]->finish_comms(rank_);
663     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
664   }
665   return MPI_SUCCESS;
666 }
667
668 int Win::flush_local_all(){
669   int finished = finish_comms();
670   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
671   return MPI_SUCCESS;
672 }
673
674 Win* Win::f2c(int id){
675   return static_cast<Win*>(F2C::f2c(id));
676 }
677
678 int Win::finish_comms(){
679   mut_->lock();
680   //Finish own requests
681   std::vector<MPI_Request> *reqqs = requests_;
682   int size = static_cast<int>(reqqs->size());
683   if (size > 0) {
684     MPI_Request* treqs = &(*reqqs)[0];
685     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
686     reqqs->clear();
687   }
688   mut_->unlock();
689   return size;
690 }
691
692 int Win::finish_comms(int rank){
693   mut_->lock();
694   //Finish own requests
695   std::vector<MPI_Request> *reqqs = requests_;
696   int size = static_cast<int>(reqqs->size());
697   if (size > 0) {
698     size = 0;
699     std::vector<MPI_Request> myreqqs;
700     std::vector<MPI_Request>::iterator iter = reqqs->begin();
701     int proc_id                             = comm_->group()->actor(rank)->get_pid();
702     while (iter != reqqs->end()){
703       // Let's see if we're either the destination or the sender of this request
704       // because we only wait for requests that we are responsible for.
705       // Also use the process id here since the request itself returns from src()
706       // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
707       if (((*iter) != MPI_REQUEST_NULL) && (((*iter)->src() == proc_id) || ((*iter)->dst() == proc_id))) {
708         myreqqs.push_back(*iter);
709         iter = reqqs->erase(iter);
710         size++;
711       } else {
712         ++iter;
713       }
714     }
715     if(size >0){
716       MPI_Request* treqs = &myreqqs[0];
717       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
718       myreqqs.clear();
719     }
720   }
721   mut_->unlock();
722   return size;
723 }
724
725 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr)
726 {
727   MPI_Win target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
728   for (int i = 0; not target_win && i < comm_->size(); i++) {
729     if (connected_wins_[i]->size_ > 0)
730       target_win = connected_wins_[i];
731   }
732   if (target_win) {
733     *size                         = target_win->size_;
734     *disp_unit                    = target_win->disp_unit_;
735     *static_cast<void**>(baseptr) = target_win->base_;
736   } else {
737     *size                         = 0;
738     *static_cast<void**>(baseptr) = xbt_malloc(0);
739   }
740   return MPI_SUCCESS;
741 }
742
743 MPI_Errhandler Win::errhandler()
744 {
745   if (errhandler_ != MPI_ERRHANDLER_NULL)
746     errhandler_->ref();
747   return errhandler_;
748 }
749
750 void Win::set_errhandler(MPI_Errhandler errhandler)
751 {
752   if (errhandler_ != MPI_ERRHANDLER_NULL)
753     simgrid::smpi::Errhandler::unref(errhandler_);
754   errhandler_ = errhandler;
755   if (errhandler_ != MPI_ERRHANDLER_NULL)
756     errhandler_->ref();
757 }
758 } // namespace smpi
759 } // namespace simgrid