Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
48b992dbefd9f7006e98ebb8af939b780ed66f88
[simgrid.git] / src / smpi / smpi_win.cpp
1 /* Copyright (c) 2007-2017. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <vector>
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 namespace simgrid{
13 namespace smpi{
14 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
15 int Win::keyval_id_=0;
16
17 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
18   int comm_size = comm->size();
19   rank_      = comm->rank();
20   XBT_DEBUG("Creating window");
21   if(info!=MPI_INFO_NULL)
22     info->ref();
23   name_ = nullptr;
24   opened_ = 0;
25   group_ = MPI_GROUP_NULL;
26   requests_ = new std::vector<MPI_Request>();
27   mut_=xbt_mutex_init();
28   lock_mut_=xbt_mutex_init();
29   atomic_mut_=xbt_mutex_init();
30   connected_wins_ = new MPI_Win[comm_size];
31   connected_wins_[rank_] = this;
32   count_ = 0;
33   if(rank_==0){
34     bar_ = MSG_barrier_init(comm_size);
35   }
36   mode_=0;
37
38   comm->add_rma_win(this);
39
40   Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
41                          MPI_BYTE, comm);
42
43   Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
44
45   Colls::barrier(comm);
46 }
47
48 Win::~Win(){
49   //As per the standard, perform a barrier to ensure every async comm is finished
50   MSG_barrier_wait(bar_);
51
52   int finished = finish_comms();
53   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
54
55   delete requests_;
56   delete[] connected_wins_;
57   if (name_ != nullptr){
58     xbt_free(name_);
59   }
60   if(info_!=MPI_INFO_NULL){
61     MPI_Info_free(&info_);
62   }
63
64   comm_->remove_rma_win(this);
65
66   Colls::barrier(comm_);
67   int rank=comm_->rank();
68   if(rank == 0)
69     MSG_barrier_destroy(bar_);
70   xbt_mutex_destroy(mut_);
71   xbt_mutex_destroy(lock_mut_);
72   xbt_mutex_destroy(atomic_mut_);
73
74   if(allocated_ !=0)
75     xbt_free(base_);
76
77   cleanup_attr<Win>();
78 }
79
80 int Win::attach (void *base, MPI_Aint size){
81   if (!(base_ == MPI_BOTTOM || base_ == 0))
82     return MPI_ERR_ARG;
83   base_=0;//actually the address will be given in the RMA calls, as being the disp.
84   size_+=size;
85   return MPI_SUCCESS;
86 }
87
88 int Win::detach (void *base){
89   base_=MPI_BOTTOM;
90   size_=-1;
91   return MPI_SUCCESS;
92 }
93
94 void Win::get_name(char* name, int* length){
95   if(name_==nullptr){
96     *length=0;
97     name=nullptr;
98     return;
99   }
100   *length = strlen(name_);
101   strncpy(name, name_, *length+1);
102 }
103
104 void Win::get_group(MPI_Group* group){
105   if(comm_ != MPI_COMM_NULL){
106     *group = comm_->group();
107   } else {
108     *group = MPI_GROUP_NULL;
109   }
110 }
111
112 MPI_Info Win::info(){
113   if(info_== MPI_INFO_NULL)
114     info_ = new Info();
115   info_->ref();
116   return info_;
117 }
118
119 int Win::rank(){
120   return rank_;
121 }
122
123 MPI_Aint Win::size(){
124   return size_;
125 }
126
127 void* Win::base(){
128   return base_;
129 }
130
131 int Win::disp_unit(){
132   return disp_unit_;
133 }
134
135 int Win::dynamic(){
136   return dynamic_;
137 }
138
139 void Win::set_info(MPI_Info info){
140   if(info_!= MPI_INFO_NULL)
141     info->ref();
142   info_=info;
143 }
144
145 void Win::set_name(char* name){
146   name_ = xbt_strdup(name);
147 }
148
149 int Win::fence(int assert)
150 {
151   XBT_DEBUG("Entering fence");
152   if (opened_ == 0)
153     opened_=1;
154   if (assert != MPI_MODE_NOPRECEDE) {
155     // This is not the first fence => finalize what came before
156     MSG_barrier_wait(bar_);
157     xbt_mutex_acquire(mut_);
158     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
159     // Without this, the vector could get redimensionned when another process pushes.
160     // This would result in the array used by Request::waitall() to be invalidated.
161     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
162     std::vector<MPI_Request> *reqs = requests_;
163     int size = static_cast<int>(reqs->size());
164     // start all requests that have been prepared by another process
165     if (size > 0) {
166       MPI_Request* treqs = &(*reqs)[0];
167       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
168     }
169     count_=0;
170     xbt_mutex_release(mut_);
171   }
172
173   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
174     opened_=0;
175   assert_ = assert;
176
177   MSG_barrier_wait(bar_);
178   XBT_DEBUG("Leaving fence");
179
180   return MPI_SUCCESS;
181 }
182
183 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
184               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
185 {
186   //get receiver pointer
187   MPI_Win recv_win = connected_wins_[target_rank];
188
189   if(opened_==0){//check that post/start has been done
190     // no fence or start .. lock ok ?
191     int locked=0;
192     for(auto it : recv_win->lockers_)
193       if (it == comm_->rank())
194         locked = 1;
195     if(locked != 1)
196       return MPI_ERR_WIN;
197   }
198
199   if(target_count*target_datatype->get_extent()>recv_win->size_)
200     return MPI_ERR_ARG;
201
202   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
203   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
204
205   if(target_rank != comm_->rank()){
206     //prepare send_request
207     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
208         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
209
210     //prepare receiver request
211     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
212         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
213
214     //start send
215     sreq->start();
216     //push request to receiver's win
217     xbt_mutex_acquire(recv_win->mut_);
218     recv_win->requests_->push_back(rreq);
219     rreq->start();
220     xbt_mutex_release(recv_win->mut_);
221     //push request to sender's win
222     xbt_mutex_acquire(mut_);
223     requests_->push_back(sreq);
224     xbt_mutex_release(mut_);
225   }else{
226     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
227   }
228
229   return MPI_SUCCESS;
230 }
231
232 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
233               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
234 {
235   //get sender pointer
236   MPI_Win send_win = connected_wins_[target_rank];
237
238   if(opened_==0){//check that post/start has been done
239     // no fence or start .. lock ok ?
240     int locked=0;
241     for(auto it : send_win->lockers_)
242       if (it == comm_->rank())
243         locked = 1;
244     if(locked != 1)
245       return MPI_ERR_WIN;
246   }
247
248   if(target_count*target_datatype->get_extent()>send_win->size_)
249     return MPI_ERR_ARG;
250
251   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
252   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
253
254   if(target_rank != comm_->rank()){
255     //prepare send_request
256     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
257         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
258         MPI_OP_NULL);
259
260     //prepare receiver request
261     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
262         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
263         MPI_OP_NULL);
264
265     //start the send, with another process than us as sender. 
266     sreq->start();
267     //push request to receiver's win
268     xbt_mutex_acquire(send_win->mut_);
269     send_win->requests_->push_back(sreq);
270     xbt_mutex_release(send_win->mut_);
271
272     //start recv
273     rreq->start();
274     //push request to sender's win
275     xbt_mutex_acquire(mut_);
276     requests_->push_back(rreq);
277     xbt_mutex_release(mut_);
278   }else{
279     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
280   }
281
282   return MPI_SUCCESS;
283 }
284
285
286 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
287               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op)
288 {
289
290   //get receiver pointer
291   MPI_Win recv_win = connected_wins_[target_rank];
292
293   if(opened_==0){//check that post/start has been done
294     // no fence or start .. lock ok ?
295     int locked=0;
296     for(auto it : recv_win->lockers_)
297       if (it == comm_->rank())
298         locked = 1;
299     if(locked != 1)
300       return MPI_ERR_WIN;
301   }
302   //FIXME: local version 
303
304   if(target_count*target_datatype->get_extent()>recv_win->size_)
305     return MPI_ERR_ARG;
306
307   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
308   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
309     //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
310     //prepare send_request
311
312     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
313         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, comm_, op);
314
315     //prepare receiver request
316     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
317         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, recv_win->comm_, op);
318
319     count_++;
320
321     //start send
322     sreq->start();
323     //push request to receiver's win
324     xbt_mutex_acquire(recv_win->mut_);
325     recv_win->requests_->push_back(rreq);
326     rreq->start();
327     xbt_mutex_release(recv_win->mut_);
328     //push request to sender's win
329     xbt_mutex_acquire(mut_);
330     requests_->push_back(sreq);
331     xbt_mutex_release(mut_);
332
333   return MPI_SUCCESS;
334 }
335
336 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr, 
337               int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count, 
338               MPI_Datatype target_datatype, MPI_Op op){
339
340   //get sender pointer
341   MPI_Win send_win = connected_wins_[target_rank];
342
343   if(opened_==0){//check that post/start has been done
344     // no fence or start .. lock ok ?
345     int locked=0;
346     for(auto it : send_win->lockers_)
347       if (it == comm_->rank())
348         locked = 1;
349     if(locked != 1)
350       return MPI_ERR_WIN;
351   }
352
353   if(target_count*target_datatype->get_extent()>send_win->size_)
354     return MPI_ERR_ARG;
355
356   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
357   //need to be sure ops are correctly ordered, so finish request here ? slow.
358   MPI_Request req;
359   xbt_mutex_acquire(send_win->atomic_mut_);
360   get(result_addr, result_count, result_datatype, target_rank,
361               target_disp, target_count, target_datatype, &req);
362   if (req != MPI_REQUEST_NULL)
363     Request::wait(&req, MPI_STATUS_IGNORE);
364   if(op!=MPI_NO_OP)
365     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
366               target_disp, target_count, target_datatype, op, &req);
367   if (req != MPI_REQUEST_NULL)
368     Request::wait(&req, MPI_STATUS_IGNORE);
369   xbt_mutex_release(send_win->atomic_mut_);
370   return MPI_SUCCESS;
371
372 }
373
374 int Win::compare_and_swap(void *origin_addr, void *compare_addr,
375         void *result_addr, MPI_Datatype datatype, int target_rank,
376         MPI_Aint target_disp){
377   //get sender pointer
378   MPI_Win send_win = connected_wins_[target_rank];
379
380   if(opened_==0){//check that post/start has been done
381     // no fence or start .. lock ok ?
382     int locked=0;
383     for(auto it : send_win->lockers_)
384       if (it == comm_->rank())
385         locked = 1;
386     if(locked != 1)
387       return MPI_ERR_WIN;
388   }
389
390   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
391   MPI_Request req;
392   xbt_mutex_acquire(send_win->atomic_mut_);
393   get(result_addr, 1, datatype, target_rank,
394               target_disp, 1, datatype, &req);
395   if (req != MPI_REQUEST_NULL)
396     Request::wait(&req, MPI_STATUS_IGNORE);
397   if(! memcmp (result_addr, compare_addr, datatype->get_extent() )){
398     put(origin_addr, 1, datatype, target_rank,
399               target_disp, 1, datatype);
400   }
401   xbt_mutex_release(send_win->atomic_mut_);
402   return MPI_SUCCESS;
403 }
404
405 int Win::start(MPI_Group group, int assert){
406     /* From MPI forum advices
407     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
408     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
409     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
410     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
411     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
412     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
413     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
414     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
415     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
416     must complete, without further dependencies.  */
417
418   //naive, blocking implementation.
419     int i             = 0;
420     int j             = 0;
421     int size          = group->size();
422     MPI_Request* reqs = xbt_new0(MPI_Request, size);
423
424     while (j != size) {
425       int src = group->index(j);
426       if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
427         reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
428         i++;
429       }
430       j++;
431   }
432   size=i;
433   Request::startall(size, reqs);
434   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
435   for(i=0;i<size;i++){
436     Request::unref(&reqs[i]);
437   }
438   xbt_free(reqs);
439   opened_++; //we're open for business !
440   group_=group;
441   group->ref();
442   return MPI_SUCCESS;
443 }
444
445 int Win::post(MPI_Group group, int assert){
446   //let's make a synchronous send here
447   int i             = 0;
448   int j             = 0;
449   int size = group->size();
450   MPI_Request* reqs = xbt_new0(MPI_Request, size);
451
452   while(j!=size){
453     int dst=group->index(j);
454     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
455       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
456       i++;
457     }
458     j++;
459   }
460   size=i;
461
462   Request::startall(size, reqs);
463   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
464   for(i=0;i<size;i++){
465     Request::unref(&reqs[i]);
466   }
467   xbt_free(reqs);
468   opened_++; //we're open for business !
469   group_=group;
470   group->ref();
471   return MPI_SUCCESS;
472 }
473
474 int Win::complete(){
475   if(opened_==0)
476     xbt_die("Complete called on already opened MPI_Win");
477
478   XBT_DEBUG("Entering MPI_Win_Complete");
479   int i             = 0;
480   int j             = 0;
481   int size = group_->size();
482   MPI_Request* reqs = xbt_new0(MPI_Request, size);
483
484   while(j!=size){
485     int dst=group_->index(j);
486     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
487       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
488       i++;
489     }
490     j++;
491   }
492   size=i;
493   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
494   Request::startall(size, reqs);
495   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
496
497   for(i=0;i<size;i++){
498     Request::unref(&reqs[i]);
499   }
500   xbt_free(reqs);
501
502   int finished = finish_comms();
503   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
504
505   Group::unref(group_);
506   opened_--; //we're closed for business !
507   return MPI_SUCCESS;
508 }
509
510 int Win::wait(){
511   //naive, blocking implementation.
512   XBT_DEBUG("Entering MPI_Win_Wait");
513   int i             = 0;
514   int j             = 0;
515   int size          = group_->size();
516   MPI_Request* reqs = xbt_new0(MPI_Request, size);
517
518   while(j!=size){
519     int src=group_->index(j);
520     if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
521       reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
522       i++;
523     }
524     j++;
525   }
526   size=i;
527   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
528   Request::startall(size, reqs);
529   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
530   for(i=0;i<size;i++){
531     Request::unref(&reqs[i]);
532   }
533   xbt_free(reqs);
534   int finished = finish_comms();
535   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
536
537   Group::unref(group_);
538   opened_--; //we're opened for business !
539   return MPI_SUCCESS;
540 }
541
542 int Win::lock(int lock_type, int rank, int assert){
543   if(opened_!=0)
544     return MPI_ERR_WIN;
545
546   MPI_Win target_win = connected_wins_[rank];
547
548   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
549     xbt_mutex_acquire(target_win->lock_mut_);
550     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
551     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
552       xbt_mutex_release(target_win->lock_mut_);
553    }
554   } else if(!(target_win->mode_==MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
555         target_win->mode_+= lock_type; // don't set to exclusive if it's already shared
556
557   target_win->lockers_.push_back(comm_->rank());
558
559   int finished = finish_comms();
560   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
561
562   return MPI_SUCCESS;
563 }
564
565 int Win::lock_all(int assert){
566   int i=0;
567   int retval = MPI_SUCCESS;
568   for (i=0; i<comm_->size();i++){
569       int ret = this->lock(MPI_LOCK_SHARED, i, assert);
570       if(ret != MPI_SUCCESS)
571         retval = ret;
572   }
573   return retval;
574 }
575
576 int Win::unlock(int rank){
577   if(opened_!=0)
578     return MPI_ERR_WIN;
579
580   MPI_Win target_win = connected_wins_[rank];
581   int target_mode = target_win->mode_;
582   target_win->mode_= 0;
583   target_win->lockers_.remove(comm_->rank());
584   if (target_mode==MPI_LOCK_EXCLUSIVE){
585     xbt_mutex_release(target_win->lock_mut_);
586   }
587
588   int finished = finish_comms();
589   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
590
591   return MPI_SUCCESS;
592 }
593
594 int Win::unlock_all(){
595   int i=0;
596   int retval = MPI_SUCCESS;
597   for (i=0; i<comm_->size();i++){
598       int ret = this->unlock(i);
599       if(ret != MPI_SUCCESS)
600         retval = ret;
601   }
602   return retval;
603 }
604
605 int Win::flush(int rank){
606   MPI_Win target_win = connected_wins_[rank];
607   int finished = finish_comms(rank);
608   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
609   finished = target_win->finish_comms(rank_);
610   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
611   return MPI_SUCCESS;
612 }
613
614 int Win::flush_local(int rank){
615   int finished = finish_comms(rank);
616   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
617   return MPI_SUCCESS;
618 }
619
620 int Win::flush_all(){
621   int i=0;
622   int finished = 0;
623   finished = finish_comms();
624   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
625   for (i=0; i<comm_->size();i++){
626     finished = connected_wins_[i]->finish_comms(rank_);
627     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
628   }
629   return MPI_SUCCESS;
630 }
631
632 int Win::flush_local_all(){
633   int finished = finish_comms();
634   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
635   return MPI_SUCCESS;
636 }
637
638 Win* Win::f2c(int id){
639   return static_cast<Win*>(F2C::f2c(id));
640 }
641
642
643 int Win::finish_comms(){
644   xbt_mutex_acquire(mut_);
645   //Finish own requests
646   std::vector<MPI_Request> *reqqs = requests_;
647   int size = static_cast<int>(reqqs->size());
648   if (size > 0) {
649     MPI_Request* treqs = &(*reqqs)[0];
650     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
651     reqqs->clear();
652   }
653   xbt_mutex_release(mut_);
654   return size;
655 }
656
657 int Win::finish_comms(int rank){
658   xbt_mutex_acquire(mut_);
659   //Finish own requests
660   std::vector<MPI_Request> *reqqs = requests_;
661   int size = static_cast<int>(reqqs->size());
662   if (size > 0) {
663     size = 0;
664     std::vector<MPI_Request>* myreqqs = new std::vector<MPI_Request>();
665     std::vector<MPI_Request>::iterator iter = reqqs->begin();
666     while (iter != reqqs->end()){
667       if(((*iter)->src() == rank) || ((*iter)->dst() == rank)){
668           myreqqs->push_back(*iter);
669           iter = reqqs->erase(iter);
670           size++;
671       } else {
672         ++iter;
673       }
674     }
675     if(size >0){
676       MPI_Request* treqs = &(*myreqqs)[0];
677       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
678       myreqqs->clear();
679       delete myreqqs;
680     }
681   }
682   xbt_mutex_release(mut_);
683   return size;
684 }
685
686
687 }
688 }