Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
appease 32 bits
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7
8 #include "private.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
16
17 #include <algorithm>
18
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
20
21
22 namespace simgrid{
23 namespace smpi{
24 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
25 int Win::keyval_id_=0;
26
27 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic)
28     : base_(base)
29     , size_(size)
30     , disp_unit_(disp_unit)
31     , info_(info)
32     , comm_(comm)
33     , connected_wins_(comm->size())
34     , rank_(comm->rank())
35     , allocated_(allocated)
36     , dynamic_(dynamic)
37 {
38   XBT_DEBUG("Creating window");
39   if(info!=MPI_INFO_NULL)
40     info->ref();
41   connected_wins_[rank_] = this;
42   if(rank_==0){
43     bar_ = new s4u::Barrier(comm->size());
44   }
45   errhandler_->ref();
46   comm->add_rma_win(this);
47   comm->ref();
48
49   colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
50                    MPI_BYTE, comm);
51
52   colls::bcast(&bar_, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
53
54   colls::barrier(comm);
55   this->add_f();
56 }
57
58 Win::~Win(){
59   //As per the standard, perform a barrier to ensure every async comm is finished
60   bar_->wait();
61
62   int finished = finish_comms();
63   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
64
65   if (info_ != MPI_INFO_NULL)
66     simgrid::smpi::Info::unref(info_);
67   if (errhandler_ != MPI_ERRHANDLER_NULL)
68     simgrid::smpi::Errhandler::unref(errhandler_);
69
70   comm_->remove_rma_win(this);
71
72   colls::barrier(comm_);
73   Comm::unref(comm_);
74   
75   if (rank_ == 0)
76     delete bar_;
77
78   if(allocated_ !=0)
79     xbt_free(base_);
80
81   F2C::free_f(this->f2c_id());
82   cleanup_attr<Win>();
83 }
84
85 int Win::attach(void* /*base*/, MPI_Aint size)
86 {
87   if (not(base_ == MPI_BOTTOM || base_ == nullptr))
88     return MPI_ERR_ARG;
89   base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
90   size_+=size;
91   return MPI_SUCCESS;
92 }
93
94 int Win::detach(const void* /*base*/)
95 {
96   base_=MPI_BOTTOM;
97   size_=-1;
98   return MPI_SUCCESS;
99 }
100
101 void Win::get_name(char* name, int* length) const
102 {
103   *length = static_cast<int>(name_.length());
104   if (not name_.empty()) {
105     name_.copy(name, *length);
106     name[*length] = '\0';
107   }
108 }
109
110 void Win::get_group(MPI_Group* group){
111   if(comm_ != MPI_COMM_NULL){
112     *group = comm_->group();
113   } else {
114     *group = MPI_GROUP_NULL;
115   }
116 }
117
118 MPI_Info Win::info()
119 {
120   if (info_ == MPI_INFO_NULL)
121     info_ = new Info();
122   info_->ref();
123   return info_;
124 }
125
126 int Win::rank() const
127 {
128   return rank_;
129 }
130
131 MPI_Comm Win::comm() const
132 {
133   return comm_;
134 }
135
136 MPI_Aint Win::size() const
137 {
138   return size_;
139 }
140
141 void* Win::base() const
142 {
143   return base_;
144 }
145
146 int Win::disp_unit() const
147 {
148   return disp_unit_;
149 }
150
151 int Win::dynamic() const
152 {
153   return dynamic_;
154 }
155
156 void Win::set_info(MPI_Info info)
157 {
158   if (info_ != MPI_INFO_NULL)
159     simgrid::smpi::Info::unref(info_);
160   info_ = info;
161   if (info_ != MPI_INFO_NULL)
162     info_->ref();
163 }
164
165 void Win::set_name(const char* name){
166   name_ = name;
167 }
168
169 int Win::fence(int assert)
170 {
171   XBT_DEBUG("Entering fence");
172   if (opened_ == 0)
173     opened_=1;
174   if (assert != MPI_MODE_NOPRECEDE) {
175     // This is not the first fence => finalize what came before
176     bar_->wait();
177     mut_->lock();
178     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
179     // Without this, the vector could get redimensioned when another process pushes.
180     // This would result in the array used by Request::waitall() to be invalidated.
181     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
182
183     // start all requests that have been prepared by another process
184     if (not requests_.empty()) {
185       int size           = static_cast<int>(requests_.size());
186       MPI_Request* treqs = requests_.data();
187       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
188     }
189     count_=0;
190     mut_->unlock();
191   }
192
193   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
194     opened_=0;
195   assert_ = assert;
196
197   bar_->wait();
198   XBT_DEBUG("Leaving fence");
199
200   return MPI_SUCCESS;
201 }
202
203 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
204               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
205 {
206   //get receiver pointer
207   Win* recv_win = connected_wins_[target_rank];
208
209   if(opened_==0){//check that post/start has been done
210     // no fence or start .. lock ok ?
211     int locked=0;
212     for (auto const& it : recv_win->lockers_)
213       if (it == comm_->rank())
214         locked = 1;
215     if(locked != 1)
216       return MPI_ERR_WIN;
217   }
218
219   if(target_count*target_datatype->get_extent()>recv_win->size_){
220     XBT_WARN("MPI_Put: Trying to put %zd, which is more than the window size on target process %d : %zd - Bailing out.",
221     target_count*target_datatype->get_extent(), target_rank, recv_win->size_);
222     return MPI_ERR_RMA_RANGE;
223   }
224
225   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
226
227   if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
228     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
229     // prepare send_request
230     MPI_Request sreq =
231         // TODO cheinrich Check for rank / pid conversion
232         Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
233                                comm_, MPI_OP_NULL);
234
235     //prepare receiver request
236     // TODO cheinrich Check for rank / pid conversion
237     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
238                                               target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
239
240     //start send
241     sreq->start();
242
243     if(request!=nullptr){
244       *request=sreq;
245     }else{
246       mut_->lock();
247       requests_.push_back(sreq);
248       mut_->unlock();
249     }
250
251     //push request to receiver's win
252     recv_win->mut_->lock();
253     recv_win->requests_.push_back(rreq);
254     rreq->start();
255     recv_win->mut_->unlock();
256   } else {
257     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
258     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
259     if(request!=nullptr)
260       *request = MPI_REQUEST_NULL;
261   }
262
263   return MPI_SUCCESS;
264 }
265
266 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
267               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
268 {
269   //get sender pointer
270   Win* send_win = connected_wins_[target_rank];
271
272   if(opened_==0){//check that post/start has been done
273     // no fence or start .. lock ok ?
274     int locked=0;
275     for (auto const& it : send_win->lockers_)
276       if (it == comm_->rank())
277         locked = 1;
278     if(locked != 1)
279       return MPI_ERR_WIN;
280   }
281
282   if(target_count*target_datatype->get_extent()>send_win->size_){
283     XBT_WARN("MPI_Get: Trying to get %zd, which is more than the window size on target process %d : %zd - Bailing out.",
284     target_count*target_datatype->get_extent(), target_rank, send_win->size_);
285     return MPI_ERR_RMA_RANGE;
286   }
287
288   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
289   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
290
291   if(target_rank != comm_->rank()){
292     //prepare send_request
293     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
294                                               send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
295
296     //prepare receiver request
297     MPI_Request rreq = Request::rma_recv_init(
298         origin_addr, origin_count, origin_datatype, target_rank,
299         comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
300         SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
301
302     //start the send, with another process than us as sender.
303     sreq->start();
304     //push request to receiver's win
305     send_win->mut_->lock();
306     send_win->requests_.push_back(sreq);
307     send_win->mut_->unlock();
308
309     //start recv
310     rreq->start();
311
312     if(request!=nullptr){
313       *request=rreq;
314     }else{
315       mut_->lock();
316       requests_.push_back(rreq);
317       mut_->unlock();
318     }
319   } else {
320     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
321     if(request!=nullptr)
322       *request=MPI_REQUEST_NULL;
323   }
324   return MPI_SUCCESS;
325 }
326
327 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
328               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
329 {
330   XBT_DEBUG("Entering MPI_Win_Accumulate");
331   //get receiver pointer
332   Win* recv_win = connected_wins_[target_rank];
333
334   if(opened_==0){//check that post/start has been done
335     // no fence or start .. lock ok ?
336     int locked=0;
337     for (auto const& it : recv_win->lockers_)
338       if (it == comm_->rank())
339         locked = 1;
340     if(locked != 1)
341       return MPI_ERR_WIN;
342   }
343   //FIXME: local version
344
345   if(target_count*target_datatype->get_extent()>recv_win->size_){
346     XBT_WARN("MPI_Accumulate: Trying to accumulate %zd, which is more than the window size on target process %d : %zd - Bailing out.",
347     target_count*target_datatype->get_extent(), target_rank, recv_win->size_);
348     return MPI_ERR_RMA_RANGE;
349   }
350
351   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
352   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
353   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
354   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
355   // prepare send_request
356
357   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
358                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
359
360   // prepare receiver request
361   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
362                                             recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
363
364   count_++;
365
366   // start send
367   sreq->start();
368   // push request to receiver's win
369   recv_win->mut_->lock();
370   recv_win->requests_.push_back(rreq);
371   rreq->start();
372   recv_win->mut_->unlock();
373
374   if (request != nullptr) {
375     *request = sreq;
376   } else {
377     mut_->lock();
378     requests_.push_back(sreq);
379     mut_->unlock();
380   }
381
382   XBT_DEBUG("Leaving MPI_Win_Accumulate");
383   return MPI_SUCCESS;
384 }
385
386 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
387                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
388                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
389 {
390   //get sender pointer
391   const Win* send_win = connected_wins_[target_rank];
392
393   if(opened_==0){//check that post/start has been done
394     // no fence or start .. lock ok ?
395     int locked=0;
396     for (auto const& it : send_win->lockers_)
397       if (it == comm_->rank())
398         locked = 1;
399     if(locked != 1)
400       return MPI_ERR_WIN;
401   }
402
403   if(target_count*target_datatype->get_extent()>send_win->size_){
404     XBT_WARN("MPI_Get_accumulate: Trying to get_accumulate %zd, which is more than the window size on target process %d : %zd - Bailing out.",
405     target_count*target_datatype->get_extent(), target_rank, send_win->size_);
406     return MPI_ERR_RMA_RANGE;
407   }
408
409
410   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
411   //need to be sure ops are correctly ordered, so finish request here ? slow.
412   MPI_Request req;
413   send_win->atomic_mut_->lock();
414   get(result_addr, result_count, result_datatype, target_rank,
415               target_disp, target_count, target_datatype, &req);
416   if (req != MPI_REQUEST_NULL)
417     Request::wait(&req, MPI_STATUS_IGNORE);
418   if(op!=MPI_NO_OP)
419     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
420               target_disp, target_count, target_datatype, op, &req);
421   if (req != MPI_REQUEST_NULL)
422     Request::wait(&req, MPI_STATUS_IGNORE);
423   send_win->atomic_mut_->unlock();
424   return MPI_SUCCESS;
425 }
426
427 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
428                           int target_rank, MPI_Aint target_disp)
429 {
430   //get sender pointer
431   const Win* send_win = connected_wins_[target_rank];
432
433   if(opened_==0){//check that post/start has been done
434     // no fence or start .. lock ok ?
435     int locked=0;
436     for (auto const& it : send_win->lockers_)
437       if (it == comm_->rank())
438         locked = 1;
439     if(locked != 1)
440       return MPI_ERR_WIN;
441   }
442
443   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
444   MPI_Request req = MPI_REQUEST_NULL;
445   send_win->atomic_mut_->lock();
446   get(result_addr, 1, datatype, target_rank,
447               target_disp, 1, datatype, &req);
448   if (req != MPI_REQUEST_NULL)
449     Request::wait(&req, MPI_STATUS_IGNORE);
450   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
451     put(origin_addr, 1, datatype, target_rank,
452               target_disp, 1, datatype);
453   }
454   send_win->atomic_mut_->unlock();
455   return MPI_SUCCESS;
456 }
457
458 int Win::start(MPI_Group group, int /*assert*/)
459 {
460   /* From MPI forum advices
461   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
462   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
463   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
464   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
465   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
466   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
467   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
468   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
469   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
470   must complete, without further dependencies.  */
471
472   //naive, blocking implementation.
473   int i             = 0;
474   int j             = 0;
475   int size          = group->size();
476   std::vector<MPI_Request> reqs(size);
477
478   XBT_DEBUG("Entering MPI_Win_Start");
479   while (j != size) {
480     int src = comm_->group()->rank(group->actor(j));
481     if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
482       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
483       i++;
484     }
485     j++;
486   }
487   size = i;
488   Request::startall(size, reqs.data());
489   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
490   for (i = 0; i < size; i++) {
491     Request::unref(&reqs[i]);
492   }
493   opened_++; //we're open for business !
494   group_=group;
495   group->ref();
496   XBT_DEBUG("Leaving MPI_Win_Start");
497   return MPI_SUCCESS;
498 }
499
500 int Win::post(MPI_Group group, int /*assert*/)
501 {
502   //let's make a synchronous send here
503   int i             = 0;
504   int j             = 0;
505   int size = group->size();
506   std::vector<MPI_Request> reqs(size);
507
508   XBT_DEBUG("Entering MPI_Win_Post");
509   while(j!=size){
510     int dst = comm_->group()->rank(group->actor(j));
511     if (dst != rank_ && dst != MPI_UNDEFINED) {
512       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
513       i++;
514     }
515     j++;
516   }
517   size=i;
518
519   Request::startall(size, reqs.data());
520   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
521   for(i=0;i<size;i++){
522     Request::unref(&reqs[i]);
523   }
524   opened_++; //we're open for business !
525   group_=group;
526   group->ref();
527   XBT_DEBUG("Leaving MPI_Win_Post");
528   return MPI_SUCCESS;
529 }
530
531 int Win::complete(){
532   xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
533
534   XBT_DEBUG("Entering MPI_Win_Complete");
535   int i             = 0;
536   int j             = 0;
537   int size          = group_->size();
538   std::vector<MPI_Request> reqs(size);
539
540   while(j!=size){
541     int dst = comm_->group()->rank(group_->actor(j));
542     if (dst != rank_ && dst != MPI_UNDEFINED) {
543       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
544       i++;
545     }
546     j++;
547   }
548   size=i;
549   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
550   Request::startall(size, reqs.data());
551   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
552
553   for(i=0;i<size;i++){
554     Request::unref(&reqs[i]);
555   }
556
557   int finished = finish_comms();
558   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
559
560   Group::unref(group_);
561   opened_--; //we're closed for business !
562   return MPI_SUCCESS;
563 }
564
565 int Win::wait(){
566   //naive, blocking implementation.
567   XBT_DEBUG("Entering MPI_Win_Wait");
568   int i             = 0;
569   int j             = 0;
570   int size          = group_->size();
571   std::vector<MPI_Request> reqs(size);
572
573   while(j!=size){
574     int src = comm_->group()->rank(group_->actor(j));
575     if (src != rank_ && src != MPI_UNDEFINED) {
576       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
577       i++;
578     }
579     j++;
580   }
581   size=i;
582   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
583   Request::startall(size, reqs.data());
584   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
585   for(i=0;i<size;i++){
586     Request::unref(&reqs[i]);
587   }
588   int finished = finish_comms();
589   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
590
591   Group::unref(group_);
592   opened_--; //we're opened for business !
593   return MPI_SUCCESS;
594 }
595
596 int Win::lock(int lock_type, int rank, int /*assert*/)
597 {
598   MPI_Win target_win = connected_wins_[rank];
599
600   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
601     target_win->lock_mut_->lock();
602     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
603     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
604       target_win->lock_mut_->unlock();
605    }
606   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
607     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
608
609   target_win->lockers_.push_back(comm_->rank());
610
611   int finished = finish_comms(rank);
612   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
613   finished = target_win->finish_comms(rank_);
614   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
615   return MPI_SUCCESS;
616 }
617
618 int Win::lock_all(int assert){
619   int retval = MPI_SUCCESS;
620   for (int i = 0; i < comm_->size(); i++) {
621     int ret = this->lock(MPI_LOCK_SHARED, i, assert);
622     if (ret != MPI_SUCCESS)
623       retval = ret;
624   }
625   return retval;
626 }
627
628 int Win::unlock(int rank){
629   MPI_Win target_win = connected_wins_[rank];
630   int target_mode = target_win->mode_;
631   target_win->mode_= 0;
632   target_win->lockers_.remove(comm_->rank());
633   if (target_mode==MPI_LOCK_EXCLUSIVE){
634     target_win->lock_mut_->unlock();
635   }
636
637   int finished = finish_comms(rank);
638   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
639   finished = target_win->finish_comms(rank_);
640   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
641   return MPI_SUCCESS;
642 }
643
644 int Win::unlock_all(){
645   int retval = MPI_SUCCESS;
646   for (int i = 0; i < comm_->size(); i++) {
647     int ret = this->unlock(i);
648     if (ret != MPI_SUCCESS)
649       retval = ret;
650   }
651   return retval;
652 }
653
654 int Win::flush(int rank){
655   MPI_Win target_win = connected_wins_[rank];
656   int finished       = finish_comms(rank_);
657   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
658   finished = target_win->finish_comms(rank);
659   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
660   return MPI_SUCCESS;
661 }
662
663 int Win::flush_local(int rank){
664   int finished = finish_comms(rank);
665   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
666   return MPI_SUCCESS;
667 }
668
669 int Win::flush_all(){
670   int finished = finish_comms();
671   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
672   for (int i = 0; i < comm_->size(); i++) {
673     finished = connected_wins_[i]->finish_comms(rank_);
674     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
675   }
676   return MPI_SUCCESS;
677 }
678
679 int Win::flush_local_all(){
680   int finished = finish_comms();
681   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
682   return MPI_SUCCESS;
683 }
684
685 Win* Win::f2c(int id){
686   return static_cast<Win*>(F2C::f2c(id));
687 }
688
689 int Win::finish_comms(){
690   mut_->lock();
691   //Finish own requests
692   int size = static_cast<int>(requests_.size());
693   if (size > 0) {
694     MPI_Request* treqs = requests_.data();
695     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
696     requests_.clear();
697   }
698   mut_->unlock();
699   return size;
700 }
701
702 int Win::finish_comms(int rank){
703   mut_->lock();
704   // Finish own requests
705   // Let's see if we're either the destination or the sender of this request
706   // because we only wait for requests that we are responsible for.
707   // Also use the process id here since the request itself returns from src()
708   // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
709   int proc_id = comm_->group()->actor(rank)->get_pid();
710   auto it     = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
711     return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
712   });
713   std::vector<MPI_Request> myreqqs(it, end(requests_));
714   requests_.erase(it, end(requests_));
715   int size = static_cast<int>(myreqqs.size());
716   if (size > 0) {
717     MPI_Request* treqs = myreqqs.data();
718     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
719     myreqqs.clear();
720   }
721   mut_->unlock();
722   return size;
723 }
724
725 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
726 {
727   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
728   for (int i = 0; not target_win && i < comm_->size(); i++) {
729     if (connected_wins_[i]->size_ > 0)
730       target_win = connected_wins_[i];
731   }
732   if (target_win) {
733     *size                         = target_win->size_;
734     *disp_unit                    = target_win->disp_unit_;
735     *static_cast<void**>(baseptr) = target_win->base_;
736   } else {
737     *size                         = 0;
738     *static_cast<void**>(baseptr) = nullptr;
739   }
740   return MPI_SUCCESS;
741 }
742
743 MPI_Errhandler Win::errhandler()
744 {
745   if (errhandler_ != MPI_ERRHANDLER_NULL)
746     errhandler_->ref();
747   return errhandler_;
748 }
749
750 void Win::set_errhandler(MPI_Errhandler errhandler)
751 {
752   if (errhandler_ != MPI_ERRHANDLER_NULL)
753     simgrid::smpi::Errhandler::unref(errhandler_);
754   errhandler_ = errhandler;
755   if (errhandler_ != MPI_ERRHANDLER_NULL)
756     errhandler_->ref();
757 }
758 } // namespace smpi
759 } // namespace simgrid