Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'klement/simgrid-klement' into master
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2020. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7
8 #include "private.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
16
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
18
19
20 namespace simgrid{
21 namespace smpi{
22 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
23 int Win::keyval_id_=0;
24
25 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic)
26     : base_(base)
27     , size_(size)
28     , disp_unit_(disp_unit)
29     , info_(info)
30     , comm_(comm)
31     , rank_(comm->rank())
32     , allocated_(allocated)
33     , dynamic_(dynamic)
34 {
35   XBT_DEBUG("Creating window");
36   if(info!=MPI_INFO_NULL)
37     info->ref();
38   int comm_size          = comm->size();
39   name_                  = nullptr;
40   opened_                = 0;
41   group_                 = MPI_GROUP_NULL;
42   requests_              = new std::vector<MPI_Request>();
43   mut_                   = s4u::Mutex::create();
44   lock_mut_              = s4u::Mutex::create();
45   atomic_mut_            = s4u::Mutex::create();
46   connected_wins_        = new MPI_Win[comm_size];
47   connected_wins_[rank_] = this;
48   count_                 = 0;
49   if(rank_==0){
50     bar_ = new s4u::Barrier(comm_size);
51   }
52   mode_=0;
53   errhandler_=MPI_ERRORS_ARE_FATAL;
54   errhandler_->ref();
55   comm->add_rma_win(this);
56   comm->ref();
57
58   colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win), MPI_BYTE,
59                    comm);
60
61   colls::bcast(&(bar_), sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
62
63   colls::barrier(comm);
64 }
65
66 Win::~Win(){
67   //As per the standard, perform a barrier to ensure every async comm is finished
68   bar_->wait();
69
70   int finished = finish_comms();
71   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
72
73   delete requests_;
74   delete[] connected_wins_;
75   if (name_ != nullptr){
76     xbt_free(name_);
77   }
78   if (info_ != MPI_INFO_NULL)
79     simgrid::smpi::Info::unref(info_);
80   if (errhandler_ != MPI_ERRHANDLER_NULL)
81     simgrid::smpi::Errhandler::unref(errhandler_);
82
83   comm_->remove_rma_win(this);
84
85   colls::barrier(comm_);
86   Comm::unref(comm_);
87   
88   if (rank_ == 0)
89     delete bar_;
90
91   if(allocated_ !=0)
92     xbt_free(base_);
93
94   cleanup_attr<Win>();
95 }
96
97 int Win::attach(void* /*base*/, MPI_Aint size)
98 {
99   if (not(base_ == MPI_BOTTOM || base_ == nullptr))
100     return MPI_ERR_ARG;
101   base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
102   size_+=size;
103   return MPI_SUCCESS;
104 }
105
106 int Win::detach(const void* /*base*/)
107 {
108   base_=MPI_BOTTOM;
109   size_=-1;
110   return MPI_SUCCESS;
111 }
112
113 void Win::get_name(char* name, int* length) const
114 {
115   if(name_==nullptr){
116     *length=0;
117     name=nullptr;
118     return;
119   }
120   *length = strlen(name_);
121   strncpy(name, name_, *length+1);
122 }
123
124 void Win::get_group(MPI_Group* group){
125   if(comm_ != MPI_COMM_NULL){
126     *group = comm_->group();
127   } else {
128     *group = MPI_GROUP_NULL;
129   }
130 }
131
132 MPI_Info Win::info()
133 {
134   if (info_ == MPI_INFO_NULL)
135     info_ = new Info();
136   info_->ref();
137   return info_;
138 }
139
140 int Win::rank() const
141 {
142   return rank_;
143 }
144
145 MPI_Aint Win::size() const
146 {
147   return size_;
148 }
149
150 void* Win::base() const
151 {
152   return base_;
153 }
154
155 int Win::disp_unit() const
156 {
157   return disp_unit_;
158 }
159
160 int Win::dynamic() const
161 {
162   return dynamic_;
163 }
164
165 void Win::set_info(MPI_Info info)
166 {
167   if (info_ != MPI_INFO_NULL)
168     simgrid::smpi::Info::unref(info_);
169   info_ = info;
170   if (info_ != MPI_INFO_NULL)
171     info_->ref();
172 }
173
174 void Win::set_name(const char* name){
175   name_ = xbt_strdup(name);
176 }
177
178 int Win::fence(int assert)
179 {
180   XBT_DEBUG("Entering fence");
181   if (opened_ == 0)
182     opened_=1;
183   if (assert != MPI_MODE_NOPRECEDE) {
184     // This is not the first fence => finalize what came before
185     bar_->wait();
186     mut_->lock();
187     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
188     // Without this, the vector could get redimensioned when another process pushes.
189     // This would result in the array used by Request::waitall() to be invalidated.
190     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
191     std::vector<MPI_Request> *reqs = requests_;
192     int size = static_cast<int>(reqs->size());
193     // start all requests that have been prepared by another process
194     if (size > 0) {
195       MPI_Request* treqs = &(*reqs)[0];
196       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
197     }
198     count_=0;
199     mut_->unlock();
200   }
201
202   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
203     opened_=0;
204   assert_ = assert;
205
206   bar_->wait();
207   XBT_DEBUG("Leaving fence");
208
209   return MPI_SUCCESS;
210 }
211
212 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
213               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
214 {
215   //get receiver pointer
216   const Win* recv_win = connected_wins_[target_rank];
217
218   if(opened_==0){//check that post/start has been done
219     // no fence or start .. lock ok ?
220     int locked=0;
221     for (auto const& it : recv_win->lockers_)
222       if (it == comm_->rank())
223         locked = 1;
224     if(locked != 1)
225       return MPI_ERR_WIN;
226   }
227
228   if(target_count*target_datatype->get_extent()>recv_win->size_)
229     return MPI_ERR_ARG;
230
231   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
232
233   if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
234     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
235     // prepare send_request
236     MPI_Request sreq =
237         // TODO cheinrich Check for rank / pid conversion
238         Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
239                                comm_, MPI_OP_NULL);
240
241     //prepare receiver request
242     // TODO cheinrich Check for rank / pid conversion
243     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
244                                               target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
245
246     //start send
247     sreq->start();
248
249     if(request!=nullptr){
250       *request=sreq;
251     }else{
252       mut_->lock();
253       requests_->push_back(sreq);
254       mut_->unlock();
255     }
256
257     //push request to receiver's win
258     recv_win->mut_->lock();
259     recv_win->requests_->push_back(rreq);
260     rreq->start();
261     recv_win->mut_->unlock();
262   } else {
263     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
264     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
265     if(request!=nullptr)
266       *request = MPI_REQUEST_NULL;
267   }
268
269   return MPI_SUCCESS;
270 }
271
272 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
273               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
274 {
275   //get sender pointer
276   const Win* send_win = connected_wins_[target_rank];
277
278   if(opened_==0){//check that post/start has been done
279     // no fence or start .. lock ok ?
280     int locked=0;
281     for (auto const& it : send_win->lockers_)
282       if (it == comm_->rank())
283         locked = 1;
284     if(locked != 1)
285       return MPI_ERR_WIN;
286   }
287
288   if(target_count*target_datatype->get_extent()>send_win->size_)
289     return MPI_ERR_ARG;
290
291   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
292   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
293
294   if(target_rank != comm_->rank()){
295     //prepare send_request
296     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
297                                               send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
298
299     //prepare receiver request
300     MPI_Request rreq = Request::rma_recv_init(
301         origin_addr, origin_count, origin_datatype, target_rank,
302         comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
303         SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
304
305     //start the send, with another process than us as sender.
306     sreq->start();
307     //push request to receiver's win
308     send_win->mut_->lock();
309     send_win->requests_->push_back(sreq);
310     send_win->mut_->unlock();
311
312     //start recv
313     rreq->start();
314
315     if(request!=nullptr){
316       *request=rreq;
317     }else{
318       mut_->lock();
319       requests_->push_back(rreq);
320       mut_->unlock();
321     }
322   } else {
323     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
324     if(request!=nullptr)
325       *request=MPI_REQUEST_NULL;
326   }
327   return MPI_SUCCESS;
328 }
329
330 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
331               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
332 {
333   XBT_DEBUG("Entering MPI_Win_Accumulate");
334   //get receiver pointer
335   const Win* recv_win = connected_wins_[target_rank];
336
337   if(opened_==0){//check that post/start has been done
338     // no fence or start .. lock ok ?
339     int locked=0;
340     for (auto const& it : recv_win->lockers_)
341       if (it == comm_->rank())
342         locked = 1;
343     if(locked != 1)
344       return MPI_ERR_WIN;
345   }
346   //FIXME: local version
347
348   if(target_count*target_datatype->get_extent()>recv_win->size_)
349     return MPI_ERR_ARG;
350
351   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
352   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
353   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
354   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
355   // prepare send_request
356
357   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
358                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
359
360   // prepare receiver request
361   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
362                                             recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
363
364   count_++;
365
366   // start send
367   sreq->start();
368   // push request to receiver's win
369   recv_win->mut_->lock();
370   recv_win->requests_->push_back(rreq);
371   rreq->start();
372   recv_win->mut_->unlock();
373
374   if (request != nullptr) {
375     *request = sreq;
376   } else {
377     mut_->lock();
378     requests_->push_back(sreq);
379     mut_->unlock();
380   }
381
382   XBT_DEBUG("Leaving MPI_Win_Accumulate");
383   return MPI_SUCCESS;
384 }
385
386 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
387                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
388                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
389 {
390   //get sender pointer
391   const Win* send_win = connected_wins_[target_rank];
392
393   if(opened_==0){//check that post/start has been done
394     // no fence or start .. lock ok ?
395     int locked=0;
396     for (auto const& it : send_win->lockers_)
397       if (it == comm_->rank())
398         locked = 1;
399     if(locked != 1)
400       return MPI_ERR_WIN;
401   }
402
403   if(target_count*target_datatype->get_extent()>send_win->size_)
404     return MPI_ERR_ARG;
405
406   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
407   //need to be sure ops are correctly ordered, so finish request here ? slow.
408   MPI_Request req;
409   send_win->atomic_mut_->lock();
410   get(result_addr, result_count, result_datatype, target_rank,
411               target_disp, target_count, target_datatype, &req);
412   if (req != MPI_REQUEST_NULL)
413     Request::wait(&req, MPI_STATUS_IGNORE);
414   if(op!=MPI_NO_OP)
415     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
416               target_disp, target_count, target_datatype, op, &req);
417   if (req != MPI_REQUEST_NULL)
418     Request::wait(&req, MPI_STATUS_IGNORE);
419   send_win->atomic_mut_->unlock();
420   return MPI_SUCCESS;
421 }
422
423 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
424                           int target_rank, MPI_Aint target_disp)
425 {
426   //get sender pointer
427   const Win* send_win = connected_wins_[target_rank];
428
429   if(opened_==0){//check that post/start has been done
430     // no fence or start .. lock ok ?
431     int locked=0;
432     for (auto const& it : send_win->lockers_)
433       if (it == comm_->rank())
434         locked = 1;
435     if(locked != 1)
436       return MPI_ERR_WIN;
437   }
438
439   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
440   MPI_Request req = MPI_REQUEST_NULL;
441   send_win->atomic_mut_->lock();
442   get(result_addr, 1, datatype, target_rank,
443               target_disp, 1, datatype, &req);
444   if (req != MPI_REQUEST_NULL)
445     Request::wait(&req, MPI_STATUS_IGNORE);
446   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
447     put(origin_addr, 1, datatype, target_rank,
448               target_disp, 1, datatype);
449   }
450   send_win->atomic_mut_->unlock();
451   return MPI_SUCCESS;
452 }
453
454 int Win::start(MPI_Group group, int /*assert*/)
455 {
456   /* From MPI forum advices
457   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
458   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
459   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
460   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
461   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
462   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
463   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
464   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
465   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
466   must complete, without further dependencies.  */
467
468   //naive, blocking implementation.
469   int i             = 0;
470   int j             = 0;
471   int size          = group->size();
472   MPI_Request* reqs = xbt_new0(MPI_Request, size);
473
474   XBT_DEBUG("Entering MPI_Win_Start");
475   while (j != size) {
476     int src = comm_->group()->rank(group->actor(j));
477     if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
478       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
479       i++;
480     }
481     j++;
482   }
483   size = i;
484   Request::startall(size, reqs);
485   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
486   for (i = 0; i < size; i++) {
487     Request::unref(&reqs[i]);
488   }
489   xbt_free(reqs);
490   opened_++; //we're open for business !
491   group_=group;
492   group->ref();
493   XBT_DEBUG("Leaving MPI_Win_Start");
494   return MPI_SUCCESS;
495 }
496
497 int Win::post(MPI_Group group, int /*assert*/)
498 {
499   //let's make a synchronous send here
500   int i             = 0;
501   int j             = 0;
502   int size = group->size();
503   MPI_Request* reqs = xbt_new0(MPI_Request, size);
504
505   XBT_DEBUG("Entering MPI_Win_Post");
506   while(j!=size){
507     int dst = comm_->group()->rank(group->actor(j));
508     if (dst != rank_ && dst != MPI_UNDEFINED) {
509       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
510       i++;
511     }
512     j++;
513   }
514   size=i;
515
516   Request::startall(size, reqs);
517   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
518   for(i=0;i<size;i++){
519     Request::unref(&reqs[i]);
520   }
521   xbt_free(reqs);
522   opened_++; //we're open for business !
523   group_=group;
524   group->ref();
525   XBT_DEBUG("Leaving MPI_Win_Post");
526   return MPI_SUCCESS;
527 }
528
529 int Win::complete(){
530   if(opened_==0)
531     xbt_die("Complete called on already opened MPI_Win");
532
533   XBT_DEBUG("Entering MPI_Win_Complete");
534   int i             = 0;
535   int j             = 0;
536   int size          = group_->size();
537   MPI_Request* reqs = xbt_new0(MPI_Request, size);
538
539   while(j!=size){
540     int dst = comm_->group()->rank(group_->actor(j));
541     if (dst != rank_ && dst != MPI_UNDEFINED) {
542       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
543       i++;
544     }
545     j++;
546   }
547   size=i;
548   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
549   Request::startall(size, reqs);
550   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
551
552   for(i=0;i<size;i++){
553     Request::unref(&reqs[i]);
554   }
555   xbt_free(reqs);
556
557   int finished = finish_comms();
558   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
559
560   Group::unref(group_);
561   opened_--; //we're closed for business !
562   return MPI_SUCCESS;
563 }
564
565 int Win::wait(){
566   //naive, blocking implementation.
567   XBT_DEBUG("Entering MPI_Win_Wait");
568   int i             = 0;
569   int j             = 0;
570   int size          = group_->size();
571   MPI_Request* reqs = xbt_new0(MPI_Request, size);
572
573   while(j!=size){
574     int src = comm_->group()->rank(group_->actor(j));
575     if (src != rank_ && src != MPI_UNDEFINED) {
576       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
577       i++;
578     }
579     j++;
580   }
581   size=i;
582   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
583   Request::startall(size, reqs);
584   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
585   for(i=0;i<size;i++){
586     Request::unref(&reqs[i]);
587   }
588   xbt_free(reqs);
589   int finished = finish_comms();
590   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
591
592   Group::unref(group_);
593   opened_--; //we're opened for business !
594   return MPI_SUCCESS;
595 }
596
597 int Win::lock(int lock_type, int rank, int /*assert*/)
598 {
599   MPI_Win target_win = connected_wins_[rank];
600
601   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
602     target_win->lock_mut_->lock();
603     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
604     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
605       target_win->lock_mut_->unlock();
606    }
607   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
608     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
609
610   target_win->lockers_.push_back(comm_->rank());
611
612   int finished = finish_comms(rank);
613   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
614   finished = target_win->finish_comms(rank_);
615   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
616   return MPI_SUCCESS;
617 }
618
619 int Win::lock_all(int assert){
620   int retval = MPI_SUCCESS;
621   for (int i = 0; i < comm_->size(); i++) {
622     int ret = this->lock(MPI_LOCK_SHARED, i, assert);
623     if (ret != MPI_SUCCESS)
624       retval = ret;
625   }
626   return retval;
627 }
628
629 int Win::unlock(int rank){
630   MPI_Win target_win = connected_wins_[rank];
631   int target_mode = target_win->mode_;
632   target_win->mode_= 0;
633   target_win->lockers_.remove(comm_->rank());
634   if (target_mode==MPI_LOCK_EXCLUSIVE){
635     target_win->lock_mut_->unlock();
636   }
637
638   int finished = finish_comms(rank);
639   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
640   finished = target_win->finish_comms(rank_);
641   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
642   return MPI_SUCCESS;
643 }
644
645 int Win::unlock_all(){
646   int retval = MPI_SUCCESS;
647   for (int i = 0; i < comm_->size(); i++) {
648     int ret = this->unlock(i);
649     if (ret != MPI_SUCCESS)
650       retval = ret;
651   }
652   return retval;
653 }
654
655 int Win::flush(int rank){
656   MPI_Win target_win = connected_wins_[rank];
657   int finished       = finish_comms(rank_);
658   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
659   finished = target_win->finish_comms(rank);
660   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
661   return MPI_SUCCESS;
662 }
663
664 int Win::flush_local(int rank){
665   int finished = finish_comms(rank);
666   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
667   return MPI_SUCCESS;
668 }
669
670 int Win::flush_all(){
671   int finished = finish_comms();
672   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
673   for (int i = 0; i < comm_->size(); i++) {
674     finished = connected_wins_[i]->finish_comms(rank_);
675     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
676   }
677   return MPI_SUCCESS;
678 }
679
680 int Win::flush_local_all(){
681   int finished = finish_comms();
682   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
683   return MPI_SUCCESS;
684 }
685
686 Win* Win::f2c(int id){
687   return static_cast<Win*>(F2C::f2c(id));
688 }
689
690 int Win::finish_comms(){
691   mut_->lock();
692   //Finish own requests
693   std::vector<MPI_Request> *reqqs = requests_;
694   int size = static_cast<int>(reqqs->size());
695   if (size > 0) {
696     MPI_Request* treqs = &(*reqqs)[0];
697     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
698     reqqs->clear();
699   }
700   mut_->unlock();
701   return size;
702 }
703
704 int Win::finish_comms(int rank){
705   mut_->lock();
706   //Finish own requests
707   std::vector<MPI_Request> *reqqs = requests_;
708   int size = static_cast<int>(reqqs->size());
709   if (size > 0) {
710     size = 0;
711     std::vector<MPI_Request> myreqqs;
712     auto iter                               = reqqs->begin();
713     int proc_id                             = comm_->group()->actor(rank)->get_pid();
714     while (iter != reqqs->end()){
715       // Let's see if we're either the destination or the sender of this request
716       // because we only wait for requests that we are responsible for.
717       // Also use the process id here since the request itself returns from src()
718       // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
719       if (((*iter) != MPI_REQUEST_NULL) && (((*iter)->src() == proc_id) || ((*iter)->dst() == proc_id))) {
720         myreqqs.push_back(*iter);
721         iter = reqqs->erase(iter);
722         size++;
723       } else {
724         ++iter;
725       }
726     }
727     if(size >0){
728       MPI_Request* treqs = &myreqqs[0];
729       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
730       myreqqs.clear();
731     }
732   }
733   mut_->unlock();
734   return size;
735 }
736
737 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
738 {
739   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
740   for (int i = 0; not target_win && i < comm_->size(); i++) {
741     if (connected_wins_[i]->size_ > 0)
742       target_win = connected_wins_[i];
743   }
744   if (target_win) {
745     *size                         = target_win->size_;
746     *disp_unit                    = target_win->disp_unit_;
747     *static_cast<void**>(baseptr) = target_win->base_;
748   } else {
749     *size                         = 0;
750     *static_cast<void**>(baseptr) = nullptr;
751   }
752   return MPI_SUCCESS;
753 }
754
755 MPI_Errhandler Win::errhandler()
756 {
757   if (errhandler_ != MPI_ERRHANDLER_NULL)
758     errhandler_->ref();
759   return errhandler_;
760 }
761
762 void Win::set_errhandler(MPI_Errhandler errhandler)
763 {
764   if (errhandler_ != MPI_ERRHANDLER_NULL)
765     simgrid::smpi::Errhandler::unref(errhandler_);
766   errhandler_ = errhandler;
767   if (errhandler_ != MPI_ERRHANDLER_NULL)
768     errhandler_->ref();
769 }
770 } // namespace smpi
771 } // namespace simgrid