Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'Adrien.Gougeon/simgrid-master'
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2020. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7
8 #include "private.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
16
17 #include <algorithm>
18
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
20
21
22 namespace simgrid{
23 namespace smpi{
24 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
25 int Win::keyval_id_=0;
26
27 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic)
28     : base_(base)
29     , size_(size)
30     , disp_unit_(disp_unit)
31     , info_(info)
32     , comm_(comm)
33     , connected_wins_(comm->size())
34     , rank_(comm->rank())
35     , allocated_(allocated)
36     , dynamic_(dynamic)
37 {
38   XBT_DEBUG("Creating window");
39   if(info!=MPI_INFO_NULL)
40     info->ref();
41   connected_wins_[rank_] = this;
42   if(rank_==0){
43     bar_ = new s4u::Barrier(comm->size());
44   }
45   errhandler_->ref();
46   comm->add_rma_win(this);
47   comm->ref();
48
49   colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
50                    MPI_BYTE, comm);
51
52   colls::bcast(&(bar_), sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
53
54   colls::barrier(comm);
55 }
56
57 Win::~Win(){
58   //As per the standard, perform a barrier to ensure every async comm is finished
59   bar_->wait();
60
61   int finished = finish_comms();
62   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
63
64   if (info_ != MPI_INFO_NULL)
65     simgrid::smpi::Info::unref(info_);
66   if (errhandler_ != MPI_ERRHANDLER_NULL)
67     simgrid::smpi::Errhandler::unref(errhandler_);
68
69   comm_->remove_rma_win(this);
70
71   colls::barrier(comm_);
72   Comm::unref(comm_);
73   
74   if (rank_ == 0)
75     delete bar_;
76
77   if(allocated_ !=0)
78     xbt_free(base_);
79
80   cleanup_attr<Win>();
81 }
82
83 int Win::attach(void* /*base*/, MPI_Aint size)
84 {
85   if (not(base_ == MPI_BOTTOM || base_ == nullptr))
86     return MPI_ERR_ARG;
87   base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
88   size_+=size;
89   return MPI_SUCCESS;
90 }
91
92 int Win::detach(const void* /*base*/)
93 {
94   base_=MPI_BOTTOM;
95   size_=-1;
96   return MPI_SUCCESS;
97 }
98
99 void Win::get_name(char* name, int* length) const
100 {
101   *length = static_cast<int>(name_.length());
102   if (not name_.empty()) {
103     name_.copy(name, *length);
104     name[*length] = '\0';
105   }
106 }
107
108 void Win::get_group(MPI_Group* group){
109   if(comm_ != MPI_COMM_NULL){
110     *group = comm_->group();
111   } else {
112     *group = MPI_GROUP_NULL;
113   }
114 }
115
116 MPI_Info Win::info()
117 {
118   if (info_ == MPI_INFO_NULL)
119     info_ = new Info();
120   info_->ref();
121   return info_;
122 }
123
124 int Win::rank() const
125 {
126   return rank_;
127 }
128
129 MPI_Aint Win::size() const
130 {
131   return size_;
132 }
133
134 void* Win::base() const
135 {
136   return base_;
137 }
138
139 int Win::disp_unit() const
140 {
141   return disp_unit_;
142 }
143
144 int Win::dynamic() const
145 {
146   return dynamic_;
147 }
148
149 void Win::set_info(MPI_Info info)
150 {
151   if (info_ != MPI_INFO_NULL)
152     simgrid::smpi::Info::unref(info_);
153   info_ = info;
154   if (info_ != MPI_INFO_NULL)
155     info_->ref();
156 }
157
158 void Win::set_name(const char* name){
159   name_ = name;
160 }
161
162 int Win::fence(int assert)
163 {
164   XBT_DEBUG("Entering fence");
165   if (opened_ == 0)
166     opened_=1;
167   if (assert != MPI_MODE_NOPRECEDE) {
168     // This is not the first fence => finalize what came before
169     bar_->wait();
170     mut_->lock();
171     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
172     // Without this, the vector could get redimensioned when another process pushes.
173     // This would result in the array used by Request::waitall() to be invalidated.
174     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
175
176     // start all requests that have been prepared by another process
177     if (not requests_.empty()) {
178       int size           = static_cast<int>(requests_.size());
179       MPI_Request* treqs = requests_.data();
180       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
181     }
182     count_=0;
183     mut_->unlock();
184   }
185
186   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
187     opened_=0;
188   assert_ = assert;
189
190   bar_->wait();
191   XBT_DEBUG("Leaving fence");
192
193   return MPI_SUCCESS;
194 }
195
196 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
197               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
198 {
199   //get receiver pointer
200   Win* recv_win = connected_wins_[target_rank];
201
202   if(opened_==0){//check that post/start has been done
203     // no fence or start .. lock ok ?
204     int locked=0;
205     for (auto const& it : recv_win->lockers_)
206       if (it == comm_->rank())
207         locked = 1;
208     if(locked != 1)
209       return MPI_ERR_WIN;
210   }
211
212   if(target_count*target_datatype->get_extent()>recv_win->size_)
213     return MPI_ERR_ARG;
214
215   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
216
217   if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
218     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
219     // prepare send_request
220     MPI_Request sreq =
221         // TODO cheinrich Check for rank / pid conversion
222         Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
223                                comm_, MPI_OP_NULL);
224
225     //prepare receiver request
226     // TODO cheinrich Check for rank / pid conversion
227     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
228                                               target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
229
230     //start send
231     sreq->start();
232
233     if(request!=nullptr){
234       *request=sreq;
235     }else{
236       mut_->lock();
237       requests_.push_back(sreq);
238       mut_->unlock();
239     }
240
241     //push request to receiver's win
242     recv_win->mut_->lock();
243     recv_win->requests_.push_back(rreq);
244     rreq->start();
245     recv_win->mut_->unlock();
246   } else {
247     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
248     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
249     if(request!=nullptr)
250       *request = MPI_REQUEST_NULL;
251   }
252
253   return MPI_SUCCESS;
254 }
255
256 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
257               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
258 {
259   //get sender pointer
260   Win* send_win = connected_wins_[target_rank];
261
262   if(opened_==0){//check that post/start has been done
263     // no fence or start .. lock ok ?
264     int locked=0;
265     for (auto const& it : send_win->lockers_)
266       if (it == comm_->rank())
267         locked = 1;
268     if(locked != 1)
269       return MPI_ERR_WIN;
270   }
271
272   if(target_count*target_datatype->get_extent()>send_win->size_)
273     return MPI_ERR_ARG;
274
275   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
276   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
277
278   if(target_rank != comm_->rank()){
279     //prepare send_request
280     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
281                                               send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
282
283     //prepare receiver request
284     MPI_Request rreq = Request::rma_recv_init(
285         origin_addr, origin_count, origin_datatype, target_rank,
286         comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
287         SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
288
289     //start the send, with another process than us as sender.
290     sreq->start();
291     //push request to receiver's win
292     send_win->mut_->lock();
293     send_win->requests_.push_back(sreq);
294     send_win->mut_->unlock();
295
296     //start recv
297     rreq->start();
298
299     if(request!=nullptr){
300       *request=rreq;
301     }else{
302       mut_->lock();
303       requests_.push_back(rreq);
304       mut_->unlock();
305     }
306   } else {
307     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
308     if(request!=nullptr)
309       *request=MPI_REQUEST_NULL;
310   }
311   return MPI_SUCCESS;
312 }
313
314 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
315               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
316 {
317   XBT_DEBUG("Entering MPI_Win_Accumulate");
318   //get receiver pointer
319   Win* recv_win = connected_wins_[target_rank];
320
321   if(opened_==0){//check that post/start has been done
322     // no fence or start .. lock ok ?
323     int locked=0;
324     for (auto const& it : recv_win->lockers_)
325       if (it == comm_->rank())
326         locked = 1;
327     if(locked != 1)
328       return MPI_ERR_WIN;
329   }
330   //FIXME: local version
331
332   if(target_count*target_datatype->get_extent()>recv_win->size_)
333     return MPI_ERR_ARG;
334
335   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
336   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
337   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
338   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
339   // prepare send_request
340
341   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
342                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
343
344   // prepare receiver request
345   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
346                                             recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
347
348   count_++;
349
350   // start send
351   sreq->start();
352   // push request to receiver's win
353   recv_win->mut_->lock();
354   recv_win->requests_.push_back(rreq);
355   rreq->start();
356   recv_win->mut_->unlock();
357
358   if (request != nullptr) {
359     *request = sreq;
360   } else {
361     mut_->lock();
362     requests_.push_back(sreq);
363     mut_->unlock();
364   }
365
366   XBT_DEBUG("Leaving MPI_Win_Accumulate");
367   return MPI_SUCCESS;
368 }
369
370 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
371                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
372                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
373 {
374   //get sender pointer
375   const Win* send_win = connected_wins_[target_rank];
376
377   if(opened_==0){//check that post/start has been done
378     // no fence or start .. lock ok ?
379     int locked=0;
380     for (auto const& it : send_win->lockers_)
381       if (it == comm_->rank())
382         locked = 1;
383     if(locked != 1)
384       return MPI_ERR_WIN;
385   }
386
387   if(target_count*target_datatype->get_extent()>send_win->size_)
388     return MPI_ERR_ARG;
389
390   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
391   //need to be sure ops are correctly ordered, so finish request here ? slow.
392   MPI_Request req;
393   send_win->atomic_mut_->lock();
394   get(result_addr, result_count, result_datatype, target_rank,
395               target_disp, target_count, target_datatype, &req);
396   if (req != MPI_REQUEST_NULL)
397     Request::wait(&req, MPI_STATUS_IGNORE);
398   if(op!=MPI_NO_OP)
399     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
400               target_disp, target_count, target_datatype, op, &req);
401   if (req != MPI_REQUEST_NULL)
402     Request::wait(&req, MPI_STATUS_IGNORE);
403   send_win->atomic_mut_->unlock();
404   return MPI_SUCCESS;
405 }
406
407 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
408                           int target_rank, MPI_Aint target_disp)
409 {
410   //get sender pointer
411   const Win* send_win = connected_wins_[target_rank];
412
413   if(opened_==0){//check that post/start has been done
414     // no fence or start .. lock ok ?
415     int locked=0;
416     for (auto const& it : send_win->lockers_)
417       if (it == comm_->rank())
418         locked = 1;
419     if(locked != 1)
420       return MPI_ERR_WIN;
421   }
422
423   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
424   MPI_Request req = MPI_REQUEST_NULL;
425   send_win->atomic_mut_->lock();
426   get(result_addr, 1, datatype, target_rank,
427               target_disp, 1, datatype, &req);
428   if (req != MPI_REQUEST_NULL)
429     Request::wait(&req, MPI_STATUS_IGNORE);
430   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
431     put(origin_addr, 1, datatype, target_rank,
432               target_disp, 1, datatype);
433   }
434   send_win->atomic_mut_->unlock();
435   return MPI_SUCCESS;
436 }
437
438 int Win::start(MPI_Group group, int /*assert*/)
439 {
440   /* From MPI forum advices
441   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
442   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
443   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
444   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
445   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
446   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
447   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
448   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
449   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
450   must complete, without further dependencies.  */
451
452   //naive, blocking implementation.
453   int i             = 0;
454   int j             = 0;
455   int size          = group->size();
456   std::vector<MPI_Request> reqs(size);
457
458   XBT_DEBUG("Entering MPI_Win_Start");
459   while (j != size) {
460     int src = comm_->group()->rank(group->actor(j));
461     if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
462       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
463       i++;
464     }
465     j++;
466   }
467   size = i;
468   Request::startall(size, reqs.data());
469   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
470   for (i = 0; i < size; i++) {
471     Request::unref(&reqs[i]);
472   }
473   opened_++; //we're open for business !
474   group_=group;
475   group->ref();
476   XBT_DEBUG("Leaving MPI_Win_Start");
477   return MPI_SUCCESS;
478 }
479
480 int Win::post(MPI_Group group, int /*assert*/)
481 {
482   //let's make a synchronous send here
483   int i             = 0;
484   int j             = 0;
485   int size = group->size();
486   std::vector<MPI_Request> reqs(size);
487
488   XBT_DEBUG("Entering MPI_Win_Post");
489   while(j!=size){
490     int dst = comm_->group()->rank(group->actor(j));
491     if (dst != rank_ && dst != MPI_UNDEFINED) {
492       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
493       i++;
494     }
495     j++;
496   }
497   size=i;
498
499   Request::startall(size, reqs.data());
500   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
501   for(i=0;i<size;i++){
502     Request::unref(&reqs[i]);
503   }
504   opened_++; //we're open for business !
505   group_=group;
506   group->ref();
507   XBT_DEBUG("Leaving MPI_Win_Post");
508   return MPI_SUCCESS;
509 }
510
511 int Win::complete(){
512   if(opened_==0)
513     xbt_die("Complete called on already opened MPI_Win");
514
515   XBT_DEBUG("Entering MPI_Win_Complete");
516   int i             = 0;
517   int j             = 0;
518   int size          = group_->size();
519   std::vector<MPI_Request> reqs(size);
520
521   while(j!=size){
522     int dst = comm_->group()->rank(group_->actor(j));
523     if (dst != rank_ && dst != MPI_UNDEFINED) {
524       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
525       i++;
526     }
527     j++;
528   }
529   size=i;
530   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
531   Request::startall(size, reqs.data());
532   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
533
534   for(i=0;i<size;i++){
535     Request::unref(&reqs[i]);
536   }
537
538   int finished = finish_comms();
539   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
540
541   Group::unref(group_);
542   opened_--; //we're closed for business !
543   return MPI_SUCCESS;
544 }
545
546 int Win::wait(){
547   //naive, blocking implementation.
548   XBT_DEBUG("Entering MPI_Win_Wait");
549   int i             = 0;
550   int j             = 0;
551   int size          = group_->size();
552   std::vector<MPI_Request> reqs(size);
553
554   while(j!=size){
555     int src = comm_->group()->rank(group_->actor(j));
556     if (src != rank_ && src != MPI_UNDEFINED) {
557       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
558       i++;
559     }
560     j++;
561   }
562   size=i;
563   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
564   Request::startall(size, reqs.data());
565   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
566   for(i=0;i<size;i++){
567     Request::unref(&reqs[i]);
568   }
569   int finished = finish_comms();
570   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
571
572   Group::unref(group_);
573   opened_--; //we're opened for business !
574   return MPI_SUCCESS;
575 }
576
577 int Win::lock(int lock_type, int rank, int /*assert*/)
578 {
579   MPI_Win target_win = connected_wins_[rank];
580
581   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
582     target_win->lock_mut_->lock();
583     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
584     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
585       target_win->lock_mut_->unlock();
586    }
587   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
588     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
589
590   target_win->lockers_.push_back(comm_->rank());
591
592   int finished = finish_comms(rank);
593   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
594   finished = target_win->finish_comms(rank_);
595   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
596   return MPI_SUCCESS;
597 }
598
599 int Win::lock_all(int assert){
600   int retval = MPI_SUCCESS;
601   for (int i = 0; i < comm_->size(); i++) {
602     int ret = this->lock(MPI_LOCK_SHARED, i, assert);
603     if (ret != MPI_SUCCESS)
604       retval = ret;
605   }
606   return retval;
607 }
608
609 int Win::unlock(int rank){
610   MPI_Win target_win = connected_wins_[rank];
611   int target_mode = target_win->mode_;
612   target_win->mode_= 0;
613   target_win->lockers_.remove(comm_->rank());
614   if (target_mode==MPI_LOCK_EXCLUSIVE){
615     target_win->lock_mut_->unlock();
616   }
617
618   int finished = finish_comms(rank);
619   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
620   finished = target_win->finish_comms(rank_);
621   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
622   return MPI_SUCCESS;
623 }
624
625 int Win::unlock_all(){
626   int retval = MPI_SUCCESS;
627   for (int i = 0; i < comm_->size(); i++) {
628     int ret = this->unlock(i);
629     if (ret != MPI_SUCCESS)
630       retval = ret;
631   }
632   return retval;
633 }
634
635 int Win::flush(int rank){
636   MPI_Win target_win = connected_wins_[rank];
637   int finished       = finish_comms(rank_);
638   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
639   finished = target_win->finish_comms(rank);
640   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
641   return MPI_SUCCESS;
642 }
643
644 int Win::flush_local(int rank){
645   int finished = finish_comms(rank);
646   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
647   return MPI_SUCCESS;
648 }
649
650 int Win::flush_all(){
651   int finished = finish_comms();
652   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
653   for (int i = 0; i < comm_->size(); i++) {
654     finished = connected_wins_[i]->finish_comms(rank_);
655     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
656   }
657   return MPI_SUCCESS;
658 }
659
660 int Win::flush_local_all(){
661   int finished = finish_comms();
662   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
663   return MPI_SUCCESS;
664 }
665
666 Win* Win::f2c(int id){
667   return static_cast<Win*>(F2C::f2c(id));
668 }
669
670 int Win::finish_comms(){
671   mut_->lock();
672   //Finish own requests
673   int size = static_cast<int>(requests_.size());
674   if (size > 0) {
675     MPI_Request* treqs = requests_.data();
676     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
677     requests_.clear();
678   }
679   mut_->unlock();
680   return size;
681 }
682
683 int Win::finish_comms(int rank){
684   mut_->lock();
685   // Finish own requests
686   // Let's see if we're either the destination or the sender of this request
687   // because we only wait for requests that we are responsible for.
688   // Also use the process id here since the request itself returns from src()
689   // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
690   int proc_id = comm_->group()->actor(rank)->get_pid();
691   auto it     = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
692     return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
693   });
694   std::vector<MPI_Request> myreqqs(it, end(requests_));
695   requests_.erase(it, end(requests_));
696   int size = static_cast<int>(myreqqs.size());
697   if (size > 0) {
698     MPI_Request* treqs = myreqqs.data();
699     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
700     myreqqs.clear();
701   }
702   mut_->unlock();
703   return size;
704 }
705
706 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
707 {
708   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
709   for (int i = 0; not target_win && i < comm_->size(); i++) {
710     if (connected_wins_[i]->size_ > 0)
711       target_win = connected_wins_[i];
712   }
713   if (target_win) {
714     *size                         = target_win->size_;
715     *disp_unit                    = target_win->disp_unit_;
716     *static_cast<void**>(baseptr) = target_win->base_;
717   } else {
718     *size                         = 0;
719     *static_cast<void**>(baseptr) = nullptr;
720   }
721   return MPI_SUCCESS;
722 }
723
724 MPI_Errhandler Win::errhandler()
725 {
726   if (errhandler_ != MPI_ERRHANDLER_NULL)
727     errhandler_->ref();
728   return errhandler_;
729 }
730
731 void Win::set_errhandler(MPI_Errhandler errhandler)
732 {
733   if (errhandler_ != MPI_ERRHANDLER_NULL)
734     simgrid::smpi::Errhandler::unref(errhandler_);
735   errhandler_ = errhandler;
736   if (errhandler_ != MPI_ERRHANDLER_NULL)
737     errhandler_->ref();
738 }
739 } // namespace smpi
740 } // namespace simgrid