Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
588e5cfea792c0c2ea562032bd8c0838e59bec38
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7
8 #include "private.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
16
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
18
19
20 namespace simgrid{
21 namespace smpi{
22 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
23 int Win::keyval_id_=0;
24
25 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
26   int comm_size = comm->size();
27   rank_         = comm->rank();
28   XBT_DEBUG("Creating window");
29   if(info!=MPI_INFO_NULL)
30     info->ref();
31   name_                  = nullptr;
32   opened_                = 0;
33   group_                 = MPI_GROUP_NULL;
34   requests_              = new std::vector<MPI_Request>();
35   mut_                   = s4u::Mutex::create();
36   lock_mut_              = s4u::Mutex::create();
37   atomic_mut_            = s4u::Mutex::create();
38   connected_wins_        = new MPI_Win[comm_size];
39   connected_wins_[rank_] = this;
40   count_                 = 0;
41   if(rank_==0){
42     bar_ = new s4u::Barrier(comm_size);
43   }
44   mode_=0;
45   errhandler_=MPI_ERRORS_ARE_FATAL;
46   comm->add_rma_win(this);
47   comm->ref();
48
49   colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win), MPI_BYTE,
50                    comm);
51
52   colls::bcast(&(bar_), sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
53
54   colls::barrier(comm);
55 }
56
57 Win::~Win(){
58   //As per the standard, perform a barrier to ensure every async comm is finished
59   bar_->wait();
60
61   int finished = finish_comms();
62   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
63
64   delete requests_;
65   delete[] connected_wins_;
66   if (name_ != nullptr){
67     xbt_free(name_);
68   }
69   if (info_ != MPI_INFO_NULL)
70     simgrid::smpi::Info::unref(info_);
71
72   comm_->remove_rma_win(this);
73
74   colls::barrier(comm_);
75   Comm::unref(comm_);
76   
77   if (rank_ == 0)
78     delete bar_;
79
80   if(allocated_ !=0)
81     xbt_free(base_);
82
83   cleanup_attr<Win>();
84 }
85
86 int Win::attach(void* /*base*/, MPI_Aint size)
87 {
88   if (not(base_ == MPI_BOTTOM || base_ == 0))
89     return MPI_ERR_ARG;
90   base_=0;//actually the address will be given in the RMA calls, as being the disp.
91   size_+=size;
92   return MPI_SUCCESS;
93 }
94
95 int Win::detach(const void* /*base*/)
96 {
97   base_=MPI_BOTTOM;
98   size_=-1;
99   return MPI_SUCCESS;
100 }
101
102 void Win::get_name(char* name, int* length){
103   if(name_==nullptr){
104     *length=0;
105     name=nullptr;
106     return;
107   }
108   *length = strlen(name_);
109   strncpy(name, name_, *length+1);
110 }
111
112 void Win::get_group(MPI_Group* group){
113   if(comm_ != MPI_COMM_NULL){
114     *group = comm_->group();
115   } else {
116     *group = MPI_GROUP_NULL;
117   }
118 }
119
120 MPI_Info Win::info()
121 {
122   if (info_ == MPI_INFO_NULL)
123     info_ = new Info();
124   info_->ref();
125   return info_;
126 }
127
128 int Win::rank(){
129   return rank_;
130 }
131
132 MPI_Aint Win::size(){
133   return size_;
134 }
135
136 void* Win::base(){
137   return base_;
138 }
139
140 int Win::disp_unit(){
141   return disp_unit_;
142 }
143
144 int Win::dynamic(){
145   return dynamic_;
146 }
147
148 void Win::set_info(MPI_Info info)
149 {
150   if (info_ != MPI_INFO_NULL)
151     simgrid::smpi::Info::unref(info_);
152   info_ = info;
153   if (info_ != MPI_INFO_NULL)
154     info_->ref();
155 }
156
157 void Win::set_name(const char* name){
158   name_ = xbt_strdup(name);
159 }
160
161 int Win::fence(int assert)
162 {
163   XBT_DEBUG("Entering fence");
164   if (opened_ == 0)
165     opened_=1;
166   if (assert != MPI_MODE_NOPRECEDE) {
167     // This is not the first fence => finalize what came before
168     bar_->wait();
169     mut_->lock();
170     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
171     // Without this, the vector could get redimensioned when another process pushes.
172     // This would result in the array used by Request::waitall() to be invalidated.
173     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
174     std::vector<MPI_Request> *reqs = requests_;
175     int size = static_cast<int>(reqs->size());
176     // start all requests that have been prepared by another process
177     if (size > 0) {
178       MPI_Request* treqs = &(*reqs)[0];
179       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
180     }
181     count_=0;
182     mut_->unlock();
183   }
184
185   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
186     opened_=0;
187   assert_ = assert;
188
189   bar_->wait();
190   XBT_DEBUG("Leaving fence");
191
192   return MPI_SUCCESS;
193 }
194
195 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
196               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
197 {
198   //get receiver pointer
199   MPI_Win recv_win = connected_wins_[target_rank];
200
201   if(opened_==0){//check that post/start has been done
202     // no fence or start .. lock ok ?
203     int locked=0;
204     for (auto const& it : recv_win->lockers_)
205       if (it == comm_->rank())
206         locked = 1;
207     if(locked != 1)
208       return MPI_ERR_WIN;
209   }
210
211   if(target_count*target_datatype->get_extent()>recv_win->size_)
212     return MPI_ERR_ARG;
213
214   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
215
216   if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
217     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
218     // prepare send_request
219     MPI_Request sreq =
220         // TODO cheinrich Check for rank / pid conversion
221         Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
222                                comm_, MPI_OP_NULL);
223
224     //prepare receiver request
225     // TODO cheinrich Check for rank / pid conversion
226     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
227                                               target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
228
229     //start send
230     sreq->start();
231
232     if(request!=nullptr){
233       *request=sreq;
234     }else{
235       mut_->lock();
236       requests_->push_back(sreq);
237       mut_->unlock();
238     }
239
240     //push request to receiver's win
241     recv_win->mut_->lock();
242     recv_win->requests_->push_back(rreq);
243     rreq->start();
244     recv_win->mut_->unlock();
245   } else {
246     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
247     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
248     if(request!=nullptr)
249       *request = MPI_REQUEST_NULL;
250   }
251
252   return MPI_SUCCESS;
253 }
254
255 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
256               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
257 {
258   //get sender pointer
259   MPI_Win send_win = connected_wins_[target_rank];
260
261   if(opened_==0){//check that post/start has been done
262     // no fence or start .. lock ok ?
263     int locked=0;
264     for (auto const& it : send_win->lockers_)
265       if (it == comm_->rank())
266         locked = 1;
267     if(locked != 1)
268       return MPI_ERR_WIN;
269   }
270
271   if(target_count*target_datatype->get_extent()>send_win->size_)
272     return MPI_ERR_ARG;
273
274   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
275   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
276
277   if(target_rank != comm_->rank()){
278     //prepare send_request
279     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
280                                               send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
281
282     //prepare receiver request
283     MPI_Request rreq = Request::rma_recv_init(
284         origin_addr, origin_count, origin_datatype, target_rank,
285         comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
286         SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
287
288     //start the send, with another process than us as sender.
289     sreq->start();
290     //push request to receiver's win
291     send_win->mut_->lock();
292     send_win->requests_->push_back(sreq);
293     send_win->mut_->unlock();
294
295     //start recv
296     rreq->start();
297
298     if(request!=nullptr){
299       *request=rreq;
300     }else{
301       mut_->lock();
302       requests_->push_back(rreq);
303       mut_->unlock();
304     }
305   } else {
306     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
307     if(request!=nullptr)
308       *request=MPI_REQUEST_NULL;
309   }
310   return MPI_SUCCESS;
311 }
312
313 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
314               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
315 {
316   XBT_DEBUG("Entering MPI_Win_Accumulate");
317   //get receiver pointer
318   MPI_Win recv_win = connected_wins_[target_rank];
319
320   if(opened_==0){//check that post/start has been done
321     // no fence or start .. lock ok ?
322     int locked=0;
323     for (auto const& it : recv_win->lockers_)
324       if (it == comm_->rank())
325         locked = 1;
326     if(locked != 1)
327       return MPI_ERR_WIN;
328   }
329   //FIXME: local version
330
331   if(target_count*target_datatype->get_extent()>recv_win->size_)
332     return MPI_ERR_ARG;
333
334   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
335   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
336   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
337   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
338   // prepare send_request
339
340   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
341                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
342
343   // prepare receiver request
344   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
345                                             recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
346
347   count_++;
348
349   // start send
350   sreq->start();
351   // push request to receiver's win
352   recv_win->mut_->lock();
353   recv_win->requests_->push_back(rreq);
354   rreq->start();
355   recv_win->mut_->unlock();
356
357   if (request != nullptr) {
358     *request = sreq;
359   } else {
360     mut_->lock();
361     requests_->push_back(sreq);
362     mut_->unlock();
363   }
364
365   XBT_DEBUG("Leaving MPI_Win_Accumulate");
366   return MPI_SUCCESS;
367 }
368
369 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
370                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
371                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
372 {
373   //get sender pointer
374   MPI_Win send_win = connected_wins_[target_rank];
375
376   if(opened_==0){//check that post/start has been done
377     // no fence or start .. lock ok ?
378     int locked=0;
379     for (auto const& it : send_win->lockers_)
380       if (it == comm_->rank())
381         locked = 1;
382     if(locked != 1)
383       return MPI_ERR_WIN;
384   }
385
386   if(target_count*target_datatype->get_extent()>send_win->size_)
387     return MPI_ERR_ARG;
388
389   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
390   //need to be sure ops are correctly ordered, so finish request here ? slow.
391   MPI_Request req;
392   send_win->atomic_mut_->lock();
393   get(result_addr, result_count, result_datatype, target_rank,
394               target_disp, target_count, target_datatype, &req);
395   if (req != MPI_REQUEST_NULL)
396     Request::wait(&req, MPI_STATUS_IGNORE);
397   if(op!=MPI_NO_OP)
398     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
399               target_disp, target_count, target_datatype, op, &req);
400   if (req != MPI_REQUEST_NULL)
401     Request::wait(&req, MPI_STATUS_IGNORE);
402   send_win->atomic_mut_->unlock();
403   return MPI_SUCCESS;
404 }
405
406 int Win::compare_and_swap(const void *origin_addr, void *compare_addr,
407         void *result_addr, MPI_Datatype datatype, int target_rank,
408         MPI_Aint target_disp){
409   //get sender pointer
410   MPI_Win send_win = connected_wins_[target_rank];
411
412   if(opened_==0){//check that post/start has been done
413     // no fence or start .. lock ok ?
414     int locked=0;
415     for (auto const& it : send_win->lockers_)
416       if (it == comm_->rank())
417         locked = 1;
418     if(locked != 1)
419       return MPI_ERR_WIN;
420   }
421
422   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
423   MPI_Request req = MPI_REQUEST_NULL;
424   send_win->atomic_mut_->lock();
425   get(result_addr, 1, datatype, target_rank,
426               target_disp, 1, datatype, &req);
427   if (req != MPI_REQUEST_NULL)
428     Request::wait(&req, MPI_STATUS_IGNORE);
429   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
430     put(origin_addr, 1, datatype, target_rank,
431               target_disp, 1, datatype);
432   }
433   send_win->atomic_mut_->unlock();
434   return MPI_SUCCESS;
435 }
436
437 int Win::start(MPI_Group group, int /*assert*/)
438 {
439   /* From MPI forum advices
440   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
441   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
442   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
443   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
444   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
445   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
446   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
447   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
448   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
449   must complete, without further dependencies.  */
450
451   //naive, blocking implementation.
452   int i             = 0;
453   int j             = 0;
454   int size          = group->size();
455   MPI_Request* reqs = xbt_new0(MPI_Request, size);
456
457   XBT_DEBUG("Entering MPI_Win_Start");
458   while (j != size) {
459     int src = comm_->group()->rank(group->actor(j));
460     if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
461       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
462       i++;
463     }
464     j++;
465   }
466   size = i;
467   Request::startall(size, reqs);
468   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
469   for (i = 0; i < size; i++) {
470     Request::unref(&reqs[i]);
471   }
472   xbt_free(reqs);
473   opened_++; //we're open for business !
474   group_=group;
475   group->ref();
476   XBT_DEBUG("Leaving MPI_Win_Start");
477   return MPI_SUCCESS;
478 }
479
480 int Win::post(MPI_Group group, int /*assert*/)
481 {
482   //let's make a synchronous send here
483   int i             = 0;
484   int j             = 0;
485   int size = group->size();
486   MPI_Request* reqs = xbt_new0(MPI_Request, size);
487
488   XBT_DEBUG("Entering MPI_Win_Post");
489   while(j!=size){
490     int dst = comm_->group()->rank(group->actor(j));
491     if (dst != rank_ && dst != MPI_UNDEFINED) {
492       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
493       i++;
494     }
495     j++;
496   }
497   size=i;
498
499   Request::startall(size, reqs);
500   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
501   for(i=0;i<size;i++){
502     Request::unref(&reqs[i]);
503   }
504   xbt_free(reqs);
505   opened_++; //we're open for business !
506   group_=group;
507   group->ref();
508   XBT_DEBUG("Leaving MPI_Win_Post");
509   return MPI_SUCCESS;
510 }
511
512 int Win::complete(){
513   if(opened_==0)
514     xbt_die("Complete called on already opened MPI_Win");
515
516   XBT_DEBUG("Entering MPI_Win_Complete");
517   int i             = 0;
518   int j             = 0;
519   int size          = group_->size();
520   MPI_Request* reqs = xbt_new0(MPI_Request, size);
521
522   while(j!=size){
523     int dst = comm_->group()->rank(group_->actor(j));
524     if (dst != rank_ && dst != MPI_UNDEFINED) {
525       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
526       i++;
527     }
528     j++;
529   }
530   size=i;
531   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
532   Request::startall(size, reqs);
533   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
534
535   for(i=0;i<size;i++){
536     Request::unref(&reqs[i]);
537   }
538   xbt_free(reqs);
539
540   int finished = finish_comms();
541   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
542
543   Group::unref(group_);
544   opened_--; //we're closed for business !
545   return MPI_SUCCESS;
546 }
547
548 int Win::wait(){
549   //naive, blocking implementation.
550   XBT_DEBUG("Entering MPI_Win_Wait");
551   int i             = 0;
552   int j             = 0;
553   int size          = group_->size();
554   MPI_Request* reqs = xbt_new0(MPI_Request, size);
555
556   while(j!=size){
557     int src = comm_->group()->rank(group_->actor(j));
558     if (src != rank_ && src != MPI_UNDEFINED) {
559       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
560       i++;
561     }
562     j++;
563   }
564   size=i;
565   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
566   Request::startall(size, reqs);
567   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
568   for(i=0;i<size;i++){
569     Request::unref(&reqs[i]);
570   }
571   xbt_free(reqs);
572   int finished = finish_comms();
573   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
574
575   Group::unref(group_);
576   opened_--; //we're opened for business !
577   return MPI_SUCCESS;
578 }
579
580 int Win::lock(int lock_type, int rank, int /*assert*/)
581 {
582   MPI_Win target_win = connected_wins_[rank];
583
584   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
585     target_win->lock_mut_->lock();
586     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
587     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
588       target_win->lock_mut_->unlock();
589    }
590   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
591     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
592
593   target_win->lockers_.push_back(comm_->rank());
594
595   int finished = finish_comms(rank);
596   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
597   finished = target_win->finish_comms(rank_);
598   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
599   return MPI_SUCCESS;
600 }
601
602 int Win::lock_all(int assert){
603   int i=0;
604   int retval = MPI_SUCCESS;
605   for (i=0; i<comm_->size();i++){
606       int ret = this->lock(MPI_LOCK_SHARED, i, assert);
607       if(ret != MPI_SUCCESS)
608         retval = ret;
609   }
610   return retval;
611 }
612
613 int Win::unlock(int rank){
614   MPI_Win target_win = connected_wins_[rank];
615   int target_mode = target_win->mode_;
616   target_win->mode_= 0;
617   target_win->lockers_.remove(comm_->rank());
618   if (target_mode==MPI_LOCK_EXCLUSIVE){
619     target_win->lock_mut_->unlock();
620   }
621
622   int finished = finish_comms(rank);
623   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
624   finished = target_win->finish_comms(rank_);
625   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
626   return MPI_SUCCESS;
627 }
628
629 int Win::unlock_all(){
630   int i=0;
631   int retval = MPI_SUCCESS;
632   for (i=0; i<comm_->size();i++){
633     int ret = this->unlock(i);
634     if (ret != MPI_SUCCESS)
635       retval = ret;
636   }
637   return retval;
638 }
639
640 int Win::flush(int rank){
641   MPI_Win target_win = connected_wins_[rank];
642   int finished       = finish_comms(rank_);
643   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
644   finished = target_win->finish_comms(rank);
645   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
646   return MPI_SUCCESS;
647 }
648
649 int Win::flush_local(int rank){
650   int finished = finish_comms(rank);
651   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
652   return MPI_SUCCESS;
653 }
654
655 int Win::flush_all(){
656   int finished = finish_comms();
657   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
658   for (int i = 0; i < comm_->size(); i++) {
659     finished = connected_wins_[i]->finish_comms(rank_);
660     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
661   }
662   return MPI_SUCCESS;
663 }
664
665 int Win::flush_local_all(){
666   int finished = finish_comms();
667   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
668   return MPI_SUCCESS;
669 }
670
671 Win* Win::f2c(int id){
672   return static_cast<Win*>(F2C::f2c(id));
673 }
674
675 int Win::finish_comms(){
676   mut_->lock();
677   //Finish own requests
678   std::vector<MPI_Request> *reqqs = requests_;
679   int size = static_cast<int>(reqqs->size());
680   if (size > 0) {
681     MPI_Request* treqs = &(*reqqs)[0];
682     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
683     reqqs->clear();
684   }
685   mut_->unlock();
686   return size;
687 }
688
689 int Win::finish_comms(int rank){
690   mut_->lock();
691   //Finish own requests
692   std::vector<MPI_Request> *reqqs = requests_;
693   int size = static_cast<int>(reqqs->size());
694   if (size > 0) {
695     size = 0;
696     std::vector<MPI_Request> myreqqs;
697     std::vector<MPI_Request>::iterator iter = reqqs->begin();
698     int proc_id                             = comm_->group()->actor(rank)->get_pid();
699     while (iter != reqqs->end()){
700       // Let's see if we're either the destination or the sender of this request
701       // because we only wait for requests that we are responsible for.
702       // Also use the process id here since the request itself returns from src()
703       // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
704       if (((*iter) != MPI_REQUEST_NULL) && (((*iter)->src() == proc_id) || ((*iter)->dst() == proc_id))) {
705         myreqqs.push_back(*iter);
706         iter = reqqs->erase(iter);
707         size++;
708       } else {
709         ++iter;
710       }
711     }
712     if(size >0){
713       MPI_Request* treqs = &myreqqs[0];
714       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
715       myreqqs.clear();
716     }
717   }
718   mut_->unlock();
719   return size;
720 }
721
722 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr)
723 {
724   MPI_Win target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
725   for (int i = 0; not target_win && i < comm_->size(); i++) {
726     if (connected_wins_[i]->size_ > 0)
727       target_win = connected_wins_[i];
728   }
729   if (target_win) {
730     *size                         = target_win->size_;
731     *disp_unit                    = target_win->disp_unit_;
732     *static_cast<void**>(baseptr) = target_win->base_;
733   } else {
734     *size                         = 0;
735     *static_cast<void**>(baseptr) = xbt_malloc(0);
736   }
737   return MPI_SUCCESS;
738 }
739
740 MPI_Errhandler Win::errhandler()
741 {
742   return errhandler_;
743 }
744
745 void Win::set_errhandler(MPI_Errhandler errhandler)
746 {
747   errhandler_ = errhandler;
748   if (errhandler_ != MPI_ERRHANDLER_NULL)
749     errhandler->ref();
750 }
751 } // namespace smpi
752 } // namespace simgrid