Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
33b7f66e63a113b79bd97219284f75520a533d34
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7 #include "private.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_datatype.hpp"
11 #include "smpi_info.hpp"
12 #include "smpi_keyvals.hpp"
13 #include "smpi_process.hpp"
14 #include "smpi_request.hpp"
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
17
18 namespace simgrid{
19 namespace smpi{
20 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
21 int Win::keyval_id_=0;
22
23 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
24   int comm_size = comm->size();
25   rank_      = comm->rank();
26   XBT_DEBUG("Creating window");
27   if(info!=MPI_INFO_NULL)
28     info->ref();
29   name_ = nullptr;
30   opened_ = 0;
31   group_ = MPI_GROUP_NULL;
32   requests_ = new std::vector<MPI_Request>();
33   mut_=xbt_mutex_init();
34   lock_mut_=xbt_mutex_init();
35   atomic_mut_=xbt_mutex_init();
36   connected_wins_ = new MPI_Win[comm_size];
37   connected_wins_[rank_] = this;
38   count_ = 0;
39   if(rank_==0){
40     bar_ = MSG_barrier_init(comm_size);
41   }
42   mode_=0;
43
44   comm->add_rma_win(this);
45
46   Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
47                          MPI_BYTE, comm);
48
49   Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
50
51   Colls::barrier(comm);
52 }
53
54 Win::~Win(){
55   //As per the standard, perform a barrier to ensure every async comm is finished
56   MSG_barrier_wait(bar_);
57
58   int finished = finish_comms();
59   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
60
61   delete requests_;
62   delete[] connected_wins_;
63   if (name_ != nullptr){
64     xbt_free(name_);
65   }
66   if(info_!=MPI_INFO_NULL){
67     MPI_Info_free(&info_);
68   }
69
70   comm_->remove_rma_win(this);
71
72   Colls::barrier(comm_);
73   int rank=comm_->rank();
74   if(rank == 0)
75     MSG_barrier_destroy(bar_);
76   xbt_mutex_destroy(mut_);
77   xbt_mutex_destroy(lock_mut_);
78   xbt_mutex_destroy(atomic_mut_);
79
80   if(allocated_ !=0)
81     xbt_free(base_);
82
83   cleanup_attr<Win>();
84 }
85
86 int Win::attach (void *base, MPI_Aint size){
87   if (not(base_ == MPI_BOTTOM || base_ == 0))
88     return MPI_ERR_ARG;
89   base_=0;//actually the address will be given in the RMA calls, as being the disp.
90   size_+=size;
91   return MPI_SUCCESS;
92 }
93
94 int Win::detach (void *base){
95   base_=MPI_BOTTOM;
96   size_=-1;
97   return MPI_SUCCESS;
98 }
99
100 void Win::get_name(char* name, int* length){
101   if(name_==nullptr){
102     *length=0;
103     name=nullptr;
104     return;
105   }
106   *length = strlen(name_);
107   strncpy(name, name_, *length+1);
108 }
109
110 void Win::get_group(MPI_Group* group){
111   if(comm_ != MPI_COMM_NULL){
112     *group = comm_->group();
113   } else {
114     *group = MPI_GROUP_NULL;
115   }
116 }
117
118 MPI_Info Win::info(){
119   if(info_== MPI_INFO_NULL)
120     info_ = new Info();
121   info_->ref();
122   return info_;
123 }
124
125 int Win::rank(){
126   return rank_;
127 }
128
129 MPI_Aint Win::size(){
130   return size_;
131 }
132
133 void* Win::base(){
134   return base_;
135 }
136
137 int Win::disp_unit(){
138   return disp_unit_;
139 }
140
141 int Win::dynamic(){
142   return dynamic_;
143 }
144
145 void Win::set_info(MPI_Info info){
146   if(info_!= MPI_INFO_NULL)
147     info->ref();
148   info_=info;
149 }
150
151 void Win::set_name(char* name){
152   name_ = xbt_strdup(name);
153 }
154
155 int Win::fence(int assert)
156 {
157   XBT_DEBUG("Entering fence");
158   if (opened_ == 0)
159     opened_=1;
160   if (assert != MPI_MODE_NOPRECEDE) {
161     // This is not the first fence => finalize what came before
162     MSG_barrier_wait(bar_);
163     xbt_mutex_acquire(mut_);
164     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
165     // Without this, the vector could get redimensionned when another process pushes.
166     // This would result in the array used by Request::waitall() to be invalidated.
167     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
168     std::vector<MPI_Request> *reqs = requests_;
169     int size = static_cast<int>(reqs->size());
170     // start all requests that have been prepared by another process
171     if (size > 0) {
172       MPI_Request* treqs = &(*reqs)[0];
173       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
174     }
175     count_=0;
176     xbt_mutex_release(mut_);
177   }
178
179   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
180     opened_=0;
181   assert_ = assert;
182
183   MSG_barrier_wait(bar_);
184   XBT_DEBUG("Leaving fence");
185
186   return MPI_SUCCESS;
187 }
188
189 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
190               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
191 {
192   //get receiver pointer
193   MPI_Win recv_win = connected_wins_[target_rank];
194
195   if(opened_==0){//check that post/start has been done
196     // no fence or start .. lock ok ?
197     int locked=0;
198     for (auto const& it : recv_win->lockers_)
199       if (it == comm_->rank())
200         locked = 1;
201     if(locked != 1)
202       return MPI_ERR_WIN;
203   }
204
205   if(target_count*target_datatype->get_extent()>recv_win->size_)
206     return MPI_ERR_ARG;
207
208   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
209   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
210
211   if(target_rank != comm_->rank()){
212     //prepare send_request
213     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
214         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
215
216     //prepare receiver request
217     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
218         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
219
220     //start send
221     sreq->start();
222
223     if(request!=nullptr){
224       *request=sreq;
225     }else{
226       xbt_mutex_acquire(mut_);
227       requests_->push_back(sreq);
228       xbt_mutex_release(mut_);
229     }
230
231     //push request to receiver's win
232     xbt_mutex_acquire(recv_win->mut_);
233     recv_win->requests_->push_back(rreq);
234     rreq->start();
235     xbt_mutex_release(recv_win->mut_);
236
237   }else{
238     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
239     if(request!=nullptr)
240       *request = MPI_REQUEST_NULL;
241   }
242
243   return MPI_SUCCESS;
244 }
245
246 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
247               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
248 {
249   //get sender pointer
250   MPI_Win send_win = connected_wins_[target_rank];
251
252   if(opened_==0){//check that post/start has been done
253     // no fence or start .. lock ok ?
254     int locked=0;
255     for (auto const& it : send_win->lockers_)
256       if (it == comm_->rank())
257         locked = 1;
258     if(locked != 1)
259       return MPI_ERR_WIN;
260   }
261
262   if(target_count*target_datatype->get_extent()>send_win->size_)
263     return MPI_ERR_ARG;
264
265   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
266   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
267
268   if(target_rank != comm_->rank()){
269     //prepare send_request
270     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
271         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
272         MPI_OP_NULL);
273
274     //prepare receiver request
275     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
276         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
277         MPI_OP_NULL);
278
279     //start the send, with another process than us as sender.
280     sreq->start();
281     //push request to receiver's win
282     xbt_mutex_acquire(send_win->mut_);
283     send_win->requests_->push_back(sreq);
284     xbt_mutex_release(send_win->mut_);
285
286     //start recv
287     rreq->start();
288
289     if(request!=nullptr){
290       *request=rreq;
291     }else{
292       xbt_mutex_acquire(mut_);
293       requests_->push_back(rreq);
294       xbt_mutex_release(mut_);
295     }
296
297   }else{
298     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
299     if(request!=nullptr)
300       *request=MPI_REQUEST_NULL;
301   }
302
303   return MPI_SUCCESS;
304 }
305
306
307 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
308               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
309 {
310   XBT_DEBUG("Entering MPI_Win_Accumulate");
311   //get receiver pointer
312   MPI_Win recv_win = connected_wins_[target_rank];
313
314   if(opened_==0){//check that post/start has been done
315     // no fence or start .. lock ok ?
316     int locked=0;
317     for (auto const& it : recv_win->lockers_)
318       if (it == comm_->rank())
319         locked = 1;
320     if(locked != 1)
321       return MPI_ERR_WIN;
322   }
323   //FIXME: local version
324
325   if(target_count*target_datatype->get_extent()>recv_win->size_)
326     return MPI_ERR_ARG;
327
328   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
329   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
330     //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
331     //prepare send_request
332
333     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
334         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, comm_, op);
335
336     //prepare receiver request
337     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
338         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, recv_win->comm_, op);
339
340     count_++;
341
342     //start send
343     sreq->start();
344     //push request to receiver's win
345     xbt_mutex_acquire(recv_win->mut_);
346     recv_win->requests_->push_back(rreq);
347     rreq->start();
348     xbt_mutex_release(recv_win->mut_);
349
350     if(request!=nullptr){
351       *request=sreq;
352     }else{
353       xbt_mutex_acquire(mut_);
354       requests_->push_back(sreq);
355       xbt_mutex_release(mut_);
356     }
357
358   XBT_DEBUG("Leaving MPI_Win_Accumulate");
359   return MPI_SUCCESS;
360 }
361
362 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr,
363               int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count,
364               MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request){
365
366   //get sender pointer
367   MPI_Win send_win = connected_wins_[target_rank];
368
369   if(opened_==0){//check that post/start has been done
370     // no fence or start .. lock ok ?
371     int locked=0;
372     for (auto const& it : send_win->lockers_)
373       if (it == comm_->rank())
374         locked = 1;
375     if(locked != 1)
376       return MPI_ERR_WIN;
377   }
378
379   if(target_count*target_datatype->get_extent()>send_win->size_)
380     return MPI_ERR_ARG;
381
382   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
383   //need to be sure ops are correctly ordered, so finish request here ? slow.
384   MPI_Request req;
385   xbt_mutex_acquire(send_win->atomic_mut_);
386   get(result_addr, result_count, result_datatype, target_rank,
387               target_disp, target_count, target_datatype, &req);
388   if (req != MPI_REQUEST_NULL)
389     Request::wait(&req, MPI_STATUS_IGNORE);
390   if(op!=MPI_NO_OP)
391     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
392               target_disp, target_count, target_datatype, op, &req);
393   if (req != MPI_REQUEST_NULL)
394     Request::wait(&req, MPI_STATUS_IGNORE);
395   xbt_mutex_release(send_win->atomic_mut_);
396   return MPI_SUCCESS;
397
398 }
399
400 int Win::compare_and_swap(void *origin_addr, void *compare_addr,
401         void *result_addr, MPI_Datatype datatype, int target_rank,
402         MPI_Aint target_disp){
403   //get sender pointer
404   MPI_Win send_win = connected_wins_[target_rank];
405
406   if(opened_==0){//check that post/start has been done
407     // no fence or start .. lock ok ?
408     int locked=0;
409     for (auto const& it : send_win->lockers_)
410       if (it == comm_->rank())
411         locked = 1;
412     if(locked != 1)
413       return MPI_ERR_WIN;
414   }
415
416   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
417   MPI_Request req = MPI_REQUEST_NULL;
418   xbt_mutex_acquire(send_win->atomic_mut_);
419   get(result_addr, 1, datatype, target_rank,
420               target_disp, 1, datatype, &req);
421   if (req != MPI_REQUEST_NULL)
422     Request::wait(&req, MPI_STATUS_IGNORE);
423   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
424     put(origin_addr, 1, datatype, target_rank,
425               target_disp, 1, datatype);
426   }
427   xbt_mutex_release(send_win->atomic_mut_);
428   return MPI_SUCCESS;
429 }
430
431 int Win::start(MPI_Group group, int assert){
432     /* From MPI forum advices
433     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
434     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
435     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
436     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
437     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
438     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
439     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
440     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
441     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
442     must complete, without further dependencies.  */
443
444   //naive, blocking implementation.
445     int i             = 0;
446     int j             = 0;
447     int size          = group->size();
448     MPI_Request* reqs = xbt_new0(MPI_Request, size);
449
450   XBT_DEBUG("Entering MPI_Win_Start");
451     while (j != size) {
452       int src = group->index(j);
453       if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
454         reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
455         i++;
456       }
457       j++;
458   }
459   size=i;
460   Request::startall(size, reqs);
461   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
462   for(i=0;i<size;i++){
463     Request::unref(&reqs[i]);
464   }
465   xbt_free(reqs);
466   opened_++; //we're open for business !
467   group_=group;
468   group->ref();
469   XBT_DEBUG("Leaving MPI_Win_Start");
470   return MPI_SUCCESS;
471 }
472
473 int Win::post(MPI_Group group, int assert){
474   //let's make a synchronous send here
475   int i             = 0;
476   int j             = 0;
477   int size = group->size();
478   MPI_Request* reqs = xbt_new0(MPI_Request, size);
479
480   XBT_DEBUG("Entering MPI_Win_Post");
481   while(j!=size){
482     int dst=group->index(j);
483     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
484       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
485       i++;
486     }
487     j++;
488   }
489   size=i;
490
491   Request::startall(size, reqs);
492   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
493   for(i=0;i<size;i++){
494     Request::unref(&reqs[i]);
495   }
496   xbt_free(reqs);
497   opened_++; //we're open for business !
498   group_=group;
499   group->ref();
500   XBT_DEBUG("Leaving MPI_Win_Post");
501   return MPI_SUCCESS;
502 }
503
504 int Win::complete(){
505   if(opened_==0)
506     xbt_die("Complete called on already opened MPI_Win");
507
508   XBT_DEBUG("Entering MPI_Win_Complete");
509   int i             = 0;
510   int j             = 0;
511   int size = group_->size();
512   MPI_Request* reqs = xbt_new0(MPI_Request, size);
513
514   while(j!=size){
515     int dst=group_->index(j);
516     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
517       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
518       i++;
519     }
520     j++;
521   }
522   size=i;
523   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
524   Request::startall(size, reqs);
525   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
526
527   for(i=0;i<size;i++){
528     Request::unref(&reqs[i]);
529   }
530   xbt_free(reqs);
531
532   int finished = finish_comms();
533   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
534
535   Group::unref(group_);
536   opened_--; //we're closed for business !
537   return MPI_SUCCESS;
538 }
539
540 int Win::wait(){
541   //naive, blocking implementation.
542   XBT_DEBUG("Entering MPI_Win_Wait");
543   int i             = 0;
544   int j             = 0;
545   int size          = group_->size();
546   MPI_Request* reqs = xbt_new0(MPI_Request, size);
547
548   while(j!=size){
549     int src=group_->index(j);
550     if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
551       reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
552       i++;
553     }
554     j++;
555   }
556   size=i;
557   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
558   Request::startall(size, reqs);
559   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
560   for(i=0;i<size;i++){
561     Request::unref(&reqs[i]);
562   }
563   xbt_free(reqs);
564   int finished = finish_comms();
565   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
566
567   Group::unref(group_);
568   opened_--; //we're opened for business !
569   return MPI_SUCCESS;
570 }
571
572 int Win::lock(int lock_type, int rank, int assert){
573   MPI_Win target_win = connected_wins_[rank];
574
575   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
576     xbt_mutex_acquire(target_win->lock_mut_);
577     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
578     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
579       xbt_mutex_release(target_win->lock_mut_);
580    }
581   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
582     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
583
584   target_win->lockers_.push_back(comm_->rank());
585
586   int finished = finish_comms(rank);
587   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
588   finished = target_win->finish_comms(rank_);
589   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
590   return MPI_SUCCESS;
591 }
592
593 int Win::lock_all(int assert){
594   int i=0;
595   int retval = MPI_SUCCESS;
596   for (i=0; i<comm_->size();i++){
597       int ret = this->lock(MPI_LOCK_SHARED, i, assert);
598       if(ret != MPI_SUCCESS)
599         retval = ret;
600   }
601   return retval;
602 }
603
604 int Win::unlock(int rank){
605   MPI_Win target_win = connected_wins_[rank];
606   int target_mode = target_win->mode_;
607   target_win->mode_= 0;
608   target_win->lockers_.remove(comm_->rank());
609   if (target_mode==MPI_LOCK_EXCLUSIVE){
610     xbt_mutex_release(target_win->lock_mut_);
611   }
612
613   int finished = finish_comms(rank);
614   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
615   finished = target_win->finish_comms(rank_);
616   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
617   return MPI_SUCCESS;
618 }
619
620 int Win::unlock_all(){
621   int i=0;
622   int retval = MPI_SUCCESS;
623   for (i=0; i<comm_->size();i++){
624       int ret = this->unlock(i);
625       if(ret != MPI_SUCCESS)
626         retval = ret;
627   }
628   return retval;
629 }
630
631 int Win::flush(int rank){
632   MPI_Win target_win = connected_wins_[rank];
633   int finished = finish_comms(rank);
634   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
635   finished = target_win->finish_comms(rank_);
636   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
637   return MPI_SUCCESS;
638 }
639
640 int Win::flush_local(int rank){
641   int finished = finish_comms(rank);
642   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
643   return MPI_SUCCESS;
644 }
645
646 int Win::flush_all(){
647   int i=0;
648   int finished = 0;
649   finished = finish_comms();
650   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
651   for (i=0; i<comm_->size();i++){
652     finished = connected_wins_[i]->finish_comms(rank_);
653     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
654   }
655   return MPI_SUCCESS;
656 }
657
658 int Win::flush_local_all(){
659   int finished = finish_comms();
660   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
661   return MPI_SUCCESS;
662 }
663
664 Win* Win::f2c(int id){
665   return static_cast<Win*>(F2C::f2c(id));
666 }
667
668
669 int Win::finish_comms(){
670   xbt_mutex_acquire(mut_);
671   //Finish own requests
672   std::vector<MPI_Request> *reqqs = requests_;
673   int size = static_cast<int>(reqqs->size());
674   if (size > 0) {
675     MPI_Request* treqs = &(*reqqs)[0];
676     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
677     reqqs->clear();
678   }
679   xbt_mutex_release(mut_);
680   return size;
681 }
682
683 int Win::finish_comms(int rank){
684   xbt_mutex_acquire(mut_);
685   //Finish own requests
686   std::vector<MPI_Request> *reqqs = requests_;
687   int size = static_cast<int>(reqqs->size());
688   if (size > 0) {
689     size = 0;
690     std::vector<MPI_Request> myreqqs;
691     std::vector<MPI_Request>::iterator iter = reqqs->begin();
692     while (iter != reqqs->end()){
693       if(((*iter)!=MPI_REQUEST_NULL) && (((*iter)->src() == rank) || ((*iter)->dst() == rank))){
694         myreqqs.push_back(*iter);
695         iter = reqqs->erase(iter);
696         size++;
697       } else {
698         ++iter;
699       }
700     }
701     if(size >0){
702       MPI_Request* treqs = &myreqqs[0];
703       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
704       myreqqs.clear();
705     }
706   }
707   xbt_mutex_release(mut_);
708   return size;
709 }
710
711
712 }
713 }