Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
39d2e8ae3f90ff513ae987f0ec777a9e233650ee
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7 #include "private.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_datatype.hpp"
11 #include "smpi_info.hpp"
12 #include "smpi_keyvals.hpp"
13 #include "smpi_process.hpp"
14 #include "smpi_request.hpp"
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
17
18 using simgrid::s4u::Actor;
19
20 namespace simgrid{
21 namespace smpi{
22 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
23 int Win::keyval_id_=0;
24
25 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
26   int comm_size = comm->size();
27   rank_      = comm->rank();
28   XBT_DEBUG("Creating window");
29   if(info!=MPI_INFO_NULL)
30     info->ref();
31   name_ = nullptr;
32   opened_ = 0;
33   group_ = MPI_GROUP_NULL;
34   requests_ = new std::vector<MPI_Request>();
35   mut_=xbt_mutex_init();
36   lock_mut_=xbt_mutex_init();
37   atomic_mut_=xbt_mutex_init();
38   connected_wins_ = new MPI_Win[comm_size];
39   connected_wins_[rank_] = this;
40   count_ = 0;
41   if(rank_==0){
42     bar_ = MSG_barrier_init(comm_size);
43   }
44   mode_=0;
45
46   comm->add_rma_win(this);
47
48   Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
49                          MPI_BYTE, comm);
50
51   Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
52
53   Colls::barrier(comm);
54 }
55
56 Win::~Win(){
57   //As per the standard, perform a barrier to ensure every async comm is finished
58   MSG_barrier_wait(bar_);
59
60   int finished = finish_comms();
61   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
62
63   delete requests_;
64   delete[] connected_wins_;
65   if (name_ != nullptr){
66     xbt_free(name_);
67   }
68   if(info_!=MPI_INFO_NULL){
69     MPI_Info_free(&info_);
70   }
71
72   comm_->remove_rma_win(this);
73
74   Colls::barrier(comm_);
75   if (rank_ == 0)
76     MSG_barrier_destroy(bar_);
77   xbt_mutex_destroy(mut_);
78   xbt_mutex_destroy(lock_mut_);
79   xbt_mutex_destroy(atomic_mut_);
80
81   if(allocated_ !=0)
82     xbt_free(base_);
83
84   cleanup_attr<Win>();
85 }
86
87 int Win::attach (void *base, MPI_Aint size){
88   if (not(base_ == MPI_BOTTOM || base_ == 0))
89     return MPI_ERR_ARG;
90   base_=0;//actually the address will be given in the RMA calls, as being the disp.
91   size_+=size;
92   return MPI_SUCCESS;
93 }
94
95 int Win::detach (void *base){
96   base_=MPI_BOTTOM;
97   size_=-1;
98   return MPI_SUCCESS;
99 }
100
101 void Win::get_name(char* name, int* length){
102   if(name_==nullptr){
103     *length=0;
104     name=nullptr;
105     return;
106   }
107   *length = strlen(name_);
108   strncpy(name, name_, *length+1);
109 }
110
111 void Win::get_group(MPI_Group* group){
112   if(comm_ != MPI_COMM_NULL){
113     *group = comm_->group();
114   } else {
115     *group = MPI_GROUP_NULL;
116   }
117 }
118
119 MPI_Info Win::info(){
120   if(info_== MPI_INFO_NULL)
121     info_ = new Info();
122   info_->ref();
123   return info_;
124 }
125
126 int Win::rank(){
127   return rank_;
128 }
129
130 MPI_Aint Win::size(){
131   return size_;
132 }
133
134 void* Win::base(){
135   return base_;
136 }
137
138 int Win::disp_unit(){
139   return disp_unit_;
140 }
141
142 int Win::dynamic(){
143   return dynamic_;
144 }
145
146 void Win::set_info(MPI_Info info){
147   if(info_!= MPI_INFO_NULL)
148     info->ref();
149   info_=info;
150 }
151
152 void Win::set_name(char* name){
153   name_ = xbt_strdup(name);
154 }
155
156 int Win::fence(int assert)
157 {
158   XBT_DEBUG("Entering fence");
159   if (opened_ == 0)
160     opened_=1;
161   if (assert != MPI_MODE_NOPRECEDE) {
162     // This is not the first fence => finalize what came before
163     MSG_barrier_wait(bar_);
164     xbt_mutex_acquire(mut_);
165     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
166     // Without this, the vector could get redimensionned when another process pushes.
167     // This would result in the array used by Request::waitall() to be invalidated.
168     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
169     std::vector<MPI_Request> *reqs = requests_;
170     int size = static_cast<int>(reqs->size());
171     // start all requests that have been prepared by another process
172     if (size > 0) {
173       MPI_Request* treqs = &(*reqs)[0];
174       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
175     }
176     count_=0;
177     xbt_mutex_release(mut_);
178   }
179
180   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
181     opened_=0;
182   assert_ = assert;
183
184   MSG_barrier_wait(bar_);
185   XBT_DEBUG("Leaving fence");
186
187   return MPI_SUCCESS;
188 }
189
190 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
191               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
192 {
193   //get receiver pointer
194   MPI_Win recv_win = connected_wins_[target_rank];
195
196   if(opened_==0){//check that post/start has been done
197     // no fence or start .. lock ok ?
198     int locked=0;
199     for (auto const& it : recv_win->lockers_)
200       if (it == comm_->rank())
201         locked = 1;
202     if(locked != 1)
203       return MPI_ERR_WIN;
204   }
205
206   if(target_count*target_datatype->get_extent()>recv_win->size_)
207     return MPI_ERR_ARG;
208
209   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
210   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
211
212   if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
213     // prepare send_request
214     MPI_Request sreq =
215         // TODO cheinrich Check for rank / pid conversion
216         Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
217                                comm_, MPI_OP_NULL);
218
219     //prepare receiver request
220     // TODO cheinrich Check for rank / pid conversion
221     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
222                                               target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
223
224     //start send
225     sreq->start();
226
227     if(request!=nullptr){
228       *request=sreq;
229     }else{
230       xbt_mutex_acquire(mut_);
231       requests_->push_back(sreq);
232       xbt_mutex_release(mut_);
233     }
234
235     //push request to receiver's win
236     xbt_mutex_acquire(recv_win->mut_);
237     recv_win->requests_->push_back(rreq);
238     rreq->start();
239     xbt_mutex_release(recv_win->mut_);
240
241   }else{
242     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
243     if(request!=nullptr)
244       *request = MPI_REQUEST_NULL;
245   }
246
247   return MPI_SUCCESS;
248 }
249
250 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
251               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
252 {
253   //get sender pointer
254   MPI_Win send_win = connected_wins_[target_rank];
255
256   if(opened_==0){//check that post/start has been done
257     // no fence or start .. lock ok ?
258     int locked=0;
259     for (auto const& it : send_win->lockers_)
260       if (it == comm_->rank())
261         locked = 1;
262     if(locked != 1)
263       return MPI_ERR_WIN;
264   }
265
266   if(target_count*target_datatype->get_extent()>send_win->size_)
267     return MPI_ERR_ARG;
268
269   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
270   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
271
272   if(target_rank != comm_->rank()){
273     //prepare send_request
274     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
275                                               send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
276
277     //prepare receiver request
278     MPI_Request rreq = Request::rma_recv_init(
279         origin_addr, origin_count, origin_datatype, target_rank,
280         comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
281         SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
282
283     //start the send, with another process than us as sender.
284     sreq->start();
285     //push request to receiver's win
286     xbt_mutex_acquire(send_win->mut_);
287     send_win->requests_->push_back(sreq);
288     xbt_mutex_release(send_win->mut_);
289
290     //start recv
291     rreq->start();
292
293     if(request!=nullptr){
294       *request=rreq;
295     }else{
296       xbt_mutex_acquire(mut_);
297       requests_->push_back(rreq);
298       xbt_mutex_release(mut_);
299     }
300
301   }else{
302     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
303     if(request!=nullptr)
304       *request=MPI_REQUEST_NULL;
305   }
306
307   return MPI_SUCCESS;
308 }
309
310
311 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
312               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
313 {
314   XBT_DEBUG("Entering MPI_Win_Accumulate");
315   //get receiver pointer
316   MPI_Win recv_win = connected_wins_[target_rank];
317
318   if(opened_==0){//check that post/start has been done
319     // no fence or start .. lock ok ?
320     int locked=0;
321     for (auto const& it : recv_win->lockers_)
322       if (it == comm_->rank())
323         locked = 1;
324     if(locked != 1)
325       return MPI_ERR_WIN;
326   }
327   //FIXME: local version
328
329   if(target_count*target_datatype->get_extent()>recv_win->size_)
330     return MPI_ERR_ARG;
331
332   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
333   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
334     //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
335     //prepare send_request
336
337   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
338                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
339
340   // prepare receiver request
341   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
342                                             target_rank, SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
343
344   count_++;
345
346   // start send
347   sreq->start();
348   // push request to receiver's win
349   xbt_mutex_acquire(recv_win->mut_);
350   recv_win->requests_->push_back(rreq);
351   rreq->start();
352   xbt_mutex_release(recv_win->mut_);
353
354   if (request != nullptr) {
355     *request = sreq;
356   } else {
357     xbt_mutex_acquire(mut_);
358     requests_->push_back(sreq);
359     xbt_mutex_release(mut_);
360   }
361
362   XBT_DEBUG("Leaving MPI_Win_Accumulate");
363   return MPI_SUCCESS;
364 }
365
366 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr,
367               int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count,
368               MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request){
369
370   //get sender pointer
371   MPI_Win send_win = connected_wins_[target_rank];
372
373   if(opened_==0){//check that post/start has been done
374     // no fence or start .. lock ok ?
375     int locked=0;
376     for (auto const& it : send_win->lockers_)
377       if (it == comm_->rank())
378         locked = 1;
379     if(locked != 1)
380       return MPI_ERR_WIN;
381   }
382
383   if(target_count*target_datatype->get_extent()>send_win->size_)
384     return MPI_ERR_ARG;
385
386   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
387   //need to be sure ops are correctly ordered, so finish request here ? slow.
388   MPI_Request req;
389   xbt_mutex_acquire(send_win->atomic_mut_);
390   get(result_addr, result_count, result_datatype, target_rank,
391               target_disp, target_count, target_datatype, &req);
392   if (req != MPI_REQUEST_NULL)
393     Request::wait(&req, MPI_STATUS_IGNORE);
394   if(op!=MPI_NO_OP)
395     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
396               target_disp, target_count, target_datatype, op, &req);
397   if (req != MPI_REQUEST_NULL)
398     Request::wait(&req, MPI_STATUS_IGNORE);
399   xbt_mutex_release(send_win->atomic_mut_);
400   return MPI_SUCCESS;
401
402 }
403
404 int Win::compare_and_swap(void *origin_addr, void *compare_addr,
405         void *result_addr, MPI_Datatype datatype, int target_rank,
406         MPI_Aint target_disp){
407   //get sender pointer
408   MPI_Win send_win = connected_wins_[target_rank];
409
410   if(opened_==0){//check that post/start has been done
411     // no fence or start .. lock ok ?
412     int locked=0;
413     for (auto const& it : send_win->lockers_)
414       if (it == comm_->rank())
415         locked = 1;
416     if(locked != 1)
417       return MPI_ERR_WIN;
418   }
419
420   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
421   MPI_Request req = MPI_REQUEST_NULL;
422   xbt_mutex_acquire(send_win->atomic_mut_);
423   get(result_addr, 1, datatype, target_rank,
424               target_disp, 1, datatype, &req);
425   if (req != MPI_REQUEST_NULL)
426     Request::wait(&req, MPI_STATUS_IGNORE);
427   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
428     put(origin_addr, 1, datatype, target_rank,
429               target_disp, 1, datatype);
430   }
431   xbt_mutex_release(send_win->atomic_mut_);
432   return MPI_SUCCESS;
433 }
434
435 int Win::start(MPI_Group group, int assert){
436   /* From MPI forum advices
437   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
438   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
439   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
440   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
441   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
442   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
443   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
444   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
445   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
446   must complete, without further dependencies.  */
447
448   //naive, blocking implementation.
449   int i             = 0;
450   int j             = 0;
451   int size          = group->size();
452   MPI_Request* reqs = xbt_new0(MPI_Request, size);
453
454   XBT_DEBUG("Entering MPI_Win_Start");
455   while (j != size) {
456     int src = comm_->group()->rank(group->actor(j));
457     if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
458       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
459       i++;
460     }
461     j++;
462   }
463   size = i;
464   Request::startall(size, reqs);
465   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
466   for (i = 0; i < size; i++) {
467     Request::unref(&reqs[i]);
468   }
469   xbt_free(reqs);
470   opened_++; //we're open for business !
471   group_=group;
472   group->ref();
473   XBT_DEBUG("Leaving MPI_Win_Start");
474   return MPI_SUCCESS;
475 }
476
477 int Win::post(MPI_Group group, int assert){
478   //let's make a synchronous send here
479   int i             = 0;
480   int j             = 0;
481   int size = group->size();
482   MPI_Request* reqs = xbt_new0(MPI_Request, size);
483
484   XBT_DEBUG("Entering MPI_Win_Post");
485   while(j!=size){
486     int dst = comm_->group()->rank(group->actor(j));
487     if (dst != rank_ && dst != MPI_UNDEFINED) {
488       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
489       i++;
490     }
491     j++;
492   }
493   size=i;
494
495   Request::startall(size, reqs);
496   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
497   for(i=0;i<size;i++){
498     Request::unref(&reqs[i]);
499   }
500   xbt_free(reqs);
501   opened_++; //we're open for business !
502   group_=group;
503   group->ref();
504   XBT_DEBUG("Leaving MPI_Win_Post");
505   return MPI_SUCCESS;
506 }
507
508 int Win::complete(){
509   if(opened_==0)
510     xbt_die("Complete called on already opened MPI_Win");
511
512   XBT_DEBUG("Entering MPI_Win_Complete");
513   int i             = 0;
514   int j             = 0;
515   int size = group_->size();
516   MPI_Request* reqs = xbt_new0(MPI_Request, size);
517
518   while(j!=size){
519     int dst = comm_->group()->rank(group_->actor(j));
520     if (dst != rank_ && dst != MPI_UNDEFINED) {
521       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
522       i++;
523     }
524     j++;
525   }
526   size=i;
527   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
528   Request::startall(size, reqs);
529   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
530
531   for(i=0;i<size;i++){
532     Request::unref(&reqs[i]);
533   }
534   xbt_free(reqs);
535
536   int finished = finish_comms();
537   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
538
539   Group::unref(group_);
540   opened_--; //we're closed for business !
541   return MPI_SUCCESS;
542 }
543
544 int Win::wait(){
545   //naive, blocking implementation.
546   XBT_DEBUG("Entering MPI_Win_Wait");
547   int i             = 0;
548   int j             = 0;
549   int size          = group_->size();
550   MPI_Request* reqs = xbt_new0(MPI_Request, size);
551
552   while(j!=size){
553     int src = comm_->group()->rank(group_->actor(j));
554     if (src != rank_ && src != MPI_UNDEFINED) {
555       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
556       i++;
557     }
558     j++;
559   }
560   size=i;
561   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
562   Request::startall(size, reqs);
563   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
564   for(i=0;i<size;i++){
565     Request::unref(&reqs[i]);
566   }
567   xbt_free(reqs);
568   int finished = finish_comms();
569   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
570
571   Group::unref(group_);
572   opened_--; //we're opened for business !
573   return MPI_SUCCESS;
574 }
575
576 int Win::lock(int lock_type, int rank, int assert){
577   MPI_Win target_win = connected_wins_[rank];
578
579   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
580     xbt_mutex_acquire(target_win->lock_mut_);
581     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
582     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
583       xbt_mutex_release(target_win->lock_mut_);
584    }
585   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
586     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
587
588   target_win->lockers_.push_back(comm_->rank());
589
590   int finished = finish_comms(rank);
591   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
592   finished = target_win->finish_comms(rank_);
593   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
594   return MPI_SUCCESS;
595 }
596
597 int Win::lock_all(int assert){
598   int i=0;
599   int retval = MPI_SUCCESS;
600   for (i=0; i<comm_->size();i++){
601       int ret = this->lock(MPI_LOCK_SHARED, i, assert);
602       if(ret != MPI_SUCCESS)
603         retval = ret;
604   }
605   return retval;
606 }
607
608 int Win::unlock(int rank){
609   MPI_Win target_win = connected_wins_[rank];
610   int target_mode = target_win->mode_;
611   target_win->mode_= 0;
612   target_win->lockers_.remove(comm_->rank());
613   if (target_mode==MPI_LOCK_EXCLUSIVE){
614     xbt_mutex_release(target_win->lock_mut_);
615   }
616
617   int finished = finish_comms(rank);
618   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
619   finished = target_win->finish_comms(rank_);
620   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
621   return MPI_SUCCESS;
622 }
623
624 int Win::unlock_all(){
625   int i=0;
626   int retval = MPI_SUCCESS;
627   for (i=0; i<comm_->size();i++){
628       int ret = this->unlock(i);
629       if(ret != MPI_SUCCESS)
630         retval = ret;
631   }
632   return retval;
633 }
634
635 int Win::flush(int rank){
636   MPI_Win target_win = connected_wins_[rank];
637   int finished = finish_comms(rank);
638   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
639   finished = target_win->finish_comms(rank_);
640   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
641   return MPI_SUCCESS;
642 }
643
644 int Win::flush_local(int rank){
645   int finished = finish_comms(rank);
646   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
647   return MPI_SUCCESS;
648 }
649
650 int Win::flush_all(){
651   int i=0;
652   int finished = 0;
653   finished = finish_comms();
654   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
655   for (i=0; i<comm_->size();i++){
656     finished = connected_wins_[i]->finish_comms(rank_);
657     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
658   }
659   return MPI_SUCCESS;
660 }
661
662 int Win::flush_local_all(){
663   int finished = finish_comms();
664   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
665   return MPI_SUCCESS;
666 }
667
668 Win* Win::f2c(int id){
669   return static_cast<Win*>(F2C::f2c(id));
670 }
671
672
673 int Win::finish_comms(){
674   xbt_mutex_acquire(mut_);
675   //Finish own requests
676   std::vector<MPI_Request> *reqqs = requests_;
677   int size = static_cast<int>(reqqs->size());
678   if (size > 0) {
679     MPI_Request* treqs = &(*reqqs)[0];
680     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
681     reqqs->clear();
682   }
683   xbt_mutex_release(mut_);
684   return size;
685 }
686
687 int Win::finish_comms(int rank){
688   xbt_mutex_acquire(mut_);
689   //Finish own requests
690   std::vector<MPI_Request> *reqqs = requests_;
691   int size = static_cast<int>(reqqs->size());
692   if (size > 0) {
693     size = 0;
694     std::vector<MPI_Request> myreqqs;
695     std::vector<MPI_Request>::iterator iter = reqqs->begin();
696     while (iter != reqqs->end()){
697       if(((*iter)!=MPI_REQUEST_NULL) && (((*iter)->src() == rank) || ((*iter)->dst() == rank))){
698         myreqqs.push_back(*iter);
699         iter = reqqs->erase(iter);
700         size++;
701       } else {
702         ++iter;
703       }
704     }
705     if(size >0){
706       MPI_Request* treqs = &myreqqs[0];
707       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
708       myreqqs.clear();
709     }
710   }
711   xbt_mutex_release(mut_);
712   return size;
713 }
714
715
716 }
717 }