Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
do not use xbt_mutex_t in SMPI
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7
8 #include "private.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
16
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
18
19 using simgrid::s4u::Actor;
20
21 namespace simgrid{
22 namespace smpi{
23 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
24 int Win::keyval_id_=0;
25
26 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
27   int comm_size = comm->size();
28   rank_         = comm->rank();
29   XBT_DEBUG("Creating window");
30   if(info!=MPI_INFO_NULL)
31     info->ref();
32   name_                  = nullptr;
33   opened_                = 0;
34   group_                 = MPI_GROUP_NULL;
35   requests_              = new std::vector<MPI_Request>();
36   mut_                   = s4u::Mutex::create();
37   lock_mut_              = s4u::Mutex::create();
38   atomic_mut_            = s4u::Mutex::create();
39   connected_wins_        = new MPI_Win[comm_size];
40   connected_wins_[rank_] = this;
41   count_                 = 0;
42   if(rank_==0){
43     bar_ = new simgrid::s4u::Barrier(comm_size);
44   }
45   mode_=0;
46
47   comm->add_rma_win(this);
48   comm->ref();
49
50   Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
51                          MPI_BYTE, comm);
52
53   Colls::bcast(&(bar_), sizeof(simgrid::s4u::Barrier*), MPI_BYTE, 0, comm);
54
55   Colls::barrier(comm);
56 }
57
58 Win::~Win(){
59   //As per the standard, perform a barrier to ensure every async comm is finished
60   bar_->wait();
61
62   int finished = finish_comms();
63   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
64
65   delete requests_;
66   delete[] connected_wins_;
67   if (name_ != nullptr){
68     xbt_free(name_);
69   }
70   if(info_!=MPI_INFO_NULL){
71     MPI_Info_free(&info_);
72   }
73
74   comm_->remove_rma_win(this);
75
76   Colls::barrier(comm_);
77   Comm::unref(comm_);
78   
79   if (rank_ == 0)
80     delete bar_;
81
82   if(allocated_ !=0)
83     xbt_free(base_);
84
85   cleanup_attr<Win>();
86 }
87
88 int Win::attach (void *base, MPI_Aint size){
89   if (not(base_ == MPI_BOTTOM || base_ == 0))
90     return MPI_ERR_ARG;
91   base_=0;//actually the address will be given in the RMA calls, as being the disp.
92   size_+=size;
93   return MPI_SUCCESS;
94 }
95
96 int Win::detach (void *base){
97   base_=MPI_BOTTOM;
98   size_=-1;
99   return MPI_SUCCESS;
100 }
101
102 void Win::get_name(char* name, int* length){
103   if(name_==nullptr){
104     *length=0;
105     name=nullptr;
106     return;
107   }
108   *length = strlen(name_);
109   strncpy(name, name_, *length+1);
110 }
111
112 void Win::get_group(MPI_Group* group){
113   if(comm_ != MPI_COMM_NULL){
114     *group = comm_->group();
115   } else {
116     *group = MPI_GROUP_NULL;
117   }
118 }
119
120 MPI_Info Win::info(){
121   if(info_== MPI_INFO_NULL)
122     info_ = new Info();
123   info_->ref();
124   return info_;
125 }
126
127 int Win::rank(){
128   return rank_;
129 }
130
131 MPI_Aint Win::size(){
132   return size_;
133 }
134
135 void* Win::base(){
136   return base_;
137 }
138
139 int Win::disp_unit(){
140   return disp_unit_;
141 }
142
143 int Win::dynamic(){
144   return dynamic_;
145 }
146
147 void Win::set_info(MPI_Info info){
148   if(info_!= MPI_INFO_NULL)
149     info->ref();
150   info_=info;
151 }
152
153 void Win::set_name(char* name){
154   name_ = xbt_strdup(name);
155 }
156
157 int Win::fence(int assert)
158 {
159   XBT_DEBUG("Entering fence");
160   if (opened_ == 0)
161     opened_=1;
162   if (assert != MPI_MODE_NOPRECEDE) {
163     // This is not the first fence => finalize what came before
164     bar_->wait();
165     mut_->lock();
166     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
167     // Without this, the vector could get redimensionned when another process pushes.
168     // This would result in the array used by Request::waitall() to be invalidated.
169     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
170     std::vector<MPI_Request> *reqs = requests_;
171     int size = static_cast<int>(reqs->size());
172     // start all requests that have been prepared by another process
173     if (size > 0) {
174       MPI_Request* treqs = &(*reqs)[0];
175       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
176     }
177     count_=0;
178     mut_->unlock();
179   }
180
181   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
182     opened_=0;
183   assert_ = assert;
184
185   bar_->wait();
186   XBT_DEBUG("Leaving fence");
187
188   return MPI_SUCCESS;
189 }
190
191 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
192               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
193 {
194   //get receiver pointer
195   MPI_Win recv_win = connected_wins_[target_rank];
196
197   if(opened_==0){//check that post/start has been done
198     // no fence or start .. lock ok ?
199     int locked=0;
200     for (auto const& it : recv_win->lockers_)
201       if (it == comm_->rank())
202         locked = 1;
203     if(locked != 1)
204       return MPI_ERR_WIN;
205   }
206
207   if(target_count*target_datatype->get_extent()>recv_win->size_)
208     return MPI_ERR_ARG;
209
210   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
211
212   if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
213     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
214     // prepare send_request
215     MPI_Request sreq =
216         // TODO cheinrich Check for rank / pid conversion
217         Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
218                                comm_, MPI_OP_NULL);
219
220     //prepare receiver request
221     // TODO cheinrich Check for rank / pid conversion
222     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
223                                               target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
224
225     //start send
226     sreq->start();
227
228     if(request!=nullptr){
229       *request=sreq;
230     }else{
231       mut_->lock();
232       requests_->push_back(sreq);
233       mut_->unlock();
234     }
235
236     //push request to receiver's win
237     recv_win->mut_->lock();
238     recv_win->requests_->push_back(rreq);
239     rreq->start();
240     recv_win->mut_->unlock();
241
242   }else{
243     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
244     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
245     if(request!=nullptr)
246       *request = MPI_REQUEST_NULL;
247   }
248
249   return MPI_SUCCESS;
250 }
251
252 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
253               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
254 {
255   //get sender pointer
256   MPI_Win send_win = connected_wins_[target_rank];
257
258   if(opened_==0){//check that post/start has been done
259     // no fence or start .. lock ok ?
260     int locked=0;
261     for (auto const& it : send_win->lockers_)
262       if (it == comm_->rank())
263         locked = 1;
264     if(locked != 1)
265       return MPI_ERR_WIN;
266   }
267
268   if(target_count*target_datatype->get_extent()>send_win->size_)
269     return MPI_ERR_ARG;
270
271   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
272   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
273
274   if(target_rank != comm_->rank()){
275     //prepare send_request
276     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
277                                               send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
278
279     //prepare receiver request
280     MPI_Request rreq = Request::rma_recv_init(
281         origin_addr, origin_count, origin_datatype, target_rank,
282         comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
283         SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
284
285     //start the send, with another process than us as sender.
286     sreq->start();
287     //push request to receiver's win
288     send_win->mut_->lock();
289     send_win->requests_->push_back(sreq);
290     send_win->mut_->unlock();
291
292     //start recv
293     rreq->start();
294
295     if(request!=nullptr){
296       *request=rreq;
297     }else{
298       mut_->lock();
299       requests_->push_back(rreq);
300       mut_->unlock();
301     }
302
303   }else{
304     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
305     if(request!=nullptr)
306       *request=MPI_REQUEST_NULL;
307   }
308
309   return MPI_SUCCESS;
310 }
311
312
313 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
314               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
315 {
316   XBT_DEBUG("Entering MPI_Win_Accumulate");
317   //get receiver pointer
318   MPI_Win recv_win = connected_wins_[target_rank];
319
320   if(opened_==0){//check that post/start has been done
321     // no fence or start .. lock ok ?
322     int locked=0;
323     for (auto const& it : recv_win->lockers_)
324       if (it == comm_->rank())
325         locked = 1;
326     if(locked != 1)
327       return MPI_ERR_WIN;
328   }
329   //FIXME: local version
330
331   if(target_count*target_datatype->get_extent()>recv_win->size_)
332     return MPI_ERR_ARG;
333
334   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
335   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
336     //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
337     //prepare send_request
338
339   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
340                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
341
342   // prepare receiver request
343   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
344                                             recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
345
346   count_++;
347
348   // start send
349   sreq->start();
350   // push request to receiver's win
351   recv_win->mut_->lock();
352   recv_win->requests_->push_back(rreq);
353   rreq->start();
354   recv_win->mut_->unlock();
355
356   if (request != nullptr) {
357     *request = sreq;
358   } else {
359     mut_->lock();
360     requests_->push_back(sreq);
361     mut_->unlock();
362   }
363
364   XBT_DEBUG("Leaving MPI_Win_Accumulate");
365   return MPI_SUCCESS;
366 }
367
368 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr,
369               int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count,
370               MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request){
371
372   //get sender pointer
373   MPI_Win send_win = connected_wins_[target_rank];
374
375   if(opened_==0){//check that post/start has been done
376     // no fence or start .. lock ok ?
377     int locked=0;
378     for (auto const& it : send_win->lockers_)
379       if (it == comm_->rank())
380         locked = 1;
381     if(locked != 1)
382       return MPI_ERR_WIN;
383   }
384
385   if(target_count*target_datatype->get_extent()>send_win->size_)
386     return MPI_ERR_ARG;
387
388   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
389   //need to be sure ops are correctly ordered, so finish request here ? slow.
390   MPI_Request req;
391   send_win->atomic_mut_->lock();
392   get(result_addr, result_count, result_datatype, target_rank,
393               target_disp, target_count, target_datatype, &req);
394   if (req != MPI_REQUEST_NULL)
395     Request::wait(&req, MPI_STATUS_IGNORE);
396   if(op!=MPI_NO_OP)
397     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
398               target_disp, target_count, target_datatype, op, &req);
399   if (req != MPI_REQUEST_NULL)
400     Request::wait(&req, MPI_STATUS_IGNORE);
401   send_win->atomic_mut_->unlock();
402   return MPI_SUCCESS;
403
404 }
405
406 int Win::compare_and_swap(void *origin_addr, void *compare_addr,
407         void *result_addr, MPI_Datatype datatype, int target_rank,
408         MPI_Aint target_disp){
409   //get sender pointer
410   MPI_Win send_win = connected_wins_[target_rank];
411
412   if(opened_==0){//check that post/start has been done
413     // no fence or start .. lock ok ?
414     int locked=0;
415     for (auto const& it : send_win->lockers_)
416       if (it == comm_->rank())
417         locked = 1;
418     if(locked != 1)
419       return MPI_ERR_WIN;
420   }
421
422   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
423   MPI_Request req = MPI_REQUEST_NULL;
424   send_win->atomic_mut_->lock();
425   get(result_addr, 1, datatype, target_rank,
426               target_disp, 1, datatype, &req);
427   if (req != MPI_REQUEST_NULL)
428     Request::wait(&req, MPI_STATUS_IGNORE);
429   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
430     put(origin_addr, 1, datatype, target_rank,
431               target_disp, 1, datatype);
432   }
433   send_win->atomic_mut_->unlock();
434   return MPI_SUCCESS;
435 }
436
437 int Win::start(MPI_Group group, int assert){
438   /* From MPI forum advices
439   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
440   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
441   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
442   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
443   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
444   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
445   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
446   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
447   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
448   must complete, without further dependencies.  */
449
450   //naive, blocking implementation.
451   int i             = 0;
452   int j             = 0;
453   int size          = group->size();
454   MPI_Request* reqs = xbt_new0(MPI_Request, size);
455
456   XBT_DEBUG("Entering MPI_Win_Start");
457   while (j != size) {
458     int src = comm_->group()->rank(group->actor(j));
459     if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
460       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
461       i++;
462     }
463     j++;
464   }
465   size = i;
466   Request::startall(size, reqs);
467   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
468   for (i = 0; i < size; i++) {
469     Request::unref(&reqs[i]);
470   }
471   xbt_free(reqs);
472   opened_++; //we're open for business !
473   group_=group;
474   group->ref();
475   XBT_DEBUG("Leaving MPI_Win_Start");
476   return MPI_SUCCESS;
477 }
478
479 int Win::post(MPI_Group group, int assert){
480   //let's make a synchronous send here
481   int i             = 0;
482   int j             = 0;
483   int size = group->size();
484   MPI_Request* reqs = xbt_new0(MPI_Request, size);
485
486   XBT_DEBUG("Entering MPI_Win_Post");
487   while(j!=size){
488     int dst = comm_->group()->rank(group->actor(j));
489     if (dst != rank_ && dst != MPI_UNDEFINED) {
490       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
491       i++;
492     }
493     j++;
494   }
495   size=i;
496
497   Request::startall(size, reqs);
498   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
499   for(i=0;i<size;i++){
500     Request::unref(&reqs[i]);
501   }
502   xbt_free(reqs);
503   opened_++; //we're open for business !
504   group_=group;
505   group->ref();
506   XBT_DEBUG("Leaving MPI_Win_Post");
507   return MPI_SUCCESS;
508 }
509
510 int Win::complete(){
511   if(opened_==0)
512     xbt_die("Complete called on already opened MPI_Win");
513
514   XBT_DEBUG("Entering MPI_Win_Complete");
515   int i             = 0;
516   int j             = 0;
517   int size          = group_->size();
518   MPI_Request* reqs = xbt_new0(MPI_Request, size);
519
520   while(j!=size){
521     int dst = comm_->group()->rank(group_->actor(j));
522     if (dst != rank_ && dst != MPI_UNDEFINED) {
523       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
524       i++;
525     }
526     j++;
527   }
528   size=i;
529   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
530   Request::startall(size, reqs);
531   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
532
533   for(i=0;i<size;i++){
534     Request::unref(&reqs[i]);
535   }
536   xbt_free(reqs);
537
538   int finished = finish_comms();
539   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
540
541   Group::unref(group_);
542   opened_--; //we're closed for business !
543   return MPI_SUCCESS;
544 }
545
546 int Win::wait(){
547   //naive, blocking implementation.
548   XBT_DEBUG("Entering MPI_Win_Wait");
549   int i             = 0;
550   int j             = 0;
551   int size          = group_->size();
552   MPI_Request* reqs = xbt_new0(MPI_Request, size);
553
554   while(j!=size){
555     int src = comm_->group()->rank(group_->actor(j));
556     if (src != rank_ && src != MPI_UNDEFINED) {
557       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
558       i++;
559     }
560     j++;
561   }
562   size=i;
563   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
564   Request::startall(size, reqs);
565   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
566   for(i=0;i<size;i++){
567     Request::unref(&reqs[i]);
568   }
569   xbt_free(reqs);
570   int finished = finish_comms();
571   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
572
573   Group::unref(group_);
574   opened_--; //we're opened for business !
575   return MPI_SUCCESS;
576 }
577
578 int Win::lock(int lock_type, int rank, int assert){
579   MPI_Win target_win = connected_wins_[rank];
580
581   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
582     target_win->lock_mut_->lock();
583     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
584     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
585       target_win->lock_mut_->unlock();
586    }
587   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
588     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
589
590   target_win->lockers_.push_back(comm_->rank());
591
592   int finished = finish_comms(rank);
593   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
594   finished = target_win->finish_comms(rank_);
595   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
596   return MPI_SUCCESS;
597 }
598
599 int Win::lock_all(int assert){
600   int i=0;
601   int retval = MPI_SUCCESS;
602   for (i=0; i<comm_->size();i++){
603       int ret = this->lock(MPI_LOCK_SHARED, i, assert);
604       if(ret != MPI_SUCCESS)
605         retval = ret;
606   }
607   return retval;
608 }
609
610 int Win::unlock(int rank){
611   MPI_Win target_win = connected_wins_[rank];
612   int target_mode = target_win->mode_;
613   target_win->mode_= 0;
614   target_win->lockers_.remove(comm_->rank());
615   if (target_mode==MPI_LOCK_EXCLUSIVE){
616     target_win->lock_mut_->unlock();
617   }
618
619   int finished = finish_comms(rank);
620   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
621   finished = target_win->finish_comms(rank_);
622   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
623   return MPI_SUCCESS;
624 }
625
626 int Win::unlock_all(){
627   int i=0;
628   int retval = MPI_SUCCESS;
629   for (i=0; i<comm_->size();i++){
630     int ret = this->unlock(i);
631     if (ret != MPI_SUCCESS)
632       retval = ret;
633   }
634   return retval;
635 }
636
637 int Win::flush(int rank){
638   MPI_Win target_win = connected_wins_[rank];
639   int finished       = finish_comms(rank_);
640   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
641   finished = target_win->finish_comms(rank);
642   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
643   return MPI_SUCCESS;
644 }
645
646 int Win::flush_local(int rank){
647   int finished = finish_comms(rank);
648   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
649   return MPI_SUCCESS;
650 }
651
652 int Win::flush_all(){
653   int finished = finish_comms();
654   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
655   for (int i = 0; i < comm_->size(); i++) {
656     finished = connected_wins_[i]->finish_comms(rank_);
657     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
658   }
659   return MPI_SUCCESS;
660 }
661
662 int Win::flush_local_all(){
663   int finished = finish_comms();
664   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
665   return MPI_SUCCESS;
666 }
667
668 Win* Win::f2c(int id){
669   return static_cast<Win*>(F2C::f2c(id));
670 }
671
672 int Win::finish_comms(){
673   mut_->lock();
674   //Finish own requests
675   std::vector<MPI_Request> *reqqs = requests_;
676   int size = static_cast<int>(reqqs->size());
677   if (size > 0) {
678     MPI_Request* treqs = &(*reqqs)[0];
679     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
680     reqqs->clear();
681   }
682   mut_->unlock();
683   return size;
684 }
685
686 int Win::finish_comms(int rank){
687   mut_->lock();
688   //Finish own requests
689   std::vector<MPI_Request> *reqqs = requests_;
690   int size = static_cast<int>(reqqs->size());
691   if (size > 0) {
692     size = 0;
693     std::vector<MPI_Request> myreqqs;
694     std::vector<MPI_Request>::iterator iter = reqqs->begin();
695     int proc_id                             = comm_->group()->actor(rank)->get_pid();
696     while (iter != reqqs->end()){
697       // Let's see if we're either the destination or the sender of this request
698       // because we only wait for requests that we are responsible for.
699       // Also use the process id here since the request itself returns from src()
700       // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
701       if (((*iter) != MPI_REQUEST_NULL) && (((*iter)->src() == proc_id) || ((*iter)->dst() == proc_id))) {
702         myreqqs.push_back(*iter);
703         iter = reqqs->erase(iter);
704         size++;
705       } else {
706         ++iter;
707       }
708     }
709     if(size >0){
710       MPI_Request* treqs = &myreqqs[0];
711       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
712       myreqqs.clear();
713     }
714   }
715   mut_->unlock();
716   return size;
717 }
718
719 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr)
720 {
721   MPI_Win target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
722   for (int i = 0; not target_win && i < comm_->size(); i++) {
723     if (connected_wins_[i]->size_ > 0)
724       target_win = connected_wins_[i];
725   }
726   if (target_win) {
727     *size                         = target_win->size_;
728     *disp_unit                    = target_win->disp_unit_;
729     *static_cast<void**>(baseptr) = target_win->base_;
730   } else {
731     *size                         = 0;
732     *static_cast<void**>(baseptr) = xbt_malloc(0);
733   }
734   return MPI_SUCCESS;
735 }
736 }
737 }