Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Big move of all SMPI files in subfolders because it was a mess.
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.h"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_info.hpp"
11 #include "smpi_keyvals.hpp"
12 #include "smpi_process.hpp"
13 #include "smpi_request.hpp"
14 #include "smpi_win.hpp"
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
17
18 namespace simgrid{
19 namespace smpi{
20 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
21 int Win::keyval_id_=0;
22
23 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
24   int comm_size = comm->size();
25   rank_      = comm->rank();
26   XBT_DEBUG("Creating window");
27   if(info!=MPI_INFO_NULL)
28     info->ref();
29   name_ = nullptr;
30   opened_ = 0;
31   group_ = MPI_GROUP_NULL;
32   requests_ = new std::vector<MPI_Request>();
33   mut_=xbt_mutex_init();
34   lock_mut_=xbt_mutex_init();
35   atomic_mut_=xbt_mutex_init();
36   connected_wins_ = new MPI_Win[comm_size];
37   connected_wins_[rank_] = this;
38   count_ = 0;
39   if(rank_==0){
40     bar_ = MSG_barrier_init(comm_size);
41   }
42   mode_=0;
43
44   comm->add_rma_win(this);
45
46   Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
47                          MPI_BYTE, comm);
48
49   Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
50
51   Colls::barrier(comm);
52 }
53
54 Win::~Win(){
55   //As per the standard, perform a barrier to ensure every async comm is finished
56   MSG_barrier_wait(bar_);
57
58   int finished = finish_comms();
59   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
60
61   delete requests_;
62   delete[] connected_wins_;
63   if (name_ != nullptr){
64     xbt_free(name_);
65   }
66   if(info_!=MPI_INFO_NULL){
67     MPI_Info_free(&info_);
68   }
69
70   comm_->remove_rma_win(this);
71
72   Colls::barrier(comm_);
73   int rank=comm_->rank();
74   if(rank == 0)
75     MSG_barrier_destroy(bar_);
76   xbt_mutex_destroy(mut_);
77   xbt_mutex_destroy(lock_mut_);
78   xbt_mutex_destroy(atomic_mut_);
79
80   if(allocated_ !=0)
81     xbt_free(base_);
82
83   cleanup_attr<Win>();
84 }
85
86 int Win::attach (void *base, MPI_Aint size){
87   if (not(base_ == MPI_BOTTOM || base_ == 0))
88     return MPI_ERR_ARG;
89   base_=0;//actually the address will be given in the RMA calls, as being the disp.
90   size_+=size;
91   return MPI_SUCCESS;
92 }
93
94 int Win::detach (void *base){
95   base_=MPI_BOTTOM;
96   size_=-1;
97   return MPI_SUCCESS;
98 }
99
100 void Win::get_name(char* name, int* length){
101   if(name_==nullptr){
102     *length=0;
103     name=nullptr;
104     return;
105   }
106   *length = strlen(name_);
107   strncpy(name, name_, *length+1);
108 }
109
110 void Win::get_group(MPI_Group* group){
111   if(comm_ != MPI_COMM_NULL){
112     *group = comm_->group();
113   } else {
114     *group = MPI_GROUP_NULL;
115   }
116 }
117
118 MPI_Info Win::info(){
119   if(info_== MPI_INFO_NULL)
120     info_ = new Info();
121   info_->ref();
122   return info_;
123 }
124
125 int Win::rank(){
126   return rank_;
127 }
128
129 MPI_Aint Win::size(){
130   return size_;
131 }
132
133 void* Win::base(){
134   return base_;
135 }
136
137 int Win::disp_unit(){
138   return disp_unit_;
139 }
140
141 int Win::dynamic(){
142   return dynamic_;
143 }
144
145 void Win::set_info(MPI_Info info){
146   if(info_!= MPI_INFO_NULL)
147     info->ref();
148   info_=info;
149 }
150
151 void Win::set_name(char* name){
152   name_ = xbt_strdup(name);
153 }
154
155 int Win::fence(int assert)
156 {
157   XBT_DEBUG("Entering fence");
158   if (opened_ == 0)
159     opened_=1;
160   if (assert != MPI_MODE_NOPRECEDE) {
161     // This is not the first fence => finalize what came before
162     MSG_barrier_wait(bar_);
163     xbt_mutex_acquire(mut_);
164     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
165     // Without this, the vector could get redimensionned when another process pushes.
166     // This would result in the array used by Request::waitall() to be invalidated.
167     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
168     std::vector<MPI_Request> *reqs = requests_;
169     int size = static_cast<int>(reqs->size());
170     // start all requests that have been prepared by another process
171     if (size > 0) {
172       MPI_Request* treqs = &(*reqs)[0];
173       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
174     }
175     count_=0;
176     xbt_mutex_release(mut_);
177   }
178
179   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
180     opened_=0;
181   assert_ = assert;
182
183   MSG_barrier_wait(bar_);
184   XBT_DEBUG("Leaving fence");
185
186   return MPI_SUCCESS;
187 }
188
189 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
190               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
191 {
192   //get receiver pointer
193   MPI_Win recv_win = connected_wins_[target_rank];
194
195   if(opened_==0){//check that post/start has been done
196     // no fence or start .. lock ok ?
197     int locked=0;
198     for(auto it : recv_win->lockers_)
199       if (it == comm_->rank())
200         locked = 1;
201     if(locked != 1)
202       return MPI_ERR_WIN;
203   }
204
205   if(target_count*target_datatype->get_extent()>recv_win->size_)
206     return MPI_ERR_ARG;
207
208   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
209   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
210
211   if(target_rank != comm_->rank()){
212     //prepare send_request
213     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
214         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
215
216     //prepare receiver request
217     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
218         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
219
220     //start send
221     sreq->start();
222
223     if(request!=nullptr){
224       *request=sreq;
225     }else{
226       xbt_mutex_acquire(mut_);
227       requests_->push_back(sreq);
228       xbt_mutex_release(mut_);
229     }
230
231     //push request to receiver's win
232     xbt_mutex_acquire(recv_win->mut_);
233     recv_win->requests_->push_back(rreq);
234     rreq->start();
235     xbt_mutex_release(recv_win->mut_);
236
237   }else{
238     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
239     if(request!=nullptr)
240       *request = MPI_REQUEST_NULL;
241   }
242
243   return MPI_SUCCESS;
244 }
245
246 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
247               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
248 {
249   //get sender pointer
250   MPI_Win send_win = connected_wins_[target_rank];
251
252   if(opened_==0){//check that post/start has been done
253     // no fence or start .. lock ok ?
254     int locked=0;
255     for(auto it : send_win->lockers_)
256       if (it == comm_->rank())
257         locked = 1;
258     if(locked != 1)
259       return MPI_ERR_WIN;
260   }
261
262   if(target_count*target_datatype->get_extent()>send_win->size_)
263     return MPI_ERR_ARG;
264
265   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
266   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
267
268   if(target_rank != comm_->rank()){
269     //prepare send_request
270     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
271         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
272         MPI_OP_NULL);
273
274     //prepare receiver request
275     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
276         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
277         MPI_OP_NULL);
278
279     //start the send, with another process than us as sender.
280     sreq->start();
281     //push request to receiver's win
282     xbt_mutex_acquire(send_win->mut_);
283     send_win->requests_->push_back(sreq);
284     xbt_mutex_release(send_win->mut_);
285
286     //start recv
287     rreq->start();
288
289     if(request!=nullptr){
290       *request=rreq;
291     }else{
292       xbt_mutex_acquire(mut_);
293       requests_->push_back(rreq);
294       xbt_mutex_release(mut_);
295     }
296
297   }else{
298     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
299     if(request!=nullptr)
300       *request=MPI_REQUEST_NULL;
301   }
302
303   return MPI_SUCCESS;
304 }
305
306
307 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
308               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
309 {
310
311   //get receiver pointer
312   MPI_Win recv_win = connected_wins_[target_rank];
313
314   if(opened_==0){//check that post/start has been done
315     // no fence or start .. lock ok ?
316     int locked=0;
317     for(auto it : recv_win->lockers_)
318       if (it == comm_->rank())
319         locked = 1;
320     if(locked != 1)
321       return MPI_ERR_WIN;
322   }
323   //FIXME: local version
324
325   if(target_count*target_datatype->get_extent()>recv_win->size_)
326     return MPI_ERR_ARG;
327
328   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
329   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
330     //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
331     //prepare send_request
332
333     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
334         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, comm_, op);
335
336     //prepare receiver request
337     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
338         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, recv_win->comm_, op);
339
340     count_++;
341
342     //start send
343     sreq->start();
344     //push request to receiver's win
345     xbt_mutex_acquire(recv_win->mut_);
346     recv_win->requests_->push_back(rreq);
347     rreq->start();
348     xbt_mutex_release(recv_win->mut_);
349
350     if(request!=nullptr){
351       *request=sreq;
352     }else{
353       xbt_mutex_acquire(mut_);
354       requests_->push_back(sreq);
355       xbt_mutex_release(mut_);
356     }
357
358   return MPI_SUCCESS;
359 }
360
361 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr,
362               int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count,
363               MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request){
364
365   //get sender pointer
366   MPI_Win send_win = connected_wins_[target_rank];
367
368   if(opened_==0){//check that post/start has been done
369     // no fence or start .. lock ok ?
370     int locked=0;
371     for(auto it : send_win->lockers_)
372       if (it == comm_->rank())
373         locked = 1;
374     if(locked != 1)
375       return MPI_ERR_WIN;
376   }
377
378   if(target_count*target_datatype->get_extent()>send_win->size_)
379     return MPI_ERR_ARG;
380
381   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
382   //need to be sure ops are correctly ordered, so finish request here ? slow.
383   MPI_Request req;
384   xbt_mutex_acquire(send_win->atomic_mut_);
385   get(result_addr, result_count, result_datatype, target_rank,
386               target_disp, target_count, target_datatype, &req);
387   if (req != MPI_REQUEST_NULL)
388     Request::wait(&req, MPI_STATUS_IGNORE);
389   if(op!=MPI_NO_OP)
390     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
391               target_disp, target_count, target_datatype, op, &req);
392   if (req != MPI_REQUEST_NULL)
393     Request::wait(&req, MPI_STATUS_IGNORE);
394   xbt_mutex_release(send_win->atomic_mut_);
395   return MPI_SUCCESS;
396
397 }
398
399 int Win::compare_and_swap(void *origin_addr, void *compare_addr,
400         void *result_addr, MPI_Datatype datatype, int target_rank,
401         MPI_Aint target_disp){
402   //get sender pointer
403   MPI_Win send_win = connected_wins_[target_rank];
404
405   if(opened_==0){//check that post/start has been done
406     // no fence or start .. lock ok ?
407     int locked=0;
408     for(auto it : send_win->lockers_)
409       if (it == comm_->rank())
410         locked = 1;
411     if(locked != 1)
412       return MPI_ERR_WIN;
413   }
414
415   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
416   MPI_Request req;
417   xbt_mutex_acquire(send_win->atomic_mut_);
418   get(result_addr, 1, datatype, target_rank,
419               target_disp, 1, datatype, &req);
420   if (req != MPI_REQUEST_NULL)
421     Request::wait(&req, MPI_STATUS_IGNORE);
422   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
423     put(origin_addr, 1, datatype, target_rank,
424               target_disp, 1, datatype);
425   }
426   xbt_mutex_release(send_win->atomic_mut_);
427   return MPI_SUCCESS;
428 }
429
430 int Win::start(MPI_Group group, int assert){
431     /* From MPI forum advices
432     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
433     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
434     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
435     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
436     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
437     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
438     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
439     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
440     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
441     must complete, without further dependencies.  */
442
443   //naive, blocking implementation.
444     int i             = 0;
445     int j             = 0;
446     int size          = group->size();
447     MPI_Request* reqs = xbt_new0(MPI_Request, size);
448
449     while (j != size) {
450       int src = group->index(j);
451       if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
452         reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
453         i++;
454       }
455       j++;
456   }
457   size=i;
458   Request::startall(size, reqs);
459   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
460   for(i=0;i<size;i++){
461     Request::unref(&reqs[i]);
462   }
463   xbt_free(reqs);
464   opened_++; //we're open for business !
465   group_=group;
466   group->ref();
467   return MPI_SUCCESS;
468 }
469
470 int Win::post(MPI_Group group, int assert){
471   //let's make a synchronous send here
472   int i             = 0;
473   int j             = 0;
474   int size = group->size();
475   MPI_Request* reqs = xbt_new0(MPI_Request, size);
476
477   while(j!=size){
478     int dst=group->index(j);
479     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
480       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
481       i++;
482     }
483     j++;
484   }
485   size=i;
486
487   Request::startall(size, reqs);
488   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
489   for(i=0;i<size;i++){
490     Request::unref(&reqs[i]);
491   }
492   xbt_free(reqs);
493   opened_++; //we're open for business !
494   group_=group;
495   group->ref();
496   return MPI_SUCCESS;
497 }
498
499 int Win::complete(){
500   if(opened_==0)
501     xbt_die("Complete called on already opened MPI_Win");
502
503   XBT_DEBUG("Entering MPI_Win_Complete");
504   int i             = 0;
505   int j             = 0;
506   int size = group_->size();
507   MPI_Request* reqs = xbt_new0(MPI_Request, size);
508
509   while(j!=size){
510     int dst=group_->index(j);
511     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
512       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
513       i++;
514     }
515     j++;
516   }
517   size=i;
518   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
519   Request::startall(size, reqs);
520   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
521
522   for(i=0;i<size;i++){
523     Request::unref(&reqs[i]);
524   }
525   xbt_free(reqs);
526
527   int finished = finish_comms();
528   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
529
530   Group::unref(group_);
531   opened_--; //we're closed for business !
532   return MPI_SUCCESS;
533 }
534
535 int Win::wait(){
536   //naive, blocking implementation.
537   XBT_DEBUG("Entering MPI_Win_Wait");
538   int i             = 0;
539   int j             = 0;
540   int size          = group_->size();
541   MPI_Request* reqs = xbt_new0(MPI_Request, size);
542
543   while(j!=size){
544     int src=group_->index(j);
545     if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
546       reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
547       i++;
548     }
549     j++;
550   }
551   size=i;
552   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
553   Request::startall(size, reqs);
554   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
555   for(i=0;i<size;i++){
556     Request::unref(&reqs[i]);
557   }
558   xbt_free(reqs);
559   int finished = finish_comms();
560   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
561
562   Group::unref(group_);
563   opened_--; //we're opened for business !
564   return MPI_SUCCESS;
565 }
566
567 int Win::lock(int lock_type, int rank, int assert){
568   MPI_Win target_win = connected_wins_[rank];
569
570   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
571     xbt_mutex_acquire(target_win->lock_mut_);
572     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
573     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
574       xbt_mutex_release(target_win->lock_mut_);
575    }
576   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
577     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
578
579   target_win->lockers_.push_back(comm_->rank());
580
581   int finished = finish_comms(rank);
582   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
583   finished = target_win->finish_comms(rank_);
584   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
585   return MPI_SUCCESS;
586 }
587
588 int Win::lock_all(int assert){
589   int i=0;
590   int retval = MPI_SUCCESS;
591   for (i=0; i<comm_->size();i++){
592       int ret = this->lock(MPI_LOCK_SHARED, i, assert);
593       if(ret != MPI_SUCCESS)
594         retval = ret;
595   }
596   return retval;
597 }
598
599 int Win::unlock(int rank){
600   MPI_Win target_win = connected_wins_[rank];
601   int target_mode = target_win->mode_;
602   target_win->mode_= 0;
603   target_win->lockers_.remove(comm_->rank());
604   if (target_mode==MPI_LOCK_EXCLUSIVE){
605     xbt_mutex_release(target_win->lock_mut_);
606   }
607
608   int finished = finish_comms(rank);
609   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
610   finished = target_win->finish_comms(rank_);
611   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
612   return MPI_SUCCESS;
613 }
614
615 int Win::unlock_all(){
616   int i=0;
617   int retval = MPI_SUCCESS;
618   for (i=0; i<comm_->size();i++){
619       int ret = this->unlock(i);
620       if(ret != MPI_SUCCESS)
621         retval = ret;
622   }
623   return retval;
624 }
625
626 int Win::flush(int rank){
627   MPI_Win target_win = connected_wins_[rank];
628   int finished = finish_comms(rank);
629   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
630   finished = target_win->finish_comms(rank_);
631   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
632   return MPI_SUCCESS;
633 }
634
635 int Win::flush_local(int rank){
636   int finished = finish_comms(rank);
637   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
638   return MPI_SUCCESS;
639 }
640
641 int Win::flush_all(){
642   int i=0;
643   int finished = 0;
644   finished = finish_comms();
645   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
646   for (i=0; i<comm_->size();i++){
647     finished = connected_wins_[i]->finish_comms(rank_);
648     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
649   }
650   return MPI_SUCCESS;
651 }
652
653 int Win::flush_local_all(){
654   int finished = finish_comms();
655   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
656   return MPI_SUCCESS;
657 }
658
659 Win* Win::f2c(int id){
660   return static_cast<Win*>(F2C::f2c(id));
661 }
662
663
664 int Win::finish_comms(){
665   xbt_mutex_acquire(mut_);
666   //Finish own requests
667   std::vector<MPI_Request> *reqqs = requests_;
668   int size = static_cast<int>(reqqs->size());
669   if (size > 0) {
670     MPI_Request* treqs = &(*reqqs)[0];
671     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
672     reqqs->clear();
673   }
674   xbt_mutex_release(mut_);
675   return size;
676 }
677
678 int Win::finish_comms(int rank){
679   xbt_mutex_acquire(mut_);
680   //Finish own requests
681   std::vector<MPI_Request> *reqqs = requests_;
682   int size = static_cast<int>(reqqs->size());
683   if (size > 0) {
684     size = 0;
685     std::vector<MPI_Request> myreqqs;
686     std::vector<MPI_Request>::iterator iter = reqqs->begin();
687     while (iter != reqqs->end()){
688       if(((*iter)!=MPI_REQUEST_NULL) && (((*iter)->src() == rank) || ((*iter)->dst() == rank))){
689         myreqqs.push_back(*iter);
690         iter = reqqs->erase(iter);
691         size++;
692       } else {
693         ++iter;
694       }
695     }
696     if(size >0){
697       MPI_Request* treqs = &myreqqs[0];
698       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
699       myreqqs.clear();
700     }
701   }
702   xbt_mutex_release(mut_);
703   return size;
704 }
705
706
707 }
708 }