Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Actually we can have locks and fences (erk) in the same program.
[simgrid.git] / src / smpi / smpi_win.cpp
1 /* Copyright (c) 2007-2017. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <vector>
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 namespace simgrid{
13 namespace smpi{
14 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
15 int Win::keyval_id_=0;
16
17 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
18   int comm_size = comm->size();
19   rank_      = comm->rank();
20   XBT_DEBUG("Creating window");
21   if(info!=MPI_INFO_NULL)
22     info->ref();
23   name_ = nullptr;
24   opened_ = 0;
25   group_ = MPI_GROUP_NULL;
26   requests_ = new std::vector<MPI_Request>();
27   mut_=xbt_mutex_init();
28   lock_mut_=xbt_mutex_init();
29   atomic_mut_=xbt_mutex_init();
30   connected_wins_ = new MPI_Win[comm_size];
31   connected_wins_[rank_] = this;
32   count_ = 0;
33   if(rank_==0){
34     bar_ = MSG_barrier_init(comm_size);
35   }
36   mode_=0;
37
38   comm->add_rma_win(this);
39
40   Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
41                          MPI_BYTE, comm);
42
43   Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
44
45   Colls::barrier(comm);
46 }
47
48 Win::~Win(){
49   //As per the standard, perform a barrier to ensure every async comm is finished
50   MSG_barrier_wait(bar_);
51
52   int finished = finish_comms();
53   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
54
55   delete requests_;
56   delete[] connected_wins_;
57   if (name_ != nullptr){
58     xbt_free(name_);
59   }
60   if(info_!=MPI_INFO_NULL){
61     MPI_Info_free(&info_);
62   }
63
64   comm_->remove_rma_win(this);
65
66   Colls::barrier(comm_);
67   int rank=comm_->rank();
68   if(rank == 0)
69     MSG_barrier_destroy(bar_);
70   xbt_mutex_destroy(mut_);
71   xbt_mutex_destroy(lock_mut_);
72   xbt_mutex_destroy(atomic_mut_);
73
74   if(allocated_ !=0)
75     xbt_free(base_);
76
77   cleanup_attr<Win>();
78 }
79
80 int Win::attach (void *base, MPI_Aint size){
81   if (!(base_ == MPI_BOTTOM || base_ == 0))
82     return MPI_ERR_ARG;
83   base_=0;//actually the address will be given in the RMA calls, as being the disp.
84   size_+=size;
85   return MPI_SUCCESS;
86 }
87
88 int Win::detach (void *base){
89   base_=MPI_BOTTOM;
90   size_=-1;
91   return MPI_SUCCESS;
92 }
93
94 void Win::get_name(char* name, int* length){
95   if(name_==nullptr){
96     *length=0;
97     name=nullptr;
98     return;
99   }
100   *length = strlen(name_);
101   strncpy(name, name_, *length+1);
102 }
103
104 void Win::get_group(MPI_Group* group){
105   if(comm_ != MPI_COMM_NULL){
106     *group = comm_->group();
107   } else {
108     *group = MPI_GROUP_NULL;
109   }
110 }
111
112 MPI_Info Win::info(){
113   if(info_== MPI_INFO_NULL)
114     info_ = new Info();
115   info_->ref();
116   return info_;
117 }
118
119 int Win::rank(){
120   return rank_;
121 }
122
123 MPI_Aint Win::size(){
124   return size_;
125 }
126
127 void* Win::base(){
128   return base_;
129 }
130
131 int Win::disp_unit(){
132   return disp_unit_;
133 }
134
135 int Win::dynamic(){
136   return dynamic_;
137 }
138
139 void Win::set_info(MPI_Info info){
140   if(info_!= MPI_INFO_NULL)
141     info->ref();
142   info_=info;
143 }
144
145 void Win::set_name(char* name){
146   name_ = xbt_strdup(name);
147 }
148
149 int Win::fence(int assert)
150 {
151   XBT_DEBUG("Entering fence");
152   if (opened_ == 0)
153     opened_=1;
154   if (assert != MPI_MODE_NOPRECEDE) {
155     // This is not the first fence => finalize what came before
156     MSG_barrier_wait(bar_);
157     xbt_mutex_acquire(mut_);
158     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
159     // Without this, the vector could get redimensionned when another process pushes.
160     // This would result in the array used by Request::waitall() to be invalidated.
161     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
162     std::vector<MPI_Request> *reqs = requests_;
163     int size = static_cast<int>(reqs->size());
164     // start all requests that have been prepared by another process
165     if (size > 0) {
166       MPI_Request* treqs = &(*reqs)[0];
167       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
168     }
169     count_=0;
170     xbt_mutex_release(mut_);
171   }
172
173   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
174     opened_=0;
175   assert_ = assert;
176
177   MSG_barrier_wait(bar_);
178   XBT_DEBUG("Leaving fence");
179
180   return MPI_SUCCESS;
181 }
182
183 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
184               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
185 {
186   //get receiver pointer
187   MPI_Win recv_win = connected_wins_[target_rank];
188
189   if(opened_==0){//check that post/start has been done
190     // no fence or start .. lock ok ?
191     int locked=0;
192     for(auto it : recv_win->lockers_)
193       if (it == comm_->rank())
194         locked = 1;
195     if(locked != 1)
196       return MPI_ERR_WIN;
197   }
198
199   if(target_count*target_datatype->get_extent()>recv_win->size_)
200     return MPI_ERR_ARG;
201
202   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
203   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
204
205   if(target_rank != comm_->rank()){
206     //prepare send_request
207     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
208         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
209
210     //prepare receiver request
211     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
212         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
213
214     //start send
215     sreq->start();
216
217     if(request!=nullptr){
218       *request=sreq;
219     }else{
220       xbt_mutex_acquire(mut_);
221       requests_->push_back(sreq);
222       xbt_mutex_release(mut_);
223     }
224
225     //push request to receiver's win
226     xbt_mutex_acquire(recv_win->mut_);
227     recv_win->requests_->push_back(rreq);
228     rreq->start();
229     xbt_mutex_release(recv_win->mut_);
230
231   }else{
232     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
233     if(request!=nullptr)
234       *request = MPI_REQUEST_NULL;
235   }
236
237   return MPI_SUCCESS;
238 }
239
240 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
241               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
242 {
243   //get sender pointer
244   MPI_Win send_win = connected_wins_[target_rank];
245
246   if(opened_==0){//check that post/start has been done
247     // no fence or start .. lock ok ?
248     int locked=0;
249     for(auto it : send_win->lockers_)
250       if (it == comm_->rank())
251         locked = 1;
252     if(locked != 1)
253       return MPI_ERR_WIN;
254   }
255
256   if(target_count*target_datatype->get_extent()>send_win->size_)
257     return MPI_ERR_ARG;
258
259   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
260   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
261
262   if(target_rank != comm_->rank()){
263     //prepare send_request
264     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
265         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
266         MPI_OP_NULL);
267
268     //prepare receiver request
269     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
270         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
271         MPI_OP_NULL);
272
273     //start the send, with another process than us as sender. 
274     sreq->start();
275     //push request to receiver's win
276     xbt_mutex_acquire(send_win->mut_);
277     send_win->requests_->push_back(sreq);
278     xbt_mutex_release(send_win->mut_);
279
280     //start recv
281     rreq->start();
282
283     if(request!=nullptr){
284       *request=rreq;
285     }else{
286       xbt_mutex_acquire(mut_);
287       requests_->push_back(rreq);
288       xbt_mutex_release(mut_);
289     }
290
291   }else{
292     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
293     if(request!=nullptr)
294       *request=MPI_REQUEST_NULL;
295   }
296
297   return MPI_SUCCESS;
298 }
299
300
301 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
302               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
303 {
304
305   //get receiver pointer
306   MPI_Win recv_win = connected_wins_[target_rank];
307
308   if(opened_==0){//check that post/start has been done
309     // no fence or start .. lock ok ?
310     int locked=0;
311     for(auto it : recv_win->lockers_)
312       if (it == comm_->rank())
313         locked = 1;
314     if(locked != 1)
315       return MPI_ERR_WIN;
316   }
317   //FIXME: local version 
318
319   if(target_count*target_datatype->get_extent()>recv_win->size_)
320     return MPI_ERR_ARG;
321
322   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
323   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
324     //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
325     //prepare send_request
326
327     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
328         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, comm_, op);
329
330     //prepare receiver request
331     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
332         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, recv_win->comm_, op);
333
334     count_++;
335
336     //start send
337     sreq->start();
338     //push request to receiver's win
339     xbt_mutex_acquire(recv_win->mut_);
340     recv_win->requests_->push_back(rreq);
341     rreq->start();
342     xbt_mutex_release(recv_win->mut_);
343
344     if(request!=nullptr){
345       *request=sreq;
346     }else{
347       xbt_mutex_acquire(mut_);
348       requests_->push_back(sreq);
349       xbt_mutex_release(mut_);
350     }
351
352   return MPI_SUCCESS;
353 }
354
355 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr, 
356               int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count, 
357               MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request){
358
359   //get sender pointer
360   MPI_Win send_win = connected_wins_[target_rank];
361
362   if(opened_==0){//check that post/start has been done
363     // no fence or start .. lock ok ?
364     int locked=0;
365     for(auto it : send_win->lockers_)
366       if (it == comm_->rank())
367         locked = 1;
368     if(locked != 1)
369       return MPI_ERR_WIN;
370   }
371
372   if(target_count*target_datatype->get_extent()>send_win->size_)
373     return MPI_ERR_ARG;
374
375   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
376   //need to be sure ops are correctly ordered, so finish request here ? slow.
377   MPI_Request req;
378   xbt_mutex_acquire(send_win->atomic_mut_);
379   get(result_addr, result_count, result_datatype, target_rank,
380               target_disp, target_count, target_datatype, &req);
381   if (req != MPI_REQUEST_NULL)
382     Request::wait(&req, MPI_STATUS_IGNORE);
383   if(op!=MPI_NO_OP)
384     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
385               target_disp, target_count, target_datatype, op, &req);
386   if (req != MPI_REQUEST_NULL)
387     Request::wait(&req, MPI_STATUS_IGNORE);
388   xbt_mutex_release(send_win->atomic_mut_);
389   return MPI_SUCCESS;
390
391 }
392
393 int Win::compare_and_swap(void *origin_addr, void *compare_addr,
394         void *result_addr, MPI_Datatype datatype, int target_rank,
395         MPI_Aint target_disp){
396   //get sender pointer
397   MPI_Win send_win = connected_wins_[target_rank];
398
399   if(opened_==0){//check that post/start has been done
400     // no fence or start .. lock ok ?
401     int locked=0;
402     for(auto it : send_win->lockers_)
403       if (it == comm_->rank())
404         locked = 1;
405     if(locked != 1)
406       return MPI_ERR_WIN;
407   }
408
409   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
410   MPI_Request req;
411   xbt_mutex_acquire(send_win->atomic_mut_);
412   get(result_addr, 1, datatype, target_rank,
413               target_disp, 1, datatype, &req);
414   if (req != MPI_REQUEST_NULL)
415     Request::wait(&req, MPI_STATUS_IGNORE);
416   if(! memcmp (result_addr, compare_addr, datatype->get_extent() )){
417     put(origin_addr, 1, datatype, target_rank,
418               target_disp, 1, datatype);
419   }
420   xbt_mutex_release(send_win->atomic_mut_);
421   return MPI_SUCCESS;
422 }
423
424 int Win::start(MPI_Group group, int assert){
425     /* From MPI forum advices
426     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
427     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
428     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
429     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
430     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
431     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
432     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
433     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
434     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
435     must complete, without further dependencies.  */
436
437   //naive, blocking implementation.
438     int i             = 0;
439     int j             = 0;
440     int size          = group->size();
441     MPI_Request* reqs = xbt_new0(MPI_Request, size);
442
443     while (j != size) {
444       int src = group->index(j);
445       if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
446         reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
447         i++;
448       }
449       j++;
450   }
451   size=i;
452   Request::startall(size, reqs);
453   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
454   for(i=0;i<size;i++){
455     Request::unref(&reqs[i]);
456   }
457   xbt_free(reqs);
458   opened_++; //we're open for business !
459   group_=group;
460   group->ref();
461   return MPI_SUCCESS;
462 }
463
464 int Win::post(MPI_Group group, int assert){
465   //let's make a synchronous send here
466   int i             = 0;
467   int j             = 0;
468   int size = group->size();
469   MPI_Request* reqs = xbt_new0(MPI_Request, size);
470
471   while(j!=size){
472     int dst=group->index(j);
473     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
474       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
475       i++;
476     }
477     j++;
478   }
479   size=i;
480
481   Request::startall(size, reqs);
482   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
483   for(i=0;i<size;i++){
484     Request::unref(&reqs[i]);
485   }
486   xbt_free(reqs);
487   opened_++; //we're open for business !
488   group_=group;
489   group->ref();
490   return MPI_SUCCESS;
491 }
492
493 int Win::complete(){
494   if(opened_==0)
495     xbt_die("Complete called on already opened MPI_Win");
496
497   XBT_DEBUG("Entering MPI_Win_Complete");
498   int i             = 0;
499   int j             = 0;
500   int size = group_->size();
501   MPI_Request* reqs = xbt_new0(MPI_Request, size);
502
503   while(j!=size){
504     int dst=group_->index(j);
505     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
506       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
507       i++;
508     }
509     j++;
510   }
511   size=i;
512   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
513   Request::startall(size, reqs);
514   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
515
516   for(i=0;i<size;i++){
517     Request::unref(&reqs[i]);
518   }
519   xbt_free(reqs);
520
521   int finished = finish_comms();
522   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
523
524   Group::unref(group_);
525   opened_--; //we're closed for business !
526   return MPI_SUCCESS;
527 }
528
529 int Win::wait(){
530   //naive, blocking implementation.
531   XBT_DEBUG("Entering MPI_Win_Wait");
532   int i             = 0;
533   int j             = 0;
534   int size          = group_->size();
535   MPI_Request* reqs = xbt_new0(MPI_Request, size);
536
537   while(j!=size){
538     int src=group_->index(j);
539     if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
540       reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
541       i++;
542     }
543     j++;
544   }
545   size=i;
546   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
547   Request::startall(size, reqs);
548   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
549   for(i=0;i<size;i++){
550     Request::unref(&reqs[i]);
551   }
552   xbt_free(reqs);
553   int finished = finish_comms();
554   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
555
556   Group::unref(group_);
557   opened_--; //we're opened for business !
558   return MPI_SUCCESS;
559 }
560
561 int Win::lock(int lock_type, int rank, int assert){
562   MPI_Win target_win = connected_wins_[rank];
563
564   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
565     xbt_mutex_acquire(target_win->lock_mut_);
566     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
567     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
568       xbt_mutex_release(target_win->lock_mut_);
569    }
570   } else if(!(target_win->mode_==MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
571         target_win->mode_+= lock_type; // don't set to exclusive if it's already shared
572
573   target_win->lockers_.push_back(comm_->rank());
574
575   int finished = finish_comms();
576   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
577
578   return MPI_SUCCESS;
579 }
580
581 int Win::lock_all(int assert){
582   int i=0;
583   int retval = MPI_SUCCESS;
584   for (i=0; i<comm_->size();i++){
585       int ret = this->lock(MPI_LOCK_SHARED, i, assert);
586       if(ret != MPI_SUCCESS)
587         retval = ret;
588   }
589   return retval;
590 }
591
592 int Win::unlock(int rank){
593   MPI_Win target_win = connected_wins_[rank];
594   int target_mode = target_win->mode_;
595   target_win->mode_= 0;
596   target_win->lockers_.remove(comm_->rank());
597   if (target_mode==MPI_LOCK_EXCLUSIVE){
598     xbt_mutex_release(target_win->lock_mut_);
599   }
600
601   int finished = finish_comms();
602   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
603
604   return MPI_SUCCESS;
605 }
606
607 int Win::unlock_all(){
608   int i=0;
609   int retval = MPI_SUCCESS;
610   for (i=0; i<comm_->size();i++){
611       int ret = this->unlock(i);
612       if(ret != MPI_SUCCESS)
613         retval = ret;
614   }
615   return retval;
616 }
617
618 int Win::flush(int rank){
619   MPI_Win target_win = connected_wins_[rank];
620   int finished = finish_comms(rank);
621   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
622   finished = target_win->finish_comms(rank_);
623   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
624   return MPI_SUCCESS;
625 }
626
627 int Win::flush_local(int rank){
628   int finished = finish_comms(rank);
629   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
630   return MPI_SUCCESS;
631 }
632
633 int Win::flush_all(){
634   int i=0;
635   int finished = 0;
636   finished = finish_comms();
637   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
638   for (i=0; i<comm_->size();i++){
639     finished = connected_wins_[i]->finish_comms(rank_);
640     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
641   }
642   return MPI_SUCCESS;
643 }
644
645 int Win::flush_local_all(){
646   int finished = finish_comms();
647   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
648   return MPI_SUCCESS;
649 }
650
651 Win* Win::f2c(int id){
652   return static_cast<Win*>(F2C::f2c(id));
653 }
654
655
656 int Win::finish_comms(){
657   xbt_mutex_acquire(mut_);
658   //Finish own requests
659   std::vector<MPI_Request> *reqqs = requests_;
660   int size = static_cast<int>(reqqs->size());
661   if (size > 0) {
662     MPI_Request* treqs = &(*reqqs)[0];
663     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
664     reqqs->clear();
665   }
666   xbt_mutex_release(mut_);
667   return size;
668 }
669
670 int Win::finish_comms(int rank){
671   xbt_mutex_acquire(mut_);
672   //Finish own requests
673   std::vector<MPI_Request> *reqqs = requests_;
674   int size = static_cast<int>(reqqs->size());
675   if (size > 0) {
676     size = 0;
677     std::vector<MPI_Request>* myreqqs = new std::vector<MPI_Request>();
678     std::vector<MPI_Request>::iterator iter = reqqs->begin();
679     while (iter != reqqs->end()){
680       if(((*iter)->src() == rank) || ((*iter)->dst() == rank)){
681           myreqqs->push_back(*iter);
682           iter = reqqs->erase(iter);
683           size++;
684       } else {
685         ++iter;
686       }
687     }
688     if(size >0){
689       MPI_Request* treqs = &(*myreqqs)[0];
690       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
691       myreqqs->clear();
692       delete myreqqs;
693     }
694   }
695   xbt_mutex_release(mut_);
696   return size;
697 }
698
699
700 }
701 }