Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
9850c2a2f2b87974abef6ac3c4274f91dd54f4b2
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7
8 #include "private.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
16
17 #include <algorithm>
18
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
20
21
22 namespace simgrid{
23 namespace smpi{
24 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
25 int Win::keyval_id_=0;
26
27 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic)
28     : base_(base)
29     , size_(size)
30     , disp_unit_(disp_unit)
31     , info_(info)
32     , comm_(comm)
33     , connected_wins_(comm->size())
34     , rank_(comm->rank())
35     , allocated_(allocated)
36     , dynamic_(dynamic)
37 {
38   XBT_DEBUG("Creating window");
39   if(info!=MPI_INFO_NULL)
40     info->ref();
41   connected_wins_[rank_] = this;
42   if(rank_==0){
43     bar_ = new s4u::Barrier(comm->size());
44   }
45   errhandler_->ref();
46   comm->add_rma_win(this);
47   comm->ref();
48
49   colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
50                    MPI_BYTE, comm);
51
52   colls::bcast(&bar_, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
53
54   colls::barrier(comm);
55   this->add_f();
56 }
57
58 Win::~Win(){
59   //As per the standard, perform a barrier to ensure every async comm is finished
60   bar_->wait();
61
62   int finished = finish_comms();
63   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
64
65   if (info_ != MPI_INFO_NULL)
66     simgrid::smpi::Info::unref(info_);
67   if (errhandler_ != MPI_ERRHANDLER_NULL)
68     simgrid::smpi::Errhandler::unref(errhandler_);
69
70   comm_->remove_rma_win(this);
71
72   colls::barrier(comm_);
73   Comm::unref(comm_);
74   
75   if (rank_ == 0)
76     delete bar_;
77
78   if(allocated_ !=0)
79     xbt_free(base_);
80
81   F2C::free_f(this->c2f());
82   cleanup_attr<Win>();
83 }
84
85 int Win::attach(void* /*base*/, MPI_Aint size)
86 {
87   if (not(base_ == MPI_BOTTOM || base_ == nullptr))
88     return MPI_ERR_ARG;
89   base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
90   size_+=size;
91   return MPI_SUCCESS;
92 }
93
94 int Win::detach(const void* /*base*/)
95 {
96   base_=MPI_BOTTOM;
97   size_=-1;
98   return MPI_SUCCESS;
99 }
100
101 void Win::get_name(char* name, int* length) const
102 {
103   *length = static_cast<int>(name_.length());
104   if (not name_.empty()) {
105     name_.copy(name, *length);
106     name[*length] = '\0';
107   }
108 }
109
110 void Win::get_group(MPI_Group* group){
111   if(comm_ != MPI_COMM_NULL){
112     *group = comm_->group();
113   } else {
114     *group = MPI_GROUP_NULL;
115   }
116 }
117
118 MPI_Info Win::info()
119 {
120   if (info_ == MPI_INFO_NULL)
121     info_ = new Info();
122   info_->ref();
123   return info_;
124 }
125
126 int Win::rank() const
127 {
128   return rank_;
129 }
130
131 MPI_Comm Win::comm() const
132 {
133   return comm_;
134 }
135
136 MPI_Aint Win::size() const
137 {
138   return size_;
139 }
140
141 void* Win::base() const
142 {
143   return base_;
144 }
145
146 int Win::disp_unit() const
147 {
148   return disp_unit_;
149 }
150
151 int Win::dynamic() const
152 {
153   return dynamic_;
154 }
155
156 void Win::set_info(MPI_Info info)
157 {
158   if (info_ != MPI_INFO_NULL)
159     simgrid::smpi::Info::unref(info_);
160   info_ = info;
161   if (info_ != MPI_INFO_NULL)
162     info_->ref();
163 }
164
165 void Win::set_name(const char* name){
166   name_ = name;
167 }
168
169 int Win::fence(int assert)
170 {
171   XBT_DEBUG("Entering fence");
172   if (opened_ == 0)
173     opened_=1;
174   if (assert != MPI_MODE_NOPRECEDE) {
175     // This is not the first fence => finalize what came before
176     bar_->wait();
177     mut_->lock();
178     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
179     // Without this, the vector could get redimensioned when another process pushes.
180     // This would result in the array used by Request::waitall() to be invalidated.
181     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
182
183     // start all requests that have been prepared by another process
184     if (not requests_.empty()) {
185       int size           = static_cast<int>(requests_.size());
186       MPI_Request* treqs = requests_.data();
187       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
188     }
189     count_=0;
190     mut_->unlock();
191   }
192
193   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
194     opened_=0;
195   assert_ = assert;
196
197   bar_->wait();
198   XBT_DEBUG("Leaving fence");
199
200   return MPI_SUCCESS;
201 }
202
203 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
204               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
205 {
206   //get receiver pointer
207   Win* recv_win = connected_wins_[target_rank];
208
209   if(opened_==0){//check that post/start has been done
210     // no fence or start .. lock ok ?
211     int locked=0;
212     for (auto const& it : recv_win->lockers_)
213       if (it == comm_->rank())
214         locked = 1;
215     if(locked != 1)
216       return MPI_ERR_WIN;
217   }
218
219   if(target_count*target_datatype->get_extent()>recv_win->size_){
220     XBT_WARN("Trying to put more than the window size - Bailing out.");
221     return MPI_ERR_RMA_RANGE;
222   }
223
224   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
225
226   if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
227     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
228     // prepare send_request
229     MPI_Request sreq =
230         // TODO cheinrich Check for rank / pid conversion
231         Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
232                                comm_, MPI_OP_NULL);
233
234     //prepare receiver request
235     // TODO cheinrich Check for rank / pid conversion
236     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
237                                               target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
238
239     //start send
240     sreq->start();
241
242     if(request!=nullptr){
243       *request=sreq;
244     }else{
245       mut_->lock();
246       requests_.push_back(sreq);
247       mut_->unlock();
248     }
249
250     //push request to receiver's win
251     recv_win->mut_->lock();
252     recv_win->requests_.push_back(rreq);
253     rreq->start();
254     recv_win->mut_->unlock();
255   } else {
256     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
257     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
258     if(request!=nullptr)
259       *request = MPI_REQUEST_NULL;
260   }
261
262   return MPI_SUCCESS;
263 }
264
265 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
266               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
267 {
268   //get sender pointer
269   Win* send_win = connected_wins_[target_rank];
270
271   if(opened_==0){//check that post/start has been done
272     // no fence or start .. lock ok ?
273     int locked=0;
274     for (auto const& it : send_win->lockers_)
275       if (it == comm_->rank())
276         locked = 1;
277     if(locked != 1)
278       return MPI_ERR_WIN;
279   }
280
281   if(target_count*target_datatype->get_extent()>send_win->size_){
282     XBT_WARN("Trying to get more than the window size - Bailing out.");
283     return MPI_ERR_RMA_RANGE;
284   }
285
286   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
287   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
288
289   if(target_rank != comm_->rank()){
290     //prepare send_request
291     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
292                                               send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
293
294     //prepare receiver request
295     MPI_Request rreq = Request::rma_recv_init(
296         origin_addr, origin_count, origin_datatype, target_rank,
297         comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
298         SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
299
300     //start the send, with another process than us as sender.
301     sreq->start();
302     //push request to receiver's win
303     send_win->mut_->lock();
304     send_win->requests_.push_back(sreq);
305     send_win->mut_->unlock();
306
307     //start recv
308     rreq->start();
309
310     if(request!=nullptr){
311       *request=rreq;
312     }else{
313       mut_->lock();
314       requests_.push_back(rreq);
315       mut_->unlock();
316     }
317   } else {
318     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
319     if(request!=nullptr)
320       *request=MPI_REQUEST_NULL;
321   }
322   return MPI_SUCCESS;
323 }
324
325 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
326               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
327 {
328   XBT_DEBUG("Entering MPI_Win_Accumulate");
329   //get receiver pointer
330   Win* recv_win = connected_wins_[target_rank];
331
332   if(opened_==0){//check that post/start has been done
333     // no fence or start .. lock ok ?
334     int locked=0;
335     for (auto const& it : recv_win->lockers_)
336       if (it == comm_->rank())
337         locked = 1;
338     if(locked != 1)
339       return MPI_ERR_WIN;
340   }
341   //FIXME: local version
342
343   if(target_count*target_datatype->get_extent()>recv_win->size_){
344     XBT_WARN("Trying to accumulate more than the window size - Bailing out.");
345     return MPI_ERR_RMA_RANGE;
346   }
347
348   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
349   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
350   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
351   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
352   // prepare send_request
353
354   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
355                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
356
357   // prepare receiver request
358   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
359                                             recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
360
361   count_++;
362
363   // start send
364   sreq->start();
365   // push request to receiver's win
366   recv_win->mut_->lock();
367   recv_win->requests_.push_back(rreq);
368   rreq->start();
369   recv_win->mut_->unlock();
370
371   if (request != nullptr) {
372     *request = sreq;
373   } else {
374     mut_->lock();
375     requests_.push_back(sreq);
376     mut_->unlock();
377   }
378
379   XBT_DEBUG("Leaving MPI_Win_Accumulate");
380   return MPI_SUCCESS;
381 }
382
383 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
384                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
385                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
386 {
387   //get sender pointer
388   const Win* send_win = connected_wins_[target_rank];
389
390   if(opened_==0){//check that post/start has been done
391     // no fence or start .. lock ok ?
392     int locked=0;
393     for (auto const& it : send_win->lockers_)
394       if (it == comm_->rank())
395         locked = 1;
396     if(locked != 1)
397       return MPI_ERR_WIN;
398   }
399
400   if(target_count*target_datatype->get_extent()>send_win->size_){
401     XBT_WARN("Trying to get_accumulate more than the window size - Bailing out.");
402     return MPI_ERR_RMA_RANGE;
403   }
404
405
406   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
407   //need to be sure ops are correctly ordered, so finish request here ? slow.
408   MPI_Request req;
409   send_win->atomic_mut_->lock();
410   get(result_addr, result_count, result_datatype, target_rank,
411               target_disp, target_count, target_datatype, &req);
412   if (req != MPI_REQUEST_NULL)
413     Request::wait(&req, MPI_STATUS_IGNORE);
414   if(op!=MPI_NO_OP)
415     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
416               target_disp, target_count, target_datatype, op, &req);
417   if (req != MPI_REQUEST_NULL)
418     Request::wait(&req, MPI_STATUS_IGNORE);
419   send_win->atomic_mut_->unlock();
420   return MPI_SUCCESS;
421 }
422
423 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
424                           int target_rank, MPI_Aint target_disp)
425 {
426   //get sender pointer
427   const Win* send_win = connected_wins_[target_rank];
428
429   if(opened_==0){//check that post/start has been done
430     // no fence or start .. lock ok ?
431     int locked=0;
432     for (auto const& it : send_win->lockers_)
433       if (it == comm_->rank())
434         locked = 1;
435     if(locked != 1)
436       return MPI_ERR_WIN;
437   }
438
439   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
440   MPI_Request req = MPI_REQUEST_NULL;
441   send_win->atomic_mut_->lock();
442   get(result_addr, 1, datatype, target_rank,
443               target_disp, 1, datatype, &req);
444   if (req != MPI_REQUEST_NULL)
445     Request::wait(&req, MPI_STATUS_IGNORE);
446   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
447     put(origin_addr, 1, datatype, target_rank,
448               target_disp, 1, datatype);
449   }
450   send_win->atomic_mut_->unlock();
451   return MPI_SUCCESS;
452 }
453
454 int Win::start(MPI_Group group, int /*assert*/)
455 {
456   /* From MPI forum advices
457   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
458   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
459   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
460   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
461   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
462   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
463   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
464   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
465   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
466   must complete, without further dependencies.  */
467
468   //naive, blocking implementation.
469   int i             = 0;
470   int j             = 0;
471   int size          = group->size();
472   std::vector<MPI_Request> reqs(size);
473
474   XBT_DEBUG("Entering MPI_Win_Start");
475   while (j != size) {
476     int src = comm_->group()->rank(group->actor(j));
477     if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
478       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
479       i++;
480     }
481     j++;
482   }
483   size = i;
484   Request::startall(size, reqs.data());
485   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
486   for (i = 0; i < size; i++) {
487     Request::unref(&reqs[i]);
488   }
489   opened_++; //we're open for business !
490   group_=group;
491   group->ref();
492   XBT_DEBUG("Leaving MPI_Win_Start");
493   return MPI_SUCCESS;
494 }
495
496 int Win::post(MPI_Group group, int /*assert*/)
497 {
498   //let's make a synchronous send here
499   int i             = 0;
500   int j             = 0;
501   int size = group->size();
502   std::vector<MPI_Request> reqs(size);
503
504   XBT_DEBUG("Entering MPI_Win_Post");
505   while(j!=size){
506     int dst = comm_->group()->rank(group->actor(j));
507     if (dst != rank_ && dst != MPI_UNDEFINED) {
508       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
509       i++;
510     }
511     j++;
512   }
513   size=i;
514
515   Request::startall(size, reqs.data());
516   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
517   for(i=0;i<size;i++){
518     Request::unref(&reqs[i]);
519   }
520   opened_++; //we're open for business !
521   group_=group;
522   group->ref();
523   XBT_DEBUG("Leaving MPI_Win_Post");
524   return MPI_SUCCESS;
525 }
526
527 int Win::complete(){
528   xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
529
530   XBT_DEBUG("Entering MPI_Win_Complete");
531   int i             = 0;
532   int j             = 0;
533   int size          = group_->size();
534   std::vector<MPI_Request> reqs(size);
535
536   while(j!=size){
537     int dst = comm_->group()->rank(group_->actor(j));
538     if (dst != rank_ && dst != MPI_UNDEFINED) {
539       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
540       i++;
541     }
542     j++;
543   }
544   size=i;
545   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
546   Request::startall(size, reqs.data());
547   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
548
549   for(i=0;i<size;i++){
550     Request::unref(&reqs[i]);
551   }
552
553   int finished = finish_comms();
554   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
555
556   Group::unref(group_);
557   opened_--; //we're closed for business !
558   return MPI_SUCCESS;
559 }
560
561 int Win::wait(){
562   //naive, blocking implementation.
563   XBT_DEBUG("Entering MPI_Win_Wait");
564   int i             = 0;
565   int j             = 0;
566   int size          = group_->size();
567   std::vector<MPI_Request> reqs(size);
568
569   while(j!=size){
570     int src = comm_->group()->rank(group_->actor(j));
571     if (src != rank_ && src != MPI_UNDEFINED) {
572       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
573       i++;
574     }
575     j++;
576   }
577   size=i;
578   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
579   Request::startall(size, reqs.data());
580   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
581   for(i=0;i<size;i++){
582     Request::unref(&reqs[i]);
583   }
584   int finished = finish_comms();
585   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
586
587   Group::unref(group_);
588   opened_--; //we're opened for business !
589   return MPI_SUCCESS;
590 }
591
592 int Win::lock(int lock_type, int rank, int /*assert*/)
593 {
594   MPI_Win target_win = connected_wins_[rank];
595
596   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
597     target_win->lock_mut_->lock();
598     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
599     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
600       target_win->lock_mut_->unlock();
601    }
602   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
603     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
604
605   target_win->lockers_.push_back(comm_->rank());
606
607   int finished = finish_comms(rank);
608   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
609   finished = target_win->finish_comms(rank_);
610   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
611   return MPI_SUCCESS;
612 }
613
614 int Win::lock_all(int assert){
615   int retval = MPI_SUCCESS;
616   for (int i = 0; i < comm_->size(); i++) {
617     int ret = this->lock(MPI_LOCK_SHARED, i, assert);
618     if (ret != MPI_SUCCESS)
619       retval = ret;
620   }
621   return retval;
622 }
623
624 int Win::unlock(int rank){
625   MPI_Win target_win = connected_wins_[rank];
626   int target_mode = target_win->mode_;
627   target_win->mode_= 0;
628   target_win->lockers_.remove(comm_->rank());
629   if (target_mode==MPI_LOCK_EXCLUSIVE){
630     target_win->lock_mut_->unlock();
631   }
632
633   int finished = finish_comms(rank);
634   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
635   finished = target_win->finish_comms(rank_);
636   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
637   return MPI_SUCCESS;
638 }
639
640 int Win::unlock_all(){
641   int retval = MPI_SUCCESS;
642   for (int i = 0; i < comm_->size(); i++) {
643     int ret = this->unlock(i);
644     if (ret != MPI_SUCCESS)
645       retval = ret;
646   }
647   return retval;
648 }
649
650 int Win::flush(int rank){
651   MPI_Win target_win = connected_wins_[rank];
652   int finished       = finish_comms(rank_);
653   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
654   finished = target_win->finish_comms(rank);
655   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
656   return MPI_SUCCESS;
657 }
658
659 int Win::flush_local(int rank){
660   int finished = finish_comms(rank);
661   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
662   return MPI_SUCCESS;
663 }
664
665 int Win::flush_all(){
666   int finished = finish_comms();
667   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
668   for (int i = 0; i < comm_->size(); i++) {
669     finished = connected_wins_[i]->finish_comms(rank_);
670     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
671   }
672   return MPI_SUCCESS;
673 }
674
675 int Win::flush_local_all(){
676   int finished = finish_comms();
677   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
678   return MPI_SUCCESS;
679 }
680
681 Win* Win::f2c(int id){
682   return static_cast<Win*>(F2C::f2c(id));
683 }
684
685 int Win::finish_comms(){
686   mut_->lock();
687   //Finish own requests
688   int size = static_cast<int>(requests_.size());
689   if (size > 0) {
690     MPI_Request* treqs = requests_.data();
691     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
692     requests_.clear();
693   }
694   mut_->unlock();
695   return size;
696 }
697
698 int Win::finish_comms(int rank){
699   mut_->lock();
700   // Finish own requests
701   // Let's see if we're either the destination or the sender of this request
702   // because we only wait for requests that we are responsible for.
703   // Also use the process id here since the request itself returns from src()
704   // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
705   int proc_id = comm_->group()->actor(rank)->get_pid();
706   auto it     = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
707     return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
708   });
709   std::vector<MPI_Request> myreqqs(it, end(requests_));
710   requests_.erase(it, end(requests_));
711   int size = static_cast<int>(myreqqs.size());
712   if (size > 0) {
713     MPI_Request* treqs = myreqqs.data();
714     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
715     myreqqs.clear();
716   }
717   mut_->unlock();
718   return size;
719 }
720
721 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
722 {
723   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
724   for (int i = 0; not target_win && i < comm_->size(); i++) {
725     if (connected_wins_[i]->size_ > 0)
726       target_win = connected_wins_[i];
727   }
728   if (target_win) {
729     *size                         = target_win->size_;
730     *disp_unit                    = target_win->disp_unit_;
731     *static_cast<void**>(baseptr) = target_win->base_;
732   } else {
733     *size                         = 0;
734     *static_cast<void**>(baseptr) = nullptr;
735   }
736   return MPI_SUCCESS;
737 }
738
739 MPI_Errhandler Win::errhandler()
740 {
741   if (errhandler_ != MPI_ERRHANDLER_NULL)
742     errhandler_->ref();
743   return errhandler_;
744 }
745
746 void Win::set_errhandler(MPI_Errhandler errhandler)
747 {
748   if (errhandler_ != MPI_ERRHANDLER_NULL)
749     simgrid::smpi::Errhandler::unref(errhandler_);
750   errhandler_ = errhandler;
751   if (errhandler_ != MPI_ERRHANDLER_NULL)
752     errhandler_->ref();
753 }
754 } // namespace smpi
755 } // namespace simgrid