Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
28f399f4d085c1677ac55c242e6f1334b90d4c09
[simgrid.git] / src / smpi / smpi_win.cpp
1 /* Copyright (c) 2007-2017. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <vector>
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 namespace simgrid{
13 namespace smpi{
14 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
15 int Win::keyval_id_=0;
16
17 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
18   int comm_size = comm->size();
19   rank_      = comm->rank();
20   XBT_DEBUG("Creating window");
21   if(info!=MPI_INFO_NULL)
22     info->ref();
23   name_ = nullptr;
24   opened_ = 0;
25   group_ = MPI_GROUP_NULL;
26   requests_ = new std::vector<MPI_Request>();
27   mut_=xbt_mutex_init();
28   lock_mut_=xbt_mutex_init();
29   atomic_mut_=xbt_mutex_init();
30   connected_wins_ = new MPI_Win[comm_size];
31   connected_wins_[rank_] = this;
32   count_ = 0;
33   if(rank_==0){
34     bar_ = MSG_barrier_init(comm_size);
35   }
36   mode_=0;
37
38   comm->add_rma_win(this);
39
40   Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
41                          MPI_BYTE, comm);
42
43   Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
44
45   Colls::barrier(comm);
46 }
47
48 Win::~Win(){
49   //As per the standard, perform a barrier to ensure every async comm is finished
50   MSG_barrier_wait(bar_);
51
52   int finished = finish_comms();
53   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
54
55   delete requests_;
56   delete[] connected_wins_;
57   if (name_ != nullptr){
58     xbt_free(name_);
59   }
60   if(info_!=MPI_INFO_NULL){
61     MPI_Info_free(&info_);
62   }
63
64   comm_->remove_rma_win(this);
65
66   Colls::barrier(comm_);
67   int rank=comm_->rank();
68   if(rank == 0)
69     MSG_barrier_destroy(bar_);
70   xbt_mutex_destroy(mut_);
71   xbt_mutex_destroy(lock_mut_);
72   xbt_mutex_destroy(atomic_mut_);
73
74   if(allocated_ !=0)
75     xbt_free(base_);
76
77   cleanup_attr<Win>();
78 }
79
80 int Win::attach (void *base, MPI_Aint size){
81   if (!(base_ == MPI_BOTTOM || base_ == 0))
82     return MPI_ERR_ARG;
83   base_=0;//actually the address will be given in the RMA calls, as being the disp.
84   size_+=size;
85   return MPI_SUCCESS;
86 }
87
88 int Win::detach (void *base){
89   base_=MPI_BOTTOM;
90   size_=-1;
91   return MPI_SUCCESS;
92 }
93
94 void Win::get_name(char* name, int* length){
95   if(name_==nullptr){
96     *length=0;
97     name=nullptr;
98     return;
99   }
100   *length = strlen(name_);
101   strncpy(name, name_, *length+1);
102 }
103
104 void Win::get_group(MPI_Group* group){
105   if(comm_ != MPI_COMM_NULL){
106     *group = comm_->group();
107   } else {
108     *group = MPI_GROUP_NULL;
109   }
110 }
111
112 MPI_Info Win::info(){
113   if(info_== MPI_INFO_NULL)
114     info_ = new Info();
115   info_->ref();
116   return info_;
117 }
118
119 int Win::rank(){
120   return rank_;
121 }
122
123 MPI_Aint Win::size(){
124   return size_;
125 }
126
127 void* Win::base(){
128   return base_;
129 }
130
131 int Win::disp_unit(){
132   return disp_unit_;
133 }
134
135 int Win::dynamic(){
136   return dynamic_;
137 }
138
139 void Win::set_info(MPI_Info info){
140   if(info_!= MPI_INFO_NULL)
141     info->ref();
142   info_=info;
143 }
144
145 void Win::set_name(char* name){
146   name_ = xbt_strdup(name);
147 }
148
149 int Win::fence(int assert)
150 {
151   XBT_DEBUG("Entering fence");
152   if (opened_ == 0)
153     opened_=1;
154   if (assert != MPI_MODE_NOPRECEDE) {
155     // This is not the first fence => finalize what came before
156     MSG_barrier_wait(bar_);
157     xbt_mutex_acquire(mut_);
158     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
159     // Without this, the vector could get redimensionned when another process pushes.
160     // This would result in the array used by Request::waitall() to be invalidated.
161     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
162     std::vector<MPI_Request> *reqs = requests_;
163     int size = static_cast<int>(reqs->size());
164     // start all requests that have been prepared by another process
165     if (size > 0) {
166       MPI_Request* treqs = &(*reqs)[0];
167       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
168     }
169     count_=0;
170     xbt_mutex_release(mut_);
171   }
172
173   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
174     opened_=0;
175   assert_ = assert;
176
177   MSG_barrier_wait(bar_);
178   XBT_DEBUG("Leaving fence");
179
180   return MPI_SUCCESS;
181 }
182
183 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
184               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
185 {
186   //get receiver pointer
187   MPI_Win recv_win = connected_wins_[target_rank];
188
189   if(opened_==0){//check that post/start has been done
190     // no fence or start .. lock ok ?
191     int locked=0;
192     for(auto it : recv_win->lockers_)
193       if (it == comm_->rank())
194         locked = 1;
195     if(locked != 1)
196       return MPI_ERR_WIN;
197   }
198
199   if(target_count*target_datatype->get_extent()>recv_win->size_)
200     return MPI_ERR_ARG;
201
202   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
203   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
204
205   if(target_rank != comm_->rank()){
206     //prepare send_request
207     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
208         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
209
210     //prepare receiver request
211     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
212         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
213
214     //start send
215     sreq->start();
216
217     if(request!=nullptr){
218       *request=sreq;
219     }else{
220       xbt_mutex_acquire(mut_);
221       requests_->push_back(sreq);
222       xbt_mutex_release(mut_);
223     }
224
225     //push request to receiver's win
226     xbt_mutex_acquire(recv_win->mut_);
227     recv_win->requests_->push_back(rreq);
228     rreq->start();
229     xbt_mutex_release(recv_win->mut_);
230
231   }else{
232     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
233     if(request!=nullptr)
234       *request = MPI_REQUEST_NULL;
235   }
236
237   return MPI_SUCCESS;
238 }
239
240 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
241               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
242 {
243   //get sender pointer
244   MPI_Win send_win = connected_wins_[target_rank];
245
246   if(opened_==0){//check that post/start has been done
247     // no fence or start .. lock ok ?
248     int locked=0;
249     for(auto it : send_win->lockers_)
250       if (it == comm_->rank())
251         locked = 1;
252     if(locked != 1)
253       return MPI_ERR_WIN;
254   }
255
256   if(target_count*target_datatype->get_extent()>send_win->size_)
257     return MPI_ERR_ARG;
258
259   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
260   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
261
262   if(target_rank != comm_->rank()){
263     //prepare send_request
264     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
265         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
266         MPI_OP_NULL);
267
268     //prepare receiver request
269     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
270         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
271         MPI_OP_NULL);
272
273     //start the send, with another process than us as sender. 
274     sreq->start();
275     //push request to receiver's win
276     xbt_mutex_acquire(send_win->mut_);
277     send_win->requests_->push_back(sreq);
278     xbt_mutex_release(send_win->mut_);
279
280     //start recv
281     rreq->start();
282
283     if(request!=nullptr){
284       *request=rreq;
285     }else{
286       xbt_mutex_acquire(mut_);
287       requests_->push_back(rreq);
288       xbt_mutex_release(mut_);
289     }
290
291   }else{
292     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
293     if(request!=nullptr)
294       *request=MPI_REQUEST_NULL;
295   }
296
297   return MPI_SUCCESS;
298 }
299
300
301 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
302               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
303 {
304
305   //get receiver pointer
306   MPI_Win recv_win = connected_wins_[target_rank];
307
308   if(opened_==0){//check that post/start has been done
309     // no fence or start .. lock ok ?
310     int locked=0;
311     for(auto it : recv_win->lockers_)
312       if (it == comm_->rank())
313         locked = 1;
314     if(locked != 1)
315       return MPI_ERR_WIN;
316   }
317   //FIXME: local version 
318
319   if(target_count*target_datatype->get_extent()>recv_win->size_)
320     return MPI_ERR_ARG;
321
322   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
323   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
324     //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
325     //prepare send_request
326
327     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
328         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, comm_, op);
329
330     //prepare receiver request
331     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
332         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, recv_win->comm_, op);
333
334     count_++;
335
336     //start send
337     sreq->start();
338     //push request to receiver's win
339     xbt_mutex_acquire(recv_win->mut_);
340     recv_win->requests_->push_back(rreq);
341     rreq->start();
342     xbt_mutex_release(recv_win->mut_);
343
344     if(request!=nullptr){
345       *request=sreq;
346     }else{
347       xbt_mutex_acquire(mut_);
348       requests_->push_back(sreq);
349       xbt_mutex_release(mut_);
350     }
351
352   return MPI_SUCCESS;
353 }
354
355 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr, 
356               int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count, 
357               MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request){
358
359   //get sender pointer
360   MPI_Win send_win = connected_wins_[target_rank];
361
362   if(opened_==0){//check that post/start has been done
363     // no fence or start .. lock ok ?
364     int locked=0;
365     for(auto it : send_win->lockers_)
366       if (it == comm_->rank())
367         locked = 1;
368     if(locked != 1)
369       return MPI_ERR_WIN;
370   }
371
372   if(target_count*target_datatype->get_extent()>send_win->size_)
373     return MPI_ERR_ARG;
374
375   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
376   //need to be sure ops are correctly ordered, so finish request here ? slow.
377   MPI_Request req;
378   xbt_mutex_acquire(send_win->atomic_mut_);
379   get(result_addr, result_count, result_datatype, target_rank,
380               target_disp, target_count, target_datatype, &req);
381   if (req != MPI_REQUEST_NULL)
382     Request::wait(&req, MPI_STATUS_IGNORE);
383   if(op!=MPI_NO_OP)
384     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
385               target_disp, target_count, target_datatype, op, &req);
386   if (req != MPI_REQUEST_NULL)
387     Request::wait(&req, MPI_STATUS_IGNORE);
388   xbt_mutex_release(send_win->atomic_mut_);
389   return MPI_SUCCESS;
390
391 }
392
393 int Win::compare_and_swap(void *origin_addr, void *compare_addr,
394         void *result_addr, MPI_Datatype datatype, int target_rank,
395         MPI_Aint target_disp){
396   //get sender pointer
397   MPI_Win send_win = connected_wins_[target_rank];
398
399   if(opened_==0){//check that post/start has been done
400     // no fence or start .. lock ok ?
401     int locked=0;
402     for(auto it : send_win->lockers_)
403       if (it == comm_->rank())
404         locked = 1;
405     if(locked != 1)
406       return MPI_ERR_WIN;
407   }
408
409   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
410   MPI_Request req;
411   xbt_mutex_acquire(send_win->atomic_mut_);
412   get(result_addr, 1, datatype, target_rank,
413               target_disp, 1, datatype, &req);
414   if (req != MPI_REQUEST_NULL)
415     Request::wait(&req, MPI_STATUS_IGNORE);
416   if(! memcmp (result_addr, compare_addr, datatype->get_extent() )){
417     put(origin_addr, 1, datatype, target_rank,
418               target_disp, 1, datatype);
419   }
420   xbt_mutex_release(send_win->atomic_mut_);
421   return MPI_SUCCESS;
422 }
423
424 int Win::start(MPI_Group group, int assert){
425     /* From MPI forum advices
426     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
427     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
428     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
429     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
430     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
431     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
432     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
433     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
434     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
435     must complete, without further dependencies.  */
436
437   //naive, blocking implementation.
438     int i             = 0;
439     int j             = 0;
440     int size          = group->size();
441     MPI_Request* reqs = xbt_new0(MPI_Request, size);
442
443     while (j != size) {
444       int src = group->index(j);
445       if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
446         reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
447         i++;
448       }
449       j++;
450   }
451   size=i;
452   Request::startall(size, reqs);
453   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
454   for(i=0;i<size;i++){
455     Request::unref(&reqs[i]);
456   }
457   xbt_free(reqs);
458   opened_++; //we're open for business !
459   group_=group;
460   group->ref();
461   return MPI_SUCCESS;
462 }
463
464 int Win::post(MPI_Group group, int assert){
465   //let's make a synchronous send here
466   int i             = 0;
467   int j             = 0;
468   int size = group->size();
469   MPI_Request* reqs = xbt_new0(MPI_Request, size);
470
471   while(j!=size){
472     int dst=group->index(j);
473     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
474       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
475       i++;
476     }
477     j++;
478   }
479   size=i;
480
481   Request::startall(size, reqs);
482   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
483   for(i=0;i<size;i++){
484     Request::unref(&reqs[i]);
485   }
486   xbt_free(reqs);
487   opened_++; //we're open for business !
488   group_=group;
489   group->ref();
490   return MPI_SUCCESS;
491 }
492
493 int Win::complete(){
494   if(opened_==0)
495     xbt_die("Complete called on already opened MPI_Win");
496
497   XBT_DEBUG("Entering MPI_Win_Complete");
498   int i             = 0;
499   int j             = 0;
500   int size = group_->size();
501   MPI_Request* reqs = xbt_new0(MPI_Request, size);
502
503   while(j!=size){
504     int dst=group_->index(j);
505     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
506       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
507       i++;
508     }
509     j++;
510   }
511   size=i;
512   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
513   Request::startall(size, reqs);
514   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
515
516   for(i=0;i<size;i++){
517     Request::unref(&reqs[i]);
518   }
519   xbt_free(reqs);
520
521   int finished = finish_comms();
522   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
523
524   Group::unref(group_);
525   opened_--; //we're closed for business !
526   return MPI_SUCCESS;
527 }
528
529 int Win::wait(){
530   //naive, blocking implementation.
531   XBT_DEBUG("Entering MPI_Win_Wait");
532   int i             = 0;
533   int j             = 0;
534   int size          = group_->size();
535   MPI_Request* reqs = xbt_new0(MPI_Request, size);
536
537   while(j!=size){
538     int src=group_->index(j);
539     if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
540       reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
541       i++;
542     }
543     j++;
544   }
545   size=i;
546   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
547   Request::startall(size, reqs);
548   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
549   for(i=0;i<size;i++){
550     Request::unref(&reqs[i]);
551   }
552   xbt_free(reqs);
553   int finished = finish_comms();
554   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
555
556   Group::unref(group_);
557   opened_--; //we're opened for business !
558   return MPI_SUCCESS;
559 }
560
561 int Win::lock(int lock_type, int rank, int assert){
562   if(opened_!=0)
563     return MPI_ERR_WIN;
564
565   MPI_Win target_win = connected_wins_[rank];
566
567   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
568     xbt_mutex_acquire(target_win->lock_mut_);
569     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
570     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
571       xbt_mutex_release(target_win->lock_mut_);
572    }
573   } else if(!(target_win->mode_==MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
574         target_win->mode_+= lock_type; // don't set to exclusive if it's already shared
575
576   target_win->lockers_.push_back(comm_->rank());
577
578   int finished = finish_comms();
579   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
580
581   return MPI_SUCCESS;
582 }
583
584 int Win::lock_all(int assert){
585   int i=0;
586   int retval = MPI_SUCCESS;
587   for (i=0; i<comm_->size();i++){
588       int ret = this->lock(MPI_LOCK_SHARED, i, assert);
589       if(ret != MPI_SUCCESS)
590         retval = ret;
591   }
592   return retval;
593 }
594
595 int Win::unlock(int rank){
596   if(opened_!=0)
597     return MPI_ERR_WIN;
598
599   MPI_Win target_win = connected_wins_[rank];
600   int target_mode = target_win->mode_;
601   target_win->mode_= 0;
602   target_win->lockers_.remove(comm_->rank());
603   if (target_mode==MPI_LOCK_EXCLUSIVE){
604     xbt_mutex_release(target_win->lock_mut_);
605   }
606
607   int finished = finish_comms();
608   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
609
610   return MPI_SUCCESS;
611 }
612
613 int Win::unlock_all(){
614   int i=0;
615   int retval = MPI_SUCCESS;
616   for (i=0; i<comm_->size();i++){
617       int ret = this->unlock(i);
618       if(ret != MPI_SUCCESS)
619         retval = ret;
620   }
621   return retval;
622 }
623
624 int Win::flush(int rank){
625   MPI_Win target_win = connected_wins_[rank];
626   int finished = finish_comms(rank);
627   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
628   finished = target_win->finish_comms(rank_);
629   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
630   return MPI_SUCCESS;
631 }
632
633 int Win::flush_local(int rank){
634   int finished = finish_comms(rank);
635   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
636   return MPI_SUCCESS;
637 }
638
639 int Win::flush_all(){
640   int i=0;
641   int finished = 0;
642   finished = finish_comms();
643   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
644   for (i=0; i<comm_->size();i++){
645     finished = connected_wins_[i]->finish_comms(rank_);
646     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
647   }
648   return MPI_SUCCESS;
649 }
650
651 int Win::flush_local_all(){
652   int finished = finish_comms();
653   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
654   return MPI_SUCCESS;
655 }
656
657 Win* Win::f2c(int id){
658   return static_cast<Win*>(F2C::f2c(id));
659 }
660
661
662 int Win::finish_comms(){
663   xbt_mutex_acquire(mut_);
664   //Finish own requests
665   std::vector<MPI_Request> *reqqs = requests_;
666   int size = static_cast<int>(reqqs->size());
667   if (size > 0) {
668     MPI_Request* treqs = &(*reqqs)[0];
669     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
670     reqqs->clear();
671   }
672   xbt_mutex_release(mut_);
673   return size;
674 }
675
676 int Win::finish_comms(int rank){
677   xbt_mutex_acquire(mut_);
678   //Finish own requests
679   std::vector<MPI_Request> *reqqs = requests_;
680   int size = static_cast<int>(reqqs->size());
681   if (size > 0) {
682     size = 0;
683     std::vector<MPI_Request>* myreqqs = new std::vector<MPI_Request>();
684     std::vector<MPI_Request>::iterator iter = reqqs->begin();
685     while (iter != reqqs->end()){
686       if(((*iter)->src() == rank) || ((*iter)->dst() == rank)){
687           myreqqs->push_back(*iter);
688           iter = reqqs->erase(iter);
689           size++;
690       } else {
691         ++iter;
692       }
693     }
694     if(size >0){
695       MPI_Request* treqs = &(*myreqqs)[0];
696       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
697       myreqqs->clear();
698       delete myreqqs;
699     }
700   }
701   xbt_mutex_release(mut_);
702   return size;
703 }
704
705
706 }
707 }