Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' into v3.20-expose-simgrid-jni
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2018. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7
8 #include "private.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
16
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
18
19 using simgrid::s4u::Actor;
20
21 namespace simgrid{
22 namespace smpi{
23 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
24 int Win::keyval_id_=0;
25
26 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
27   int comm_size = comm->size();
28   rank_         = comm->rank();
29   XBT_DEBUG("Creating window");
30   if(info!=MPI_INFO_NULL)
31     info->ref();
32   name_                  = nullptr;
33   opened_                = 0;
34   group_                 = MPI_GROUP_NULL;
35   requests_              = new std::vector<MPI_Request>();
36   mut_                   = xbt_mutex_init();
37   lock_mut_              = xbt_mutex_init();
38   atomic_mut_            = xbt_mutex_init();
39   connected_wins_        = new MPI_Win[comm_size];
40   connected_wins_[rank_] = this;
41   count_                 = 0;
42   if(rank_==0){
43     bar_ = new simgrid::s4u::Barrier(comm_size);
44   }
45   mode_=0;
46
47   comm->add_rma_win(this);
48   comm->ref();
49
50   Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
51                          MPI_BYTE, comm);
52
53   Colls::bcast(&(bar_), sizeof(simgrid::s4u::Barrier*), MPI_BYTE, 0, comm);
54
55   Colls::barrier(comm);
56 }
57
58 Win::~Win(){
59   //As per the standard, perform a barrier to ensure every async comm is finished
60   bar_->wait();
61
62   int finished = finish_comms();
63   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
64
65   delete requests_;
66   delete[] connected_wins_;
67   if (name_ != nullptr){
68     xbt_free(name_);
69   }
70   if(info_!=MPI_INFO_NULL){
71     MPI_Info_free(&info_);
72   }
73
74   comm_->remove_rma_win(this);
75
76   Colls::barrier(comm_);
77   Comm::unref(comm_);
78   
79   if (rank_ == 0)
80     delete bar_;
81   xbt_mutex_destroy(mut_);
82   xbt_mutex_destroy(lock_mut_);
83   xbt_mutex_destroy(atomic_mut_);
84
85   if(allocated_ !=0)
86     xbt_free(base_);
87
88   cleanup_attr<Win>();
89 }
90
91 int Win::attach (void *base, MPI_Aint size){
92   if (not(base_ == MPI_BOTTOM || base_ == 0))
93     return MPI_ERR_ARG;
94   base_=0;//actually the address will be given in the RMA calls, as being the disp.
95   size_+=size;
96   return MPI_SUCCESS;
97 }
98
99 int Win::detach (void *base){
100   base_=MPI_BOTTOM;
101   size_=-1;
102   return MPI_SUCCESS;
103 }
104
105 void Win::get_name(char* name, int* length){
106   if(name_==nullptr){
107     *length=0;
108     name=nullptr;
109     return;
110   }
111   *length = strlen(name_);
112   strncpy(name, name_, *length+1);
113 }
114
115 void Win::get_group(MPI_Group* group){
116   if(comm_ != MPI_COMM_NULL){
117     *group = comm_->group();
118   } else {
119     *group = MPI_GROUP_NULL;
120   }
121 }
122
123 MPI_Info Win::info(){
124   if(info_== MPI_INFO_NULL)
125     info_ = new Info();
126   info_->ref();
127   return info_;
128 }
129
130 int Win::rank(){
131   return rank_;
132 }
133
134 MPI_Aint Win::size(){
135   return size_;
136 }
137
138 void* Win::base(){
139   return base_;
140 }
141
142 int Win::disp_unit(){
143   return disp_unit_;
144 }
145
146 int Win::dynamic(){
147   return dynamic_;
148 }
149
150 void Win::set_info(MPI_Info info){
151   if(info_!= MPI_INFO_NULL)
152     info->ref();
153   info_=info;
154 }
155
156 void Win::set_name(char* name){
157   name_ = xbt_strdup(name);
158 }
159
160 int Win::fence(int assert)
161 {
162   XBT_DEBUG("Entering fence");
163   if (opened_ == 0)
164     opened_=1;
165   if (assert != MPI_MODE_NOPRECEDE) {
166     // This is not the first fence => finalize what came before
167     bar_->wait();
168     xbt_mutex_acquire(mut_);
169     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
170     // Without this, the vector could get redimensionned when another process pushes.
171     // This would result in the array used by Request::waitall() to be invalidated.
172     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
173     std::vector<MPI_Request> *reqs = requests_;
174     int size = static_cast<int>(reqs->size());
175     // start all requests that have been prepared by another process
176     if (size > 0) {
177       MPI_Request* treqs = &(*reqs)[0];
178       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
179     }
180     count_=0;
181     xbt_mutex_release(mut_);
182   }
183
184   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
185     opened_=0;
186   assert_ = assert;
187
188   bar_->wait();
189   XBT_DEBUG("Leaving fence");
190
191   return MPI_SUCCESS;
192 }
193
194 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
195               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
196 {
197   //get receiver pointer
198   MPI_Win recv_win = connected_wins_[target_rank];
199
200   if(opened_==0){//check that post/start has been done
201     // no fence or start .. lock ok ?
202     int locked=0;
203     for (auto const& it : recv_win->lockers_)
204       if (it == comm_->rank())
205         locked = 1;
206     if(locked != 1)
207       return MPI_ERR_WIN;
208   }
209
210   if(target_count*target_datatype->get_extent()>recv_win->size_)
211     return MPI_ERR_ARG;
212
213   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
214
215   if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
216     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
217     // prepare send_request
218     MPI_Request sreq =
219         // TODO cheinrich Check for rank / pid conversion
220         Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
221                                comm_, MPI_OP_NULL);
222
223     //prepare receiver request
224     // TODO cheinrich Check for rank / pid conversion
225     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
226                                               target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
227
228     //start send
229     sreq->start();
230
231     if(request!=nullptr){
232       *request=sreq;
233     }else{
234       xbt_mutex_acquire(mut_);
235       requests_->push_back(sreq);
236       xbt_mutex_release(mut_);
237     }
238
239     //push request to receiver's win
240     xbt_mutex_acquire(recv_win->mut_);
241     recv_win->requests_->push_back(rreq);
242     rreq->start();
243     xbt_mutex_release(recv_win->mut_);
244
245   }else{
246     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
247     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
248     if(request!=nullptr)
249       *request = MPI_REQUEST_NULL;
250   }
251
252   return MPI_SUCCESS;
253 }
254
255 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
256               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
257 {
258   //get sender pointer
259   MPI_Win send_win = connected_wins_[target_rank];
260
261   if(opened_==0){//check that post/start has been done
262     // no fence or start .. lock ok ?
263     int locked=0;
264     for (auto const& it : send_win->lockers_)
265       if (it == comm_->rank())
266         locked = 1;
267     if(locked != 1)
268       return MPI_ERR_WIN;
269   }
270
271   if(target_count*target_datatype->get_extent()>send_win->size_)
272     return MPI_ERR_ARG;
273
274   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
275   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
276
277   if(target_rank != comm_->rank()){
278     //prepare send_request
279     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
280                                               send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
281
282     //prepare receiver request
283     MPI_Request rreq = Request::rma_recv_init(
284         origin_addr, origin_count, origin_datatype, target_rank,
285         comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
286         SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
287
288     //start the send, with another process than us as sender.
289     sreq->start();
290     //push request to receiver's win
291     xbt_mutex_acquire(send_win->mut_);
292     send_win->requests_->push_back(sreq);
293     xbt_mutex_release(send_win->mut_);
294
295     //start recv
296     rreq->start();
297
298     if(request!=nullptr){
299       *request=rreq;
300     }else{
301       xbt_mutex_acquire(mut_);
302       requests_->push_back(rreq);
303       xbt_mutex_release(mut_);
304     }
305
306   }else{
307     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
308     if(request!=nullptr)
309       *request=MPI_REQUEST_NULL;
310   }
311
312   return MPI_SUCCESS;
313 }
314
315
316 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
317               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
318 {
319   XBT_DEBUG("Entering MPI_Win_Accumulate");
320   //get receiver pointer
321   MPI_Win recv_win = connected_wins_[target_rank];
322
323   if(opened_==0){//check that post/start has been done
324     // no fence or start .. lock ok ?
325     int locked=0;
326     for (auto const& it : recv_win->lockers_)
327       if (it == comm_->rank())
328         locked = 1;
329     if(locked != 1)
330       return MPI_ERR_WIN;
331   }
332   //FIXME: local version
333
334   if(target_count*target_datatype->get_extent()>recv_win->size_)
335     return MPI_ERR_ARG;
336
337   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
338   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
339     //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
340     //prepare send_request
341
342   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
343                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
344
345   // prepare receiver request
346   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
347                                             recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
348
349   count_++;
350
351   // start send
352   sreq->start();
353   // push request to receiver's win
354   xbt_mutex_acquire(recv_win->mut_);
355   recv_win->requests_->push_back(rreq);
356   rreq->start();
357   xbt_mutex_release(recv_win->mut_);
358
359   if (request != nullptr) {
360     *request = sreq;
361   } else {
362     xbt_mutex_acquire(mut_);
363     requests_->push_back(sreq);
364     xbt_mutex_release(mut_);
365   }
366
367   XBT_DEBUG("Leaving MPI_Win_Accumulate");
368   return MPI_SUCCESS;
369 }
370
371 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr,
372               int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count,
373               MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request){
374
375   //get sender pointer
376   MPI_Win send_win = connected_wins_[target_rank];
377
378   if(opened_==0){//check that post/start has been done
379     // no fence or start .. lock ok ?
380     int locked=0;
381     for (auto const& it : send_win->lockers_)
382       if (it == comm_->rank())
383         locked = 1;
384     if(locked != 1)
385       return MPI_ERR_WIN;
386   }
387
388   if(target_count*target_datatype->get_extent()>send_win->size_)
389     return MPI_ERR_ARG;
390
391   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
392   //need to be sure ops are correctly ordered, so finish request here ? slow.
393   MPI_Request req;
394   xbt_mutex_acquire(send_win->atomic_mut_);
395   get(result_addr, result_count, result_datatype, target_rank,
396               target_disp, target_count, target_datatype, &req);
397   if (req != MPI_REQUEST_NULL)
398     Request::wait(&req, MPI_STATUS_IGNORE);
399   if(op!=MPI_NO_OP)
400     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
401               target_disp, target_count, target_datatype, op, &req);
402   if (req != MPI_REQUEST_NULL)
403     Request::wait(&req, MPI_STATUS_IGNORE);
404   xbt_mutex_release(send_win->atomic_mut_);
405   return MPI_SUCCESS;
406
407 }
408
409 int Win::compare_and_swap(void *origin_addr, void *compare_addr,
410         void *result_addr, MPI_Datatype datatype, int target_rank,
411         MPI_Aint target_disp){
412   //get sender pointer
413   MPI_Win send_win = connected_wins_[target_rank];
414
415   if(opened_==0){//check that post/start has been done
416     // no fence or start .. lock ok ?
417     int locked=0;
418     for (auto const& it : send_win->lockers_)
419       if (it == comm_->rank())
420         locked = 1;
421     if(locked != 1)
422       return MPI_ERR_WIN;
423   }
424
425   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
426   MPI_Request req = MPI_REQUEST_NULL;
427   xbt_mutex_acquire(send_win->atomic_mut_);
428   get(result_addr, 1, datatype, target_rank,
429               target_disp, 1, datatype, &req);
430   if (req != MPI_REQUEST_NULL)
431     Request::wait(&req, MPI_STATUS_IGNORE);
432   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
433     put(origin_addr, 1, datatype, target_rank,
434               target_disp, 1, datatype);
435   }
436   xbt_mutex_release(send_win->atomic_mut_);
437   return MPI_SUCCESS;
438 }
439
440 int Win::start(MPI_Group group, int assert){
441   /* From MPI forum advices
442   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
443   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
444   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
445   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
446   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
447   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
448   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
449   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
450   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
451   must complete, without further dependencies.  */
452
453   //naive, blocking implementation.
454   int i             = 0;
455   int j             = 0;
456   int size          = group->size();
457   MPI_Request* reqs = xbt_new0(MPI_Request, size);
458
459   XBT_DEBUG("Entering MPI_Win_Start");
460   while (j != size) {
461     int src = comm_->group()->rank(group->actor(j));
462     if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
463       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
464       i++;
465     }
466     j++;
467   }
468   size = i;
469   Request::startall(size, reqs);
470   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
471   for (i = 0; i < size; i++) {
472     Request::unref(&reqs[i]);
473   }
474   xbt_free(reqs);
475   opened_++; //we're open for business !
476   group_=group;
477   group->ref();
478   XBT_DEBUG("Leaving MPI_Win_Start");
479   return MPI_SUCCESS;
480 }
481
482 int Win::post(MPI_Group group, int assert){
483   //let's make a synchronous send here
484   int i             = 0;
485   int j             = 0;
486   int size = group->size();
487   MPI_Request* reqs = xbt_new0(MPI_Request, size);
488
489   XBT_DEBUG("Entering MPI_Win_Post");
490   while(j!=size){
491     int dst = comm_->group()->rank(group->actor(j));
492     if (dst != rank_ && dst != MPI_UNDEFINED) {
493       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
494       i++;
495     }
496     j++;
497   }
498   size=i;
499
500   Request::startall(size, reqs);
501   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
502   for(i=0;i<size;i++){
503     Request::unref(&reqs[i]);
504   }
505   xbt_free(reqs);
506   opened_++; //we're open for business !
507   group_=group;
508   group->ref();
509   XBT_DEBUG("Leaving MPI_Win_Post");
510   return MPI_SUCCESS;
511 }
512
513 int Win::complete(){
514   if(opened_==0)
515     xbt_die("Complete called on already opened MPI_Win");
516
517   XBT_DEBUG("Entering MPI_Win_Complete");
518   int i             = 0;
519   int j             = 0;
520   int size          = group_->size();
521   MPI_Request* reqs = xbt_new0(MPI_Request, size);
522
523   while(j!=size){
524     int dst = comm_->group()->rank(group_->actor(j));
525     if (dst != rank_ && dst != MPI_UNDEFINED) {
526       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
527       i++;
528     }
529     j++;
530   }
531   size=i;
532   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
533   Request::startall(size, reqs);
534   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
535
536   for(i=0;i<size;i++){
537     Request::unref(&reqs[i]);
538   }
539   xbt_free(reqs);
540
541   int finished = finish_comms();
542   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
543
544   Group::unref(group_);
545   opened_--; //we're closed for business !
546   return MPI_SUCCESS;
547 }
548
549 int Win::wait(){
550   //naive, blocking implementation.
551   XBT_DEBUG("Entering MPI_Win_Wait");
552   int i             = 0;
553   int j             = 0;
554   int size          = group_->size();
555   MPI_Request* reqs = xbt_new0(MPI_Request, size);
556
557   while(j!=size){
558     int src = comm_->group()->rank(group_->actor(j));
559     if (src != rank_ && src != MPI_UNDEFINED) {
560       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
561       i++;
562     }
563     j++;
564   }
565   size=i;
566   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
567   Request::startall(size, reqs);
568   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
569   for(i=0;i<size;i++){
570     Request::unref(&reqs[i]);
571   }
572   xbt_free(reqs);
573   int finished = finish_comms();
574   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
575
576   Group::unref(group_);
577   opened_--; //we're opened for business !
578   return MPI_SUCCESS;
579 }
580
581 int Win::lock(int lock_type, int rank, int assert){
582   MPI_Win target_win = connected_wins_[rank];
583
584   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
585     xbt_mutex_acquire(target_win->lock_mut_);
586     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
587     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
588       xbt_mutex_release(target_win->lock_mut_);
589    }
590   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
591     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
592
593   target_win->lockers_.push_back(comm_->rank());
594
595   int finished = finish_comms(rank);
596   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
597   finished = target_win->finish_comms(rank_);
598   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
599   return MPI_SUCCESS;
600 }
601
602 int Win::lock_all(int assert){
603   int i=0;
604   int retval = MPI_SUCCESS;
605   for (i=0; i<comm_->size();i++){
606       int ret = this->lock(MPI_LOCK_SHARED, i, assert);
607       if(ret != MPI_SUCCESS)
608         retval = ret;
609   }
610   return retval;
611 }
612
613 int Win::unlock(int rank){
614   MPI_Win target_win = connected_wins_[rank];
615   int target_mode = target_win->mode_;
616   target_win->mode_= 0;
617   target_win->lockers_.remove(comm_->rank());
618   if (target_mode==MPI_LOCK_EXCLUSIVE){
619     xbt_mutex_release(target_win->lock_mut_);
620   }
621
622   int finished = finish_comms(rank);
623   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
624   finished = target_win->finish_comms(rank_);
625   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
626   return MPI_SUCCESS;
627 }
628
629 int Win::unlock_all(){
630   int i=0;
631   int retval = MPI_SUCCESS;
632   for (i=0; i<comm_->size();i++){
633     int ret = this->unlock(i);
634     if (ret != MPI_SUCCESS)
635       retval = ret;
636   }
637   return retval;
638 }
639
640 int Win::flush(int rank){
641   MPI_Win target_win = connected_wins_[rank];
642   int finished       = finish_comms(rank_);
643   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
644   finished = target_win->finish_comms(rank);
645   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
646   return MPI_SUCCESS;
647 }
648
649 int Win::flush_local(int rank){
650   int finished = finish_comms(rank);
651   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
652   return MPI_SUCCESS;
653 }
654
655 int Win::flush_all(){
656   int finished = finish_comms();
657   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
658   for (int i = 0; i < comm_->size(); i++) {
659     finished = connected_wins_[i]->finish_comms(rank_);
660     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
661   }
662   return MPI_SUCCESS;
663 }
664
665 int Win::flush_local_all(){
666   int finished = finish_comms();
667   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
668   return MPI_SUCCESS;
669 }
670
671 Win* Win::f2c(int id){
672   return static_cast<Win*>(F2C::f2c(id));
673 }
674
675 int Win::finish_comms(){
676   xbt_mutex_acquire(mut_);
677   //Finish own requests
678   std::vector<MPI_Request> *reqqs = requests_;
679   int size = static_cast<int>(reqqs->size());
680   if (size > 0) {
681     MPI_Request* treqs = &(*reqqs)[0];
682     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
683     reqqs->clear();
684   }
685   xbt_mutex_release(mut_);
686   return size;
687 }
688
689 int Win::finish_comms(int rank){
690   xbt_mutex_acquire(mut_);
691   //Finish own requests
692   std::vector<MPI_Request> *reqqs = requests_;
693   int size = static_cast<int>(reqqs->size());
694   if (size > 0) {
695     size = 0;
696     std::vector<MPI_Request> myreqqs;
697     std::vector<MPI_Request>::iterator iter = reqqs->begin();
698     int proc_id                             = comm_->group()->actor(rank)->get_pid();
699     while (iter != reqqs->end()){
700       // Let's see if we're either the destination or the sender of this request
701       // because we only wait for requests that we are responsible for.
702       // Also use the process id here since the request itself returns from src()
703       // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
704       if (((*iter) != MPI_REQUEST_NULL) && (((*iter)->src() == proc_id) || ((*iter)->dst() == proc_id))) {
705         myreqqs.push_back(*iter);
706         iter = reqqs->erase(iter);
707         size++;
708       } else {
709         ++iter;
710       }
711     }
712     if(size >0){
713       MPI_Request* treqs = &myreqqs[0];
714       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
715       myreqqs.clear();
716     }
717   }
718   xbt_mutex_release(mut_);
719   return size;
720 }
721
722 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr)
723 {
724   MPI_Win target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
725   for (int i = 0; not target_win && i < comm_->size(); i++) {
726     if (connected_wins_[i]->size_ > 0)
727       target_win = connected_wins_[i];
728   }
729   if (target_win) {
730     *size                         = target_win->size_;
731     *disp_unit                    = target_win->disp_unit_;
732     *static_cast<void**>(baseptr) = target_win->base_;
733   } else {
734     *size                         = 0;
735     *static_cast<void**>(baseptr) = xbt_malloc(0);
736   }
737   return MPI_SUCCESS;
738 }
739 }
740 }