Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Little simplifications in loops.
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7
8 #include "private.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
16
17 #include <algorithm>
18
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
20
21 #define CHECK_RMA_REMOTE_WIN(fun, win)\
22   if(target_count*target_datatype->get_extent()>win->size_){\
23     XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
24     fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
25     simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
26     return MPI_ERR_RMA_RANGE;\
27   }
28
29 #define CHECK_WIN_LOCKED(win)                                                                                          \
30   if (opened_ == 0) { /*check that post/start has been done*/                                                          \
31     bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
32     if (not locked)                                                                                                    \
33       return MPI_ERR_WIN;                                                                                              \
34   }
35
36 namespace simgrid{
37 namespace smpi{
38 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
39 int Win::keyval_id_=0;
40
41 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
42     : base_(base)
43     , size_(size)
44     , disp_unit_(disp_unit)
45     , info_(info)
46     , comm_(comm)
47     , connected_wins_(comm->size())
48     , rank_(comm->rank())
49     , allocated_(allocated)
50     , dynamic_(dynamic)
51 {
52   XBT_DEBUG("Creating window");
53   if(info!=MPI_INFO_NULL)
54     info->ref();
55   connected_wins_[rank_] = this;
56   if(rank_==0){
57     bar_ = new s4u::Barrier(comm->size());
58   }
59   errhandler_->ref();
60   comm->add_rma_win(this);
61   comm->ref();
62
63   colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
64                    MPI_BYTE, comm);
65
66   colls::bcast(&bar_, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
67
68   colls::barrier(comm);
69   this->add_f();
70 }
71
72 Win::~Win(){
73   //As per the standard, perform a barrier to ensure every async comm is finished
74   bar_->wait();
75
76   int finished = finish_comms();
77   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
78
79   if (info_ != MPI_INFO_NULL)
80     simgrid::smpi::Info::unref(info_);
81   if (errhandler_ != MPI_ERRHANDLER_NULL)
82     simgrid::smpi::Errhandler::unref(errhandler_);
83
84   comm_->remove_rma_win(this);
85
86   colls::barrier(comm_);
87   Comm::unref(comm_);
88   
89   if (rank_ == 0)
90     delete bar_;
91
92   if (allocated_)
93     xbt_free(base_);
94
95   F2C::free_f(this->f2c_id());
96   cleanup_attr<Win>();
97 }
98
99 int Win::attach(void* /*base*/, MPI_Aint size)
100 {
101   if (not(base_ == MPI_BOTTOM || base_ == nullptr))
102     return MPI_ERR_ARG;
103   base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
104   size_+=size;
105   return MPI_SUCCESS;
106 }
107
108 int Win::detach(const void* /*base*/)
109 {
110   base_=MPI_BOTTOM;
111   size_=-1;
112   return MPI_SUCCESS;
113 }
114
115 void Win::get_name(char* name, int* length) const
116 {
117   *length = static_cast<int>(name_.length());
118   if (not name_.empty()) {
119     name_.copy(name, *length);
120     name[*length] = '\0';
121   }
122 }
123
124 void Win::get_group(MPI_Group* group){
125   if(comm_ != MPI_COMM_NULL){
126     *group = comm_->group();
127   } else {
128     *group = MPI_GROUP_NULL;
129   }
130 }
131
132 MPI_Info Win::info()
133 {
134   if (info_ == MPI_INFO_NULL)
135     info_ = new Info();
136   info_->ref();
137   return info_;
138 }
139
140 int Win::rank() const
141 {
142   return rank_;
143 }
144
145 MPI_Comm Win::comm() const
146 {
147   return comm_;
148 }
149
150 MPI_Aint Win::size() const
151 {
152   return size_;
153 }
154
155 void* Win::base() const
156 {
157   return base_;
158 }
159
160 int Win::disp_unit() const
161 {
162   return disp_unit_;
163 }
164
165 bool Win::dynamic() const
166 {
167   return dynamic_;
168 }
169
170 void Win::set_info(MPI_Info info)
171 {
172   if (info_ != MPI_INFO_NULL)
173     simgrid::smpi::Info::unref(info_);
174   info_ = info;
175   if (info_ != MPI_INFO_NULL)
176     info_->ref();
177 }
178
179 void Win::set_name(const char* name){
180   name_ = name;
181 }
182
183 int Win::fence(int assert)
184 {
185   XBT_DEBUG("Entering fence");
186   if (opened_ == 0)
187     opened_=1;
188   if (not (assert & MPI_MODE_NOPRECEDE)) {
189     // This is not the first fence => finalize what came before
190     bar_->wait();
191     mut_->lock();
192     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
193     // Without this, the vector could get redimensioned when another process pushes.
194     // This would result in the array used by Request::waitall() to be invalidated.
195     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
196
197     // start all requests that have been prepared by another process
198     if (not requests_.empty()) {
199       int size           = static_cast<int>(requests_.size());
200       MPI_Request* treqs = requests_.data();
201       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
202     }
203     count_=0;
204     mut_->unlock();
205   }
206
207   if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
208     opened_=0;
209   assert_ = assert;
210
211   bar_->wait();
212   XBT_DEBUG("Leaving fence");
213
214   return MPI_SUCCESS;
215 }
216
217 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
218               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
219 {
220   //get receiver pointer
221   Win* recv_win = connected_wins_[target_rank];
222
223   CHECK_WIN_LOCKED(recv_win)
224   CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
225
226   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
227
228   if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
229     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
230     // prepare send_request
231     MPI_Request sreq =
232         // TODO cheinrich Check for rank / pid conversion
233         Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
234                                comm_, MPI_OP_NULL);
235
236     //prepare receiver request
237     // TODO cheinrich Check for rank / pid conversion
238     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
239                                               target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
240
241     //start send
242     sreq->start();
243
244     if(request!=nullptr){
245       *request=sreq;
246     }else{
247       mut_->lock();
248       requests_.push_back(sreq);
249       mut_->unlock();
250     }
251
252     //push request to receiver's win
253     recv_win->mut_->lock();
254     recv_win->requests_.push_back(rreq);
255     rreq->start();
256     recv_win->mut_->unlock();
257   } else {
258     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
259     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
260     if(request!=nullptr)
261       *request = MPI_REQUEST_NULL;
262   }
263
264   return MPI_SUCCESS;
265 }
266
267 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
268               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
269 {
270   //get sender pointer
271   Win* send_win = connected_wins_[target_rank];
272
273   CHECK_WIN_LOCKED(send_win)
274   CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
275
276   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
277   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
278
279   if(target_rank != comm_->rank()){
280     //prepare send_request
281     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
282                                               send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
283
284     //prepare receiver request
285     MPI_Request rreq = Request::rma_recv_init(
286         origin_addr, origin_count, origin_datatype, target_rank,
287         comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
288         SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
289
290     //start the send, with another process than us as sender.
291     sreq->start();
292     //push request to receiver's win
293     send_win->mut_->lock();
294     send_win->requests_.push_back(sreq);
295     send_win->mut_->unlock();
296
297     //start recv
298     rreq->start();
299
300     if(request!=nullptr){
301       *request=rreq;
302     }else{
303       mut_->lock();
304       requests_.push_back(rreq);
305       mut_->unlock();
306     }
307   } else {
308     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
309     if(request!=nullptr)
310       *request=MPI_REQUEST_NULL;
311   }
312   return MPI_SUCCESS;
313 }
314
315 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
316               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
317 {
318   XBT_DEBUG("Entering MPI_Win_Accumulate");
319   //get receiver pointer
320   Win* recv_win = connected_wins_[target_rank];
321
322   //FIXME: local version
323   CHECK_WIN_LOCKED(recv_win)
324   CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
325
326   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
327   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
328   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
329   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
330   // prepare send_request
331
332   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
333                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
334
335   // prepare receiver request
336   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
337                                             recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)),
338                                             SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
339
340   count_++;
341
342   // start send
343   sreq->start();
344   // push request to receiver's win
345   recv_win->mut_->lock();
346   recv_win->requests_.push_back(rreq);
347   rreq->start();
348   recv_win->mut_->unlock();
349
350   if (request != nullptr) {
351     *request = sreq;
352   } else {
353     mut_->lock();
354     requests_.push_back(sreq);
355     mut_->unlock();
356   }
357
358   XBT_DEBUG("Leaving MPI_Win_Accumulate");
359   return MPI_SUCCESS;
360 }
361
362 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
363                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
364                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
365 {
366   //get sender pointer
367   const Win* send_win = connected_wins_[target_rank];
368
369   CHECK_WIN_LOCKED(send_win)
370   CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
371
372   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
373   //need to be sure ops are correctly ordered, so finish request here ? slow.
374   MPI_Request req;
375   send_win->atomic_mut_->lock();
376   get(result_addr, result_count, result_datatype, target_rank,
377               target_disp, target_count, target_datatype, &req);
378   if (req != MPI_REQUEST_NULL)
379     Request::wait(&req, MPI_STATUS_IGNORE);
380   if(op!=MPI_NO_OP)
381     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
382               target_disp, target_count, target_datatype, op, &req);
383   if (req != MPI_REQUEST_NULL)
384     Request::wait(&req, MPI_STATUS_IGNORE);
385   send_win->atomic_mut_->unlock();
386   return MPI_SUCCESS;
387 }
388
389 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
390                           int target_rank, MPI_Aint target_disp)
391 {
392   //get sender pointer
393   const Win* send_win = connected_wins_[target_rank];
394
395   CHECK_WIN_LOCKED(send_win)
396
397   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
398   MPI_Request req = MPI_REQUEST_NULL;
399   send_win->atomic_mut_->lock();
400   get(result_addr, 1, datatype, target_rank,
401               target_disp, 1, datatype, &req);
402   if (req != MPI_REQUEST_NULL)
403     Request::wait(&req, MPI_STATUS_IGNORE);
404   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
405     put(origin_addr, 1, datatype, target_rank,
406               target_disp, 1, datatype);
407   }
408   send_win->atomic_mut_->unlock();
409   return MPI_SUCCESS;
410 }
411
412 int Win::start(MPI_Group group, int /*assert*/)
413 {
414   /* From MPI forum advices
415   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
416   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
417   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
418   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
419   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
420   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
421   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
422   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
423   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
424   must complete, without further dependencies.  */
425
426   //naive, blocking implementation.
427   XBT_DEBUG("Entering MPI_Win_Start");
428   std::vector<MPI_Request> reqs;
429   for (int i = 0; i < group->size(); i++) {
430     int src = comm_->group()->rank(group->actor(i));
431     xbt_assert(src != MPI_UNDEFINED);
432     if (src != rank_)
433       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
434   }
435   int size = static_cast<int>(reqs.size());
436
437   Request::startall(size, reqs.data());
438   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
439   for (auto& req : reqs)
440     Request::unref(&req);
441
442   group->ref();
443   dst_group_ = group;
444   opened_++; // we're open for business !
445   XBT_DEBUG("Leaving MPI_Win_Start");
446   return MPI_SUCCESS;
447 }
448
449 int Win::post(MPI_Group group, int /*assert*/)
450 {
451   //let's make a synchronous send here
452   XBT_DEBUG("Entering MPI_Win_Post");
453   std::vector<MPI_Request> reqs;
454   for (int i = 0; i < group->size(); i++) {
455     int dst = comm_->group()->rank(group->actor(i));
456     xbt_assert(dst != MPI_UNDEFINED);
457     if (dst != rank_)
458       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
459   }
460   int size = static_cast<int>(reqs.size());
461
462   Request::startall(size, reqs.data());
463   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
464   for (auto& req : reqs)
465     Request::unref(&req);
466
467   group->ref();
468   src_group_ = group;
469   opened_++; // we're open for business !
470   XBT_DEBUG("Leaving MPI_Win_Post");
471   return MPI_SUCCESS;
472 }
473
474 int Win::complete(){
475   xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
476
477   XBT_DEBUG("Entering MPI_Win_Complete");
478   std::vector<MPI_Request> reqs;
479   for (int i = 0; i < dst_group_->size(); i++) {
480     int dst = comm_->group()->rank(dst_group_->actor(i));
481     xbt_assert(dst != MPI_UNDEFINED);
482     if (dst != rank_)
483       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
484   }
485   int size = static_cast<int>(reqs.size());
486
487   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
488   Request::startall(size, reqs.data());
489   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
490   for (auto& req : reqs)
491     Request::unref(&req);
492
493   int finished = finish_comms();
494   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
495
496   opened_--; //we're closed for business !
497   Group::unref(dst_group_);
498   dst_group_ = MPI_GROUP_NULL;
499   return MPI_SUCCESS;
500 }
501
502 int Win::wait(){
503   //naive, blocking implementation.
504   XBT_DEBUG("Entering MPI_Win_Wait");
505   std::vector<MPI_Request> reqs;
506   for (int i = 0; i < src_group_->size(); i++) {
507     int src = comm_->group()->rank(src_group_->actor(i));
508     xbt_assert(src != MPI_UNDEFINED);
509     if (src != rank_)
510       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
511   }
512   int size = static_cast<int>(reqs.size());
513
514   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
515   Request::startall(size, reqs.data());
516   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
517   for (auto& req : reqs)
518     Request::unref(&req);
519
520   int finished = finish_comms();
521   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
522
523   opened_--; //we're closed for business !
524   Group::unref(src_group_);
525   src_group_ = MPI_GROUP_NULL;
526   return MPI_SUCCESS;
527 }
528
529 int Win::lock(int lock_type, int rank, int /*assert*/)
530 {
531   MPI_Win target_win = connected_wins_[rank];
532
533   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
534     target_win->lock_mut_->lock();
535     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
536     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
537       target_win->lock_mut_->unlock();
538    }
539   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
540     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
541
542   target_win->lockers_.push_back(comm_->rank());
543
544   int finished = finish_comms(rank);
545   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
546   finished = target_win->finish_comms(rank_);
547   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
548   return MPI_SUCCESS;
549 }
550
551 int Win::lock_all(int assert){
552   int retval = MPI_SUCCESS;
553   for (int i = 0; i < comm_->size(); i++) {
554     int ret = this->lock(MPI_LOCK_SHARED, i, assert);
555     if (ret != MPI_SUCCESS)
556       retval = ret;
557   }
558   return retval;
559 }
560
561 int Win::unlock(int rank){
562   MPI_Win target_win = connected_wins_[rank];
563   int target_mode = target_win->mode_;
564   target_win->mode_= 0;
565   target_win->lockers_.remove(comm_->rank());
566   if (target_mode==MPI_LOCK_EXCLUSIVE){
567     target_win->lock_mut_->unlock();
568   }
569
570   int finished = finish_comms(rank);
571   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
572   finished = target_win->finish_comms(rank_);
573   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
574   return MPI_SUCCESS;
575 }
576
577 int Win::unlock_all(){
578   int retval = MPI_SUCCESS;
579   for (int i = 0; i < comm_->size(); i++) {
580     int ret = this->unlock(i);
581     if (ret != MPI_SUCCESS)
582       retval = ret;
583   }
584   return retval;
585 }
586
587 int Win::flush(int rank){
588   MPI_Win target_win = connected_wins_[rank];
589   int finished       = finish_comms(rank_);
590   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
591   finished = target_win->finish_comms(rank);
592   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
593   return MPI_SUCCESS;
594 }
595
596 int Win::flush_local(int rank){
597   int finished = finish_comms(rank);
598   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
599   return MPI_SUCCESS;
600 }
601
602 int Win::flush_all(){
603   int finished = finish_comms();
604   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
605   for (int i = 0; i < comm_->size(); i++) {
606     finished = connected_wins_[i]->finish_comms(rank_);
607     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
608   }
609   return MPI_SUCCESS;
610 }
611
612 int Win::flush_local_all(){
613   int finished = finish_comms();
614   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
615   return MPI_SUCCESS;
616 }
617
618 Win* Win::f2c(int id){
619   return static_cast<Win*>(F2C::f2c(id));
620 }
621
622 int Win::finish_comms(){
623   mut_->lock();
624   //Finish own requests
625   int size = static_cast<int>(requests_.size());
626   if (size > 0) {
627     MPI_Request* treqs = requests_.data();
628     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
629     requests_.clear();
630   }
631   mut_->unlock();
632   return size;
633 }
634
635 int Win::finish_comms(int rank){
636   mut_->lock();
637   // Finish own requests
638   // Let's see if we're either the destination or the sender of this request
639   // because we only wait for requests that we are responsible for.
640   // Also use the process id here since the request itself returns from src()
641   // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
642   aid_t proc_id = comm_->group()->actor(rank);
643   auto it     = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
644     return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
645   });
646   std::vector<MPI_Request> myreqqs(it, end(requests_));
647   requests_.erase(it, end(requests_));
648   int size = static_cast<int>(myreqqs.size());
649   if (size > 0) {
650     MPI_Request* treqs = myreqqs.data();
651     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
652     myreqqs.clear();
653   }
654   mut_->unlock();
655   return size;
656 }
657
658 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
659 {
660   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
661   for (int i = 0; not target_win && i < comm_->size(); i++) {
662     if (connected_wins_[i]->size_ > 0)
663       target_win = connected_wins_[i];
664   }
665   if (target_win) {
666     *size                         = target_win->size_;
667     *disp_unit                    = target_win->disp_unit_;
668     *static_cast<void**>(baseptr) = target_win->base_;
669   } else {
670     *size                         = 0;
671     *static_cast<void**>(baseptr) = nullptr;
672   }
673   return MPI_SUCCESS;
674 }
675
676 MPI_Errhandler Win::errhandler()
677 {
678   if (errhandler_ != MPI_ERRHANDLER_NULL)
679     errhandler_->ref();
680   return errhandler_;
681 }
682
683 void Win::set_errhandler(MPI_Errhandler errhandler)
684 {
685   if (errhandler_ != MPI_ERRHANDLER_NULL)
686     simgrid::smpi::Errhandler::unref(errhandler_);
687   errhandler_ = errhandler;
688   if (errhandler_ != MPI_ERRHANDLER_NULL)
689     errhandler_->ref();
690 }
691 } // namespace smpi
692 } // namespace simgrid