Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Finally rename smpi::Group::actor_pid() back to actor().
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7
8 #include "private.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
16
17 #include <algorithm>
18
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
20
21 #define CHECK_RMA_REMOTE_WIN(fun, win)\
22   if(target_count*target_datatype->get_extent()>win->size_){\
23     XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
24     fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
25     simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
26     return MPI_ERR_RMA_RANGE;\
27   }
28
29 #define CHECK_WIN_LOCKED(win)\
30   if(opened_==0){ /*check that post/start has been done*/\
31     int locked=0;\
32     for (auto const& it : win->lockers_)\
33       if (it == comm_->rank())\
34         locked = 1;\
35     if(locked != 1)\
36       return MPI_ERR_WIN;\
37   }
38
39 namespace simgrid{
40 namespace smpi{
41 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
42 int Win::keyval_id_=0;
43
44 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic)
45     : base_(base)
46     , size_(size)
47     , disp_unit_(disp_unit)
48     , info_(info)
49     , comm_(comm)
50     , connected_wins_(comm->size())
51     , rank_(comm->rank())
52     , allocated_(allocated)
53     , dynamic_(dynamic)
54 {
55   XBT_DEBUG("Creating window");
56   if(info!=MPI_INFO_NULL)
57     info->ref();
58   connected_wins_[rank_] = this;
59   if(rank_==0){
60     bar_ = new s4u::Barrier(comm->size());
61   }
62   errhandler_->ref();
63   comm->add_rma_win(this);
64   comm->ref();
65
66   colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
67                    MPI_BYTE, comm);
68
69   colls::bcast(&bar_, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
70
71   colls::barrier(comm);
72   this->add_f();
73 }
74
75 Win::~Win(){
76   //As per the standard, perform a barrier to ensure every async comm is finished
77   bar_->wait();
78
79   int finished = finish_comms();
80   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
81
82   if (info_ != MPI_INFO_NULL)
83     simgrid::smpi::Info::unref(info_);
84   if (errhandler_ != MPI_ERRHANDLER_NULL)
85     simgrid::smpi::Errhandler::unref(errhandler_);
86
87   comm_->remove_rma_win(this);
88
89   colls::barrier(comm_);
90   Comm::unref(comm_);
91   
92   if (rank_ == 0)
93     delete bar_;
94
95   if(allocated_ !=0)
96     xbt_free(base_);
97
98   F2C::free_f(this->f2c_id());
99   cleanup_attr<Win>();
100 }
101
102 int Win::attach(void* /*base*/, MPI_Aint size)
103 {
104   if (not(base_ == MPI_BOTTOM || base_ == nullptr))
105     return MPI_ERR_ARG;
106   base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
107   size_+=size;
108   return MPI_SUCCESS;
109 }
110
111 int Win::detach(const void* /*base*/)
112 {
113   base_=MPI_BOTTOM;
114   size_=-1;
115   return MPI_SUCCESS;
116 }
117
118 void Win::get_name(char* name, int* length) const
119 {
120   *length = static_cast<int>(name_.length());
121   if (not name_.empty()) {
122     name_.copy(name, *length);
123     name[*length] = '\0';
124   }
125 }
126
127 void Win::get_group(MPI_Group* group){
128   if(comm_ != MPI_COMM_NULL){
129     *group = comm_->group();
130   } else {
131     *group = MPI_GROUP_NULL;
132   }
133 }
134
135 MPI_Info Win::info()
136 {
137   if (info_ == MPI_INFO_NULL)
138     info_ = new Info();
139   info_->ref();
140   return info_;
141 }
142
143 int Win::rank() const
144 {
145   return rank_;
146 }
147
148 MPI_Comm Win::comm() const
149 {
150   return comm_;
151 }
152
153 MPI_Aint Win::size() const
154 {
155   return size_;
156 }
157
158 void* Win::base() const
159 {
160   return base_;
161 }
162
163 int Win::disp_unit() const
164 {
165   return disp_unit_;
166 }
167
168 int Win::dynamic() const
169 {
170   return dynamic_;
171 }
172
173 void Win::set_info(MPI_Info info)
174 {
175   if (info_ != MPI_INFO_NULL)
176     simgrid::smpi::Info::unref(info_);
177   info_ = info;
178   if (info_ != MPI_INFO_NULL)
179     info_->ref();
180 }
181
182 void Win::set_name(const char* name){
183   name_ = name;
184 }
185
186 int Win::fence(int assert)
187 {
188   XBT_DEBUG("Entering fence");
189   if (opened_ == 0)
190     opened_=1;
191   if (assert != MPI_MODE_NOPRECEDE) {
192     // This is not the first fence => finalize what came before
193     bar_->wait();
194     mut_->lock();
195     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
196     // Without this, the vector could get redimensioned when another process pushes.
197     // This would result in the array used by Request::waitall() to be invalidated.
198     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
199
200     // start all requests that have been prepared by another process
201     if (not requests_.empty()) {
202       int size           = static_cast<int>(requests_.size());
203       MPI_Request* treqs = requests_.data();
204       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
205     }
206     count_=0;
207     mut_->unlock();
208   }
209
210   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
211     opened_=0;
212   assert_ = assert;
213
214   bar_->wait();
215   XBT_DEBUG("Leaving fence");
216
217   return MPI_SUCCESS;
218 }
219
220 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
221               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
222 {
223   //get receiver pointer
224   Win* recv_win = connected_wins_[target_rank];
225
226   if(opened_==0){//check that post/start has been done
227     // no fence or start .. lock ok ?
228     int locked=0;
229     for (auto const& it : recv_win->lockers_)
230       if (it == comm_->rank())
231         locked = 1;
232     if(locked != 1)
233       return MPI_ERR_WIN;
234   }
235
236   CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
237
238   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
239
240   if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
241     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
242     // prepare send_request
243     MPI_Request sreq =
244         // TODO cheinrich Check for rank / pid conversion
245         Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
246                                comm_, MPI_OP_NULL);
247
248     //prepare receiver request
249     // TODO cheinrich Check for rank / pid conversion
250     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
251                                               target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
252
253     //start send
254     sreq->start();
255
256     if(request!=nullptr){
257       *request=sreq;
258     }else{
259       mut_->lock();
260       requests_.push_back(sreq);
261       mut_->unlock();
262     }
263
264     //push request to receiver's win
265     recv_win->mut_->lock();
266     recv_win->requests_.push_back(rreq);
267     rreq->start();
268     recv_win->mut_->unlock();
269   } else {
270     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
271     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
272     if(request!=nullptr)
273       *request = MPI_REQUEST_NULL;
274   }
275
276   return MPI_SUCCESS;
277 }
278
279 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
280               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
281 {
282   //get sender pointer
283   Win* send_win = connected_wins_[target_rank];
284
285   CHECK_WIN_LOCKED(send_win)
286   CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
287
288   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
289   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
290
291   if(target_rank != comm_->rank()){
292     //prepare send_request
293     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
294                                               send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
295
296     //prepare receiver request
297     MPI_Request rreq = Request::rma_recv_init(
298         origin_addr, origin_count, origin_datatype, target_rank,
299         comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
300         SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
301
302     //start the send, with another process than us as sender.
303     sreq->start();
304     //push request to receiver's win
305     send_win->mut_->lock();
306     send_win->requests_.push_back(sreq);
307     send_win->mut_->unlock();
308
309     //start recv
310     rreq->start();
311
312     if(request!=nullptr){
313       *request=rreq;
314     }else{
315       mut_->lock();
316       requests_.push_back(rreq);
317       mut_->unlock();
318     }
319   } else {
320     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
321     if(request!=nullptr)
322       *request=MPI_REQUEST_NULL;
323   }
324   return MPI_SUCCESS;
325 }
326
327 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
328               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
329 {
330   XBT_DEBUG("Entering MPI_Win_Accumulate");
331   //get receiver pointer
332   Win* recv_win = connected_wins_[target_rank];
333
334   //FIXME: local version
335   CHECK_WIN_LOCKED(recv_win)
336   CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
337
338   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
339   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
340   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
341   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
342   // prepare send_request
343
344   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
345                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
346
347   // prepare receiver request
348   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
349                                             recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)),
350                                             SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
351
352   count_++;
353
354   // start send
355   sreq->start();
356   // push request to receiver's win
357   recv_win->mut_->lock();
358   recv_win->requests_.push_back(rreq);
359   rreq->start();
360   recv_win->mut_->unlock();
361
362   if (request != nullptr) {
363     *request = sreq;
364   } else {
365     mut_->lock();
366     requests_.push_back(sreq);
367     mut_->unlock();
368   }
369
370   XBT_DEBUG("Leaving MPI_Win_Accumulate");
371   return MPI_SUCCESS;
372 }
373
374 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
375                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
376                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
377 {
378   //get sender pointer
379   const Win* send_win = connected_wins_[target_rank];
380
381   CHECK_WIN_LOCKED(send_win)
382   CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
383
384   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
385   //need to be sure ops are correctly ordered, so finish request here ? slow.
386   MPI_Request req;
387   send_win->atomic_mut_->lock();
388   get(result_addr, result_count, result_datatype, target_rank,
389               target_disp, target_count, target_datatype, &req);
390   if (req != MPI_REQUEST_NULL)
391     Request::wait(&req, MPI_STATUS_IGNORE);
392   if(op!=MPI_NO_OP)
393     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
394               target_disp, target_count, target_datatype, op, &req);
395   if (req != MPI_REQUEST_NULL)
396     Request::wait(&req, MPI_STATUS_IGNORE);
397   send_win->atomic_mut_->unlock();
398   return MPI_SUCCESS;
399 }
400
401 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
402                           int target_rank, MPI_Aint target_disp)
403 {
404   //get sender pointer
405   const Win* send_win = connected_wins_[target_rank];
406
407   CHECK_WIN_LOCKED(send_win)
408
409   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
410   MPI_Request req = MPI_REQUEST_NULL;
411   send_win->atomic_mut_->lock();
412   get(result_addr, 1, datatype, target_rank,
413               target_disp, 1, datatype, &req);
414   if (req != MPI_REQUEST_NULL)
415     Request::wait(&req, MPI_STATUS_IGNORE);
416   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
417     put(origin_addr, 1, datatype, target_rank,
418               target_disp, 1, datatype);
419   }
420   send_win->atomic_mut_->unlock();
421   return MPI_SUCCESS;
422 }
423
424 int Win::start(MPI_Group group, int /*assert*/)
425 {
426   /* From MPI forum advices
427   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
428   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
429   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
430   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
431   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
432   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
433   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
434   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
435   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
436   must complete, without further dependencies.  */
437
438   //naive, blocking implementation.
439   int i             = 0;
440   int j             = 0;
441   int size          = group->size();
442   std::vector<MPI_Request> reqs(size);
443
444   XBT_DEBUG("Entering MPI_Win_Start");
445   while (j != size) {
446     int src = comm_->group()->rank(group->actor(j));
447     if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
448       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
449       i++;
450     }
451     j++;
452   }
453   size = i;
454   Request::startall(size, reqs.data());
455   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
456   for (i = 0; i < size; i++) {
457     Request::unref(&reqs[i]);
458   }
459   opened_++; //we're open for business !
460   group_=group;
461   group->ref();
462   XBT_DEBUG("Leaving MPI_Win_Start");
463   return MPI_SUCCESS;
464 }
465
466 int Win::post(MPI_Group group, int /*assert*/)
467 {
468   //let's make a synchronous send here
469   int i             = 0;
470   int j             = 0;
471   int size = group->size();
472   std::vector<MPI_Request> reqs(size);
473
474   XBT_DEBUG("Entering MPI_Win_Post");
475   while(j!=size){
476     int dst = comm_->group()->rank(group->actor(j));
477     if (dst != rank_ && dst != MPI_UNDEFINED) {
478       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
479       i++;
480     }
481     j++;
482   }
483   size=i;
484
485   Request::startall(size, reqs.data());
486   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
487   for(i=0;i<size;i++){
488     Request::unref(&reqs[i]);
489   }
490   opened_++; //we're open for business !
491   group_=group;
492   group->ref();
493   XBT_DEBUG("Leaving MPI_Win_Post");
494   return MPI_SUCCESS;
495 }
496
497 int Win::complete(){
498   xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
499
500   XBT_DEBUG("Entering MPI_Win_Complete");
501   int i             = 0;
502   int j             = 0;
503   int size          = group_->size();
504   std::vector<MPI_Request> reqs(size);
505
506   while(j!=size){
507     int dst = comm_->group()->rank(group_->actor(j));
508     if (dst != rank_ && dst != MPI_UNDEFINED) {
509       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
510       i++;
511     }
512     j++;
513   }
514   size=i;
515   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
516   Request::startall(size, reqs.data());
517   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
518
519   for(i=0;i<size;i++){
520     Request::unref(&reqs[i]);
521   }
522
523   int finished = finish_comms();
524   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
525
526   Group::unref(group_);
527   opened_--; //we're closed for business !
528   return MPI_SUCCESS;
529 }
530
531 int Win::wait(){
532   //naive, blocking implementation.
533   XBT_DEBUG("Entering MPI_Win_Wait");
534   int i             = 0;
535   int j             = 0;
536   int size          = group_->size();
537   std::vector<MPI_Request> reqs(size);
538
539   while(j!=size){
540     int src = comm_->group()->rank(group_->actor(j));
541     if (src != rank_ && src != MPI_UNDEFINED) {
542       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
543       i++;
544     }
545     j++;
546   }
547   size=i;
548   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
549   Request::startall(size, reqs.data());
550   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
551   for(i=0;i<size;i++){
552     Request::unref(&reqs[i]);
553   }
554   int finished = finish_comms();
555   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
556
557   Group::unref(group_);
558   opened_--; //we're closed for business !
559   return MPI_SUCCESS;
560 }
561
562 int Win::lock(int lock_type, int rank, int /*assert*/)
563 {
564   MPI_Win target_win = connected_wins_[rank];
565
566   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
567     target_win->lock_mut_->lock();
568     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
569     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
570       target_win->lock_mut_->unlock();
571    }
572   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
573     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
574
575   target_win->lockers_.push_back(comm_->rank());
576
577   int finished = finish_comms(rank);
578   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
579   finished = target_win->finish_comms(rank_);
580   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
581   return MPI_SUCCESS;
582 }
583
584 int Win::lock_all(int assert){
585   int retval = MPI_SUCCESS;
586   for (int i = 0; i < comm_->size(); i++) {
587     int ret = this->lock(MPI_LOCK_SHARED, i, assert);
588     if (ret != MPI_SUCCESS)
589       retval = ret;
590   }
591   return retval;
592 }
593
594 int Win::unlock(int rank){
595   MPI_Win target_win = connected_wins_[rank];
596   int target_mode = target_win->mode_;
597   target_win->mode_= 0;
598   target_win->lockers_.remove(comm_->rank());
599   if (target_mode==MPI_LOCK_EXCLUSIVE){
600     target_win->lock_mut_->unlock();
601   }
602
603   int finished = finish_comms(rank);
604   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
605   finished = target_win->finish_comms(rank_);
606   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
607   return MPI_SUCCESS;
608 }
609
610 int Win::unlock_all(){
611   int retval = MPI_SUCCESS;
612   for (int i = 0; i < comm_->size(); i++) {
613     int ret = this->unlock(i);
614     if (ret != MPI_SUCCESS)
615       retval = ret;
616   }
617   return retval;
618 }
619
620 int Win::flush(int rank){
621   MPI_Win target_win = connected_wins_[rank];
622   int finished       = finish_comms(rank_);
623   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
624   finished = target_win->finish_comms(rank);
625   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
626   return MPI_SUCCESS;
627 }
628
629 int Win::flush_local(int rank){
630   int finished = finish_comms(rank);
631   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
632   return MPI_SUCCESS;
633 }
634
635 int Win::flush_all(){
636   int finished = finish_comms();
637   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
638   for (int i = 0; i < comm_->size(); i++) {
639     finished = connected_wins_[i]->finish_comms(rank_);
640     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
641   }
642   return MPI_SUCCESS;
643 }
644
645 int Win::flush_local_all(){
646   int finished = finish_comms();
647   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
648   return MPI_SUCCESS;
649 }
650
651 Win* Win::f2c(int id){
652   return static_cast<Win*>(F2C::f2c(id));
653 }
654
655 int Win::finish_comms(){
656   mut_->lock();
657   //Finish own requests
658   int size = static_cast<int>(requests_.size());
659   if (size > 0) {
660     MPI_Request* treqs = requests_.data();
661     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
662     requests_.clear();
663   }
664   mut_->unlock();
665   return size;
666 }
667
668 int Win::finish_comms(int rank){
669   mut_->lock();
670   // Finish own requests
671   // Let's see if we're either the destination or the sender of this request
672   // because we only wait for requests that we are responsible for.
673   // Also use the process id here since the request itself returns from src()
674   // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
675   int proc_id = comm_->group()->actor(rank);
676   auto it     = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
677     return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
678   });
679   std::vector<MPI_Request> myreqqs(it, end(requests_));
680   requests_.erase(it, end(requests_));
681   int size = static_cast<int>(myreqqs.size());
682   if (size > 0) {
683     MPI_Request* treqs = myreqqs.data();
684     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
685     myreqqs.clear();
686   }
687   mut_->unlock();
688   return size;
689 }
690
691 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
692 {
693   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
694   for (int i = 0; not target_win && i < comm_->size(); i++) {
695     if (connected_wins_[i]->size_ > 0)
696       target_win = connected_wins_[i];
697   }
698   if (target_win) {
699     *size                         = target_win->size_;
700     *disp_unit                    = target_win->disp_unit_;
701     *static_cast<void**>(baseptr) = target_win->base_;
702   } else {
703     *size                         = 0;
704     *static_cast<void**>(baseptr) = nullptr;
705   }
706   return MPI_SUCCESS;
707 }
708
709 MPI_Errhandler Win::errhandler()
710 {
711   if (errhandler_ != MPI_ERRHANDLER_NULL)
712     errhandler_->ref();
713   return errhandler_;
714 }
715
716 void Win::set_errhandler(MPI_Errhandler errhandler)
717 {
718   if (errhandler_ != MPI_ERRHANDLER_NULL)
719     simgrid::smpi::Errhandler::unref(errhandler_);
720   errhandler_ = errhandler;
721   if (errhandler_ != MPI_ERRHANDLER_NULL)
722     errhandler_->ref();
723 }
724 } // namespace smpi
725 } // namespace simgrid