Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Link Topo and Comm in both directions, and fix memory leak.
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7
8 #include "private.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
16
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
18
19
20 namespace simgrid{
21 namespace smpi{
22 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
23 int Win::keyval_id_=0;
24
25 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
26   int comm_size = comm->size();
27   rank_         = comm->rank();
28   XBT_DEBUG("Creating window");
29   if(info!=MPI_INFO_NULL)
30     info->ref();
31   name_                  = nullptr;
32   opened_                = 0;
33   group_                 = MPI_GROUP_NULL;
34   requests_              = new std::vector<MPI_Request>();
35   mut_                   = s4u::Mutex::create();
36   lock_mut_              = s4u::Mutex::create();
37   atomic_mut_            = s4u::Mutex::create();
38   connected_wins_        = new MPI_Win[comm_size];
39   connected_wins_[rank_] = this;
40   count_                 = 0;
41   if(rank_==0){
42     bar_ = new s4u::Barrier(comm_size);
43   }
44   mode_=0;
45   errhandler_=MPI_ERRORS_ARE_FATAL;
46   comm->add_rma_win(this);
47   comm->ref();
48
49   colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win), MPI_BYTE,
50                    comm);
51
52   colls::bcast(&(bar_), sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
53
54   colls::barrier(comm);
55 }
56
57 Win::~Win(){
58   //As per the standard, perform a barrier to ensure every async comm is finished
59   bar_->wait();
60
61   int finished = finish_comms();
62   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
63
64   delete requests_;
65   delete[] connected_wins_;
66   if (name_ != nullptr){
67     xbt_free(name_);
68   }
69   if(info_!=MPI_INFO_NULL){
70     MPI_Info_free(&info_);
71   }
72
73   comm_->remove_rma_win(this);
74
75   colls::barrier(comm_);
76   Comm::unref(comm_);
77   
78   if (rank_ == 0)
79     delete bar_;
80
81   if(allocated_ !=0)
82     xbt_free(base_);
83
84   cleanup_attr<Win>();
85 }
86
87 int Win::attach(void* /*base*/, MPI_Aint size)
88 {
89   if (not(base_ == MPI_BOTTOM || base_ == 0))
90     return MPI_ERR_ARG;
91   base_=0;//actually the address will be given in the RMA calls, as being the disp.
92   size_+=size;
93   return MPI_SUCCESS;
94 }
95
96 int Win::detach(const void* /*base*/)
97 {
98   base_=MPI_BOTTOM;
99   size_=-1;
100   return MPI_SUCCESS;
101 }
102
103 void Win::get_name(char* name, int* length){
104   if(name_==nullptr){
105     *length=0;
106     name=nullptr;
107     return;
108   }
109   *length = strlen(name_);
110   strncpy(name, name_, *length+1);
111 }
112
113 void Win::get_group(MPI_Group* group){
114   if(comm_ != MPI_COMM_NULL){
115     *group = comm_->group();
116   } else {
117     *group = MPI_GROUP_NULL;
118   }
119 }
120
121 MPI_Info Win::info(){
122   if(info_== MPI_INFO_NULL)
123     info_ = new Info();
124   info_->ref();
125   return info_;
126 }
127
128 int Win::rank(){
129   return rank_;
130 }
131
132 MPI_Aint Win::size(){
133   return size_;
134 }
135
136 void* Win::base(){
137   return base_;
138 }
139
140 int Win::disp_unit(){
141   return disp_unit_;
142 }
143
144 int Win::dynamic(){
145   return dynamic_;
146 }
147
148 void Win::set_info(MPI_Info info){
149   if(info_!= MPI_INFO_NULL)
150     info->ref();
151   info_=info;
152 }
153
154 void Win::set_name(const char* name){
155   name_ = xbt_strdup(name);
156 }
157
158 int Win::fence(int assert)
159 {
160   XBT_DEBUG("Entering fence");
161   if (opened_ == 0)
162     opened_=1;
163   if (assert != MPI_MODE_NOPRECEDE) {
164     // This is not the first fence => finalize what came before
165     bar_->wait();
166     mut_->lock();
167     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
168     // Without this, the vector could get redimensioned when another process pushes.
169     // This would result in the array used by Request::waitall() to be invalidated.
170     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
171     std::vector<MPI_Request> *reqs = requests_;
172     int size = static_cast<int>(reqs->size());
173     // start all requests that have been prepared by another process
174     if (size > 0) {
175       MPI_Request* treqs = &(*reqs)[0];
176       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
177     }
178     count_=0;
179     mut_->unlock();
180   }
181
182   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
183     opened_=0;
184   assert_ = assert;
185
186   bar_->wait();
187   XBT_DEBUG("Leaving fence");
188
189   return MPI_SUCCESS;
190 }
191
192 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
193               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
194 {
195   //get receiver pointer
196   MPI_Win recv_win = connected_wins_[target_rank];
197
198   if(opened_==0){//check that post/start has been done
199     // no fence or start .. lock ok ?
200     int locked=0;
201     for (auto const& it : recv_win->lockers_)
202       if (it == comm_->rank())
203         locked = 1;
204     if(locked != 1)
205       return MPI_ERR_WIN;
206   }
207
208   if(target_count*target_datatype->get_extent()>recv_win->size_)
209     return MPI_ERR_ARG;
210
211   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
212
213   if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
214     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
215     // prepare send_request
216     MPI_Request sreq =
217         // TODO cheinrich Check for rank / pid conversion
218         Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
219                                comm_, MPI_OP_NULL);
220
221     //prepare receiver request
222     // TODO cheinrich Check for rank / pid conversion
223     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
224                                               target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
225
226     //start send
227     sreq->start();
228
229     if(request!=nullptr){
230       *request=sreq;
231     }else{
232       mut_->lock();
233       requests_->push_back(sreq);
234       mut_->unlock();
235     }
236
237     //push request to receiver's win
238     recv_win->mut_->lock();
239     recv_win->requests_->push_back(rreq);
240     rreq->start();
241     recv_win->mut_->unlock();
242   } else {
243     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
244     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
245     if(request!=nullptr)
246       *request = MPI_REQUEST_NULL;
247   }
248
249   return MPI_SUCCESS;
250 }
251
252 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
253               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
254 {
255   //get sender pointer
256   MPI_Win send_win = connected_wins_[target_rank];
257
258   if(opened_==0){//check that post/start has been done
259     // no fence or start .. lock ok ?
260     int locked=0;
261     for (auto const& it : send_win->lockers_)
262       if (it == comm_->rank())
263         locked = 1;
264     if(locked != 1)
265       return MPI_ERR_WIN;
266   }
267
268   if(target_count*target_datatype->get_extent()>send_win->size_)
269     return MPI_ERR_ARG;
270
271   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
272   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
273
274   if(target_rank != comm_->rank()){
275     //prepare send_request
276     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
277                                               send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
278
279     //prepare receiver request
280     MPI_Request rreq = Request::rma_recv_init(
281         origin_addr, origin_count, origin_datatype, target_rank,
282         comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
283         SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
284
285     //start the send, with another process than us as sender.
286     sreq->start();
287     //push request to receiver's win
288     send_win->mut_->lock();
289     send_win->requests_->push_back(sreq);
290     send_win->mut_->unlock();
291
292     //start recv
293     rreq->start();
294
295     if(request!=nullptr){
296       *request=rreq;
297     }else{
298       mut_->lock();
299       requests_->push_back(rreq);
300       mut_->unlock();
301     }
302   } else {
303     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
304     if(request!=nullptr)
305       *request=MPI_REQUEST_NULL;
306   }
307   return MPI_SUCCESS;
308 }
309
310 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
311               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
312 {
313   XBT_DEBUG("Entering MPI_Win_Accumulate");
314   //get receiver pointer
315   MPI_Win recv_win = connected_wins_[target_rank];
316
317   if(opened_==0){//check that post/start has been done
318     // no fence or start .. lock ok ?
319     int locked=0;
320     for (auto const& it : recv_win->lockers_)
321       if (it == comm_->rank())
322         locked = 1;
323     if(locked != 1)
324       return MPI_ERR_WIN;
325   }
326   //FIXME: local version
327
328   if(target_count*target_datatype->get_extent()>recv_win->size_)
329     return MPI_ERR_ARG;
330
331   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
332   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
333   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
334   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
335   // prepare send_request
336
337   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
338                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
339
340   // prepare receiver request
341   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
342                                             recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
343
344   count_++;
345
346   // start send
347   sreq->start();
348   // push request to receiver's win
349   recv_win->mut_->lock();
350   recv_win->requests_->push_back(rreq);
351   rreq->start();
352   recv_win->mut_->unlock();
353
354   if (request != nullptr) {
355     *request = sreq;
356   } else {
357     mut_->lock();
358     requests_->push_back(sreq);
359     mut_->unlock();
360   }
361
362   XBT_DEBUG("Leaving MPI_Win_Accumulate");
363   return MPI_SUCCESS;
364 }
365
366 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
367                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
368                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
369 {
370   //get sender pointer
371   MPI_Win send_win = connected_wins_[target_rank];
372
373   if(opened_==0){//check that post/start has been done
374     // no fence or start .. lock ok ?
375     int locked=0;
376     for (auto const& it : send_win->lockers_)
377       if (it == comm_->rank())
378         locked = 1;
379     if(locked != 1)
380       return MPI_ERR_WIN;
381   }
382
383   if(target_count*target_datatype->get_extent()>send_win->size_)
384     return MPI_ERR_ARG;
385
386   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
387   //need to be sure ops are correctly ordered, so finish request here ? slow.
388   MPI_Request req;
389   send_win->atomic_mut_->lock();
390   get(result_addr, result_count, result_datatype, target_rank,
391               target_disp, target_count, target_datatype, &req);
392   if (req != MPI_REQUEST_NULL)
393     Request::wait(&req, MPI_STATUS_IGNORE);
394   if(op!=MPI_NO_OP)
395     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
396               target_disp, target_count, target_datatype, op, &req);
397   if (req != MPI_REQUEST_NULL)
398     Request::wait(&req, MPI_STATUS_IGNORE);
399   send_win->atomic_mut_->unlock();
400   return MPI_SUCCESS;
401 }
402
403 int Win::compare_and_swap(const void *origin_addr, void *compare_addr,
404         void *result_addr, MPI_Datatype datatype, int target_rank,
405         MPI_Aint target_disp){
406   //get sender pointer
407   MPI_Win send_win = connected_wins_[target_rank];
408
409   if(opened_==0){//check that post/start has been done
410     // no fence or start .. lock ok ?
411     int locked=0;
412     for (auto const& it : send_win->lockers_)
413       if (it == comm_->rank())
414         locked = 1;
415     if(locked != 1)
416       return MPI_ERR_WIN;
417   }
418
419   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
420   MPI_Request req = MPI_REQUEST_NULL;
421   send_win->atomic_mut_->lock();
422   get(result_addr, 1, datatype, target_rank,
423               target_disp, 1, datatype, &req);
424   if (req != MPI_REQUEST_NULL)
425     Request::wait(&req, MPI_STATUS_IGNORE);
426   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
427     put(origin_addr, 1, datatype, target_rank,
428               target_disp, 1, datatype);
429   }
430   send_win->atomic_mut_->unlock();
431   return MPI_SUCCESS;
432 }
433
434 int Win::start(MPI_Group group, int /*assert*/)
435 {
436   /* From MPI forum advices
437   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
438   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
439   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
440   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
441   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
442   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
443   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
444   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
445   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
446   must complete, without further dependencies.  */
447
448   //naive, blocking implementation.
449   int i             = 0;
450   int j             = 0;
451   int size          = group->size();
452   MPI_Request* reqs = xbt_new0(MPI_Request, size);
453
454   XBT_DEBUG("Entering MPI_Win_Start");
455   while (j != size) {
456     int src = comm_->group()->rank(group->actor(j));
457     if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
458       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
459       i++;
460     }
461     j++;
462   }
463   size = i;
464   Request::startall(size, reqs);
465   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
466   for (i = 0; i < size; i++) {
467     Request::unref(&reqs[i]);
468   }
469   xbt_free(reqs);
470   opened_++; //we're open for business !
471   group_=group;
472   group->ref();
473   XBT_DEBUG("Leaving MPI_Win_Start");
474   return MPI_SUCCESS;
475 }
476
477 int Win::post(MPI_Group group, int /*assert*/)
478 {
479   //let's make a synchronous send here
480   int i             = 0;
481   int j             = 0;
482   int size = group->size();
483   MPI_Request* reqs = xbt_new0(MPI_Request, size);
484
485   XBT_DEBUG("Entering MPI_Win_Post");
486   while(j!=size){
487     int dst = comm_->group()->rank(group->actor(j));
488     if (dst != rank_ && dst != MPI_UNDEFINED) {
489       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
490       i++;
491     }
492     j++;
493   }
494   size=i;
495
496   Request::startall(size, reqs);
497   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
498   for(i=0;i<size;i++){
499     Request::unref(&reqs[i]);
500   }
501   xbt_free(reqs);
502   opened_++; //we're open for business !
503   group_=group;
504   group->ref();
505   XBT_DEBUG("Leaving MPI_Win_Post");
506   return MPI_SUCCESS;
507 }
508
509 int Win::complete(){
510   if(opened_==0)
511     xbt_die("Complete called on already opened MPI_Win");
512
513   XBT_DEBUG("Entering MPI_Win_Complete");
514   int i             = 0;
515   int j             = 0;
516   int size          = group_->size();
517   MPI_Request* reqs = xbt_new0(MPI_Request, size);
518
519   while(j!=size){
520     int dst = comm_->group()->rank(group_->actor(j));
521     if (dst != rank_ && dst != MPI_UNDEFINED) {
522       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
523       i++;
524     }
525     j++;
526   }
527   size=i;
528   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
529   Request::startall(size, reqs);
530   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
531
532   for(i=0;i<size;i++){
533     Request::unref(&reqs[i]);
534   }
535   xbt_free(reqs);
536
537   int finished = finish_comms();
538   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
539
540   Group::unref(group_);
541   opened_--; //we're closed for business !
542   return MPI_SUCCESS;
543 }
544
545 int Win::wait(){
546   //naive, blocking implementation.
547   XBT_DEBUG("Entering MPI_Win_Wait");
548   int i             = 0;
549   int j             = 0;
550   int size          = group_->size();
551   MPI_Request* reqs = xbt_new0(MPI_Request, size);
552
553   while(j!=size){
554     int src = comm_->group()->rank(group_->actor(j));
555     if (src != rank_ && src != MPI_UNDEFINED) {
556       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
557       i++;
558     }
559     j++;
560   }
561   size=i;
562   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
563   Request::startall(size, reqs);
564   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
565   for(i=0;i<size;i++){
566     Request::unref(&reqs[i]);
567   }
568   xbt_free(reqs);
569   int finished = finish_comms();
570   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
571
572   Group::unref(group_);
573   opened_--; //we're opened for business !
574   return MPI_SUCCESS;
575 }
576
577 int Win::lock(int lock_type, int rank, int /*assert*/)
578 {
579   MPI_Win target_win = connected_wins_[rank];
580
581   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
582     target_win->lock_mut_->lock();
583     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
584     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
585       target_win->lock_mut_->unlock();
586    }
587   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
588     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
589
590   target_win->lockers_.push_back(comm_->rank());
591
592   int finished = finish_comms(rank);
593   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
594   finished = target_win->finish_comms(rank_);
595   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
596   return MPI_SUCCESS;
597 }
598
599 int Win::lock_all(int assert){
600   int i=0;
601   int retval = MPI_SUCCESS;
602   for (i=0; i<comm_->size();i++){
603       int ret = this->lock(MPI_LOCK_SHARED, i, assert);
604       if(ret != MPI_SUCCESS)
605         retval = ret;
606   }
607   return retval;
608 }
609
610 int Win::unlock(int rank){
611   MPI_Win target_win = connected_wins_[rank];
612   int target_mode = target_win->mode_;
613   target_win->mode_= 0;
614   target_win->lockers_.remove(comm_->rank());
615   if (target_mode==MPI_LOCK_EXCLUSIVE){
616     target_win->lock_mut_->unlock();
617   }
618
619   int finished = finish_comms(rank);
620   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
621   finished = target_win->finish_comms(rank_);
622   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
623   return MPI_SUCCESS;
624 }
625
626 int Win::unlock_all(){
627   int i=0;
628   int retval = MPI_SUCCESS;
629   for (i=0; i<comm_->size();i++){
630     int ret = this->unlock(i);
631     if (ret != MPI_SUCCESS)
632       retval = ret;
633   }
634   return retval;
635 }
636
637 int Win::flush(int rank){
638   MPI_Win target_win = connected_wins_[rank];
639   int finished       = finish_comms(rank_);
640   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
641   finished = target_win->finish_comms(rank);
642   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
643   return MPI_SUCCESS;
644 }
645
646 int Win::flush_local(int rank){
647   int finished = finish_comms(rank);
648   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
649   return MPI_SUCCESS;
650 }
651
652 int Win::flush_all(){
653   int finished = finish_comms();
654   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
655   for (int i = 0; i < comm_->size(); i++) {
656     finished = connected_wins_[i]->finish_comms(rank_);
657     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
658   }
659   return MPI_SUCCESS;
660 }
661
662 int Win::flush_local_all(){
663   int finished = finish_comms();
664   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
665   return MPI_SUCCESS;
666 }
667
668 Win* Win::f2c(int id){
669   return static_cast<Win*>(F2C::f2c(id));
670 }
671
672 int Win::finish_comms(){
673   mut_->lock();
674   //Finish own requests
675   std::vector<MPI_Request> *reqqs = requests_;
676   int size = static_cast<int>(reqqs->size());
677   if (size > 0) {
678     MPI_Request* treqs = &(*reqqs)[0];
679     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
680     reqqs->clear();
681   }
682   mut_->unlock();
683   return size;
684 }
685
686 int Win::finish_comms(int rank){
687   mut_->lock();
688   //Finish own requests
689   std::vector<MPI_Request> *reqqs = requests_;
690   int size = static_cast<int>(reqqs->size());
691   if (size > 0) {
692     size = 0;
693     std::vector<MPI_Request> myreqqs;
694     std::vector<MPI_Request>::iterator iter = reqqs->begin();
695     int proc_id                             = comm_->group()->actor(rank)->get_pid();
696     while (iter != reqqs->end()){
697       // Let's see if we're either the destination or the sender of this request
698       // because we only wait for requests that we are responsible for.
699       // Also use the process id here since the request itself returns from src()
700       // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
701       if (((*iter) != MPI_REQUEST_NULL) && (((*iter)->src() == proc_id) || ((*iter)->dst() == proc_id))) {
702         myreqqs.push_back(*iter);
703         iter = reqqs->erase(iter);
704         size++;
705       } else {
706         ++iter;
707       }
708     }
709     if(size >0){
710       MPI_Request* treqs = &myreqqs[0];
711       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
712       myreqqs.clear();
713     }
714   }
715   mut_->unlock();
716   return size;
717 }
718
719 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr)
720 {
721   MPI_Win target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
722   for (int i = 0; not target_win && i < comm_->size(); i++) {
723     if (connected_wins_[i]->size_ > 0)
724       target_win = connected_wins_[i];
725   }
726   if (target_win) {
727     *size                         = target_win->size_;
728     *disp_unit                    = target_win->disp_unit_;
729     *static_cast<void**>(baseptr) = target_win->base_;
730   } else {
731     *size                         = 0;
732     *static_cast<void**>(baseptr) = xbt_malloc(0);
733   }
734   return MPI_SUCCESS;
735 }
736
737 MPI_Errhandler Win::errhandler()
738 {
739   return errhandler_;
740 }
741
742 void Win::set_errhandler(MPI_Errhandler errhandler)
743 {
744   errhandler_ = errhandler;
745   if (errhandler_ != MPI_ERRHANDLER_NULL)
746     errhandler->ref();
747 }
748 } // namespace smpi
749 } // namespace simgrid