Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Win::~Win(): Use class member rank_ instead of a local var
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7 #include "private.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_datatype.hpp"
11 #include "smpi_info.hpp"
12 #include "smpi_keyvals.hpp"
13 #include "smpi_process.hpp"
14 #include "smpi_request.hpp"
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
17
18 namespace simgrid{
19 namespace smpi{
20 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
21 int Win::keyval_id_=0;
22
23 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
24   int comm_size = comm->size();
25   rank_      = comm->rank();
26   XBT_DEBUG("Creating window");
27   if(info!=MPI_INFO_NULL)
28     info->ref();
29   name_ = nullptr;
30   opened_ = 0;
31   group_ = MPI_GROUP_NULL;
32   requests_ = new std::vector<MPI_Request>();
33   mut_=xbt_mutex_init();
34   lock_mut_=xbt_mutex_init();
35   atomic_mut_=xbt_mutex_init();
36   connected_wins_ = new MPI_Win[comm_size];
37   connected_wins_[rank_] = this;
38   count_ = 0;
39   if(rank_==0){
40     bar_ = MSG_barrier_init(comm_size);
41   }
42   mode_=0;
43
44   comm->add_rma_win(this);
45
46   Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
47                          MPI_BYTE, comm);
48
49   Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
50
51   Colls::barrier(comm);
52 }
53
54 Win::~Win(){
55   //As per the standard, perform a barrier to ensure every async comm is finished
56   MSG_barrier_wait(bar_);
57
58   int finished = finish_comms();
59   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
60
61   delete requests_;
62   delete[] connected_wins_;
63   if (name_ != nullptr){
64     xbt_free(name_);
65   }
66   if(info_!=MPI_INFO_NULL){
67     MPI_Info_free(&info_);
68   }
69
70   comm_->remove_rma_win(this);
71
72   Colls::barrier(comm_);
73   if (rank_ == 0)
74     MSG_barrier_destroy(bar_);
75   xbt_mutex_destroy(mut_);
76   xbt_mutex_destroy(lock_mut_);
77   xbt_mutex_destroy(atomic_mut_);
78
79   if(allocated_ !=0)
80     xbt_free(base_);
81
82   cleanup_attr<Win>();
83 }
84
85 int Win::attach (void *base, MPI_Aint size){
86   if (not(base_ == MPI_BOTTOM || base_ == 0))
87     return MPI_ERR_ARG;
88   base_=0;//actually the address will be given in the RMA calls, as being the disp.
89   size_+=size;
90   return MPI_SUCCESS;
91 }
92
93 int Win::detach (void *base){
94   base_=MPI_BOTTOM;
95   size_=-1;
96   return MPI_SUCCESS;
97 }
98
99 void Win::get_name(char* name, int* length){
100   if(name_==nullptr){
101     *length=0;
102     name=nullptr;
103     return;
104   }
105   *length = strlen(name_);
106   strncpy(name, name_, *length+1);
107 }
108
109 void Win::get_group(MPI_Group* group){
110   if(comm_ != MPI_COMM_NULL){
111     *group = comm_->group();
112   } else {
113     *group = MPI_GROUP_NULL;
114   }
115 }
116
117 MPI_Info Win::info(){
118   if(info_== MPI_INFO_NULL)
119     info_ = new Info();
120   info_->ref();
121   return info_;
122 }
123
124 int Win::rank(){
125   return rank_;
126 }
127
128 MPI_Aint Win::size(){
129   return size_;
130 }
131
132 void* Win::base(){
133   return base_;
134 }
135
136 int Win::disp_unit(){
137   return disp_unit_;
138 }
139
140 int Win::dynamic(){
141   return dynamic_;
142 }
143
144 void Win::set_info(MPI_Info info){
145   if(info_!= MPI_INFO_NULL)
146     info->ref();
147   info_=info;
148 }
149
150 void Win::set_name(char* name){
151   name_ = xbt_strdup(name);
152 }
153
154 int Win::fence(int assert)
155 {
156   XBT_DEBUG("Entering fence");
157   if (opened_ == 0)
158     opened_=1;
159   if (assert != MPI_MODE_NOPRECEDE) {
160     // This is not the first fence => finalize what came before
161     MSG_barrier_wait(bar_);
162     xbt_mutex_acquire(mut_);
163     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
164     // Without this, the vector could get redimensionned when another process pushes.
165     // This would result in the array used by Request::waitall() to be invalidated.
166     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
167     std::vector<MPI_Request> *reqs = requests_;
168     int size = static_cast<int>(reqs->size());
169     // start all requests that have been prepared by another process
170     if (size > 0) {
171       MPI_Request* treqs = &(*reqs)[0];
172       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
173     }
174     count_=0;
175     xbt_mutex_release(mut_);
176   }
177
178   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
179     opened_=0;
180   assert_ = assert;
181
182   MSG_barrier_wait(bar_);
183   XBT_DEBUG("Leaving fence");
184
185   return MPI_SUCCESS;
186 }
187
188 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
189               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
190 {
191   //get receiver pointer
192   MPI_Win recv_win = connected_wins_[target_rank];
193
194   if(opened_==0){//check that post/start has been done
195     // no fence or start .. lock ok ?
196     int locked=0;
197     for (auto const& it : recv_win->lockers_)
198       if (it == comm_->rank())
199         locked = 1;
200     if(locked != 1)
201       return MPI_ERR_WIN;
202   }
203
204   if(target_count*target_datatype->get_extent()>recv_win->size_)
205     return MPI_ERR_ARG;
206
207   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
208   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
209
210   if(target_rank != comm_->rank()){
211     //prepare send_request
212     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
213         comm_->group()->actor(target_rank)->getPid()-1, SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
214
215     //prepare receiver request
216     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
217         comm_->group()->actor(target_rank)->getPid()-1, SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
218
219     //start send
220     sreq->start();
221
222     if(request!=nullptr){
223       *request=sreq;
224     }else{
225       xbt_mutex_acquire(mut_);
226       requests_->push_back(sreq);
227       xbt_mutex_release(mut_);
228     }
229
230     //push request to receiver's win
231     xbt_mutex_acquire(recv_win->mut_);
232     recv_win->requests_->push_back(rreq);
233     rreq->start();
234     xbt_mutex_release(recv_win->mut_);
235
236   }else{
237     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
238     if(request!=nullptr)
239       *request = MPI_REQUEST_NULL;
240   }
241
242   return MPI_SUCCESS;
243 }
244
245 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
246               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
247 {
248   //get sender pointer
249   MPI_Win send_win = connected_wins_[target_rank];
250
251   if(opened_==0){//check that post/start has been done
252     // no fence or start .. lock ok ?
253     int locked=0;
254     for (auto const& it : send_win->lockers_)
255       if (it == comm_->rank())
256         locked = 1;
257     if(locked != 1)
258       return MPI_ERR_WIN;
259   }
260
261   if(target_count*target_datatype->get_extent()>send_win->size_)
262     return MPI_ERR_ARG;
263
264   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
265   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
266
267   if(target_rank != comm_->rank()){
268     //prepare send_request
269     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
270         comm_->group()->actor(target_rank)->getPid()-1, smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
271         MPI_OP_NULL);
272
273     //prepare receiver request
274     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
275         comm_->group()->actor(target_rank)->getPid()-1, smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
276         MPI_OP_NULL);
277
278     //start the send, with another process than us as sender.
279     sreq->start();
280     //push request to receiver's win
281     xbt_mutex_acquire(send_win->mut_);
282     send_win->requests_->push_back(sreq);
283     xbt_mutex_release(send_win->mut_);
284
285     //start recv
286     rreq->start();
287
288     if(request!=nullptr){
289       *request=rreq;
290     }else{
291       xbt_mutex_acquire(mut_);
292       requests_->push_back(rreq);
293       xbt_mutex_release(mut_);
294     }
295
296   }else{
297     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
298     if(request!=nullptr)
299       *request=MPI_REQUEST_NULL;
300   }
301
302   return MPI_SUCCESS;
303 }
304
305
306 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
307               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
308 {
309   XBT_DEBUG("Entering MPI_Win_Accumulate");
310   //get receiver pointer
311   MPI_Win recv_win = connected_wins_[target_rank];
312
313   if(opened_==0){//check that post/start has been done
314     // no fence or start .. lock ok ?
315     int locked=0;
316     for (auto const& it : recv_win->lockers_)
317       if (it == comm_->rank())
318         locked = 1;
319     if(locked != 1)
320       return MPI_ERR_WIN;
321   }
322   //FIXME: local version
323
324   if(target_count*target_datatype->get_extent()>recv_win->size_)
325     return MPI_ERR_ARG;
326
327   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
328   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
329     //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
330     //prepare send_request
331
332     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
333         smpi_process()->index(), comm_->group()->actor(target_rank)->getPid()-1, SMPI_RMA_TAG-3-count_, comm_, op);
334
335     //prepare receiver request
336     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
337         smpi_process()->index(), comm_->group()->actor(target_rank)->getPid()-1, SMPI_RMA_TAG-3-count_, recv_win->comm_, op);
338
339     count_++;
340
341     //start send
342     sreq->start();
343     //push request to receiver's win
344     xbt_mutex_acquire(recv_win->mut_);
345     recv_win->requests_->push_back(rreq);
346     rreq->start();
347     xbt_mutex_release(recv_win->mut_);
348
349     if(request!=nullptr){
350       *request=sreq;
351     }else{
352       xbt_mutex_acquire(mut_);
353       requests_->push_back(sreq);
354       xbt_mutex_release(mut_);
355     }
356
357   XBT_DEBUG("Leaving MPI_Win_Accumulate");
358   return MPI_SUCCESS;
359 }
360
361 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr,
362               int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count,
363               MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request){
364
365   //get sender pointer
366   MPI_Win send_win = connected_wins_[target_rank];
367
368   if(opened_==0){//check that post/start has been done
369     // no fence or start .. lock ok ?
370     int locked=0;
371     for (auto const& it : send_win->lockers_)
372       if (it == comm_->rank())
373         locked = 1;
374     if(locked != 1)
375       return MPI_ERR_WIN;
376   }
377
378   if(target_count*target_datatype->get_extent()>send_win->size_)
379     return MPI_ERR_ARG;
380
381   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
382   //need to be sure ops are correctly ordered, so finish request here ? slow.
383   MPI_Request req;
384   xbt_mutex_acquire(send_win->atomic_mut_);
385   get(result_addr, result_count, result_datatype, target_rank,
386               target_disp, target_count, target_datatype, &req);
387   if (req != MPI_REQUEST_NULL)
388     Request::wait(&req, MPI_STATUS_IGNORE);
389   if(op!=MPI_NO_OP)
390     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
391               target_disp, target_count, target_datatype, op, &req);
392   if (req != MPI_REQUEST_NULL)
393     Request::wait(&req, MPI_STATUS_IGNORE);
394   xbt_mutex_release(send_win->atomic_mut_);
395   return MPI_SUCCESS;
396
397 }
398
399 int Win::compare_and_swap(void *origin_addr, void *compare_addr,
400         void *result_addr, MPI_Datatype datatype, int target_rank,
401         MPI_Aint target_disp){
402   //get sender pointer
403   MPI_Win send_win = connected_wins_[target_rank];
404
405   if(opened_==0){//check that post/start has been done
406     // no fence or start .. lock ok ?
407     int locked=0;
408     for (auto const& it : send_win->lockers_)
409       if (it == comm_->rank())
410         locked = 1;
411     if(locked != 1)
412       return MPI_ERR_WIN;
413   }
414
415   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
416   MPI_Request req = MPI_REQUEST_NULL;
417   xbt_mutex_acquire(send_win->atomic_mut_);
418   get(result_addr, 1, datatype, target_rank,
419               target_disp, 1, datatype, &req);
420   if (req != MPI_REQUEST_NULL)
421     Request::wait(&req, MPI_STATUS_IGNORE);
422   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
423     put(origin_addr, 1, datatype, target_rank,
424               target_disp, 1, datatype);
425   }
426   xbt_mutex_release(send_win->atomic_mut_);
427   return MPI_SUCCESS;
428 }
429
430 int Win::start(MPI_Group group, int assert){
431     /* From MPI forum advices
432     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
433     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
434     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
435     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
436     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
437     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
438     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
439     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
440     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
441     must complete, without further dependencies.  */
442
443   //naive, blocking implementation.
444     int i             = 0;
445     int j             = 0;
446     int size          = group->size();
447     MPI_Request* reqs = xbt_new0(MPI_Request, size);
448
449   XBT_DEBUG("Entering MPI_Win_Start");
450     while (j != size) {
451       int src = group->actor(j)->getPid()-1;
452       if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
453         reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
454         i++;
455       }
456       j++;
457   }
458   size=i;
459   Request::startall(size, reqs);
460   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
461   for(i=0;i<size;i++){
462     Request::unref(&reqs[i]);
463   }
464   xbt_free(reqs);
465   opened_++; //we're open for business !
466   group_=group;
467   group->ref();
468   XBT_DEBUG("Leaving MPI_Win_Start");
469   return MPI_SUCCESS;
470 }
471
472 int Win::post(MPI_Group group, int assert){
473   //let's make a synchronous send here
474   int i             = 0;
475   int j             = 0;
476   int size = group->size();
477   MPI_Request* reqs = xbt_new0(MPI_Request, size);
478
479   XBT_DEBUG("Entering MPI_Win_Post");
480   while(j!=size){
481     int dst=group->actor(j)->getPid()-1;
482     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
483       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
484       i++;
485     }
486     j++;
487   }
488   size=i;
489
490   Request::startall(size, reqs);
491   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
492   for(i=0;i<size;i++){
493     Request::unref(&reqs[i]);
494   }
495   xbt_free(reqs);
496   opened_++; //we're open for business !
497   group_=group;
498   group->ref();
499   XBT_DEBUG("Leaving MPI_Win_Post");
500   return MPI_SUCCESS;
501 }
502
503 int Win::complete(){
504   if(opened_==0)
505     xbt_die("Complete called on already opened MPI_Win");
506
507   XBT_DEBUG("Entering MPI_Win_Complete");
508   int i             = 0;
509   int j             = 0;
510   int size = group_->size();
511   MPI_Request* reqs = xbt_new0(MPI_Request, size);
512
513   while(j!=size){
514     int dst=group_->actor(j)->getPid()-1;
515     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
516       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
517       i++;
518     }
519     j++;
520   }
521   size=i;
522   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
523   Request::startall(size, reqs);
524   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
525
526   for(i=0;i<size;i++){
527     Request::unref(&reqs[i]);
528   }
529   xbt_free(reqs);
530
531   int finished = finish_comms();
532   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
533
534   Group::unref(group_);
535   opened_--; //we're closed for business !
536   return MPI_SUCCESS;
537 }
538
539 int Win::wait(){
540   //naive, blocking implementation.
541   XBT_DEBUG("Entering MPI_Win_Wait");
542   int i             = 0;
543   int j             = 0;
544   int size          = group_->size();
545   MPI_Request* reqs = xbt_new0(MPI_Request, size);
546
547   while(j!=size){
548     int src=group_->actor(j)->getPid()-1;
549     if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
550       reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
551       i++;
552     }
553     j++;
554   }
555   size=i;
556   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
557   Request::startall(size, reqs);
558   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
559   for(i=0;i<size;i++){
560     Request::unref(&reqs[i]);
561   }
562   xbt_free(reqs);
563   int finished = finish_comms();
564   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
565
566   Group::unref(group_);
567   opened_--; //we're opened for business !
568   return MPI_SUCCESS;
569 }
570
571 int Win::lock(int lock_type, int rank, int assert){
572   MPI_Win target_win = connected_wins_[rank];
573
574   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
575     xbt_mutex_acquire(target_win->lock_mut_);
576     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
577     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
578       xbt_mutex_release(target_win->lock_mut_);
579    }
580   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
581     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
582
583   target_win->lockers_.push_back(comm_->rank());
584
585   int finished = finish_comms(rank);
586   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
587   finished = target_win->finish_comms(rank_);
588   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
589   return MPI_SUCCESS;
590 }
591
592 int Win::lock_all(int assert){
593   int i=0;
594   int retval = MPI_SUCCESS;
595   for (i=0; i<comm_->size();i++){
596       int ret = this->lock(MPI_LOCK_SHARED, i, assert);
597       if(ret != MPI_SUCCESS)
598         retval = ret;
599   }
600   return retval;
601 }
602
603 int Win::unlock(int rank){
604   MPI_Win target_win = connected_wins_[rank];
605   int target_mode = target_win->mode_;
606   target_win->mode_= 0;
607   target_win->lockers_.remove(comm_->rank());
608   if (target_mode==MPI_LOCK_EXCLUSIVE){
609     xbt_mutex_release(target_win->lock_mut_);
610   }
611
612   int finished = finish_comms(rank);
613   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
614   finished = target_win->finish_comms(rank_);
615   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
616   return MPI_SUCCESS;
617 }
618
619 int Win::unlock_all(){
620   int i=0;
621   int retval = MPI_SUCCESS;
622   for (i=0; i<comm_->size();i++){
623       int ret = this->unlock(i);
624       if(ret != MPI_SUCCESS)
625         retval = ret;
626   }
627   return retval;
628 }
629
630 int Win::flush(int rank){
631   MPI_Win target_win = connected_wins_[rank];
632   int finished = finish_comms(rank);
633   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
634   finished = target_win->finish_comms(rank_);
635   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
636   return MPI_SUCCESS;
637 }
638
639 int Win::flush_local(int rank){
640   int finished = finish_comms(rank);
641   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
642   return MPI_SUCCESS;
643 }
644
645 int Win::flush_all(){
646   int i=0;
647   int finished = 0;
648   finished = finish_comms();
649   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
650   for (i=0; i<comm_->size();i++){
651     finished = connected_wins_[i]->finish_comms(rank_);
652     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
653   }
654   return MPI_SUCCESS;
655 }
656
657 int Win::flush_local_all(){
658   int finished = finish_comms();
659   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
660   return MPI_SUCCESS;
661 }
662
663 Win* Win::f2c(int id){
664   return static_cast<Win*>(F2C::f2c(id));
665 }
666
667
668 int Win::finish_comms(){
669   xbt_mutex_acquire(mut_);
670   //Finish own requests
671   std::vector<MPI_Request> *reqqs = requests_;
672   int size = static_cast<int>(reqqs->size());
673   if (size > 0) {
674     MPI_Request* treqs = &(*reqqs)[0];
675     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
676     reqqs->clear();
677   }
678   xbt_mutex_release(mut_);
679   return size;
680 }
681
682 int Win::finish_comms(int rank){
683   xbt_mutex_acquire(mut_);
684   //Finish own requests
685   std::vector<MPI_Request> *reqqs = requests_;
686   int size = static_cast<int>(reqqs->size());
687   if (size > 0) {
688     size = 0;
689     std::vector<MPI_Request> myreqqs;
690     std::vector<MPI_Request>::iterator iter = reqqs->begin();
691     while (iter != reqqs->end()){
692       if(((*iter)!=MPI_REQUEST_NULL) && (((*iter)->src() == rank) || ((*iter)->dst() == rank))){
693         myreqqs.push_back(*iter);
694         iter = reqqs->erase(iter);
695         size++;
696       } else {
697         ++iter;
698       }
699     }
700     if(size >0){
701       MPI_Request* treqs = &myreqqs[0];
702       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
703       myreqqs.clear();
704     }
705   }
706   xbt_mutex_release(mut_);
707   return size;
708 }
709
710
711 }
712 }