Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[sonar] Constify pointer and reference parameters in src/smpi/.
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7
8 #include "private.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
16
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
18
19
20 namespace simgrid{
21 namespace smpi{
22 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
23 int Win::keyval_id_=0;
24
25 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic)
26     : base_(base)
27     , size_(size)
28     , disp_unit_(disp_unit)
29     , info_(info)
30     , comm_(comm)
31     , rank_(comm->rank())
32     , allocated_(allocated)
33     , dynamic_(dynamic)
34 {
35   XBT_DEBUG("Creating window");
36   if(info!=MPI_INFO_NULL)
37     info->ref();
38   int comm_size          = comm->size();
39   name_                  = nullptr;
40   opened_                = 0;
41   group_                 = MPI_GROUP_NULL;
42   requests_              = new std::vector<MPI_Request>();
43   mut_                   = s4u::Mutex::create();
44   lock_mut_              = s4u::Mutex::create();
45   atomic_mut_            = s4u::Mutex::create();
46   connected_wins_        = new MPI_Win[comm_size];
47   connected_wins_[rank_] = this;
48   count_                 = 0;
49   if(rank_==0){
50     bar_ = new s4u::Barrier(comm_size);
51   }
52   mode_=0;
53   errhandler_=MPI_ERRORS_ARE_FATAL;
54   errhandler_->ref();
55   comm->add_rma_win(this);
56   comm->ref();
57
58   colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win), MPI_BYTE,
59                    comm);
60
61   colls::bcast(&(bar_), sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
62
63   colls::barrier(comm);
64 }
65
66 Win::~Win(){
67   //As per the standard, perform a barrier to ensure every async comm is finished
68   bar_->wait();
69
70   int finished = finish_comms();
71   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
72
73   delete requests_;
74   delete[] connected_wins_;
75   if (name_ != nullptr){
76     xbt_free(name_);
77   }
78   if (info_ != MPI_INFO_NULL)
79     simgrid::smpi::Info::unref(info_);
80   if (errhandler_ != MPI_ERRHANDLER_NULL)
81     simgrid::smpi::Errhandler::unref(errhandler_);
82
83   comm_->remove_rma_win(this);
84
85   colls::barrier(comm_);
86   Comm::unref(comm_);
87   
88   if (rank_ == 0)
89     delete bar_;
90
91   if(allocated_ !=0)
92     xbt_free(base_);
93
94   cleanup_attr<Win>();
95 }
96
97 int Win::attach(void* /*base*/, MPI_Aint size)
98 {
99   if (not(base_ == MPI_BOTTOM || base_ == 0))
100     return MPI_ERR_ARG;
101   base_=0;//actually the address will be given in the RMA calls, as being the disp.
102   size_+=size;
103   return MPI_SUCCESS;
104 }
105
106 int Win::detach(const void* /*base*/)
107 {
108   base_=MPI_BOTTOM;
109   size_=-1;
110   return MPI_SUCCESS;
111 }
112
113 void Win::get_name(char* name, int* length){
114   if(name_==nullptr){
115     *length=0;
116     name=nullptr;
117     return;
118   }
119   *length = strlen(name_);
120   strncpy(name, name_, *length+1);
121 }
122
123 void Win::get_group(MPI_Group* group){
124   if(comm_ != MPI_COMM_NULL){
125     *group = comm_->group();
126   } else {
127     *group = MPI_GROUP_NULL;
128   }
129 }
130
131 MPI_Info Win::info()
132 {
133   if (info_ == MPI_INFO_NULL)
134     info_ = new Info();
135   info_->ref();
136   return info_;
137 }
138
139 int Win::rank(){
140   return rank_;
141 }
142
143 MPI_Aint Win::size(){
144   return size_;
145 }
146
147 void* Win::base(){
148   return base_;
149 }
150
151 int Win::disp_unit(){
152   return disp_unit_;
153 }
154
155 int Win::dynamic(){
156   return dynamic_;
157 }
158
159 void Win::set_info(MPI_Info info)
160 {
161   if (info_ != MPI_INFO_NULL)
162     simgrid::smpi::Info::unref(info_);
163   info_ = info;
164   if (info_ != MPI_INFO_NULL)
165     info_->ref();
166 }
167
168 void Win::set_name(const char* name){
169   name_ = xbt_strdup(name);
170 }
171
172 int Win::fence(int assert)
173 {
174   XBT_DEBUG("Entering fence");
175   if (opened_ == 0)
176     opened_=1;
177   if (assert != MPI_MODE_NOPRECEDE) {
178     // This is not the first fence => finalize what came before
179     bar_->wait();
180     mut_->lock();
181     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
182     // Without this, the vector could get redimensioned when another process pushes.
183     // This would result in the array used by Request::waitall() to be invalidated.
184     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
185     std::vector<MPI_Request> *reqs = requests_;
186     int size = static_cast<int>(reqs->size());
187     // start all requests that have been prepared by another process
188     if (size > 0) {
189       MPI_Request* treqs = &(*reqs)[0];
190       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
191     }
192     count_=0;
193     mut_->unlock();
194   }
195
196   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
197     opened_=0;
198   assert_ = assert;
199
200   bar_->wait();
201   XBT_DEBUG("Leaving fence");
202
203   return MPI_SUCCESS;
204 }
205
206 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
207               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
208 {
209   //get receiver pointer
210   const Win* recv_win = connected_wins_[target_rank];
211
212   if(opened_==0){//check that post/start has been done
213     // no fence or start .. lock ok ?
214     int locked=0;
215     for (auto const& it : recv_win->lockers_)
216       if (it == comm_->rank())
217         locked = 1;
218     if(locked != 1)
219       return MPI_ERR_WIN;
220   }
221
222   if(target_count*target_datatype->get_extent()>recv_win->size_)
223     return MPI_ERR_ARG;
224
225   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
226
227   if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
228     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
229     // prepare send_request
230     MPI_Request sreq =
231         // TODO cheinrich Check for rank / pid conversion
232         Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
233                                comm_, MPI_OP_NULL);
234
235     //prepare receiver request
236     // TODO cheinrich Check for rank / pid conversion
237     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
238                                               target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
239
240     //start send
241     sreq->start();
242
243     if(request!=nullptr){
244       *request=sreq;
245     }else{
246       mut_->lock();
247       requests_->push_back(sreq);
248       mut_->unlock();
249     }
250
251     //push request to receiver's win
252     recv_win->mut_->lock();
253     recv_win->requests_->push_back(rreq);
254     rreq->start();
255     recv_win->mut_->unlock();
256   } else {
257     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
258     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
259     if(request!=nullptr)
260       *request = MPI_REQUEST_NULL;
261   }
262
263   return MPI_SUCCESS;
264 }
265
266 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
267               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
268 {
269   //get sender pointer
270   const Win* send_win = connected_wins_[target_rank];
271
272   if(opened_==0){//check that post/start has been done
273     // no fence or start .. lock ok ?
274     int locked=0;
275     for (auto const& it : send_win->lockers_)
276       if (it == comm_->rank())
277         locked = 1;
278     if(locked != 1)
279       return MPI_ERR_WIN;
280   }
281
282   if(target_count*target_datatype->get_extent()>send_win->size_)
283     return MPI_ERR_ARG;
284
285   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
286   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
287
288   if(target_rank != comm_->rank()){
289     //prepare send_request
290     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
291                                               send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
292
293     //prepare receiver request
294     MPI_Request rreq = Request::rma_recv_init(
295         origin_addr, origin_count, origin_datatype, target_rank,
296         comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
297         SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
298
299     //start the send, with another process than us as sender.
300     sreq->start();
301     //push request to receiver's win
302     send_win->mut_->lock();
303     send_win->requests_->push_back(sreq);
304     send_win->mut_->unlock();
305
306     //start recv
307     rreq->start();
308
309     if(request!=nullptr){
310       *request=rreq;
311     }else{
312       mut_->lock();
313       requests_->push_back(rreq);
314       mut_->unlock();
315     }
316   } else {
317     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
318     if(request!=nullptr)
319       *request=MPI_REQUEST_NULL;
320   }
321   return MPI_SUCCESS;
322 }
323
324 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
325               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
326 {
327   XBT_DEBUG("Entering MPI_Win_Accumulate");
328   //get receiver pointer
329   const Win* recv_win = connected_wins_[target_rank];
330
331   if(opened_==0){//check that post/start has been done
332     // no fence or start .. lock ok ?
333     int locked=0;
334     for (auto const& it : recv_win->lockers_)
335       if (it == comm_->rank())
336         locked = 1;
337     if(locked != 1)
338       return MPI_ERR_WIN;
339   }
340   //FIXME: local version
341
342   if(target_count*target_datatype->get_extent()>recv_win->size_)
343     return MPI_ERR_ARG;
344
345   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
346   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
347   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
348   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
349   // prepare send_request
350
351   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
352                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
353
354   // prepare receiver request
355   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
356                                             recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
357
358   count_++;
359
360   // start send
361   sreq->start();
362   // push request to receiver's win
363   recv_win->mut_->lock();
364   recv_win->requests_->push_back(rreq);
365   rreq->start();
366   recv_win->mut_->unlock();
367
368   if (request != nullptr) {
369     *request = sreq;
370   } else {
371     mut_->lock();
372     requests_->push_back(sreq);
373     mut_->unlock();
374   }
375
376   XBT_DEBUG("Leaving MPI_Win_Accumulate");
377   return MPI_SUCCESS;
378 }
379
380 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
381                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
382                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
383 {
384   //get sender pointer
385   const Win* send_win = connected_wins_[target_rank];
386
387   if(opened_==0){//check that post/start has been done
388     // no fence or start .. lock ok ?
389     int locked=0;
390     for (auto const& it : send_win->lockers_)
391       if (it == comm_->rank())
392         locked = 1;
393     if(locked != 1)
394       return MPI_ERR_WIN;
395   }
396
397   if(target_count*target_datatype->get_extent()>send_win->size_)
398     return MPI_ERR_ARG;
399
400   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
401   //need to be sure ops are correctly ordered, so finish request here ? slow.
402   MPI_Request req;
403   send_win->atomic_mut_->lock();
404   get(result_addr, result_count, result_datatype, target_rank,
405               target_disp, target_count, target_datatype, &req);
406   if (req != MPI_REQUEST_NULL)
407     Request::wait(&req, MPI_STATUS_IGNORE);
408   if(op!=MPI_NO_OP)
409     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
410               target_disp, target_count, target_datatype, op, &req);
411   if (req != MPI_REQUEST_NULL)
412     Request::wait(&req, MPI_STATUS_IGNORE);
413   send_win->atomic_mut_->unlock();
414   return MPI_SUCCESS;
415 }
416
417 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
418                           int target_rank, MPI_Aint target_disp)
419 {
420   //get sender pointer
421   const Win* send_win = connected_wins_[target_rank];
422
423   if(opened_==0){//check that post/start has been done
424     // no fence or start .. lock ok ?
425     int locked=0;
426     for (auto const& it : send_win->lockers_)
427       if (it == comm_->rank())
428         locked = 1;
429     if(locked != 1)
430       return MPI_ERR_WIN;
431   }
432
433   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
434   MPI_Request req = MPI_REQUEST_NULL;
435   send_win->atomic_mut_->lock();
436   get(result_addr, 1, datatype, target_rank,
437               target_disp, 1, datatype, &req);
438   if (req != MPI_REQUEST_NULL)
439     Request::wait(&req, MPI_STATUS_IGNORE);
440   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
441     put(origin_addr, 1, datatype, target_rank,
442               target_disp, 1, datatype);
443   }
444   send_win->atomic_mut_->unlock();
445   return MPI_SUCCESS;
446 }
447
448 int Win::start(MPI_Group group, int /*assert*/)
449 {
450   /* From MPI forum advices
451   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
452   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
453   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
454   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
455   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
456   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
457   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
458   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
459   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
460   must complete, without further dependencies.  */
461
462   //naive, blocking implementation.
463   int i             = 0;
464   int j             = 0;
465   int size          = group->size();
466   MPI_Request* reqs = xbt_new0(MPI_Request, size);
467
468   XBT_DEBUG("Entering MPI_Win_Start");
469   while (j != size) {
470     int src = comm_->group()->rank(group->actor(j));
471     if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
472       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
473       i++;
474     }
475     j++;
476   }
477   size = i;
478   Request::startall(size, reqs);
479   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
480   for (i = 0; i < size; i++) {
481     Request::unref(&reqs[i]);
482   }
483   xbt_free(reqs);
484   opened_++; //we're open for business !
485   group_=group;
486   group->ref();
487   XBT_DEBUG("Leaving MPI_Win_Start");
488   return MPI_SUCCESS;
489 }
490
491 int Win::post(MPI_Group group, int /*assert*/)
492 {
493   //let's make a synchronous send here
494   int i             = 0;
495   int j             = 0;
496   int size = group->size();
497   MPI_Request* reqs = xbt_new0(MPI_Request, size);
498
499   XBT_DEBUG("Entering MPI_Win_Post");
500   while(j!=size){
501     int dst = comm_->group()->rank(group->actor(j));
502     if (dst != rank_ && dst != MPI_UNDEFINED) {
503       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
504       i++;
505     }
506     j++;
507   }
508   size=i;
509
510   Request::startall(size, reqs);
511   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
512   for(i=0;i<size;i++){
513     Request::unref(&reqs[i]);
514   }
515   xbt_free(reqs);
516   opened_++; //we're open for business !
517   group_=group;
518   group->ref();
519   XBT_DEBUG("Leaving MPI_Win_Post");
520   return MPI_SUCCESS;
521 }
522
523 int Win::complete(){
524   if(opened_==0)
525     xbt_die("Complete called on already opened MPI_Win");
526
527   XBT_DEBUG("Entering MPI_Win_Complete");
528   int i             = 0;
529   int j             = 0;
530   int size          = group_->size();
531   MPI_Request* reqs = xbt_new0(MPI_Request, size);
532
533   while(j!=size){
534     int dst = comm_->group()->rank(group_->actor(j));
535     if (dst != rank_ && dst != MPI_UNDEFINED) {
536       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
537       i++;
538     }
539     j++;
540   }
541   size=i;
542   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
543   Request::startall(size, reqs);
544   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
545
546   for(i=0;i<size;i++){
547     Request::unref(&reqs[i]);
548   }
549   xbt_free(reqs);
550
551   int finished = finish_comms();
552   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
553
554   Group::unref(group_);
555   opened_--; //we're closed for business !
556   return MPI_SUCCESS;
557 }
558
559 int Win::wait(){
560   //naive, blocking implementation.
561   XBT_DEBUG("Entering MPI_Win_Wait");
562   int i             = 0;
563   int j             = 0;
564   int size          = group_->size();
565   MPI_Request* reqs = xbt_new0(MPI_Request, size);
566
567   while(j!=size){
568     int src = comm_->group()->rank(group_->actor(j));
569     if (src != rank_ && src != MPI_UNDEFINED) {
570       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
571       i++;
572     }
573     j++;
574   }
575   size=i;
576   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
577   Request::startall(size, reqs);
578   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
579   for(i=0;i<size;i++){
580     Request::unref(&reqs[i]);
581   }
582   xbt_free(reqs);
583   int finished = finish_comms();
584   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
585
586   Group::unref(group_);
587   opened_--; //we're opened for business !
588   return MPI_SUCCESS;
589 }
590
591 int Win::lock(int lock_type, int rank, int /*assert*/)
592 {
593   MPI_Win target_win = connected_wins_[rank];
594
595   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
596     target_win->lock_mut_->lock();
597     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
598     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
599       target_win->lock_mut_->unlock();
600    }
601   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
602     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
603
604   target_win->lockers_.push_back(comm_->rank());
605
606   int finished = finish_comms(rank);
607   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
608   finished = target_win->finish_comms(rank_);
609   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
610   return MPI_SUCCESS;
611 }
612
613 int Win::lock_all(int assert){
614   int i=0;
615   int retval = MPI_SUCCESS;
616   for (i=0; i<comm_->size();i++){
617       int ret = this->lock(MPI_LOCK_SHARED, i, assert);
618       if(ret != MPI_SUCCESS)
619         retval = ret;
620   }
621   return retval;
622 }
623
624 int Win::unlock(int rank){
625   MPI_Win target_win = connected_wins_[rank];
626   int target_mode = target_win->mode_;
627   target_win->mode_= 0;
628   target_win->lockers_.remove(comm_->rank());
629   if (target_mode==MPI_LOCK_EXCLUSIVE){
630     target_win->lock_mut_->unlock();
631   }
632
633   int finished = finish_comms(rank);
634   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
635   finished = target_win->finish_comms(rank_);
636   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
637   return MPI_SUCCESS;
638 }
639
640 int Win::unlock_all(){
641   int i=0;
642   int retval = MPI_SUCCESS;
643   for (i=0; i<comm_->size();i++){
644     int ret = this->unlock(i);
645     if (ret != MPI_SUCCESS)
646       retval = ret;
647   }
648   return retval;
649 }
650
651 int Win::flush(int rank){
652   MPI_Win target_win = connected_wins_[rank];
653   int finished       = finish_comms(rank_);
654   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
655   finished = target_win->finish_comms(rank);
656   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
657   return MPI_SUCCESS;
658 }
659
660 int Win::flush_local(int rank){
661   int finished = finish_comms(rank);
662   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
663   return MPI_SUCCESS;
664 }
665
666 int Win::flush_all(){
667   int finished = finish_comms();
668   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
669   for (int i = 0; i < comm_->size(); i++) {
670     finished = connected_wins_[i]->finish_comms(rank_);
671     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
672   }
673   return MPI_SUCCESS;
674 }
675
676 int Win::flush_local_all(){
677   int finished = finish_comms();
678   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
679   return MPI_SUCCESS;
680 }
681
682 Win* Win::f2c(int id){
683   return static_cast<Win*>(F2C::f2c(id));
684 }
685
686 int Win::finish_comms(){
687   mut_->lock();
688   //Finish own requests
689   std::vector<MPI_Request> *reqqs = requests_;
690   int size = static_cast<int>(reqqs->size());
691   if (size > 0) {
692     MPI_Request* treqs = &(*reqqs)[0];
693     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
694     reqqs->clear();
695   }
696   mut_->unlock();
697   return size;
698 }
699
700 int Win::finish_comms(int rank){
701   mut_->lock();
702   //Finish own requests
703   std::vector<MPI_Request> *reqqs = requests_;
704   int size = static_cast<int>(reqqs->size());
705   if (size > 0) {
706     size = 0;
707     std::vector<MPI_Request> myreqqs;
708     std::vector<MPI_Request>::iterator iter = reqqs->begin();
709     int proc_id                             = comm_->group()->actor(rank)->get_pid();
710     while (iter != reqqs->end()){
711       // Let's see if we're either the destination or the sender of this request
712       // because we only wait for requests that we are responsible for.
713       // Also use the process id here since the request itself returns from src()
714       // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
715       if (((*iter) != MPI_REQUEST_NULL) && (((*iter)->src() == proc_id) || ((*iter)->dst() == proc_id))) {
716         myreqqs.push_back(*iter);
717         iter = reqqs->erase(iter);
718         size++;
719       } else {
720         ++iter;
721       }
722     }
723     if(size >0){
724       MPI_Request* treqs = &myreqqs[0];
725       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
726       myreqqs.clear();
727     }
728   }
729   mut_->unlock();
730   return size;
731 }
732
733 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr)
734 {
735   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
736   for (int i = 0; not target_win && i < comm_->size(); i++) {
737     if (connected_wins_[i]->size_ > 0)
738       target_win = connected_wins_[i];
739   }
740   if (target_win) {
741     *size                         = target_win->size_;
742     *disp_unit                    = target_win->disp_unit_;
743     *static_cast<void**>(baseptr) = target_win->base_;
744   } else {
745     *size                         = 0;
746     *static_cast<void**>(baseptr) = nullptr;
747   }
748   return MPI_SUCCESS;
749 }
750
751 MPI_Errhandler Win::errhandler()
752 {
753   if (errhandler_ != MPI_ERRHANDLER_NULL)
754     errhandler_->ref();
755   return errhandler_;
756 }
757
758 void Win::set_errhandler(MPI_Errhandler errhandler)
759 {
760   if (errhandler_ != MPI_ERRHANDLER_NULL)
761     simgrid::smpi::Errhandler::unref(errhandler_);
762   errhandler_ = errhandler;
763   if (errhandler_ != MPI_ERRHANDLER_NULL)
764     errhandler_->ref();
765 }
766 } // namespace smpi
767 } // namespace simgrid