Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[sonar] Useless parentheses.
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7
8 #include "private.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
16
17 #include <algorithm>
18
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
20
21
22 namespace simgrid{
23 namespace smpi{
24 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
25 int Win::keyval_id_=0;
26
27 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic)
28     : base_(base)
29     , size_(size)
30     , disp_unit_(disp_unit)
31     , info_(info)
32     , comm_(comm)
33     , connected_wins_(comm->size())
34     , rank_(comm->rank())
35     , allocated_(allocated)
36     , dynamic_(dynamic)
37 {
38   XBT_DEBUG("Creating window");
39   if(info!=MPI_INFO_NULL)
40     info->ref();
41   connected_wins_[rank_] = this;
42   if(rank_==0){
43     bar_ = new s4u::Barrier(comm->size());
44   }
45   errhandler_->ref();
46   comm->add_rma_win(this);
47   comm->ref();
48
49   colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
50                    MPI_BYTE, comm);
51
52   colls::bcast(&bar_, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
53
54   colls::barrier(comm);
55   this->add_f();
56 }
57
58 Win::~Win(){
59   //As per the standard, perform a barrier to ensure every async comm is finished
60   bar_->wait();
61
62   int finished = finish_comms();
63   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
64
65   if (info_ != MPI_INFO_NULL)
66     simgrid::smpi::Info::unref(info_);
67   if (errhandler_ != MPI_ERRHANDLER_NULL)
68     simgrid::smpi::Errhandler::unref(errhandler_);
69
70   comm_->remove_rma_win(this);
71
72   colls::barrier(comm_);
73   Comm::unref(comm_);
74   
75   if (rank_ == 0)
76     delete bar_;
77
78   if(allocated_ !=0)
79     xbt_free(base_);
80
81   F2C::free_f(this->c2f());
82   cleanup_attr<Win>();
83 }
84
85 int Win::attach(void* /*base*/, MPI_Aint size)
86 {
87   if (not(base_ == MPI_BOTTOM || base_ == nullptr))
88     return MPI_ERR_ARG;
89   base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
90   size_+=size;
91   return MPI_SUCCESS;
92 }
93
94 int Win::detach(const void* /*base*/)
95 {
96   base_=MPI_BOTTOM;
97   size_=-1;
98   return MPI_SUCCESS;
99 }
100
101 void Win::get_name(char* name, int* length) const
102 {
103   *length = static_cast<int>(name_.length());
104   if (not name_.empty()) {
105     name_.copy(name, *length);
106     name[*length] = '\0';
107   }
108 }
109
110 void Win::get_group(MPI_Group* group){
111   if(comm_ != MPI_COMM_NULL){
112     *group = comm_->group();
113   } else {
114     *group = MPI_GROUP_NULL;
115   }
116 }
117
118 MPI_Info Win::info()
119 {
120   if (info_ == MPI_INFO_NULL)
121     info_ = new Info();
122   info_->ref();
123   return info_;
124 }
125
126 int Win::rank() const
127 {
128   return rank_;
129 }
130
131 MPI_Aint Win::size() const
132 {
133   return size_;
134 }
135
136 void* Win::base() const
137 {
138   return base_;
139 }
140
141 int Win::disp_unit() const
142 {
143   return disp_unit_;
144 }
145
146 int Win::dynamic() const
147 {
148   return dynamic_;
149 }
150
151 void Win::set_info(MPI_Info info)
152 {
153   if (info_ != MPI_INFO_NULL)
154     simgrid::smpi::Info::unref(info_);
155   info_ = info;
156   if (info_ != MPI_INFO_NULL)
157     info_->ref();
158 }
159
160 void Win::set_name(const char* name){
161   name_ = name;
162 }
163
164 int Win::fence(int assert)
165 {
166   XBT_DEBUG("Entering fence");
167   if (opened_ == 0)
168     opened_=1;
169   if (assert != MPI_MODE_NOPRECEDE) {
170     // This is not the first fence => finalize what came before
171     bar_->wait();
172     mut_->lock();
173     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
174     // Without this, the vector could get redimensioned when another process pushes.
175     // This would result in the array used by Request::waitall() to be invalidated.
176     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
177
178     // start all requests that have been prepared by another process
179     if (not requests_.empty()) {
180       int size           = static_cast<int>(requests_.size());
181       MPI_Request* treqs = requests_.data();
182       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
183     }
184     count_=0;
185     mut_->unlock();
186   }
187
188   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
189     opened_=0;
190   assert_ = assert;
191
192   bar_->wait();
193   XBT_DEBUG("Leaving fence");
194
195   return MPI_SUCCESS;
196 }
197
198 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
199               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
200 {
201   //get receiver pointer
202   Win* recv_win = connected_wins_[target_rank];
203
204   if(opened_==0){//check that post/start has been done
205     // no fence or start .. lock ok ?
206     int locked=0;
207     for (auto const& it : recv_win->lockers_)
208       if (it == comm_->rank())
209         locked = 1;
210     if(locked != 1)
211       return MPI_ERR_WIN;
212   }
213
214   if(target_count*target_datatype->get_extent()>recv_win->size_){
215     XBT_WARN("Trying to put more than the window size - Bailing out.");
216     return MPI_ERR_RMA_RANGE;
217   }
218
219   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
220
221   if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
222     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
223     // prepare send_request
224     MPI_Request sreq =
225         // TODO cheinrich Check for rank / pid conversion
226         Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
227                                comm_, MPI_OP_NULL);
228
229     //prepare receiver request
230     // TODO cheinrich Check for rank / pid conversion
231     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
232                                               target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
233
234     //start send
235     sreq->start();
236
237     if(request!=nullptr){
238       *request=sreq;
239     }else{
240       mut_->lock();
241       requests_.push_back(sreq);
242       mut_->unlock();
243     }
244
245     //push request to receiver's win
246     recv_win->mut_->lock();
247     recv_win->requests_.push_back(rreq);
248     rreq->start();
249     recv_win->mut_->unlock();
250   } else {
251     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
252     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
253     if(request!=nullptr)
254       *request = MPI_REQUEST_NULL;
255   }
256
257   return MPI_SUCCESS;
258 }
259
260 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
261               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
262 {
263   //get sender pointer
264   Win* send_win = connected_wins_[target_rank];
265
266   if(opened_==0){//check that post/start has been done
267     // no fence or start .. lock ok ?
268     int locked=0;
269     for (auto const& it : send_win->lockers_)
270       if (it == comm_->rank())
271         locked = 1;
272     if(locked != 1)
273       return MPI_ERR_WIN;
274   }
275
276   if(target_count*target_datatype->get_extent()>send_win->size_){
277     XBT_WARN("Trying to get more than the window size - Bailing out.");
278     return MPI_ERR_RMA_RANGE;
279   }
280
281   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
282   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
283
284   if(target_rank != comm_->rank()){
285     //prepare send_request
286     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
287                                               send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
288
289     //prepare receiver request
290     MPI_Request rreq = Request::rma_recv_init(
291         origin_addr, origin_count, origin_datatype, target_rank,
292         comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
293         SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
294
295     //start the send, with another process than us as sender.
296     sreq->start();
297     //push request to receiver's win
298     send_win->mut_->lock();
299     send_win->requests_.push_back(sreq);
300     send_win->mut_->unlock();
301
302     //start recv
303     rreq->start();
304
305     if(request!=nullptr){
306       *request=rreq;
307     }else{
308       mut_->lock();
309       requests_.push_back(rreq);
310       mut_->unlock();
311     }
312   } else {
313     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
314     if(request!=nullptr)
315       *request=MPI_REQUEST_NULL;
316   }
317   return MPI_SUCCESS;
318 }
319
320 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
321               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
322 {
323   XBT_DEBUG("Entering MPI_Win_Accumulate");
324   //get receiver pointer
325   Win* recv_win = connected_wins_[target_rank];
326
327   if(opened_==0){//check that post/start has been done
328     // no fence or start .. lock ok ?
329     int locked=0;
330     for (auto const& it : recv_win->lockers_)
331       if (it == comm_->rank())
332         locked = 1;
333     if(locked != 1)
334       return MPI_ERR_WIN;
335   }
336   //FIXME: local version
337
338   if(target_count*target_datatype->get_extent()>recv_win->size_){
339     XBT_WARN("Trying to accumulate more than the window size - Bailing out.");
340     return MPI_ERR_RMA_RANGE;
341   }
342
343   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
344   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
345   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
346   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
347   // prepare send_request
348
349   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
350                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
351
352   // prepare receiver request
353   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
354                                             recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
355
356   count_++;
357
358   // start send
359   sreq->start();
360   // push request to receiver's win
361   recv_win->mut_->lock();
362   recv_win->requests_.push_back(rreq);
363   rreq->start();
364   recv_win->mut_->unlock();
365
366   if (request != nullptr) {
367     *request = sreq;
368   } else {
369     mut_->lock();
370     requests_.push_back(sreq);
371     mut_->unlock();
372   }
373
374   XBT_DEBUG("Leaving MPI_Win_Accumulate");
375   return MPI_SUCCESS;
376 }
377
378 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
379                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
380                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
381 {
382   //get sender pointer
383   const Win* send_win = connected_wins_[target_rank];
384
385   if(opened_==0){//check that post/start has been done
386     // no fence or start .. lock ok ?
387     int locked=0;
388     for (auto const& it : send_win->lockers_)
389       if (it == comm_->rank())
390         locked = 1;
391     if(locked != 1)
392       return MPI_ERR_WIN;
393   }
394
395   if(target_count*target_datatype->get_extent()>send_win->size_){
396     XBT_WARN("Trying to get_accumulate more than the window size - Bailing out.");
397     return MPI_ERR_RMA_RANGE;
398   }
399
400
401   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
402   //need to be sure ops are correctly ordered, so finish request here ? slow.
403   MPI_Request req;
404   send_win->atomic_mut_->lock();
405   get(result_addr, result_count, result_datatype, target_rank,
406               target_disp, target_count, target_datatype, &req);
407   if (req != MPI_REQUEST_NULL)
408     Request::wait(&req, MPI_STATUS_IGNORE);
409   if(op!=MPI_NO_OP)
410     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
411               target_disp, target_count, target_datatype, op, &req);
412   if (req != MPI_REQUEST_NULL)
413     Request::wait(&req, MPI_STATUS_IGNORE);
414   send_win->atomic_mut_->unlock();
415   return MPI_SUCCESS;
416 }
417
418 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
419                           int target_rank, MPI_Aint target_disp)
420 {
421   //get sender pointer
422   const Win* send_win = connected_wins_[target_rank];
423
424   if(opened_==0){//check that post/start has been done
425     // no fence or start .. lock ok ?
426     int locked=0;
427     for (auto const& it : send_win->lockers_)
428       if (it == comm_->rank())
429         locked = 1;
430     if(locked != 1)
431       return MPI_ERR_WIN;
432   }
433
434   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
435   MPI_Request req = MPI_REQUEST_NULL;
436   send_win->atomic_mut_->lock();
437   get(result_addr, 1, datatype, target_rank,
438               target_disp, 1, datatype, &req);
439   if (req != MPI_REQUEST_NULL)
440     Request::wait(&req, MPI_STATUS_IGNORE);
441   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
442     put(origin_addr, 1, datatype, target_rank,
443               target_disp, 1, datatype);
444   }
445   send_win->atomic_mut_->unlock();
446   return MPI_SUCCESS;
447 }
448
449 int Win::start(MPI_Group group, int /*assert*/)
450 {
451   /* From MPI forum advices
452   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
453   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
454   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
455   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
456   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
457   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
458   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
459   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
460   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
461   must complete, without further dependencies.  */
462
463   //naive, blocking implementation.
464   int i             = 0;
465   int j             = 0;
466   int size          = group->size();
467   std::vector<MPI_Request> reqs(size);
468
469   XBT_DEBUG("Entering MPI_Win_Start");
470   while (j != size) {
471     int src = comm_->group()->rank(group->actor(j));
472     if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
473       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
474       i++;
475     }
476     j++;
477   }
478   size = i;
479   Request::startall(size, reqs.data());
480   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
481   for (i = 0; i < size; i++) {
482     Request::unref(&reqs[i]);
483   }
484   opened_++; //we're open for business !
485   group_=group;
486   group->ref();
487   XBT_DEBUG("Leaving MPI_Win_Start");
488   return MPI_SUCCESS;
489 }
490
491 int Win::post(MPI_Group group, int /*assert*/)
492 {
493   //let's make a synchronous send here
494   int i             = 0;
495   int j             = 0;
496   int size = group->size();
497   std::vector<MPI_Request> reqs(size);
498
499   XBT_DEBUG("Entering MPI_Win_Post");
500   while(j!=size){
501     int dst = comm_->group()->rank(group->actor(j));
502     if (dst != rank_ && dst != MPI_UNDEFINED) {
503       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
504       i++;
505     }
506     j++;
507   }
508   size=i;
509
510   Request::startall(size, reqs.data());
511   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
512   for(i=0;i<size;i++){
513     Request::unref(&reqs[i]);
514   }
515   opened_++; //we're open for business !
516   group_=group;
517   group->ref();
518   XBT_DEBUG("Leaving MPI_Win_Post");
519   return MPI_SUCCESS;
520 }
521
522 int Win::complete(){
523   if(opened_==0)
524     xbt_die("Complete called on already opened MPI_Win");
525
526   XBT_DEBUG("Entering MPI_Win_Complete");
527   int i             = 0;
528   int j             = 0;
529   int size          = group_->size();
530   std::vector<MPI_Request> reqs(size);
531
532   while(j!=size){
533     int dst = comm_->group()->rank(group_->actor(j));
534     if (dst != rank_ && dst != MPI_UNDEFINED) {
535       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
536       i++;
537     }
538     j++;
539   }
540   size=i;
541   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
542   Request::startall(size, reqs.data());
543   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
544
545   for(i=0;i<size;i++){
546     Request::unref(&reqs[i]);
547   }
548
549   int finished = finish_comms();
550   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
551
552   Group::unref(group_);
553   opened_--; //we're closed for business !
554   return MPI_SUCCESS;
555 }
556
557 int Win::wait(){
558   //naive, blocking implementation.
559   XBT_DEBUG("Entering MPI_Win_Wait");
560   int i             = 0;
561   int j             = 0;
562   int size          = group_->size();
563   std::vector<MPI_Request> reqs(size);
564
565   while(j!=size){
566     int src = comm_->group()->rank(group_->actor(j));
567     if (src != rank_ && src != MPI_UNDEFINED) {
568       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
569       i++;
570     }
571     j++;
572   }
573   size=i;
574   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
575   Request::startall(size, reqs.data());
576   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
577   for(i=0;i<size;i++){
578     Request::unref(&reqs[i]);
579   }
580   int finished = finish_comms();
581   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
582
583   Group::unref(group_);
584   opened_--; //we're opened for business !
585   return MPI_SUCCESS;
586 }
587
588 int Win::lock(int lock_type, int rank, int /*assert*/)
589 {
590   MPI_Win target_win = connected_wins_[rank];
591
592   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
593     target_win->lock_mut_->lock();
594     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
595     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
596       target_win->lock_mut_->unlock();
597    }
598   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
599     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
600
601   target_win->lockers_.push_back(comm_->rank());
602
603   int finished = finish_comms(rank);
604   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
605   finished = target_win->finish_comms(rank_);
606   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
607   return MPI_SUCCESS;
608 }
609
610 int Win::lock_all(int assert){
611   int retval = MPI_SUCCESS;
612   for (int i = 0; i < comm_->size(); i++) {
613     int ret = this->lock(MPI_LOCK_SHARED, i, assert);
614     if (ret != MPI_SUCCESS)
615       retval = ret;
616   }
617   return retval;
618 }
619
620 int Win::unlock(int rank){
621   MPI_Win target_win = connected_wins_[rank];
622   int target_mode = target_win->mode_;
623   target_win->mode_= 0;
624   target_win->lockers_.remove(comm_->rank());
625   if (target_mode==MPI_LOCK_EXCLUSIVE){
626     target_win->lock_mut_->unlock();
627   }
628
629   int finished = finish_comms(rank);
630   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
631   finished = target_win->finish_comms(rank_);
632   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
633   return MPI_SUCCESS;
634 }
635
636 int Win::unlock_all(){
637   int retval = MPI_SUCCESS;
638   for (int i = 0; i < comm_->size(); i++) {
639     int ret = this->unlock(i);
640     if (ret != MPI_SUCCESS)
641       retval = ret;
642   }
643   return retval;
644 }
645
646 int Win::flush(int rank){
647   MPI_Win target_win = connected_wins_[rank];
648   int finished       = finish_comms(rank_);
649   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
650   finished = target_win->finish_comms(rank);
651   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
652   return MPI_SUCCESS;
653 }
654
655 int Win::flush_local(int rank){
656   int finished = finish_comms(rank);
657   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
658   return MPI_SUCCESS;
659 }
660
661 int Win::flush_all(){
662   int finished = finish_comms();
663   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
664   for (int i = 0; i < comm_->size(); i++) {
665     finished = connected_wins_[i]->finish_comms(rank_);
666     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
667   }
668   return MPI_SUCCESS;
669 }
670
671 int Win::flush_local_all(){
672   int finished = finish_comms();
673   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
674   return MPI_SUCCESS;
675 }
676
677 Win* Win::f2c(int id){
678   return static_cast<Win*>(F2C::f2c(id));
679 }
680
681 int Win::finish_comms(){
682   mut_->lock();
683   //Finish own requests
684   int size = static_cast<int>(requests_.size());
685   if (size > 0) {
686     MPI_Request* treqs = requests_.data();
687     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
688     requests_.clear();
689   }
690   mut_->unlock();
691   return size;
692 }
693
694 int Win::finish_comms(int rank){
695   mut_->lock();
696   // Finish own requests
697   // Let's see if we're either the destination or the sender of this request
698   // because we only wait for requests that we are responsible for.
699   // Also use the process id here since the request itself returns from src()
700   // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
701   int proc_id = comm_->group()->actor(rank)->get_pid();
702   auto it     = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
703     return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
704   });
705   std::vector<MPI_Request> myreqqs(it, end(requests_));
706   requests_.erase(it, end(requests_));
707   int size = static_cast<int>(myreqqs.size());
708   if (size > 0) {
709     MPI_Request* treqs = myreqqs.data();
710     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
711     myreqqs.clear();
712   }
713   mut_->unlock();
714   return size;
715 }
716
717 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
718 {
719   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
720   for (int i = 0; not target_win && i < comm_->size(); i++) {
721     if (connected_wins_[i]->size_ > 0)
722       target_win = connected_wins_[i];
723   }
724   if (target_win) {
725     *size                         = target_win->size_;
726     *disp_unit                    = target_win->disp_unit_;
727     *static_cast<void**>(baseptr) = target_win->base_;
728   } else {
729     *size                         = 0;
730     *static_cast<void**>(baseptr) = nullptr;
731   }
732   return MPI_SUCCESS;
733 }
734
735 MPI_Errhandler Win::errhandler()
736 {
737   if (errhandler_ != MPI_ERRHANDLER_NULL)
738     errhandler_->ref();
739   return errhandler_;
740 }
741
742 void Win::set_errhandler(MPI_Errhandler errhandler)
743 {
744   if (errhandler_ != MPI_ERRHANDLER_NULL)
745     simgrid::smpi::Errhandler::unref(errhandler_);
746   errhandler_ = errhandler;
747   if (errhandler_ != MPI_ERRHANDLER_NULL)
748     errhandler_->ref();
749 }
750 } // namespace smpi
751 } // namespace simgrid