Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Spell check.
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7
8 #include "private.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
16
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
18
19
20 namespace simgrid{
21 namespace smpi{
22 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
23 int Win::keyval_id_=0;
24
25 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
26   int comm_size = comm->size();
27   rank_         = comm->rank();
28   XBT_DEBUG("Creating window");
29   if(info!=MPI_INFO_NULL)
30     info->ref();
31   name_                  = nullptr;
32   opened_                = 0;
33   group_                 = MPI_GROUP_NULL;
34   requests_              = new std::vector<MPI_Request>();
35   mut_                   = s4u::Mutex::create();
36   lock_mut_              = s4u::Mutex::create();
37   atomic_mut_            = s4u::Mutex::create();
38   connected_wins_        = new MPI_Win[comm_size];
39   connected_wins_[rank_] = this;
40   count_                 = 0;
41   if(rank_==0){
42     bar_ = new s4u::Barrier(comm_size);
43   }
44   mode_=0;
45   errhandler_=MPI_ERRORS_ARE_FATAL;
46   comm->add_rma_win(this);
47   comm->ref();
48
49   Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
50                          MPI_BYTE, comm);
51
52   Colls::bcast(&(bar_), sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
53
54   Colls::barrier(comm);
55 }
56
57 Win::~Win(){
58   //As per the standard, perform a barrier to ensure every async comm is finished
59   bar_->wait();
60
61   int finished = finish_comms();
62   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
63
64   delete requests_;
65   delete[] connected_wins_;
66   if (name_ != nullptr){
67     xbt_free(name_);
68   }
69   if(info_!=MPI_INFO_NULL){
70     MPI_Info_free(&info_);
71   }
72
73   comm_->remove_rma_win(this);
74
75   Colls::barrier(comm_);
76   Comm::unref(comm_);
77   
78   if (rank_ == 0)
79     delete bar_;
80
81   if(allocated_ !=0)
82     xbt_free(base_);
83
84   cleanup_attr<Win>();
85 }
86
87 int Win::attach(void* /*base*/, MPI_Aint size)
88 {
89   if (not(base_ == MPI_BOTTOM || base_ == 0))
90     return MPI_ERR_ARG;
91   base_=0;//actually the address will be given in the RMA calls, as being the disp.
92   size_+=size;
93   return MPI_SUCCESS;
94 }
95
96 int Win::detach(const void* /*base*/)
97 {
98   base_=MPI_BOTTOM;
99   size_=-1;
100   return MPI_SUCCESS;
101 }
102
103 void Win::get_name(char* name, int* length){
104   if(name_==nullptr){
105     *length=0;
106     name=nullptr;
107     return;
108   }
109   *length = strlen(name_);
110   strncpy(name, name_, *length+1);
111 }
112
113 void Win::get_group(MPI_Group* group){
114   if(comm_ != MPI_COMM_NULL){
115     *group = comm_->group();
116   } else {
117     *group = MPI_GROUP_NULL;
118   }
119 }
120
121 MPI_Info Win::info(){
122   if(info_== MPI_INFO_NULL)
123     info_ = new Info();
124   info_->ref();
125   return info_;
126 }
127
128 int Win::rank(){
129   return rank_;
130 }
131
132 MPI_Aint Win::size(){
133   return size_;
134 }
135
136 void* Win::base(){
137   return base_;
138 }
139
140 int Win::disp_unit(){
141   return disp_unit_;
142 }
143
144 int Win::dynamic(){
145   return dynamic_;
146 }
147
148 void Win::set_info(MPI_Info info){
149   if(info_!= MPI_INFO_NULL)
150     info->ref();
151   info_=info;
152 }
153
154 void Win::set_name(const char* name){
155   name_ = xbt_strdup(name);
156 }
157
158 int Win::fence(int assert)
159 {
160   XBT_DEBUG("Entering fence");
161   if (opened_ == 0)
162     opened_=1;
163   if (assert != MPI_MODE_NOPRECEDE) {
164     // This is not the first fence => finalize what came before
165     bar_->wait();
166     mut_->lock();
167     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
168     // Without this, the vector could get redimensioned when another process pushes.
169     // This would result in the array used by Request::waitall() to be invalidated.
170     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
171     std::vector<MPI_Request> *reqs = requests_;
172     int size = static_cast<int>(reqs->size());
173     // start all requests that have been prepared by another process
174     if (size > 0) {
175       MPI_Request* treqs = &(*reqs)[0];
176       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
177     }
178     count_=0;
179     mut_->unlock();
180   }
181
182   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
183     opened_=0;
184   assert_ = assert;
185
186   bar_->wait();
187   XBT_DEBUG("Leaving fence");
188
189   return MPI_SUCCESS;
190 }
191
192 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
193               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
194 {
195   //get receiver pointer
196   MPI_Win recv_win = connected_wins_[target_rank];
197
198   if(opened_==0){//check that post/start has been done
199     // no fence or start .. lock ok ?
200     int locked=0;
201     for (auto const& it : recv_win->lockers_)
202       if (it == comm_->rank())
203         locked = 1;
204     if(locked != 1)
205       return MPI_ERR_WIN;
206   }
207
208   if(target_count*target_datatype->get_extent()>recv_win->size_)
209     return MPI_ERR_ARG;
210
211   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
212
213   if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
214     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
215     // prepare send_request
216     MPI_Request sreq =
217         // TODO cheinrich Check for rank / pid conversion
218         Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
219                                comm_, MPI_OP_NULL);
220
221     //prepare receiver request
222     // TODO cheinrich Check for rank / pid conversion
223     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
224                                               target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
225
226     //start send
227     sreq->start();
228
229     if(request!=nullptr){
230       *request=sreq;
231     }else{
232       mut_->lock();
233       requests_->push_back(sreq);
234       mut_->unlock();
235     }
236
237     //push request to receiver's win
238     recv_win->mut_->lock();
239     recv_win->requests_->push_back(rreq);
240     rreq->start();
241     recv_win->mut_->unlock();
242
243   }else{
244     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
245     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
246     if(request!=nullptr)
247       *request = MPI_REQUEST_NULL;
248   }
249
250   return MPI_SUCCESS;
251 }
252
253 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
254               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
255 {
256   //get sender pointer
257   MPI_Win send_win = connected_wins_[target_rank];
258
259   if(opened_==0){//check that post/start has been done
260     // no fence or start .. lock ok ?
261     int locked=0;
262     for (auto const& it : send_win->lockers_)
263       if (it == comm_->rank())
264         locked = 1;
265     if(locked != 1)
266       return MPI_ERR_WIN;
267   }
268
269   if(target_count*target_datatype->get_extent()>send_win->size_)
270     return MPI_ERR_ARG;
271
272   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
273   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
274
275   if(target_rank != comm_->rank()){
276     //prepare send_request
277     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
278                                               send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
279
280     //prepare receiver request
281     MPI_Request rreq = Request::rma_recv_init(
282         origin_addr, origin_count, origin_datatype, target_rank,
283         comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
284         SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
285
286     //start the send, with another process than us as sender.
287     sreq->start();
288     //push request to receiver's win
289     send_win->mut_->lock();
290     send_win->requests_->push_back(sreq);
291     send_win->mut_->unlock();
292
293     //start recv
294     rreq->start();
295
296     if(request!=nullptr){
297       *request=rreq;
298     }else{
299       mut_->lock();
300       requests_->push_back(rreq);
301       mut_->unlock();
302     }
303
304   }else{
305     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
306     if(request!=nullptr)
307       *request=MPI_REQUEST_NULL;
308   }
309
310   return MPI_SUCCESS;
311 }
312
313
314 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
315               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
316 {
317   XBT_DEBUG("Entering MPI_Win_Accumulate");
318   //get receiver pointer
319   MPI_Win recv_win = connected_wins_[target_rank];
320
321   if(opened_==0){//check that post/start has been done
322     // no fence or start .. lock ok ?
323     int locked=0;
324     for (auto const& it : recv_win->lockers_)
325       if (it == comm_->rank())
326         locked = 1;
327     if(locked != 1)
328       return MPI_ERR_WIN;
329   }
330   //FIXME: local version
331
332   if(target_count*target_datatype->get_extent()>recv_win->size_)
333     return MPI_ERR_ARG;
334
335   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
336   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
337   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
338   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
339   // prepare send_request
340
341   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
342                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
343
344   // prepare receiver request
345   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
346                                             recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
347
348   count_++;
349
350   // start send
351   sreq->start();
352   // push request to receiver's win
353   recv_win->mut_->lock();
354   recv_win->requests_->push_back(rreq);
355   rreq->start();
356   recv_win->mut_->unlock();
357
358   if (request != nullptr) {
359     *request = sreq;
360   } else {
361     mut_->lock();
362     requests_->push_back(sreq);
363     mut_->unlock();
364   }
365
366   XBT_DEBUG("Leaving MPI_Win_Accumulate");
367   return MPI_SUCCESS;
368 }
369
370 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
371                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
372                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
373 {
374   //get sender pointer
375   MPI_Win send_win = connected_wins_[target_rank];
376
377   if(opened_==0){//check that post/start has been done
378     // no fence or start .. lock ok ?
379     int locked=0;
380     for (auto const& it : send_win->lockers_)
381       if (it == comm_->rank())
382         locked = 1;
383     if(locked != 1)
384       return MPI_ERR_WIN;
385   }
386
387   if(target_count*target_datatype->get_extent()>send_win->size_)
388     return MPI_ERR_ARG;
389
390   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
391   //need to be sure ops are correctly ordered, so finish request here ? slow.
392   MPI_Request req;
393   send_win->atomic_mut_->lock();
394   get(result_addr, result_count, result_datatype, target_rank,
395               target_disp, target_count, target_datatype, &req);
396   if (req != MPI_REQUEST_NULL)
397     Request::wait(&req, MPI_STATUS_IGNORE);
398   if(op!=MPI_NO_OP)
399     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
400               target_disp, target_count, target_datatype, op, &req);
401   if (req != MPI_REQUEST_NULL)
402     Request::wait(&req, MPI_STATUS_IGNORE);
403   send_win->atomic_mut_->unlock();
404   return MPI_SUCCESS;
405
406 }
407
408 int Win::compare_and_swap(const void *origin_addr, void *compare_addr,
409         void *result_addr, MPI_Datatype datatype, int target_rank,
410         MPI_Aint target_disp){
411   //get sender pointer
412   MPI_Win send_win = connected_wins_[target_rank];
413
414   if(opened_==0){//check that post/start has been done
415     // no fence or start .. lock ok ?
416     int locked=0;
417     for (auto const& it : send_win->lockers_)
418       if (it == comm_->rank())
419         locked = 1;
420     if(locked != 1)
421       return MPI_ERR_WIN;
422   }
423
424   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
425   MPI_Request req = MPI_REQUEST_NULL;
426   send_win->atomic_mut_->lock();
427   get(result_addr, 1, datatype, target_rank,
428               target_disp, 1, datatype, &req);
429   if (req != MPI_REQUEST_NULL)
430     Request::wait(&req, MPI_STATUS_IGNORE);
431   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
432     put(origin_addr, 1, datatype, target_rank,
433               target_disp, 1, datatype);
434   }
435   send_win->atomic_mut_->unlock();
436   return MPI_SUCCESS;
437 }
438
439 int Win::start(MPI_Group group, int /*assert*/)
440 {
441   /* From MPI forum advices
442   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
443   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
444   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
445   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
446   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
447   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
448   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
449   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
450   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
451   must complete, without further dependencies.  */
452
453   //naive, blocking implementation.
454   int i             = 0;
455   int j             = 0;
456   int size          = group->size();
457   MPI_Request* reqs = xbt_new0(MPI_Request, size);
458
459   XBT_DEBUG("Entering MPI_Win_Start");
460   while (j != size) {
461     int src = comm_->group()->rank(group->actor(j));
462     if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
463       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
464       i++;
465     }
466     j++;
467   }
468   size = i;
469   Request::startall(size, reqs);
470   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
471   for (i = 0; i < size; i++) {
472     Request::unref(&reqs[i]);
473   }
474   xbt_free(reqs);
475   opened_++; //we're open for business !
476   group_=group;
477   group->ref();
478   XBT_DEBUG("Leaving MPI_Win_Start");
479   return MPI_SUCCESS;
480 }
481
482 int Win::post(MPI_Group group, int /*assert*/)
483 {
484   //let's make a synchronous send here
485   int i             = 0;
486   int j             = 0;
487   int size = group->size();
488   MPI_Request* reqs = xbt_new0(MPI_Request, size);
489
490   XBT_DEBUG("Entering MPI_Win_Post");
491   while(j!=size){
492     int dst = comm_->group()->rank(group->actor(j));
493     if (dst != rank_ && dst != MPI_UNDEFINED) {
494       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
495       i++;
496     }
497     j++;
498   }
499   size=i;
500
501   Request::startall(size, reqs);
502   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
503   for(i=0;i<size;i++){
504     Request::unref(&reqs[i]);
505   }
506   xbt_free(reqs);
507   opened_++; //we're open for business !
508   group_=group;
509   group->ref();
510   XBT_DEBUG("Leaving MPI_Win_Post");
511   return MPI_SUCCESS;
512 }
513
514 int Win::complete(){
515   if(opened_==0)
516     xbt_die("Complete called on already opened MPI_Win");
517
518   XBT_DEBUG("Entering MPI_Win_Complete");
519   int i             = 0;
520   int j             = 0;
521   int size          = group_->size();
522   MPI_Request* reqs = xbt_new0(MPI_Request, size);
523
524   while(j!=size){
525     int dst = comm_->group()->rank(group_->actor(j));
526     if (dst != rank_ && dst != MPI_UNDEFINED) {
527       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
528       i++;
529     }
530     j++;
531   }
532   size=i;
533   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
534   Request::startall(size, reqs);
535   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
536
537   for(i=0;i<size;i++){
538     Request::unref(&reqs[i]);
539   }
540   xbt_free(reqs);
541
542   int finished = finish_comms();
543   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
544
545   Group::unref(group_);
546   opened_--; //we're closed for business !
547   return MPI_SUCCESS;
548 }
549
550 int Win::wait(){
551   //naive, blocking implementation.
552   XBT_DEBUG("Entering MPI_Win_Wait");
553   int i             = 0;
554   int j             = 0;
555   int size          = group_->size();
556   MPI_Request* reqs = xbt_new0(MPI_Request, size);
557
558   while(j!=size){
559     int src = comm_->group()->rank(group_->actor(j));
560     if (src != rank_ && src != MPI_UNDEFINED) {
561       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
562       i++;
563     }
564     j++;
565   }
566   size=i;
567   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
568   Request::startall(size, reqs);
569   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
570   for(i=0;i<size;i++){
571     Request::unref(&reqs[i]);
572   }
573   xbt_free(reqs);
574   int finished = finish_comms();
575   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
576
577   Group::unref(group_);
578   opened_--; //we're opened for business !
579   return MPI_SUCCESS;
580 }
581
582 int Win::lock(int lock_type, int rank, int /*assert*/)
583 {
584   MPI_Win target_win = connected_wins_[rank];
585
586   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
587     target_win->lock_mut_->lock();
588     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
589     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
590       target_win->lock_mut_->unlock();
591    }
592   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
593     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
594
595   target_win->lockers_.push_back(comm_->rank());
596
597   int finished = finish_comms(rank);
598   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
599   finished = target_win->finish_comms(rank_);
600   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
601   return MPI_SUCCESS;
602 }
603
604 int Win::lock_all(int assert){
605   int i=0;
606   int retval = MPI_SUCCESS;
607   for (i=0; i<comm_->size();i++){
608       int ret = this->lock(MPI_LOCK_SHARED, i, assert);
609       if(ret != MPI_SUCCESS)
610         retval = ret;
611   }
612   return retval;
613 }
614
615 int Win::unlock(int rank){
616   MPI_Win target_win = connected_wins_[rank];
617   int target_mode = target_win->mode_;
618   target_win->mode_= 0;
619   target_win->lockers_.remove(comm_->rank());
620   if (target_mode==MPI_LOCK_EXCLUSIVE){
621     target_win->lock_mut_->unlock();
622   }
623
624   int finished = finish_comms(rank);
625   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
626   finished = target_win->finish_comms(rank_);
627   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
628   return MPI_SUCCESS;
629 }
630
631 int Win::unlock_all(){
632   int i=0;
633   int retval = MPI_SUCCESS;
634   for (i=0; i<comm_->size();i++){
635     int ret = this->unlock(i);
636     if (ret != MPI_SUCCESS)
637       retval = ret;
638   }
639   return retval;
640 }
641
642 int Win::flush(int rank){
643   MPI_Win target_win = connected_wins_[rank];
644   int finished       = finish_comms(rank_);
645   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
646   finished = target_win->finish_comms(rank);
647   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
648   return MPI_SUCCESS;
649 }
650
651 int Win::flush_local(int rank){
652   int finished = finish_comms(rank);
653   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
654   return MPI_SUCCESS;
655 }
656
657 int Win::flush_all(){
658   int finished = finish_comms();
659   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
660   for (int i = 0; i < comm_->size(); i++) {
661     finished = connected_wins_[i]->finish_comms(rank_);
662     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
663   }
664   return MPI_SUCCESS;
665 }
666
667 int Win::flush_local_all(){
668   int finished = finish_comms();
669   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
670   return MPI_SUCCESS;
671 }
672
673 Win* Win::f2c(int id){
674   return static_cast<Win*>(F2C::f2c(id));
675 }
676
677 int Win::finish_comms(){
678   mut_->lock();
679   //Finish own requests
680   std::vector<MPI_Request> *reqqs = requests_;
681   int size = static_cast<int>(reqqs->size());
682   if (size > 0) {
683     MPI_Request* treqs = &(*reqqs)[0];
684     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
685     reqqs->clear();
686   }
687   mut_->unlock();
688   return size;
689 }
690
691 int Win::finish_comms(int rank){
692   mut_->lock();
693   //Finish own requests
694   std::vector<MPI_Request> *reqqs = requests_;
695   int size = static_cast<int>(reqqs->size());
696   if (size > 0) {
697     size = 0;
698     std::vector<MPI_Request> myreqqs;
699     std::vector<MPI_Request>::iterator iter = reqqs->begin();
700     int proc_id                             = comm_->group()->actor(rank)->get_pid();
701     while (iter != reqqs->end()){
702       // Let's see if we're either the destination or the sender of this request
703       // because we only wait for requests that we are responsible for.
704       // Also use the process id here since the request itself returns from src()
705       // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
706       if (((*iter) != MPI_REQUEST_NULL) && (((*iter)->src() == proc_id) || ((*iter)->dst() == proc_id))) {
707         myreqqs.push_back(*iter);
708         iter = reqqs->erase(iter);
709         size++;
710       } else {
711         ++iter;
712       }
713     }
714     if(size >0){
715       MPI_Request* treqs = &myreqqs[0];
716       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
717       myreqqs.clear();
718     }
719   }
720   mut_->unlock();
721   return size;
722 }
723
724 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr)
725 {
726   MPI_Win target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
727   for (int i = 0; not target_win && i < comm_->size(); i++) {
728     if (connected_wins_[i]->size_ > 0)
729       target_win = connected_wins_[i];
730   }
731   if (target_win) {
732     *size                         = target_win->size_;
733     *disp_unit                    = target_win->disp_unit_;
734     *static_cast<void**>(baseptr) = target_win->base_;
735   } else {
736     *size                         = 0;
737     *static_cast<void**>(baseptr) = xbt_malloc(0);
738   }
739   return MPI_SUCCESS;
740 }
741
742 MPI_Errhandler Win::errhandler(){
743   return errhandler_;
744 }
745
746 void Win::set_errhandler(MPI_Errhandler errhandler){
747   errhandler_=errhandler;
748   if(errhandler_!= MPI_ERRHANDLER_NULL)
749     errhandler->ref();
750 }
751
752 }
753 }