Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
e20fdb4c66698a4087cd281afadf74438c4e5399
[simgrid.git] / src / smpi / smpi_win.cpp
1 /* Copyright (c) 2007-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "src/smpi/smpi_coll.hpp"
7 #include "src/smpi/smpi_datatype.hpp"
8 #include "src/smpi/smpi_info.hpp"
9 #include "src/smpi/smpi_keyvals.hpp"
10 #include "src/smpi/smpi_process.hpp"
11 #include "src/smpi/smpi_request.hpp"
12 #include "src/smpi/smpi_win.hpp"
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
15
16 namespace simgrid{
17 namespace smpi{
18 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
19 int Win::keyval_id_=0;
20
21 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
22   int comm_size = comm->size();
23   rank_      = comm->rank();
24   XBT_DEBUG("Creating window");
25   if(info!=MPI_INFO_NULL)
26     info->ref();
27   name_ = nullptr;
28   opened_ = 0;
29   group_ = MPI_GROUP_NULL;
30   requests_ = new std::vector<MPI_Request>();
31   mut_=xbt_mutex_init();
32   lock_mut_=xbt_mutex_init();
33   atomic_mut_=xbt_mutex_init();
34   connected_wins_ = new MPI_Win[comm_size];
35   connected_wins_[rank_] = this;
36   count_ = 0;
37   if(rank_==0){
38     bar_ = MSG_barrier_init(comm_size);
39   }
40   mode_=0;
41
42   comm->add_rma_win(this);
43
44   Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
45                          MPI_BYTE, comm);
46
47   Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
48
49   Colls::barrier(comm);
50 }
51
52 Win::~Win(){
53   //As per the standard, perform a barrier to ensure every async comm is finished
54   MSG_barrier_wait(bar_);
55
56   int finished = finish_comms();
57   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
58
59   delete requests_;
60   delete[] connected_wins_;
61   if (name_ != nullptr){
62     xbt_free(name_);
63   }
64   if(info_!=MPI_INFO_NULL){
65     MPI_Info_free(&info_);
66   }
67
68   comm_->remove_rma_win(this);
69
70   Colls::barrier(comm_);
71   int rank=comm_->rank();
72   if(rank == 0)
73     MSG_barrier_destroy(bar_);
74   xbt_mutex_destroy(mut_);
75   xbt_mutex_destroy(lock_mut_);
76   xbt_mutex_destroy(atomic_mut_);
77
78   if(allocated_ !=0)
79     xbt_free(base_);
80
81   cleanup_attr<Win>();
82 }
83
84 int Win::attach (void *base, MPI_Aint size){
85   if (!(base_ == MPI_BOTTOM || base_ == 0))
86     return MPI_ERR_ARG;
87   base_=0;//actually the address will be given in the RMA calls, as being the disp.
88   size_+=size;
89   return MPI_SUCCESS;
90 }
91
92 int Win::detach (void *base){
93   base_=MPI_BOTTOM;
94   size_=-1;
95   return MPI_SUCCESS;
96 }
97
98 void Win::get_name(char* name, int* length){
99   if(name_==nullptr){
100     *length=0;
101     name=nullptr;
102     return;
103   }
104   *length = strlen(name_);
105   strncpy(name, name_, *length+1);
106 }
107
108 void Win::get_group(MPI_Group* group){
109   if(comm_ != MPI_COMM_NULL){
110     *group = comm_->group();
111   } else {
112     *group = MPI_GROUP_NULL;
113   }
114 }
115
116 MPI_Info Win::info(){
117   if(info_== MPI_INFO_NULL)
118     info_ = new Info();
119   info_->ref();
120   return info_;
121 }
122
123 int Win::rank(){
124   return rank_;
125 }
126
127 MPI_Aint Win::size(){
128   return size_;
129 }
130
131 void* Win::base(){
132   return base_;
133 }
134
135 int Win::disp_unit(){
136   return disp_unit_;
137 }
138
139 int Win::dynamic(){
140   return dynamic_;
141 }
142
143 void Win::set_info(MPI_Info info){
144   if(info_!= MPI_INFO_NULL)
145     info->ref();
146   info_=info;
147 }
148
149 void Win::set_name(char* name){
150   name_ = xbt_strdup(name);
151 }
152
153 int Win::fence(int assert)
154 {
155   XBT_DEBUG("Entering fence");
156   if (opened_ == 0)
157     opened_=1;
158   if (assert != MPI_MODE_NOPRECEDE) {
159     // This is not the first fence => finalize what came before
160     MSG_barrier_wait(bar_);
161     xbt_mutex_acquire(mut_);
162     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
163     // Without this, the vector could get redimensionned when another process pushes.
164     // This would result in the array used by Request::waitall() to be invalidated.
165     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
166     std::vector<MPI_Request> *reqs = requests_;
167     int size = static_cast<int>(reqs->size());
168     // start all requests that have been prepared by another process
169     if (size > 0) {
170       MPI_Request* treqs = &(*reqs)[0];
171       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
172     }
173     count_=0;
174     xbt_mutex_release(mut_);
175   }
176
177   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
178     opened_=0;
179   assert_ = assert;
180
181   MSG_barrier_wait(bar_);
182   XBT_DEBUG("Leaving fence");
183
184   return MPI_SUCCESS;
185 }
186
187 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
188               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
189 {
190   //get receiver pointer
191   MPI_Win recv_win = connected_wins_[target_rank];
192
193   if(opened_==0){//check that post/start has been done
194     // no fence or start .. lock ok ?
195     int locked=0;
196     for(auto it : recv_win->lockers_)
197       if (it == comm_->rank())
198         locked = 1;
199     if(locked != 1)
200       return MPI_ERR_WIN;
201   }
202
203   if(target_count*target_datatype->get_extent()>recv_win->size_)
204     return MPI_ERR_ARG;
205
206   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
207   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
208
209   if(target_rank != comm_->rank()){
210     //prepare send_request
211     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
212         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
213
214     //prepare receiver request
215     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
216         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
217
218     //start send
219     sreq->start();
220
221     if(request!=nullptr){
222       *request=sreq;
223     }else{
224       xbt_mutex_acquire(mut_);
225       requests_->push_back(sreq);
226       xbt_mutex_release(mut_);
227     }
228
229     //push request to receiver's win
230     xbt_mutex_acquire(recv_win->mut_);
231     recv_win->requests_->push_back(rreq);
232     rreq->start();
233     xbt_mutex_release(recv_win->mut_);
234
235   }else{
236     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
237     if(request!=nullptr)
238       *request = MPI_REQUEST_NULL;
239   }
240
241   return MPI_SUCCESS;
242 }
243
244 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
245               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
246 {
247   //get sender pointer
248   MPI_Win send_win = connected_wins_[target_rank];
249
250   if(opened_==0){//check that post/start has been done
251     // no fence or start .. lock ok ?
252     int locked=0;
253     for(auto it : send_win->lockers_)
254       if (it == comm_->rank())
255         locked = 1;
256     if(locked != 1)
257       return MPI_ERR_WIN;
258   }
259
260   if(target_count*target_datatype->get_extent()>send_win->size_)
261     return MPI_ERR_ARG;
262
263   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
264   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
265
266   if(target_rank != comm_->rank()){
267     //prepare send_request
268     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
269         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
270         MPI_OP_NULL);
271
272     //prepare receiver request
273     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
274         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
275         MPI_OP_NULL);
276
277     //start the send, with another process than us as sender. 
278     sreq->start();
279     //push request to receiver's win
280     xbt_mutex_acquire(send_win->mut_);
281     send_win->requests_->push_back(sreq);
282     xbt_mutex_release(send_win->mut_);
283
284     //start recv
285     rreq->start();
286
287     if(request!=nullptr){
288       *request=rreq;
289     }else{
290       xbt_mutex_acquire(mut_);
291       requests_->push_back(rreq);
292       xbt_mutex_release(mut_);
293     }
294
295   }else{
296     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
297     if(request!=nullptr)
298       *request=MPI_REQUEST_NULL;
299   }
300
301   return MPI_SUCCESS;
302 }
303
304
305 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
306               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
307 {
308
309   //get receiver pointer
310   MPI_Win recv_win = connected_wins_[target_rank];
311
312   if(opened_==0){//check that post/start has been done
313     // no fence or start .. lock ok ?
314     int locked=0;
315     for(auto it : recv_win->lockers_)
316       if (it == comm_->rank())
317         locked = 1;
318     if(locked != 1)
319       return MPI_ERR_WIN;
320   }
321   //FIXME: local version 
322
323   if(target_count*target_datatype->get_extent()>recv_win->size_)
324     return MPI_ERR_ARG;
325
326   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
327   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
328     //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
329     //prepare send_request
330
331     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
332         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, comm_, op);
333
334     //prepare receiver request
335     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
336         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, recv_win->comm_, op);
337
338     count_++;
339
340     //start send
341     sreq->start();
342     //push request to receiver's win
343     xbt_mutex_acquire(recv_win->mut_);
344     recv_win->requests_->push_back(rreq);
345     rreq->start();
346     xbt_mutex_release(recv_win->mut_);
347
348     if(request!=nullptr){
349       *request=sreq;
350     }else{
351       xbt_mutex_acquire(mut_);
352       requests_->push_back(sreq);
353       xbt_mutex_release(mut_);
354     }
355
356   return MPI_SUCCESS;
357 }
358
359 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr, 
360               int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count, 
361               MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request){
362
363   //get sender pointer
364   MPI_Win send_win = connected_wins_[target_rank];
365
366   if(opened_==0){//check that post/start has been done
367     // no fence or start .. lock ok ?
368     int locked=0;
369     for(auto it : send_win->lockers_)
370       if (it == comm_->rank())
371         locked = 1;
372     if(locked != 1)
373       return MPI_ERR_WIN;
374   }
375
376   if(target_count*target_datatype->get_extent()>send_win->size_)
377     return MPI_ERR_ARG;
378
379   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
380   //need to be sure ops are correctly ordered, so finish request here ? slow.
381   MPI_Request req;
382   xbt_mutex_acquire(send_win->atomic_mut_);
383   get(result_addr, result_count, result_datatype, target_rank,
384               target_disp, target_count, target_datatype, &req);
385   if (req != MPI_REQUEST_NULL)
386     Request::wait(&req, MPI_STATUS_IGNORE);
387   if(op!=MPI_NO_OP)
388     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
389               target_disp, target_count, target_datatype, op, &req);
390   if (req != MPI_REQUEST_NULL)
391     Request::wait(&req, MPI_STATUS_IGNORE);
392   xbt_mutex_release(send_win->atomic_mut_);
393   return MPI_SUCCESS;
394
395 }
396
397 int Win::compare_and_swap(void *origin_addr, void *compare_addr,
398         void *result_addr, MPI_Datatype datatype, int target_rank,
399         MPI_Aint target_disp){
400   //get sender pointer
401   MPI_Win send_win = connected_wins_[target_rank];
402
403   if(opened_==0){//check that post/start has been done
404     // no fence or start .. lock ok ?
405     int locked=0;
406     for(auto it : send_win->lockers_)
407       if (it == comm_->rank())
408         locked = 1;
409     if(locked != 1)
410       return MPI_ERR_WIN;
411   }
412
413   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
414   MPI_Request req;
415   xbt_mutex_acquire(send_win->atomic_mut_);
416   get(result_addr, 1, datatype, target_rank,
417               target_disp, 1, datatype, &req);
418   if (req != MPI_REQUEST_NULL)
419     Request::wait(&req, MPI_STATUS_IGNORE);
420   if(! memcmp (result_addr, compare_addr, datatype->get_extent() )){
421     put(origin_addr, 1, datatype, target_rank,
422               target_disp, 1, datatype);
423   }
424   xbt_mutex_release(send_win->atomic_mut_);
425   return MPI_SUCCESS;
426 }
427
428 int Win::start(MPI_Group group, int assert){
429     /* From MPI forum advices
430     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
431     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
432     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
433     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
434     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
435     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
436     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
437     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
438     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
439     must complete, without further dependencies.  */
440
441   //naive, blocking implementation.
442     int i             = 0;
443     int j             = 0;
444     int size          = group->size();
445     MPI_Request* reqs = xbt_new0(MPI_Request, size);
446
447     while (j != size) {
448       int src = group->index(j);
449       if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
450         reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
451         i++;
452       }
453       j++;
454   }
455   size=i;
456   Request::startall(size, reqs);
457   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
458   for(i=0;i<size;i++){
459     Request::unref(&reqs[i]);
460   }
461   xbt_free(reqs);
462   opened_++; //we're open for business !
463   group_=group;
464   group->ref();
465   return MPI_SUCCESS;
466 }
467
468 int Win::post(MPI_Group group, int assert){
469   //let's make a synchronous send here
470   int i             = 0;
471   int j             = 0;
472   int size = group->size();
473   MPI_Request* reqs = xbt_new0(MPI_Request, size);
474
475   while(j!=size){
476     int dst=group->index(j);
477     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
478       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
479       i++;
480     }
481     j++;
482   }
483   size=i;
484
485   Request::startall(size, reqs);
486   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
487   for(i=0;i<size;i++){
488     Request::unref(&reqs[i]);
489   }
490   xbt_free(reqs);
491   opened_++; //we're open for business !
492   group_=group;
493   group->ref();
494   return MPI_SUCCESS;
495 }
496
497 int Win::complete(){
498   if(opened_==0)
499     xbt_die("Complete called on already opened MPI_Win");
500
501   XBT_DEBUG("Entering MPI_Win_Complete");
502   int i             = 0;
503   int j             = 0;
504   int size = group_->size();
505   MPI_Request* reqs = xbt_new0(MPI_Request, size);
506
507   while(j!=size){
508     int dst=group_->index(j);
509     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
510       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
511       i++;
512     }
513     j++;
514   }
515   size=i;
516   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
517   Request::startall(size, reqs);
518   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
519
520   for(i=0;i<size;i++){
521     Request::unref(&reqs[i]);
522   }
523   xbt_free(reqs);
524
525   int finished = finish_comms();
526   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
527
528   Group::unref(group_);
529   opened_--; //we're closed for business !
530   return MPI_SUCCESS;
531 }
532
533 int Win::wait(){
534   //naive, blocking implementation.
535   XBT_DEBUG("Entering MPI_Win_Wait");
536   int i             = 0;
537   int j             = 0;
538   int size          = group_->size();
539   MPI_Request* reqs = xbt_new0(MPI_Request, size);
540
541   while(j!=size){
542     int src=group_->index(j);
543     if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
544       reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
545       i++;
546     }
547     j++;
548   }
549   size=i;
550   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
551   Request::startall(size, reqs);
552   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
553   for(i=0;i<size;i++){
554     Request::unref(&reqs[i]);
555   }
556   xbt_free(reqs);
557   int finished = finish_comms();
558   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
559
560   Group::unref(group_);
561   opened_--; //we're opened for business !
562   return MPI_SUCCESS;
563 }
564
565 int Win::lock(int lock_type, int rank, int assert){
566   MPI_Win target_win = connected_wins_[rank];
567
568   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
569     xbt_mutex_acquire(target_win->lock_mut_);
570     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
571     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
572       xbt_mutex_release(target_win->lock_mut_);
573    }
574   } else if(!(target_win->mode_==MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
575         target_win->mode_+= lock_type; // don't set to exclusive if it's already shared
576
577   target_win->lockers_.push_back(comm_->rank());
578
579   int finished = finish_comms(rank);
580   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
581   finished = target_win->finish_comms(rank_);
582   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
583   return MPI_SUCCESS;
584 }
585
586 int Win::lock_all(int assert){
587   int i=0;
588   int retval = MPI_SUCCESS;
589   for (i=0; i<comm_->size();i++){
590       int ret = this->lock(MPI_LOCK_SHARED, i, assert);
591       if(ret != MPI_SUCCESS)
592         retval = ret;
593   }
594   return retval;
595 }
596
597 int Win::unlock(int rank){
598   MPI_Win target_win = connected_wins_[rank];
599   int target_mode = target_win->mode_;
600   target_win->mode_= 0;
601   target_win->lockers_.remove(comm_->rank());
602   if (target_mode==MPI_LOCK_EXCLUSIVE){
603     xbt_mutex_release(target_win->lock_mut_);
604   }
605
606   int finished = finish_comms(rank);
607   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
608   finished = target_win->finish_comms(rank_);
609   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
610   return MPI_SUCCESS;
611 }
612
613 int Win::unlock_all(){
614   int i=0;
615   int retval = MPI_SUCCESS;
616   for (i=0; i<comm_->size();i++){
617       int ret = this->unlock(i);
618       if(ret != MPI_SUCCESS)
619         retval = ret;
620   }
621   return retval;
622 }
623
624 int Win::flush(int rank){
625   MPI_Win target_win = connected_wins_[rank];
626   int finished = finish_comms(rank);
627   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
628   finished = target_win->finish_comms(rank_);
629   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
630   return MPI_SUCCESS;
631 }
632
633 int Win::flush_local(int rank){
634   int finished = finish_comms(rank);
635   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
636   return MPI_SUCCESS;
637 }
638
639 int Win::flush_all(){
640   int i=0;
641   int finished = 0;
642   finished = finish_comms();
643   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
644   for (i=0; i<comm_->size();i++){
645     finished = connected_wins_[i]->finish_comms(rank_);
646     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
647   }
648   return MPI_SUCCESS;
649 }
650
651 int Win::flush_local_all(){
652   int finished = finish_comms();
653   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
654   return MPI_SUCCESS;
655 }
656
657 Win* Win::f2c(int id){
658   return static_cast<Win*>(F2C::f2c(id));
659 }
660
661
662 int Win::finish_comms(){
663   xbt_mutex_acquire(mut_);
664   //Finish own requests
665   std::vector<MPI_Request> *reqqs = requests_;
666   int size = static_cast<int>(reqqs->size());
667   if (size > 0) {
668     MPI_Request* treqs = &(*reqqs)[0];
669     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
670     reqqs->clear();
671   }
672   xbt_mutex_release(mut_);
673   return size;
674 }
675
676 int Win::finish_comms(int rank){
677   xbt_mutex_acquire(mut_);
678   //Finish own requests
679   std::vector<MPI_Request> *reqqs = requests_;
680   int size = static_cast<int>(reqqs->size());
681   if (size > 0) {
682     size = 0;
683     std::vector<MPI_Request>* myreqqs = new std::vector<MPI_Request>();
684     std::vector<MPI_Request>::iterator iter = reqqs->begin();
685     while (iter != reqqs->end()){
686       if(((*iter)!=MPI_REQUEST_NULL) && (((*iter)->src() == rank) || ((*iter)->dst() == rank))){
687           myreqqs->push_back(*iter);
688           iter = reqqs->erase(iter);
689           size++;
690       } else {
691         ++iter;
692       }
693     }
694     if(size >0){
695       MPI_Request* treqs = &(*myreqqs)[0];
696       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
697       myreqqs->clear();
698       delete myreqqs;
699     }
700   }
701   xbt_mutex_release(mut_);
702   return size;
703 }
704
705
706 }
707 }