Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
6c95f11d93ef2e9420e21ebfcaaff4591d1b23e3
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7
8 #include "private.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
16
17 #include <algorithm>
18
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
20
21
22 namespace simgrid{
23 namespace smpi{
24 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
25 int Win::keyval_id_=0;
26
27 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic)
28     : base_(base)
29     , size_(size)
30     , disp_unit_(disp_unit)
31     , info_(info)
32     , comm_(comm)
33     , connected_wins_(comm->size())
34     , rank_(comm->rank())
35     , allocated_(allocated)
36     , dynamic_(dynamic)
37 {
38   XBT_DEBUG("Creating window");
39   if(info!=MPI_INFO_NULL)
40     info->ref();
41   connected_wins_[rank_] = this;
42   if(rank_==0){
43     bar_ = new s4u::Barrier(comm->size());
44   }
45   errhandler_->ref();
46   comm->add_rma_win(this);
47   comm->ref();
48
49   colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
50                    MPI_BYTE, comm);
51
52   colls::bcast(&(bar_), sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
53
54   colls::barrier(comm);
55 }
56
57 Win::~Win(){
58   //As per the standard, perform a barrier to ensure every async comm is finished
59   bar_->wait();
60
61   int finished = finish_comms();
62   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
63
64   if (info_ != MPI_INFO_NULL)
65     simgrid::smpi::Info::unref(info_);
66   if (errhandler_ != MPI_ERRHANDLER_NULL)
67     simgrid::smpi::Errhandler::unref(errhandler_);
68
69   comm_->remove_rma_win(this);
70
71   colls::barrier(comm_);
72   Comm::unref(comm_);
73   
74   if (rank_ == 0)
75     delete bar_;
76
77   if(allocated_ !=0)
78     xbt_free(base_);
79
80   cleanup_attr<Win>();
81 }
82
83 int Win::attach(void* /*base*/, MPI_Aint size)
84 {
85   if (not(base_ == MPI_BOTTOM || base_ == nullptr))
86     return MPI_ERR_ARG;
87   base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
88   size_+=size;
89   return MPI_SUCCESS;
90 }
91
92 int Win::detach(const void* /*base*/)
93 {
94   base_=MPI_BOTTOM;
95   size_=-1;
96   return MPI_SUCCESS;
97 }
98
99 void Win::get_name(char* name, int* length) const
100 {
101   *length = static_cast<int>(name_.length());
102   if (not name_.empty()) {
103     name_.copy(name, *length);
104     name[*length] = '\0';
105   }
106 }
107
108 void Win::get_group(MPI_Group* group){
109   if(comm_ != MPI_COMM_NULL){
110     *group = comm_->group();
111   } else {
112     *group = MPI_GROUP_NULL;
113   }
114 }
115
116 MPI_Info Win::info()
117 {
118   if (info_ == MPI_INFO_NULL)
119     info_ = new Info();
120   info_->ref();
121   return info_;
122 }
123
124 int Win::rank() const
125 {
126   return rank_;
127 }
128
129 MPI_Aint Win::size() const
130 {
131   return size_;
132 }
133
134 void* Win::base() const
135 {
136   return base_;
137 }
138
139 int Win::disp_unit() const
140 {
141   return disp_unit_;
142 }
143
144 int Win::dynamic() const
145 {
146   return dynamic_;
147 }
148
149 void Win::set_info(MPI_Info info)
150 {
151   if (info_ != MPI_INFO_NULL)
152     simgrid::smpi::Info::unref(info_);
153   info_ = info;
154   if (info_ != MPI_INFO_NULL)
155     info_->ref();
156 }
157
158 void Win::set_name(const char* name){
159   name_ = name;
160 }
161
162 int Win::fence(int assert)
163 {
164   XBT_DEBUG("Entering fence");
165   if (opened_ == 0)
166     opened_=1;
167   if (assert != MPI_MODE_NOPRECEDE) {
168     // This is not the first fence => finalize what came before
169     bar_->wait();
170     mut_->lock();
171     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
172     // Without this, the vector could get redimensioned when another process pushes.
173     // This would result in the array used by Request::waitall() to be invalidated.
174     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
175
176     // start all requests that have been prepared by another process
177     if (not requests_.empty()) {
178       int size           = static_cast<int>(requests_.size());
179       MPI_Request* treqs = requests_.data();
180       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
181     }
182     count_=0;
183     mut_->unlock();
184   }
185
186   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
187     opened_=0;
188   assert_ = assert;
189
190   bar_->wait();
191   XBT_DEBUG("Leaving fence");
192
193   return MPI_SUCCESS;
194 }
195
196 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
197               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
198 {
199   //get receiver pointer
200   Win* recv_win = connected_wins_[target_rank];
201
202   if(opened_==0){//check that post/start has been done
203     // no fence or start .. lock ok ?
204     int locked=0;
205     for (auto const& it : recv_win->lockers_)
206       if (it == comm_->rank())
207         locked = 1;
208     if(locked != 1)
209       return MPI_ERR_WIN;
210   }
211
212   if(target_count*target_datatype->get_extent()>recv_win->size_){
213     XBT_WARN("Trying to put more than the window size - Bailing out.");
214     return MPI_ERR_RMA_RANGE;
215   }
216
217   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
218
219   if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
220     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
221     // prepare send_request
222     MPI_Request sreq =
223         // TODO cheinrich Check for rank / pid conversion
224         Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
225                                comm_, MPI_OP_NULL);
226
227     //prepare receiver request
228     // TODO cheinrich Check for rank / pid conversion
229     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
230                                               target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
231
232     //start send
233     sreq->start();
234
235     if(request!=nullptr){
236       *request=sreq;
237     }else{
238       mut_->lock();
239       requests_.push_back(sreq);
240       mut_->unlock();
241     }
242
243     //push request to receiver's win
244     recv_win->mut_->lock();
245     recv_win->requests_.push_back(rreq);
246     rreq->start();
247     recv_win->mut_->unlock();
248   } else {
249     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
250     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
251     if(request!=nullptr)
252       *request = MPI_REQUEST_NULL;
253   }
254
255   return MPI_SUCCESS;
256 }
257
258 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
259               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
260 {
261   //get sender pointer
262   Win* send_win = connected_wins_[target_rank];
263
264   if(opened_==0){//check that post/start has been done
265     // no fence or start .. lock ok ?
266     int locked=0;
267     for (auto const& it : send_win->lockers_)
268       if (it == comm_->rank())
269         locked = 1;
270     if(locked != 1)
271       return MPI_ERR_WIN;
272   }
273
274   if(target_count*target_datatype->get_extent()>send_win->size_){
275     XBT_WARN("Trying to get more than the window size - Bailing out.");
276     return MPI_ERR_RMA_RANGE;
277   }
278
279   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
280   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
281
282   if(target_rank != comm_->rank()){
283     //prepare send_request
284     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
285                                               send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
286
287     //prepare receiver request
288     MPI_Request rreq = Request::rma_recv_init(
289         origin_addr, origin_count, origin_datatype, target_rank,
290         comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
291         SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
292
293     //start the send, with another process than us as sender.
294     sreq->start();
295     //push request to receiver's win
296     send_win->mut_->lock();
297     send_win->requests_.push_back(sreq);
298     send_win->mut_->unlock();
299
300     //start recv
301     rreq->start();
302
303     if(request!=nullptr){
304       *request=rreq;
305     }else{
306       mut_->lock();
307       requests_.push_back(rreq);
308       mut_->unlock();
309     }
310   } else {
311     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
312     if(request!=nullptr)
313       *request=MPI_REQUEST_NULL;
314   }
315   return MPI_SUCCESS;
316 }
317
318 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
319               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
320 {
321   XBT_DEBUG("Entering MPI_Win_Accumulate");
322   //get receiver pointer
323   Win* recv_win = connected_wins_[target_rank];
324
325   if(opened_==0){//check that post/start has been done
326     // no fence or start .. lock ok ?
327     int locked=0;
328     for (auto const& it : recv_win->lockers_)
329       if (it == comm_->rank())
330         locked = 1;
331     if(locked != 1)
332       return MPI_ERR_WIN;
333   }
334   //FIXME: local version
335
336   if(target_count*target_datatype->get_extent()>recv_win->size_){
337     XBT_WARN("Trying to accumulate more than the window size - Bailing out.");
338     return MPI_ERR_RMA_RANGE;
339   }
340
341   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
342   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
343   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
344   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
345   // prepare send_request
346
347   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
348                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
349
350   // prepare receiver request
351   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
352                                             recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
353
354   count_++;
355
356   // start send
357   sreq->start();
358   // push request to receiver's win
359   recv_win->mut_->lock();
360   recv_win->requests_.push_back(rreq);
361   rreq->start();
362   recv_win->mut_->unlock();
363
364   if (request != nullptr) {
365     *request = sreq;
366   } else {
367     mut_->lock();
368     requests_.push_back(sreq);
369     mut_->unlock();
370   }
371
372   XBT_DEBUG("Leaving MPI_Win_Accumulate");
373   return MPI_SUCCESS;
374 }
375
376 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
377                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
378                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
379 {
380   //get sender pointer
381   const Win* send_win = connected_wins_[target_rank];
382
383   if(opened_==0){//check that post/start has been done
384     // no fence or start .. lock ok ?
385     int locked=0;
386     for (auto const& it : send_win->lockers_)
387       if (it == comm_->rank())
388         locked = 1;
389     if(locked != 1)
390       return MPI_ERR_WIN;
391   }
392
393   if(target_count*target_datatype->get_extent()>send_win->size_){
394     XBT_WARN("Trying to get_accumulate more than the window size - Bailing out.");
395     return MPI_ERR_RMA_RANGE;
396   }
397
398
399   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
400   //need to be sure ops are correctly ordered, so finish request here ? slow.
401   MPI_Request req;
402   send_win->atomic_mut_->lock();
403   get(result_addr, result_count, result_datatype, target_rank,
404               target_disp, target_count, target_datatype, &req);
405   if (req != MPI_REQUEST_NULL)
406     Request::wait(&req, MPI_STATUS_IGNORE);
407   if(op!=MPI_NO_OP)
408     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
409               target_disp, target_count, target_datatype, op, &req);
410   if (req != MPI_REQUEST_NULL)
411     Request::wait(&req, MPI_STATUS_IGNORE);
412   send_win->atomic_mut_->unlock();
413   return MPI_SUCCESS;
414 }
415
416 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
417                           int target_rank, MPI_Aint target_disp)
418 {
419   //get sender pointer
420   const Win* send_win = connected_wins_[target_rank];
421
422   if(opened_==0){//check that post/start has been done
423     // no fence or start .. lock ok ?
424     int locked=0;
425     for (auto const& it : send_win->lockers_)
426       if (it == comm_->rank())
427         locked = 1;
428     if(locked != 1)
429       return MPI_ERR_WIN;
430   }
431
432   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
433   MPI_Request req = MPI_REQUEST_NULL;
434   send_win->atomic_mut_->lock();
435   get(result_addr, 1, datatype, target_rank,
436               target_disp, 1, datatype, &req);
437   if (req != MPI_REQUEST_NULL)
438     Request::wait(&req, MPI_STATUS_IGNORE);
439   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
440     put(origin_addr, 1, datatype, target_rank,
441               target_disp, 1, datatype);
442   }
443   send_win->atomic_mut_->unlock();
444   return MPI_SUCCESS;
445 }
446
447 int Win::start(MPI_Group group, int /*assert*/)
448 {
449   /* From MPI forum advices
450   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
451   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
452   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
453   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
454   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
455   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
456   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
457   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
458   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
459   must complete, without further dependencies.  */
460
461   //naive, blocking implementation.
462   int i             = 0;
463   int j             = 0;
464   int size          = group->size();
465   std::vector<MPI_Request> reqs(size);
466
467   XBT_DEBUG("Entering MPI_Win_Start");
468   while (j != size) {
469     int src = comm_->group()->rank(group->actor(j));
470     if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
471       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
472       i++;
473     }
474     j++;
475   }
476   size = i;
477   Request::startall(size, reqs.data());
478   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
479   for (i = 0; i < size; i++) {
480     Request::unref(&reqs[i]);
481   }
482   opened_++; //we're open for business !
483   group_=group;
484   group->ref();
485   XBT_DEBUG("Leaving MPI_Win_Start");
486   return MPI_SUCCESS;
487 }
488
489 int Win::post(MPI_Group group, int /*assert*/)
490 {
491   //let's make a synchronous send here
492   int i             = 0;
493   int j             = 0;
494   int size = group->size();
495   std::vector<MPI_Request> reqs(size);
496
497   XBT_DEBUG("Entering MPI_Win_Post");
498   while(j!=size){
499     int dst = comm_->group()->rank(group->actor(j));
500     if (dst != rank_ && dst != MPI_UNDEFINED) {
501       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
502       i++;
503     }
504     j++;
505   }
506   size=i;
507
508   Request::startall(size, reqs.data());
509   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
510   for(i=0;i<size;i++){
511     Request::unref(&reqs[i]);
512   }
513   opened_++; //we're open for business !
514   group_=group;
515   group->ref();
516   XBT_DEBUG("Leaving MPI_Win_Post");
517   return MPI_SUCCESS;
518 }
519
520 int Win::complete(){
521   if(opened_==0)
522     xbt_die("Complete called on already opened MPI_Win");
523
524   XBT_DEBUG("Entering MPI_Win_Complete");
525   int i             = 0;
526   int j             = 0;
527   int size          = group_->size();
528   std::vector<MPI_Request> reqs(size);
529
530   while(j!=size){
531     int dst = comm_->group()->rank(group_->actor(j));
532     if (dst != rank_ && dst != MPI_UNDEFINED) {
533       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
534       i++;
535     }
536     j++;
537   }
538   size=i;
539   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
540   Request::startall(size, reqs.data());
541   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
542
543   for(i=0;i<size;i++){
544     Request::unref(&reqs[i]);
545   }
546
547   int finished = finish_comms();
548   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
549
550   Group::unref(group_);
551   opened_--; //we're closed for business !
552   return MPI_SUCCESS;
553 }
554
555 int Win::wait(){
556   //naive, blocking implementation.
557   XBT_DEBUG("Entering MPI_Win_Wait");
558   int i             = 0;
559   int j             = 0;
560   int size          = group_->size();
561   std::vector<MPI_Request> reqs(size);
562
563   while(j!=size){
564     int src = comm_->group()->rank(group_->actor(j));
565     if (src != rank_ && src != MPI_UNDEFINED) {
566       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
567       i++;
568     }
569     j++;
570   }
571   size=i;
572   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
573   Request::startall(size, reqs.data());
574   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
575   for(i=0;i<size;i++){
576     Request::unref(&reqs[i]);
577   }
578   int finished = finish_comms();
579   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
580
581   Group::unref(group_);
582   opened_--; //we're opened for business !
583   return MPI_SUCCESS;
584 }
585
586 int Win::lock(int lock_type, int rank, int /*assert*/)
587 {
588   MPI_Win target_win = connected_wins_[rank];
589
590   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
591     target_win->lock_mut_->lock();
592     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
593     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
594       target_win->lock_mut_->unlock();
595    }
596   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
597     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
598
599   target_win->lockers_.push_back(comm_->rank());
600
601   int finished = finish_comms(rank);
602   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
603   finished = target_win->finish_comms(rank_);
604   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
605   return MPI_SUCCESS;
606 }
607
608 int Win::lock_all(int assert){
609   int retval = MPI_SUCCESS;
610   for (int i = 0; i < comm_->size(); i++) {
611     int ret = this->lock(MPI_LOCK_SHARED, i, assert);
612     if (ret != MPI_SUCCESS)
613       retval = ret;
614   }
615   return retval;
616 }
617
618 int Win::unlock(int rank){
619   MPI_Win target_win = connected_wins_[rank];
620   int target_mode = target_win->mode_;
621   target_win->mode_= 0;
622   target_win->lockers_.remove(comm_->rank());
623   if (target_mode==MPI_LOCK_EXCLUSIVE){
624     target_win->lock_mut_->unlock();
625   }
626
627   int finished = finish_comms(rank);
628   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
629   finished = target_win->finish_comms(rank_);
630   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
631   return MPI_SUCCESS;
632 }
633
634 int Win::unlock_all(){
635   int retval = MPI_SUCCESS;
636   for (int i = 0; i < comm_->size(); i++) {
637     int ret = this->unlock(i);
638     if (ret != MPI_SUCCESS)
639       retval = ret;
640   }
641   return retval;
642 }
643
644 int Win::flush(int rank){
645   MPI_Win target_win = connected_wins_[rank];
646   int finished       = finish_comms(rank_);
647   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
648   finished = target_win->finish_comms(rank);
649   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
650   return MPI_SUCCESS;
651 }
652
653 int Win::flush_local(int rank){
654   int finished = finish_comms(rank);
655   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
656   return MPI_SUCCESS;
657 }
658
659 int Win::flush_all(){
660   int finished = finish_comms();
661   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
662   for (int i = 0; i < comm_->size(); i++) {
663     finished = connected_wins_[i]->finish_comms(rank_);
664     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
665   }
666   return MPI_SUCCESS;
667 }
668
669 int Win::flush_local_all(){
670   int finished = finish_comms();
671   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
672   return MPI_SUCCESS;
673 }
674
675 Win* Win::f2c(int id){
676   return static_cast<Win*>(F2C::f2c(id));
677 }
678
679 int Win::finish_comms(){
680   mut_->lock();
681   //Finish own requests
682   int size = static_cast<int>(requests_.size());
683   if (size > 0) {
684     MPI_Request* treqs = requests_.data();
685     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
686     requests_.clear();
687   }
688   mut_->unlock();
689   return size;
690 }
691
692 int Win::finish_comms(int rank){
693   mut_->lock();
694   // Finish own requests
695   // Let's see if we're either the destination or the sender of this request
696   // because we only wait for requests that we are responsible for.
697   // Also use the process id here since the request itself returns from src()
698   // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
699   int proc_id = comm_->group()->actor(rank)->get_pid();
700   auto it     = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
701     return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
702   });
703   std::vector<MPI_Request> myreqqs(it, end(requests_));
704   requests_.erase(it, end(requests_));
705   int size = static_cast<int>(myreqqs.size());
706   if (size > 0) {
707     MPI_Request* treqs = myreqqs.data();
708     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
709     myreqqs.clear();
710   }
711   mut_->unlock();
712   return size;
713 }
714
715 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
716 {
717   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
718   for (int i = 0; not target_win && i < comm_->size(); i++) {
719     if (connected_wins_[i]->size_ > 0)
720       target_win = connected_wins_[i];
721   }
722   if (target_win) {
723     *size                         = target_win->size_;
724     *disp_unit                    = target_win->disp_unit_;
725     *static_cast<void**>(baseptr) = target_win->base_;
726   } else {
727     *size                         = 0;
728     *static_cast<void**>(baseptr) = nullptr;
729   }
730   return MPI_SUCCESS;
731 }
732
733 MPI_Errhandler Win::errhandler()
734 {
735   if (errhandler_ != MPI_ERRHANDLER_NULL)
736     errhandler_->ref();
737   return errhandler_;
738 }
739
740 void Win::set_errhandler(MPI_Errhandler errhandler)
741 {
742   if (errhandler_ != MPI_ERRHANDLER_NULL)
743     simgrid::smpi::Errhandler::unref(errhandler_);
744   errhandler_ = errhandler;
745   if (errhandler_ != MPI_ERRHANDLER_NULL)
746     errhandler_->ref();
747 }
748 } // namespace smpi
749 } // namespace simgrid