Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' into CRTP
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7
8 #include "private.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
16
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
18
19
20 namespace simgrid{
21 namespace smpi{
22 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
23 int Win::keyval_id_=0;
24
25 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
26   int comm_size = comm->size();
27   rank_         = comm->rank();
28   XBT_DEBUG("Creating window");
29   if(info!=MPI_INFO_NULL)
30     info->ref();
31   name_                  = nullptr;
32   opened_                = 0;
33   group_                 = MPI_GROUP_NULL;
34   requests_              = new std::vector<MPI_Request>();
35   mut_                   = s4u::Mutex::create();
36   lock_mut_              = s4u::Mutex::create();
37   atomic_mut_            = s4u::Mutex::create();
38   connected_wins_        = new MPI_Win[comm_size];
39   connected_wins_[rank_] = this;
40   count_                 = 0;
41   if(rank_==0){
42     bar_ = new s4u::Barrier(comm_size);
43   }
44   mode_=0;
45   errhandler_=MPI_ERRORS_ARE_FATAL;
46   comm->add_rma_win(this);
47   comm->ref();
48
49   Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
50                          MPI_BYTE, comm);
51
52   Colls::bcast(&(bar_), sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
53
54   Colls::barrier(comm);
55 }
56
57 Win::~Win(){
58   //As per the standard, perform a barrier to ensure every async comm is finished
59   bar_->wait();
60
61   int finished = finish_comms();
62   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
63
64   delete requests_;
65   delete[] connected_wins_;
66   if (name_ != nullptr){
67     xbt_free(name_);
68   }
69   if(info_!=MPI_INFO_NULL){
70     MPI_Info_free(&info_);
71   }
72
73   comm_->remove_rma_win(this);
74
75   Colls::barrier(comm_);
76   Comm::unref(comm_);
77   
78   if (rank_ == 0)
79     delete bar_;
80
81   if(allocated_ !=0)
82     xbt_free(base_);
83
84   cleanup_attr<Win>();
85 }
86
87 int Win::attach(void* /*base*/, MPI_Aint size)
88 {
89   if (not(base_ == MPI_BOTTOM || base_ == 0))
90     return MPI_ERR_ARG;
91   base_=0;//actually the address will be given in the RMA calls, as being the disp.
92   size_+=size;
93   return MPI_SUCCESS;
94 }
95
96 int Win::detach(const void* /*base*/)
97 {
98   base_=MPI_BOTTOM;
99   size_=-1;
100   return MPI_SUCCESS;
101 }
102
103 void Win::get_name(char* name, int* length){
104   if(name_==nullptr){
105     *length=0;
106     name=nullptr;
107     return;
108   }
109   *length = strlen(name_);
110   strncpy(name, name_, *length+1);
111 }
112
113 void Win::get_group(MPI_Group* group){
114   if(comm_ != MPI_COMM_NULL){
115     *group = comm_->group();
116   } else {
117     *group = MPI_GROUP_NULL;
118   }
119 }
120
121 MPI_Info Win::info(){
122   if(info_== MPI_INFO_NULL)
123     info_ = new Info();
124   info_->ref();
125   return info_;
126 }
127
128 int Win::rank(){
129   return rank_;
130 }
131
132 MPI_Aint Win::size(){
133   return size_;
134 }
135
136 void* Win::base(){
137   return base_;
138 }
139
140 int Win::disp_unit(){
141   return disp_unit_;
142 }
143
144 int Win::dynamic(){
145   return dynamic_;
146 }
147
148 void Win::set_info(MPI_Info info){
149   if(info_!= MPI_INFO_NULL)
150     info->ref();
151   info_=info;
152 }
153
154 void Win::set_name(const char* name){
155   name_ = xbt_strdup(name);
156 }
157
158 int Win::fence(int assert)
159 {
160   XBT_DEBUG("Entering fence");
161   if (opened_ == 0)
162     opened_=1;
163   if (assert != MPI_MODE_NOPRECEDE) {
164     // This is not the first fence => finalize what came before
165     bar_->wait();
166     mut_->lock();
167     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
168     // Without this, the vector could get redimensionned when another process pushes.
169     // This would result in the array used by Request::waitall() to be invalidated.
170     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
171     std::vector<MPI_Request> *reqs = requests_;
172     int size = static_cast<int>(reqs->size());
173     // start all requests that have been prepared by another process
174     if (size > 0) {
175       MPI_Request* treqs = &(*reqs)[0];
176       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
177     }
178     count_=0;
179     mut_->unlock();
180   }
181
182   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
183     opened_=0;
184   assert_ = assert;
185
186   bar_->wait();
187   XBT_DEBUG("Leaving fence");
188
189   return MPI_SUCCESS;
190 }
191
192 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
193               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
194 {
195   //get receiver pointer
196   MPI_Win recv_win = connected_wins_[target_rank];
197
198   if(opened_==0){//check that post/start has been done
199     // no fence or start .. lock ok ?
200     int locked=0;
201     for (auto const& it : recv_win->lockers_)
202       if (it == comm_->rank())
203         locked = 1;
204     if(locked != 1)
205       return MPI_ERR_WIN;
206   }
207
208   if(target_count*target_datatype->get_extent()>recv_win->size_)
209     return MPI_ERR_ARG;
210
211   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
212
213   if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
214     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
215     // prepare send_request
216     MPI_Request sreq =
217         // TODO cheinrich Check for rank / pid conversion
218         Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
219                                comm_, MPI_OP_NULL);
220
221     //prepare receiver request
222     // TODO cheinrich Check for rank / pid conversion
223     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
224                                               target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
225
226     //start send
227     sreq->start();
228
229     if(request!=nullptr){
230       *request=sreq;
231     }else{
232       mut_->lock();
233       requests_->push_back(sreq);
234       mut_->unlock();
235     }
236
237     //push request to receiver's win
238     recv_win->mut_->lock();
239     recv_win->requests_->push_back(rreq);
240     rreq->start();
241     recv_win->mut_->unlock();
242
243   }else{
244     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
245     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
246     if(request!=nullptr)
247       *request = MPI_REQUEST_NULL;
248   }
249
250   return MPI_SUCCESS;
251 }
252
253 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
254               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
255 {
256   //get sender pointer
257   MPI_Win send_win = connected_wins_[target_rank];
258
259   if(opened_==0){//check that post/start has been done
260     // no fence or start .. lock ok ?
261     int locked=0;
262     for (auto const& it : send_win->lockers_)
263       if (it == comm_->rank())
264         locked = 1;
265     if(locked != 1)
266       return MPI_ERR_WIN;
267   }
268
269   if(target_count*target_datatype->get_extent()>send_win->size_)
270     return MPI_ERR_ARG;
271
272   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
273   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
274
275   if(target_rank != comm_->rank()){
276     //prepare send_request
277     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
278                                               send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
279
280     //prepare receiver request
281     MPI_Request rreq = Request::rma_recv_init(
282         origin_addr, origin_count, origin_datatype, target_rank,
283         comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
284         SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
285
286     //start the send, with another process than us as sender.
287     sreq->start();
288     //push request to receiver's win
289     send_win->mut_->lock();
290     send_win->requests_->push_back(sreq);
291     send_win->mut_->unlock();
292
293     //start recv
294     rreq->start();
295
296     if(request!=nullptr){
297       *request=rreq;
298     }else{
299       mut_->lock();
300       requests_->push_back(rreq);
301       mut_->unlock();
302     }
303
304   }else{
305     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
306     if(request!=nullptr)
307       *request=MPI_REQUEST_NULL;
308   }
309
310   return MPI_SUCCESS;
311 }
312
313
314 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
315               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
316 {
317   XBT_DEBUG("Entering MPI_Win_Accumulate");
318   //get receiver pointer
319   MPI_Win recv_win = connected_wins_[target_rank];
320
321   if(opened_==0){//check that post/start has been done
322     // no fence or start .. lock ok ?
323     int locked=0;
324     for (auto const& it : recv_win->lockers_)
325       if (it == comm_->rank())
326         locked = 1;
327     if(locked != 1)
328       return MPI_ERR_WIN;
329   }
330   //FIXME: local version
331
332   if(target_count*target_datatype->get_extent()>recv_win->size_)
333     return MPI_ERR_ARG;
334
335   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
336   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
337     //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
338     //prepare send_request
339
340   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
341                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
342
343   // prepare receiver request
344   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
345                                             recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
346
347   count_++;
348
349   // start send
350   sreq->start();
351   // push request to receiver's win
352   recv_win->mut_->lock();
353   recv_win->requests_->push_back(rreq);
354   rreq->start();
355   recv_win->mut_->unlock();
356
357   if (request != nullptr) {
358     *request = sreq;
359   } else {
360     mut_->lock();
361     requests_->push_back(sreq);
362     mut_->unlock();
363   }
364
365   XBT_DEBUG("Leaving MPI_Win_Accumulate");
366   return MPI_SUCCESS;
367 }
368
369 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
370                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
371                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
372 {
373   //get sender pointer
374   MPI_Win send_win = connected_wins_[target_rank];
375
376   if(opened_==0){//check that post/start has been done
377     // no fence or start .. lock ok ?
378     int locked=0;
379     for (auto const& it : send_win->lockers_)
380       if (it == comm_->rank())
381         locked = 1;
382     if(locked != 1)
383       return MPI_ERR_WIN;
384   }
385
386   if(target_count*target_datatype->get_extent()>send_win->size_)
387     return MPI_ERR_ARG;
388
389   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
390   //need to be sure ops are correctly ordered, so finish request here ? slow.
391   MPI_Request req;
392   send_win->atomic_mut_->lock();
393   get(result_addr, result_count, result_datatype, target_rank,
394               target_disp, target_count, target_datatype, &req);
395   if (req != MPI_REQUEST_NULL)
396     Request::wait(&req, MPI_STATUS_IGNORE);
397   if(op!=MPI_NO_OP)
398     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
399               target_disp, target_count, target_datatype, op, &req);
400   if (req != MPI_REQUEST_NULL)
401     Request::wait(&req, MPI_STATUS_IGNORE);
402   send_win->atomic_mut_->unlock();
403   return MPI_SUCCESS;
404
405 }
406
407 int Win::compare_and_swap(const void *origin_addr, void *compare_addr,
408         void *result_addr, MPI_Datatype datatype, int target_rank,
409         MPI_Aint target_disp){
410   //get sender pointer
411   MPI_Win send_win = connected_wins_[target_rank];
412
413   if(opened_==0){//check that post/start has been done
414     // no fence or start .. lock ok ?
415     int locked=0;
416     for (auto const& it : send_win->lockers_)
417       if (it == comm_->rank())
418         locked = 1;
419     if(locked != 1)
420       return MPI_ERR_WIN;
421   }
422
423   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
424   MPI_Request req = MPI_REQUEST_NULL;
425   send_win->atomic_mut_->lock();
426   get(result_addr, 1, datatype, target_rank,
427               target_disp, 1, datatype, &req);
428   if (req != MPI_REQUEST_NULL)
429     Request::wait(&req, MPI_STATUS_IGNORE);
430   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
431     put(origin_addr, 1, datatype, target_rank,
432               target_disp, 1, datatype);
433   }
434   send_win->atomic_mut_->unlock();
435   return MPI_SUCCESS;
436 }
437
438 int Win::start(MPI_Group group, int /*assert*/)
439 {
440   /* From MPI forum advices
441   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
442   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
443   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
444   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
445   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
446   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
447   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
448   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
449   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
450   must complete, without further dependencies.  */
451
452   //naive, blocking implementation.
453   int i             = 0;
454   int j             = 0;
455   int size          = group->size();
456   MPI_Request* reqs = xbt_new0(MPI_Request, size);
457
458   XBT_DEBUG("Entering MPI_Win_Start");
459   while (j != size) {
460     int src = comm_->group()->rank(group->actor(j));
461     if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
462       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
463       i++;
464     }
465     j++;
466   }
467   size = i;
468   Request::startall(size, reqs);
469   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
470   for (i = 0; i < size; i++) {
471     Request::unref(&reqs[i]);
472   }
473   xbt_free(reqs);
474   opened_++; //we're open for business !
475   group_=group;
476   group->ref();
477   XBT_DEBUG("Leaving MPI_Win_Start");
478   return MPI_SUCCESS;
479 }
480
481 int Win::post(MPI_Group group, int /*assert*/)
482 {
483   //let's make a synchronous send here
484   int i             = 0;
485   int j             = 0;
486   int size = group->size();
487   MPI_Request* reqs = xbt_new0(MPI_Request, size);
488
489   XBT_DEBUG("Entering MPI_Win_Post");
490   while(j!=size){
491     int dst = comm_->group()->rank(group->actor(j));
492     if (dst != rank_ && dst != MPI_UNDEFINED) {
493       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
494       i++;
495     }
496     j++;
497   }
498   size=i;
499
500   Request::startall(size, reqs);
501   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
502   for(i=0;i<size;i++){
503     Request::unref(&reqs[i]);
504   }
505   xbt_free(reqs);
506   opened_++; //we're open for business !
507   group_=group;
508   group->ref();
509   XBT_DEBUG("Leaving MPI_Win_Post");
510   return MPI_SUCCESS;
511 }
512
513 int Win::complete(){
514   if(opened_==0)
515     xbt_die("Complete called on already opened MPI_Win");
516
517   XBT_DEBUG("Entering MPI_Win_Complete");
518   int i             = 0;
519   int j             = 0;
520   int size          = group_->size();
521   MPI_Request* reqs = xbt_new0(MPI_Request, size);
522
523   while(j!=size){
524     int dst = comm_->group()->rank(group_->actor(j));
525     if (dst != rank_ && dst != MPI_UNDEFINED) {
526       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
527       i++;
528     }
529     j++;
530   }
531   size=i;
532   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
533   Request::startall(size, reqs);
534   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
535
536   for(i=0;i<size;i++){
537     Request::unref(&reqs[i]);
538   }
539   xbt_free(reqs);
540
541   int finished = finish_comms();
542   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
543
544   Group::unref(group_);
545   opened_--; //we're closed for business !
546   return MPI_SUCCESS;
547 }
548
549 int Win::wait(){
550   //naive, blocking implementation.
551   XBT_DEBUG("Entering MPI_Win_Wait");
552   int i             = 0;
553   int j             = 0;
554   int size          = group_->size();
555   MPI_Request* reqs = xbt_new0(MPI_Request, size);
556
557   while(j!=size){
558     int src = comm_->group()->rank(group_->actor(j));
559     if (src != rank_ && src != MPI_UNDEFINED) {
560       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
561       i++;
562     }
563     j++;
564   }
565   size=i;
566   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
567   Request::startall(size, reqs);
568   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
569   for(i=0;i<size;i++){
570     Request::unref(&reqs[i]);
571   }
572   xbt_free(reqs);
573   int finished = finish_comms();
574   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
575
576   Group::unref(group_);
577   opened_--; //we're opened for business !
578   return MPI_SUCCESS;
579 }
580
581 int Win::lock(int lock_type, int rank, int /*assert*/)
582 {
583   MPI_Win target_win = connected_wins_[rank];
584
585   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
586     target_win->lock_mut_->lock();
587     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
588     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
589       target_win->lock_mut_->unlock();
590    }
591   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
592     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
593
594   target_win->lockers_.push_back(comm_->rank());
595
596   int finished = finish_comms(rank);
597   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
598   finished = target_win->finish_comms(rank_);
599   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
600   return MPI_SUCCESS;
601 }
602
603 int Win::lock_all(int assert){
604   int i=0;
605   int retval = MPI_SUCCESS;
606   for (i=0; i<comm_->size();i++){
607       int ret = this->lock(MPI_LOCK_SHARED, i, assert);
608       if(ret != MPI_SUCCESS)
609         retval = ret;
610   }
611   return retval;
612 }
613
614 int Win::unlock(int rank){
615   MPI_Win target_win = connected_wins_[rank];
616   int target_mode = target_win->mode_;
617   target_win->mode_= 0;
618   target_win->lockers_.remove(comm_->rank());
619   if (target_mode==MPI_LOCK_EXCLUSIVE){
620     target_win->lock_mut_->unlock();
621   }
622
623   int finished = finish_comms(rank);
624   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
625   finished = target_win->finish_comms(rank_);
626   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
627   return MPI_SUCCESS;
628 }
629
630 int Win::unlock_all(){
631   int i=0;
632   int retval = MPI_SUCCESS;
633   for (i=0; i<comm_->size();i++){
634     int ret = this->unlock(i);
635     if (ret != MPI_SUCCESS)
636       retval = ret;
637   }
638   return retval;
639 }
640
641 int Win::flush(int rank){
642   MPI_Win target_win = connected_wins_[rank];
643   int finished       = finish_comms(rank_);
644   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
645   finished = target_win->finish_comms(rank);
646   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
647   return MPI_SUCCESS;
648 }
649
650 int Win::flush_local(int rank){
651   int finished = finish_comms(rank);
652   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
653   return MPI_SUCCESS;
654 }
655
656 int Win::flush_all(){
657   int finished = finish_comms();
658   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
659   for (int i = 0; i < comm_->size(); i++) {
660     finished = connected_wins_[i]->finish_comms(rank_);
661     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
662   }
663   return MPI_SUCCESS;
664 }
665
666 int Win::flush_local_all(){
667   int finished = finish_comms();
668   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
669   return MPI_SUCCESS;
670 }
671
672 Win* Win::f2c(int id){
673   return static_cast<Win*>(F2C::f2c(id));
674 }
675
676 int Win::finish_comms(){
677   mut_->lock();
678   //Finish own requests
679   std::vector<MPI_Request> *reqqs = requests_;
680   int size = static_cast<int>(reqqs->size());
681   if (size > 0) {
682     MPI_Request* treqs = &(*reqqs)[0];
683     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
684     reqqs->clear();
685   }
686   mut_->unlock();
687   return size;
688 }
689
690 int Win::finish_comms(int rank){
691   mut_->lock();
692   //Finish own requests
693   std::vector<MPI_Request> *reqqs = requests_;
694   int size = static_cast<int>(reqqs->size());
695   if (size > 0) {
696     size = 0;
697     std::vector<MPI_Request> myreqqs;
698     std::vector<MPI_Request>::iterator iter = reqqs->begin();
699     int proc_id                             = comm_->group()->actor(rank)->get_pid();
700     while (iter != reqqs->end()){
701       // Let's see if we're either the destination or the sender of this request
702       // because we only wait for requests that we are responsible for.
703       // Also use the process id here since the request itself returns from src()
704       // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
705       if (((*iter) != MPI_REQUEST_NULL) && (((*iter)->src() == proc_id) || ((*iter)->dst() == proc_id))) {
706         myreqqs.push_back(*iter);
707         iter = reqqs->erase(iter);
708         size++;
709       } else {
710         ++iter;
711       }
712     }
713     if(size >0){
714       MPI_Request* treqs = &myreqqs[0];
715       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
716       myreqqs.clear();
717     }
718   }
719   mut_->unlock();
720   return size;
721 }
722
723 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr)
724 {
725   MPI_Win target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
726   for (int i = 0; not target_win && i < comm_->size(); i++) {
727     if (connected_wins_[i]->size_ > 0)
728       target_win = connected_wins_[i];
729   }
730   if (target_win) {
731     *size                         = target_win->size_;
732     *disp_unit                    = target_win->disp_unit_;
733     *static_cast<void**>(baseptr) = target_win->base_;
734   } else {
735     *size                         = 0;
736     *static_cast<void**>(baseptr) = xbt_malloc(0);
737   }
738   return MPI_SUCCESS;
739 }
740
741 MPI_Errhandler Win::errhandler(){
742   return errhandler_;
743 }
744
745 void Win::set_errhandler(MPI_Errhandler errhandler){
746   errhandler_=errhandler;
747   if(errhandler_!= MPI_ERRHANDLER_NULL)
748     errhandler->ref();
749 }
750
751 }
752 }