Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Coding style: prefer keyword "not" to "!".
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2022. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <simgrid/modelchecker.h>
7 #include "smpi_win.hpp"
8
9 #include "private.hpp"
10 #include "smpi_coll.hpp"
11 #include "smpi_comm.hpp"
12 #include "smpi_datatype.hpp"
13 #include "smpi_info.hpp"
14 #include "smpi_keyvals.hpp"
15 #include "smpi_request.hpp"
16 #include "src/smpi/include/smpi_actor.hpp"
17 #include "src/mc/mc_replay.hpp"
18
19 #include <algorithm>
20
21 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
22
23 #define CHECK_RMA_REMOTE_WIN(fun, win)\
24   if(target_count*target_datatype->get_extent()>win->size_){\
25     XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
26     fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
27     simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
28     return MPI_ERR_RMA_RANGE;\
29   }
30
31 #define CHECK_WIN_LOCKED(win)                                                                                          \
32   if (opened_ == 0) { /*check that post/start has been done*/                                                          \
33     bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
34     if (not locked)                                                                                                    \
35       return MPI_ERR_WIN;                                                                                              \
36   }
37
38 namespace simgrid::smpi {
39 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
40 int Win::keyval_id_=0;
41
42 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
43     : base_(base)
44     , size_(size)
45     , disp_unit_(disp_unit)
46     , info_(info)
47     , comm_(comm)
48     , connected_wins_(comm->size())
49     , rank_(comm->rank())
50     , allocated_(allocated)
51     , dynamic_(dynamic)
52 {
53   XBT_DEBUG("Creating window");
54   if(info!=MPI_INFO_NULL)
55     info->ref();
56   connected_wins_[rank_] = this;
57   errhandler_->ref();
58   comm->add_rma_win(this);
59   comm->ref();
60
61   colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
62                    MPI_BYTE, comm);
63   if  (MC_is_active() || MC_record_replay_is_active()){
64     s4u::Barrier* bar_ptr;
65     if (rank_ == 0) {
66       bar_ = s4u::Barrier::create(comm->size());
67       bar_ptr = bar_.get();
68     }
69     colls::bcast(&bar_ptr, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
70     if (rank_ != 0)
71       bar_ = s4u::BarrierPtr(bar_ptr);
72   }
73   this->add_f();
74 }
75
76 int Win::del(Win* win){
77   //As per the standard, perform a barrier to ensure every async comm is finished
78   if  (MC_is_active() || MC_record_replay_is_active())
79     win->bar_->wait();
80   else
81     colls::barrier(win->comm_);
82   win->flush_local_all();
83
84   if (win->info_ != MPI_INFO_NULL)
85     simgrid::smpi::Info::unref(win->info_);
86   if (win->errhandler_ != MPI_ERRHANDLER_NULL)
87     simgrid::smpi::Errhandler::unref(win->errhandler_);
88
89   win->comm_->remove_rma_win(win);
90
91   colls::barrier(win->comm_);
92   Comm::unref(win->comm_);
93   if (not win->lockers_.empty() || win->opened_ < 0) {
94     XBT_WARN("Freeing a locked or opened window");
95     return MPI_ERR_WIN;
96   }
97   if (win->allocated_)
98     xbt_free(win->base_);
99
100   F2C::free_f(win->f2c_id());
101   win->cleanup_attr<Win>();
102
103   delete win;
104   return MPI_SUCCESS;
105 }
106
107 int Win::attach(void* /*base*/, MPI_Aint size)
108 {
109   if (not(base_ == MPI_BOTTOM || base_ == nullptr))
110     return MPI_ERR_ARG;
111   base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
112   size_+=size;
113   return MPI_SUCCESS;
114 }
115
116 int Win::detach(const void* /*base*/)
117 {
118   base_=MPI_BOTTOM;
119   size_=-1;
120   return MPI_SUCCESS;
121 }
122
123 void Win::get_name(char* name, int* length) const
124 {
125   *length = static_cast<int>(name_.length());
126   if (not name_.empty()) {
127     name_.copy(name, *length);
128     name[*length] = '\0';
129   }
130 }
131
132 void Win::get_group(MPI_Group* group){
133   if(comm_ != MPI_COMM_NULL){
134     *group = comm_->group();
135   } else {
136     *group = MPI_GROUP_NULL;
137   }
138 }
139
140 MPI_Info Win::info()
141 {
142   return info_;
143 }
144
145 int Win::rank() const
146 {
147   return rank_;
148 }
149
150 MPI_Comm Win::comm() const
151 {
152   return comm_;
153 }
154
155 MPI_Aint Win::size() const
156 {
157   return size_;
158 }
159
160 void* Win::base() const
161 {
162   return base_;
163 }
164
165 int Win::disp_unit() const
166 {
167   return disp_unit_;
168 }
169
170 bool Win::dynamic() const
171 {
172   return dynamic_;
173 }
174
175 void Win::set_info(MPI_Info info)
176 {
177   if (info_ != MPI_INFO_NULL)
178     simgrid::smpi::Info::unref(info_);
179   info_ = info;
180   if (info_ != MPI_INFO_NULL)
181     info_->ref();
182 }
183
184 void Win::set_name(const char* name){
185   name_ = name;
186 }
187
188 int Win::fence(int assert)
189 {
190   XBT_DEBUG("Entering fence");
191   opened_++;
192   if (not (assert & MPI_MODE_NOPRECEDE)) {
193     // This is not the first fence => finalize what came before
194     if (MC_is_active() || MC_record_replay_is_active())
195       bar_->wait();
196     else
197       colls::barrier(comm_);
198     flush_local_all();
199     count_=0;
200   }
201
202   if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
203     opened_=0;
204   assert_ = assert;
205   if (MC_is_active() || MC_record_replay_is_active())
206     bar_->wait();
207   else
208     colls::barrier(comm_);
209   XBT_DEBUG("Leaving fence");
210
211   return MPI_SUCCESS;
212 }
213
214 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
215               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
216 {
217   //get receiver pointer
218   Win* recv_win = connected_wins_[target_rank];
219
220   CHECK_WIN_LOCKED(recv_win)
221   CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
222
223   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
224
225   if (target_rank != rank_) { // This is not for myself, so we need to send messages
226     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
227     // prepare send_request
228     MPI_Request sreq =
229         Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
230                                MPI_OP_NULL);
231
232     //prepare receiver request
233     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
234                                               SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
235
236     //start send
237     sreq->start();
238
239     if(request!=nullptr){
240       *request=sreq;
241     }else{
242       mut_->lock();
243       requests_.push_back(sreq);
244       mut_->unlock();
245     }
246
247     //push request to receiver's win
248     recv_win->mut_->lock();
249     recv_win->requests_.push_back(rreq);
250     rreq->start();
251     recv_win->mut_->unlock();
252   } else {
253     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
254     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
255     if(request!=nullptr)
256       *request = MPI_REQUEST_NULL;
257   }
258
259   return MPI_SUCCESS;
260 }
261
262 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
263               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
264 {
265   //get sender pointer
266   Win* send_win = connected_wins_[target_rank];
267
268   CHECK_WIN_LOCKED(send_win)
269   CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
270
271   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
272   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
273
274   if (target_rank != rank_) {
275     //prepare send_request
276     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
277                                               SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
278
279     //prepare receiver request
280     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
281                                               SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
282
283     //start the send, with another process than us as sender.
284     sreq->start();
285     // push request to sender's win
286     send_win->mut_->lock();
287     send_win->requests_.push_back(sreq);
288     send_win->mut_->unlock();
289
290     //start recv
291     rreq->start();
292
293     if(request!=nullptr){
294       *request=rreq;
295     }else{
296       mut_->lock();
297       requests_.push_back(rreq);
298       mut_->unlock();
299     }
300   } else {
301     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
302     if(request!=nullptr)
303       *request=MPI_REQUEST_NULL;
304   }
305   return MPI_SUCCESS;
306 }
307
308 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
309               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
310 {
311   XBT_DEBUG("Entering MPI_Win_Accumulate");
312   //get receiver pointer
313   Win* recv_win = connected_wins_[target_rank];
314
315   //FIXME: local version
316   CHECK_WIN_LOCKED(recv_win)
317   CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
318
319   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
320   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
321   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
322   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
323   // prepare send_request
324
325   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
326                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
327
328   // prepare receiver request
329   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
330                                             SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
331
332   count_++;
333
334   // start send
335   sreq->start();
336   // push request to receiver's win
337   recv_win->mut_->lock();
338   recv_win->requests_.push_back(rreq);
339   rreq->start();
340   recv_win->mut_->unlock();
341
342   if (request != nullptr) {
343     *request = sreq;
344   } else {
345     mut_->lock();
346     requests_.push_back(sreq);
347     mut_->unlock();
348   }
349
350   // FIXME: The current implementation fails to ensure the correct ordering of the accumulate requests.  The following
351   // 'flush' is a workaround to fix that.
352   flush(target_rank);
353   XBT_DEBUG("Leaving MPI_Win_Accumulate");
354   return MPI_SUCCESS;
355 }
356
357 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
358                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
359                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
360 {
361   //get sender pointer
362   const Win* send_win = connected_wins_[target_rank];
363
364   CHECK_WIN_LOCKED(send_win)
365   CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
366
367   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
368   //need to be sure ops are correctly ordered, so finish request here ? slow.
369   MPI_Request req = MPI_REQUEST_NULL;
370   send_win->atomic_mut_->lock();
371   get(result_addr, result_count, result_datatype, target_rank,
372               target_disp, target_count, target_datatype, &req);
373   if (req != MPI_REQUEST_NULL)
374     Request::wait(&req, MPI_STATUS_IGNORE);
375   if(op!=MPI_NO_OP)
376     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
377               target_disp, target_count, target_datatype, op, &req);
378   if (req != MPI_REQUEST_NULL)
379     Request::wait(&req, MPI_STATUS_IGNORE);
380   send_win->atomic_mut_->unlock();
381   return MPI_SUCCESS;
382 }
383
384 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
385                           int target_rank, MPI_Aint target_disp)
386 {
387   //get sender pointer
388   const Win* send_win = connected_wins_[target_rank];
389
390   CHECK_WIN_LOCKED(send_win)
391
392   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
393   MPI_Request req = MPI_REQUEST_NULL;
394   send_win->atomic_mut_->lock();
395   get(result_addr, 1, datatype, target_rank,
396               target_disp, 1, datatype, &req);
397   if (req != MPI_REQUEST_NULL)
398     Request::wait(&req, MPI_STATUS_IGNORE);
399   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
400     put(origin_addr, 1, datatype, target_rank,
401               target_disp, 1, datatype);
402   }
403   send_win->atomic_mut_->unlock();
404   return MPI_SUCCESS;
405 }
406
407 int Win::start(MPI_Group group, int /*assert*/)
408 {
409   /* From MPI forum advices
410   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
411   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
412   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
413   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
414   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
415   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
416   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
417   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
418   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
419   must complete, without further dependencies.  */
420
421   //naive, blocking implementation.
422   XBT_DEBUG("Entering MPI_Win_Start");
423   std::vector<MPI_Request> reqs;
424   for (int i = 0; i < group->size(); i++) {
425     int src = comm_->group()->rank(group->actor(i));
426     xbt_assert(src != MPI_UNDEFINED);
427     if (src != rank_)
428       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
429   }
430   int size = static_cast<int>(reqs.size());
431
432   Request::startall(size, reqs.data());
433   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
434   for (auto& req : reqs)
435     Request::unref(&req);
436
437   group->ref();
438   dst_group_ = group;
439   opened_--; // we're open for business !
440   XBT_DEBUG("Leaving MPI_Win_Start");
441   return MPI_SUCCESS;
442 }
443
444 int Win::post(MPI_Group group, int /*assert*/)
445 {
446   //let's make a synchronous send here
447   XBT_DEBUG("Entering MPI_Win_Post");
448   std::vector<MPI_Request> reqs;
449   for (int i = 0; i < group->size(); i++) {
450     int dst = comm_->group()->rank(group->actor(i));
451     xbt_assert(dst != MPI_UNDEFINED);
452     if (dst != rank_)
453       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
454   }
455   int size = static_cast<int>(reqs.size());
456
457   Request::startall(size, reqs.data());
458   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
459   for (auto& req : reqs)
460     Request::unref(&req);
461
462   group->ref();
463   src_group_ = group;
464   opened_--; // we're open for business !
465   XBT_DEBUG("Leaving MPI_Win_Post");
466   return MPI_SUCCESS;
467 }
468
469 int Win::complete(){
470   xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
471
472   XBT_DEBUG("Entering MPI_Win_Complete");
473   std::vector<MPI_Request> reqs;
474   for (int i = 0; i < dst_group_->size(); i++) {
475     int dst = comm_->group()->rank(dst_group_->actor(i));
476     xbt_assert(dst != MPI_UNDEFINED);
477     if (dst != rank_)
478       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
479   }
480   int size = static_cast<int>(reqs.size());
481
482   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
483   Request::startall(size, reqs.data());
484   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
485   for (auto& req : reqs)
486     Request::unref(&req);
487
488   flush_local_all();
489
490   opened_++; //we're closed for business !
491   Group::unref(dst_group_);
492   dst_group_ = MPI_GROUP_NULL;
493   return MPI_SUCCESS;
494 }
495
496 int Win::wait(){
497   //naive, blocking implementation.
498   XBT_DEBUG("Entering MPI_Win_Wait");
499   std::vector<MPI_Request> reqs;
500   for (int i = 0; i < src_group_->size(); i++) {
501     int src = comm_->group()->rank(src_group_->actor(i));
502     xbt_assert(src != MPI_UNDEFINED);
503     if (src != rank_)
504       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
505   }
506   int size = static_cast<int>(reqs.size());
507
508   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
509   Request::startall(size, reqs.data());
510   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
511   for (auto& req : reqs)
512     Request::unref(&req);
513
514   flush_local_all();
515
516   opened_++; //we're closed for business !
517   Group::unref(src_group_);
518   src_group_ = MPI_GROUP_NULL;
519   return MPI_SUCCESS;
520 }
521
522 int Win::lock(int lock_type, int rank, int /*assert*/)
523 {
524   MPI_Win target_win = connected_wins_[rank];
525
526   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
527     target_win->lock_mut_->lock();
528     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
529     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
530       target_win->lock_mut_->unlock();
531    }
532   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
533     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
534
535   target_win->lockers_.push_back(rank_);
536
537   flush(rank);
538   return MPI_SUCCESS;
539 }
540
541 int Win::lock_all(int assert){
542   int retval = MPI_SUCCESS;
543   for (int i = 0; i < comm_->size(); i++) {
544     int ret = this->lock(MPI_LOCK_SHARED, i, assert);
545     if (ret != MPI_SUCCESS)
546       retval = ret;
547   }
548   return retval;
549 }
550
551 int Win::unlock(int rank){
552   MPI_Win target_win = connected_wins_[rank];
553   int target_mode = target_win->mode_;
554   target_win->mode_= 0;
555   target_win->lockers_.remove(rank_);
556   if (target_mode==MPI_LOCK_EXCLUSIVE){
557     target_win->lock_mut_->unlock();
558   }
559
560   flush(rank);
561   return MPI_SUCCESS;
562 }
563
564 int Win::unlock_all(){
565   int retval = MPI_SUCCESS;
566   for (int i = 0; i < comm_->size(); i++) {
567     int ret = this->unlock(i);
568     if (ret != MPI_SUCCESS)
569       retval = ret;
570   }
571   return retval;
572 }
573
574 int Win::flush(int rank){
575   int finished = finish_comms(rank);
576   XBT_DEBUG("Win_flush on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
577   if (rank != rank_) {
578     finished = connected_wins_[rank]->finish_comms(rank_);
579     XBT_DEBUG("Win_flush on remote %d for local %d - Finished %d RMA calls", rank, rank_, finished);
580   }
581   return MPI_SUCCESS;
582 }
583
584 int Win::flush_local(int rank){
585   int finished = finish_comms(rank);
586   XBT_DEBUG("Win_flush_local on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
587   return MPI_SUCCESS;
588 }
589
590 int Win::flush_all(){
591   int finished = finish_comms();
592   XBT_DEBUG("Win_flush_all on local %d - Finished %d RMA calls", rank_, finished);
593   for (int i = 0; i < comm_->size(); i++) {
594     if (i != rank_) {
595       finished = connected_wins_[i]->finish_comms(rank_);
596       XBT_DEBUG("Win_flush_all on remote %d for local %d - Finished %d RMA calls", i, rank_, finished);
597     }
598   }
599   return MPI_SUCCESS;
600 }
601
602 int Win::flush_local_all(){
603   int finished = finish_comms();
604   XBT_DEBUG("Win_flush_local_all on local %d - Finished %d RMA calls", rank_, finished);
605   return MPI_SUCCESS;
606 }
607
608 Win* Win::f2c(int id){
609   return static_cast<Win*>(F2C::f2c(id));
610 }
611
612 int Win::finish_comms(){
613   // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
614   // Without this, the vector could get redimensioned when another process pushes.
615   // This would result in the array used by Request::waitall() to be invalidated.
616   // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
617   mut_->lock();
618   //Finish own requests
619   int size = static_cast<int>(requests_.size());
620   if (size > 0) {
621     MPI_Request* treqs = requests_.data();
622     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
623     requests_.clear();
624   }
625   mut_->unlock();
626   return size;
627 }
628
629 int Win::finish_comms(int rank){
630   // See comment about the mutex in finish_comms() above
631   mut_->lock();
632   // Finish own requests
633   // Let's see if we're either the destination or the sender of this request
634   // because we only wait for requests that we are responsible for.
635   // Also use the process id here since the request itself returns from src()
636   // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
637   aid_t proc_id = comm_->group()->actor(rank);
638   auto it     = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
639     return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
640   });
641   std::vector<MPI_Request> myreqqs(it, end(requests_));
642   requests_.erase(it, end(requests_));
643   int size = static_cast<int>(myreqqs.size());
644   if (size > 0) {
645     MPI_Request* treqs = myreqqs.data();
646     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
647     myreqqs.clear();
648   }
649   mut_->unlock();
650   return size;
651 }
652
653 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
654 {
655   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
656   for (int i = 0; not target_win && i < comm_->size(); i++) {
657     if (connected_wins_[i]->size_ > 0)
658       target_win = connected_wins_[i];
659   }
660   if (target_win) {
661     *size                         = target_win->size_;
662     *disp_unit                    = target_win->disp_unit_;
663     *static_cast<void**>(baseptr) = target_win->base_;
664   } else {
665     *size                         = 0;
666     *static_cast<void**>(baseptr) = nullptr;
667   }
668   return MPI_SUCCESS;
669 }
670
671 MPI_Errhandler Win::errhandler()
672 {
673   if (errhandler_ != MPI_ERRHANDLER_NULL)
674     errhandler_->ref();
675   return errhandler_;
676 }
677
678 void Win::set_errhandler(MPI_Errhandler errhandler)
679 {
680   if (errhandler_ != MPI_ERRHANDLER_NULL)
681     simgrid::smpi::Errhandler::unref(errhandler_);
682   errhandler_ = errhandler;
683   if (errhandler_ != MPI_ERRHANDLER_NULL)
684     errhandler_->ref();
685 }
686 } // namespace simgrid::smpi