Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'factor_in_actions' into 'master'
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7
8 #include "private.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
16
17 #include <algorithm>
18
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
20
21 #define CHECK_RMA_REMOTE_WIN(fun, win)\
22   if(target_count*target_datatype->get_extent()>win->size_){\
23     XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
24     fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
25     simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
26     return MPI_ERR_RMA_RANGE;\
27   }
28
29 #define CHECK_WIN_LOCKED(win)                                                                                          \
30   if (opened_ == 0) { /*check that post/start has been done*/                                                          \
31     bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
32     if (not locked)                                                                                                    \
33       return MPI_ERR_WIN;                                                                                              \
34   }
35
36 namespace simgrid{
37 namespace smpi{
38 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
39 int Win::keyval_id_=0;
40
41 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
42     : base_(base)
43     , size_(size)
44     , disp_unit_(disp_unit)
45     , info_(info)
46     , comm_(comm)
47     , connected_wins_(comm->size())
48     , rank_(comm->rank())
49     , allocated_(allocated)
50     , dynamic_(dynamic)
51 {
52   XBT_DEBUG("Creating window");
53   if(info!=MPI_INFO_NULL)
54     info->ref();
55   connected_wins_[rank_] = this;
56   if(rank_==0){
57     bar_ = new s4u::Barrier(comm->size());
58   }
59   errhandler_->ref();
60   comm->add_rma_win(this);
61   comm->ref();
62
63   colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
64                    MPI_BYTE, comm);
65
66   colls::bcast(&bar_, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
67
68   colls::barrier(comm);
69   this->add_f();
70 }
71
72 Win::~Win(){
73   //As per the standard, perform a barrier to ensure every async comm is finished
74   bar_->wait();
75
76   flush_local_all();
77
78   if (info_ != MPI_INFO_NULL)
79     simgrid::smpi::Info::unref(info_);
80   if (errhandler_ != MPI_ERRHANDLER_NULL)
81     simgrid::smpi::Errhandler::unref(errhandler_);
82
83   comm_->remove_rma_win(this);
84
85   colls::barrier(comm_);
86   Comm::unref(comm_);
87   
88   if (rank_ == 0)
89     delete bar_;
90
91   if (allocated_)
92     xbt_free(base_);
93
94   F2C::free_f(this->f2c_id());
95   cleanup_attr<Win>();
96 }
97
98 int Win::attach(void* /*base*/, MPI_Aint size)
99 {
100   if (not(base_ == MPI_BOTTOM || base_ == nullptr))
101     return MPI_ERR_ARG;
102   base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
103   size_+=size;
104   return MPI_SUCCESS;
105 }
106
107 int Win::detach(const void* /*base*/)
108 {
109   base_=MPI_BOTTOM;
110   size_=-1;
111   return MPI_SUCCESS;
112 }
113
114 void Win::get_name(char* name, int* length) const
115 {
116   *length = static_cast<int>(name_.length());
117   if (not name_.empty()) {
118     name_.copy(name, *length);
119     name[*length] = '\0';
120   }
121 }
122
123 void Win::get_group(MPI_Group* group){
124   if(comm_ != MPI_COMM_NULL){
125     *group = comm_->group();
126   } else {
127     *group = MPI_GROUP_NULL;
128   }
129 }
130
131 MPI_Info Win::info()
132 {
133   if (info_ == MPI_INFO_NULL)
134     info_ = new Info();
135   info_->ref();
136   return info_;
137 }
138
139 int Win::rank() const
140 {
141   return rank_;
142 }
143
144 MPI_Comm Win::comm() const
145 {
146   return comm_;
147 }
148
149 MPI_Aint Win::size() const
150 {
151   return size_;
152 }
153
154 void* Win::base() const
155 {
156   return base_;
157 }
158
159 int Win::disp_unit() const
160 {
161   return disp_unit_;
162 }
163
164 bool Win::dynamic() const
165 {
166   return dynamic_;
167 }
168
169 void Win::set_info(MPI_Info info)
170 {
171   if (info_ != MPI_INFO_NULL)
172     simgrid::smpi::Info::unref(info_);
173   info_ = info;
174   if (info_ != MPI_INFO_NULL)
175     info_->ref();
176 }
177
178 void Win::set_name(const char* name){
179   name_ = name;
180 }
181
182 int Win::fence(int assert)
183 {
184   XBT_DEBUG("Entering fence");
185   opened_++;
186   if (not (assert & MPI_MODE_NOPRECEDE)) {
187     // This is not the first fence => finalize what came before
188     bar_->wait();
189     flush_local_all();
190     count_=0;
191   }
192
193   if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
194     opened_=0;
195   assert_ = assert;
196
197   bar_->wait();
198   XBT_DEBUG("Leaving fence");
199
200   return MPI_SUCCESS;
201 }
202
203 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
204               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
205 {
206   //get receiver pointer
207   Win* recv_win = connected_wins_[target_rank];
208
209   CHECK_WIN_LOCKED(recv_win)
210   CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
211
212   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
213
214   if (target_rank != rank_) { // This is not for myself, so we need to send messages
215     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
216     // prepare send_request
217     MPI_Request sreq =
218         Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
219                                MPI_OP_NULL);
220
221     //prepare receiver request
222     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
223                                               SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
224
225     //start send
226     sreq->start();
227
228     if(request!=nullptr){
229       *request=sreq;
230     }else{
231       mut_->lock();
232       requests_.push_back(sreq);
233       mut_->unlock();
234     }
235
236     //push request to receiver's win
237     recv_win->mut_->lock();
238     recv_win->requests_.push_back(rreq);
239     rreq->start();
240     recv_win->mut_->unlock();
241   } else {
242     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
243     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
244     if(request!=nullptr)
245       *request = MPI_REQUEST_NULL;
246   }
247
248   return MPI_SUCCESS;
249 }
250
251 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
252               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
253 {
254   //get sender pointer
255   Win* send_win = connected_wins_[target_rank];
256
257   CHECK_WIN_LOCKED(send_win)
258   CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
259
260   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
261   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
262
263   if (target_rank != rank_) {
264     //prepare send_request
265     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
266                                               SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
267
268     //prepare receiver request
269     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
270                                               SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
271
272     //start the send, with another process than us as sender.
273     sreq->start();
274     // push request to sender's win
275     send_win->mut_->lock();
276     send_win->requests_.push_back(sreq);
277     send_win->mut_->unlock();
278
279     //start recv
280     rreq->start();
281
282     if(request!=nullptr){
283       *request=rreq;
284     }else{
285       mut_->lock();
286       requests_.push_back(rreq);
287       mut_->unlock();
288     }
289   } else {
290     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
291     if(request!=nullptr)
292       *request=MPI_REQUEST_NULL;
293   }
294   return MPI_SUCCESS;
295 }
296
297 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
298               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
299 {
300   XBT_DEBUG("Entering MPI_Win_Accumulate");
301   //get receiver pointer
302   Win* recv_win = connected_wins_[target_rank];
303
304   //FIXME: local version
305   CHECK_WIN_LOCKED(recv_win)
306   CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
307
308   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
309   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
310   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
311   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
312   // prepare send_request
313
314   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
315                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
316
317   // prepare receiver request
318   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
319                                             SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
320
321   count_++;
322
323   // start send
324   sreq->start();
325   // push request to receiver's win
326   recv_win->mut_->lock();
327   recv_win->requests_.push_back(rreq);
328   rreq->start();
329   recv_win->mut_->unlock();
330
331   if (request != nullptr) {
332     *request = sreq;
333   } else {
334     mut_->lock();
335     requests_.push_back(sreq);
336     mut_->unlock();
337   }
338
339   // FIXME: The current implementation fails to ensure the correct ordering of the accumulate requests.  The following
340   // 'flush' is a workaround to fix that.
341   flush(target_rank);
342   XBT_DEBUG("Leaving MPI_Win_Accumulate");
343   return MPI_SUCCESS;
344 }
345
346 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
347                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
348                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
349 {
350   //get sender pointer
351   const Win* send_win = connected_wins_[target_rank];
352
353   CHECK_WIN_LOCKED(send_win)
354   CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
355
356   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
357   //need to be sure ops are correctly ordered, so finish request here ? slow.
358   MPI_Request req = MPI_REQUEST_NULL;
359   send_win->atomic_mut_->lock();
360   get(result_addr, result_count, result_datatype, target_rank,
361               target_disp, target_count, target_datatype, &req);
362   if (req != MPI_REQUEST_NULL)
363     Request::wait(&req, MPI_STATUS_IGNORE);
364   if(op!=MPI_NO_OP)
365     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
366               target_disp, target_count, target_datatype, op, &req);
367   if (req != MPI_REQUEST_NULL)
368     Request::wait(&req, MPI_STATUS_IGNORE);
369   send_win->atomic_mut_->unlock();
370   return MPI_SUCCESS;
371 }
372
373 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
374                           int target_rank, MPI_Aint target_disp)
375 {
376   //get sender pointer
377   const Win* send_win = connected_wins_[target_rank];
378
379   CHECK_WIN_LOCKED(send_win)
380
381   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
382   MPI_Request req = MPI_REQUEST_NULL;
383   send_win->atomic_mut_->lock();
384   get(result_addr, 1, datatype, target_rank,
385               target_disp, 1, datatype, &req);
386   if (req != MPI_REQUEST_NULL)
387     Request::wait(&req, MPI_STATUS_IGNORE);
388   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
389     put(origin_addr, 1, datatype, target_rank,
390               target_disp, 1, datatype);
391   }
392   send_win->atomic_mut_->unlock();
393   return MPI_SUCCESS;
394 }
395
396 int Win::start(MPI_Group group, int /*assert*/)
397 {
398   /* From MPI forum advices
399   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
400   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
401   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
402   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
403   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
404   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
405   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
406   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
407   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
408   must complete, without further dependencies.  */
409
410   //naive, blocking implementation.
411   XBT_DEBUG("Entering MPI_Win_Start");
412   std::vector<MPI_Request> reqs;
413   for (int i = 0; i < group->size(); i++) {
414     int src = comm_->group()->rank(group->actor(i));
415     xbt_assert(src != MPI_UNDEFINED);
416     if (src != rank_)
417       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
418   }
419   int size = static_cast<int>(reqs.size());
420
421   Request::startall(size, reqs.data());
422   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
423   for (auto& req : reqs)
424     Request::unref(&req);
425
426   group->ref();
427   dst_group_ = group;
428   opened_++; // we're open for business !
429   XBT_DEBUG("Leaving MPI_Win_Start");
430   return MPI_SUCCESS;
431 }
432
433 int Win::post(MPI_Group group, int /*assert*/)
434 {
435   //let's make a synchronous send here
436   XBT_DEBUG("Entering MPI_Win_Post");
437   std::vector<MPI_Request> reqs;
438   for (int i = 0; i < group->size(); i++) {
439     int dst = comm_->group()->rank(group->actor(i));
440     xbt_assert(dst != MPI_UNDEFINED);
441     if (dst != rank_)
442       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
443   }
444   int size = static_cast<int>(reqs.size());
445
446   Request::startall(size, reqs.data());
447   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
448   for (auto& req : reqs)
449     Request::unref(&req);
450
451   group->ref();
452   src_group_ = group;
453   opened_++; // we're open for business !
454   XBT_DEBUG("Leaving MPI_Win_Post");
455   return MPI_SUCCESS;
456 }
457
458 int Win::complete(){
459   xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
460
461   XBT_DEBUG("Entering MPI_Win_Complete");
462   std::vector<MPI_Request> reqs;
463   for (int i = 0; i < dst_group_->size(); i++) {
464     int dst = comm_->group()->rank(dst_group_->actor(i));
465     xbt_assert(dst != MPI_UNDEFINED);
466     if (dst != rank_)
467       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
468   }
469   int size = static_cast<int>(reqs.size());
470
471   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
472   Request::startall(size, reqs.data());
473   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
474   for (auto& req : reqs)
475     Request::unref(&req);
476
477   flush_local_all();
478
479   opened_--; //we're closed for business !
480   Group::unref(dst_group_);
481   dst_group_ = MPI_GROUP_NULL;
482   return MPI_SUCCESS;
483 }
484
485 int Win::wait(){
486   //naive, blocking implementation.
487   XBT_DEBUG("Entering MPI_Win_Wait");
488   std::vector<MPI_Request> reqs;
489   for (int i = 0; i < src_group_->size(); i++) {
490     int src = comm_->group()->rank(src_group_->actor(i));
491     xbt_assert(src != MPI_UNDEFINED);
492     if (src != rank_)
493       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
494   }
495   int size = static_cast<int>(reqs.size());
496
497   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
498   Request::startall(size, reqs.data());
499   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
500   for (auto& req : reqs)
501     Request::unref(&req);
502
503   flush_local_all();
504
505   opened_--; //we're closed for business !
506   Group::unref(src_group_);
507   src_group_ = MPI_GROUP_NULL;
508   return MPI_SUCCESS;
509 }
510
511 int Win::lock(int lock_type, int rank, int /*assert*/)
512 {
513   MPI_Win target_win = connected_wins_[rank];
514
515   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
516     target_win->lock_mut_->lock();
517     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
518     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
519       target_win->lock_mut_->unlock();
520    }
521   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
522     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
523
524   target_win->lockers_.push_back(rank_);
525
526   flush(rank);
527   return MPI_SUCCESS;
528 }
529
530 int Win::lock_all(int assert){
531   int retval = MPI_SUCCESS;
532   for (int i = 0; i < comm_->size(); i++) {
533     int ret = this->lock(MPI_LOCK_SHARED, i, assert);
534     if (ret != MPI_SUCCESS)
535       retval = ret;
536   }
537   return retval;
538 }
539
540 int Win::unlock(int rank){
541   MPI_Win target_win = connected_wins_[rank];
542   int target_mode = target_win->mode_;
543   target_win->mode_= 0;
544   target_win->lockers_.remove(rank_);
545   if (target_mode==MPI_LOCK_EXCLUSIVE){
546     target_win->lock_mut_->unlock();
547   }
548
549   flush(rank);
550   return MPI_SUCCESS;
551 }
552
553 int Win::unlock_all(){
554   int retval = MPI_SUCCESS;
555   for (int i = 0; i < comm_->size(); i++) {
556     int ret = this->unlock(i);
557     if (ret != MPI_SUCCESS)
558       retval = ret;
559   }
560   return retval;
561 }
562
563 int Win::flush(int rank){
564   int finished = finish_comms(rank);
565   XBT_DEBUG("Win_flush on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
566   if (rank != rank_) {
567     finished = connected_wins_[rank]->finish_comms(rank_);
568     XBT_DEBUG("Win_flush on remote %d for local %d - Finished %d RMA calls", rank, rank_, finished);
569   }
570   return MPI_SUCCESS;
571 }
572
573 int Win::flush_local(int rank){
574   int finished = finish_comms(rank);
575   XBT_DEBUG("Win_flush_local on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
576   return MPI_SUCCESS;
577 }
578
579 int Win::flush_all(){
580   int finished = finish_comms();
581   XBT_DEBUG("Win_flush_all on local %d - Finished %d RMA calls", rank_, finished);
582   for (int i = 0; i < comm_->size(); i++) {
583     if (i != rank_) {
584       finished = connected_wins_[i]->finish_comms(rank_);
585       XBT_DEBUG("Win_flush_all on remote %d for local %d - Finished %d RMA calls", i, rank_, finished);
586     }
587   }
588   return MPI_SUCCESS;
589 }
590
591 int Win::flush_local_all(){
592   int finished = finish_comms();
593   XBT_DEBUG("Win_flush_local_all on local %d - Finished %d RMA calls", rank_, finished);
594   return MPI_SUCCESS;
595 }
596
597 Win* Win::f2c(int id){
598   return static_cast<Win*>(F2C::f2c(id));
599 }
600
601 int Win::finish_comms(){
602   // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
603   // Without this, the vector could get redimensioned when another process pushes.
604   // This would result in the array used by Request::waitall() to be invalidated.
605   // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
606   mut_->lock();
607   //Finish own requests
608   int size = static_cast<int>(requests_.size());
609   if (size > 0) {
610     MPI_Request* treqs = requests_.data();
611     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
612     requests_.clear();
613   }
614   mut_->unlock();
615   return size;
616 }
617
618 int Win::finish_comms(int rank){
619   // See comment about the mutex in finish_comms() above
620   mut_->lock();
621   // Finish own requests
622   // Let's see if we're either the destination or the sender of this request
623   // because we only wait for requests that we are responsible for.
624   // Also use the process id here since the request itself returns from src()
625   // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
626   aid_t proc_id = comm_->group()->actor(rank);
627   auto it     = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
628     return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
629   });
630   std::vector<MPI_Request> myreqqs(it, end(requests_));
631   requests_.erase(it, end(requests_));
632   int size = static_cast<int>(myreqqs.size());
633   if (size > 0) {
634     MPI_Request* treqs = myreqqs.data();
635     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
636     myreqqs.clear();
637   }
638   mut_->unlock();
639   return size;
640 }
641
642 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
643 {
644   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
645   for (int i = 0; not target_win && i < comm_->size(); i++) {
646     if (connected_wins_[i]->size_ > 0)
647       target_win = connected_wins_[i];
648   }
649   if (target_win) {
650     *size                         = target_win->size_;
651     *disp_unit                    = target_win->disp_unit_;
652     *static_cast<void**>(baseptr) = target_win->base_;
653   } else {
654     *size                         = 0;
655     *static_cast<void**>(baseptr) = nullptr;
656   }
657   return MPI_SUCCESS;
658 }
659
660 MPI_Errhandler Win::errhandler()
661 {
662   if (errhandler_ != MPI_ERRHANDLER_NULL)
663     errhandler_->ref();
664   return errhandler_;
665 }
666
667 void Win::set_errhandler(MPI_Errhandler errhandler)
668 {
669   if (errhandler_ != MPI_ERRHANDLER_NULL)
670     simgrid::smpi::Errhandler::unref(errhandler_);
671   errhandler_ = errhandler;
672   if (errhandler_ != MPI_ERRHANDLER_NULL)
673     errhandler_->ref();
674 }
675 } // namespace smpi
676 } // namespace simgrid