Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
put back s4u barrier for MC only (untested)
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2022. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <simgrid/modelchecker.h>
7 #include "smpi_win.hpp"
8
9 #include "private.hpp"
10 #include "smpi_coll.hpp"
11 #include "smpi_comm.hpp"
12 #include "smpi_datatype.hpp"
13 #include "smpi_info.hpp"
14 #include "smpi_keyvals.hpp"
15 #include "smpi_request.hpp"
16 #include "src/smpi/include/smpi_actor.hpp"
17 #include "src/mc/mc_replay.hpp"
18
19 #include <algorithm>
20
21 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
22
23 #define CHECK_RMA_REMOTE_WIN(fun, win)\
24   if(target_count*target_datatype->get_extent()>win->size_){\
25     XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
26     fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
27     simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
28     return MPI_ERR_RMA_RANGE;\
29   }
30
31 #define CHECK_WIN_LOCKED(win)                                                                                          \
32   if (opened_ == 0) { /*check that post/start has been done*/                                                          \
33     bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
34     if (not locked)                                                                                                    \
35       return MPI_ERR_WIN;                                                                                              \
36   }
37
38 namespace simgrid{
39 namespace smpi{
40 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
41 int Win::keyval_id_=0;
42
43 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
44     : base_(base)
45     , size_(size)
46     , disp_unit_(disp_unit)
47     , info_(info)
48     , comm_(comm)
49     , connected_wins_(comm->size())
50     , rank_(comm->rank())
51     , allocated_(allocated)
52     , dynamic_(dynamic)
53 {
54   XBT_DEBUG("Creating window");
55   if(info!=MPI_INFO_NULL)
56     info->ref();
57   connected_wins_[rank_] = this;
58   errhandler_->ref();
59   comm->add_rma_win(this);
60   comm->ref();
61
62   colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
63                    MPI_BYTE, comm);
64   if  (MC_is_active() || MC_record_replay_is_active()){
65     if(rank_==0){
66       bar_ = new s4u::Barrier(comm->size());
67     }
68     colls::bcast(&bar_, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
69     bar_->wait();
70   }else{
71     colls::barrier(comm);
72   }
73   this->add_f();
74 }
75
76 Win::~Win(){
77   //As per the standard, perform a barrier to ensure every async comm is finished
78   if  (MC_is_active() || MC_record_replay_is_active())
79     bar_->wait();
80   else
81     colls::barrier(comm_);
82   flush_local_all();
83
84   if (info_ != MPI_INFO_NULL)
85     simgrid::smpi::Info::unref(info_);
86   if (errhandler_ != MPI_ERRHANDLER_NULL)
87     simgrid::smpi::Errhandler::unref(errhandler_);
88
89   comm_->remove_rma_win(this);
90
91   colls::barrier(comm_);
92   Comm::unref(comm_);
93
94   if (rank_ == 0)
95     delete bar_;
96
97   if (allocated_)
98     xbt_free(base_);
99
100   F2C::free_f(this->f2c_id());
101   cleanup_attr<Win>();
102 }
103
104 int Win::attach(void* /*base*/, MPI_Aint size)
105 {
106   if (not(base_ == MPI_BOTTOM || base_ == nullptr))
107     return MPI_ERR_ARG;
108   base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
109   size_+=size;
110   return MPI_SUCCESS;
111 }
112
113 int Win::detach(const void* /*base*/)
114 {
115   base_=MPI_BOTTOM;
116   size_=-1;
117   return MPI_SUCCESS;
118 }
119
120 void Win::get_name(char* name, int* length) const
121 {
122   *length = static_cast<int>(name_.length());
123   if (not name_.empty()) {
124     name_.copy(name, *length);
125     name[*length] = '\0';
126   }
127 }
128
129 void Win::get_group(MPI_Group* group){
130   if(comm_ != MPI_COMM_NULL){
131     *group = comm_->group();
132   } else {
133     *group = MPI_GROUP_NULL;
134   }
135 }
136
137 MPI_Info Win::info()
138 {
139   return info_;
140 }
141
142 int Win::rank() const
143 {
144   return rank_;
145 }
146
147 MPI_Comm Win::comm() const
148 {
149   return comm_;
150 }
151
152 MPI_Aint Win::size() const
153 {
154   return size_;
155 }
156
157 void* Win::base() const
158 {
159   return base_;
160 }
161
162 int Win::disp_unit() const
163 {
164   return disp_unit_;
165 }
166
167 bool Win::dynamic() const
168 {
169   return dynamic_;
170 }
171
172 void Win::set_info(MPI_Info info)
173 {
174   if (info_ != MPI_INFO_NULL)
175     simgrid::smpi::Info::unref(info_);
176   info_ = info;
177   if (info_ != MPI_INFO_NULL)
178     info_->ref();
179 }
180
181 void Win::set_name(const char* name){
182   name_ = name;
183 }
184
185 int Win::fence(int assert)
186 {
187   XBT_DEBUG("Entering fence");
188   opened_++;
189   if (not (assert & MPI_MODE_NOPRECEDE)) {
190     // This is not the first fence => finalize what came before
191     if (MC_is_active() || MC_record_replay_is_active())
192       bar_->wait();
193     else
194       colls::barrier(comm_);
195     flush_local_all();
196     count_=0;
197   }
198
199   if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
200     opened_=0;
201   assert_ = assert;
202   if (MC_is_active() || MC_record_replay_is_active())
203     bar_->wait();
204   else
205     colls::barrier(comm_);
206   XBT_DEBUG("Leaving fence");
207
208   return MPI_SUCCESS;
209 }
210
211 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
212               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
213 {
214   //get receiver pointer
215   Win* recv_win = connected_wins_[target_rank];
216
217   CHECK_WIN_LOCKED(recv_win)
218   CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
219
220   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
221
222   if (target_rank != rank_) { // This is not for myself, so we need to send messages
223     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
224     // prepare send_request
225     MPI_Request sreq =
226         Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
227                                MPI_OP_NULL);
228
229     //prepare receiver request
230     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
231                                               SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
232
233     //start send
234     sreq->start();
235
236     if(request!=nullptr){
237       *request=sreq;
238     }else{
239       mut_->lock();
240       requests_.push_back(sreq);
241       mut_->unlock();
242     }
243
244     //push request to receiver's win
245     recv_win->mut_->lock();
246     recv_win->requests_.push_back(rreq);
247     rreq->start();
248     recv_win->mut_->unlock();
249   } else {
250     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
251     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
252     if(request!=nullptr)
253       *request = MPI_REQUEST_NULL;
254   }
255
256   return MPI_SUCCESS;
257 }
258
259 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
260               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
261 {
262   //get sender pointer
263   Win* send_win = connected_wins_[target_rank];
264
265   CHECK_WIN_LOCKED(send_win)
266   CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
267
268   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
269   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
270
271   if (target_rank != rank_) {
272     //prepare send_request
273     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
274                                               SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
275
276     //prepare receiver request
277     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
278                                               SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
279
280     //start the send, with another process than us as sender.
281     sreq->start();
282     // push request to sender's win
283     send_win->mut_->lock();
284     send_win->requests_.push_back(sreq);
285     send_win->mut_->unlock();
286
287     //start recv
288     rreq->start();
289
290     if(request!=nullptr){
291       *request=rreq;
292     }else{
293       mut_->lock();
294       requests_.push_back(rreq);
295       mut_->unlock();
296     }
297   } else {
298     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
299     if(request!=nullptr)
300       *request=MPI_REQUEST_NULL;
301   }
302   return MPI_SUCCESS;
303 }
304
305 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
306               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
307 {
308   XBT_DEBUG("Entering MPI_Win_Accumulate");
309   //get receiver pointer
310   Win* recv_win = connected_wins_[target_rank];
311
312   //FIXME: local version
313   CHECK_WIN_LOCKED(recv_win)
314   CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
315
316   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
317   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
318   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
319   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
320   // prepare send_request
321
322   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
323                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
324
325   // prepare receiver request
326   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
327                                             SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
328
329   count_++;
330
331   // start send
332   sreq->start();
333   // push request to receiver's win
334   recv_win->mut_->lock();
335   recv_win->requests_.push_back(rreq);
336   rreq->start();
337   recv_win->mut_->unlock();
338
339   if (request != nullptr) {
340     *request = sreq;
341   } else {
342     mut_->lock();
343     requests_.push_back(sreq);
344     mut_->unlock();
345   }
346
347   // FIXME: The current implementation fails to ensure the correct ordering of the accumulate requests.  The following
348   // 'flush' is a workaround to fix that.
349   flush(target_rank);
350   XBT_DEBUG("Leaving MPI_Win_Accumulate");
351   return MPI_SUCCESS;
352 }
353
354 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
355                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
356                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
357 {
358   //get sender pointer
359   const Win* send_win = connected_wins_[target_rank];
360
361   CHECK_WIN_LOCKED(send_win)
362   CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
363
364   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
365   //need to be sure ops are correctly ordered, so finish request here ? slow.
366   MPI_Request req = MPI_REQUEST_NULL;
367   send_win->atomic_mut_->lock();
368   get(result_addr, result_count, result_datatype, target_rank,
369               target_disp, target_count, target_datatype, &req);
370   if (req != MPI_REQUEST_NULL)
371     Request::wait(&req, MPI_STATUS_IGNORE);
372   if(op!=MPI_NO_OP)
373     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
374               target_disp, target_count, target_datatype, op, &req);
375   if (req != MPI_REQUEST_NULL)
376     Request::wait(&req, MPI_STATUS_IGNORE);
377   send_win->atomic_mut_->unlock();
378   return MPI_SUCCESS;
379 }
380
381 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
382                           int target_rank, MPI_Aint target_disp)
383 {
384   //get sender pointer
385   const Win* send_win = connected_wins_[target_rank];
386
387   CHECK_WIN_LOCKED(send_win)
388
389   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
390   MPI_Request req = MPI_REQUEST_NULL;
391   send_win->atomic_mut_->lock();
392   get(result_addr, 1, datatype, target_rank,
393               target_disp, 1, datatype, &req);
394   if (req != MPI_REQUEST_NULL)
395     Request::wait(&req, MPI_STATUS_IGNORE);
396   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
397     put(origin_addr, 1, datatype, target_rank,
398               target_disp, 1, datatype);
399   }
400   send_win->atomic_mut_->unlock();
401   return MPI_SUCCESS;
402 }
403
404 int Win::start(MPI_Group group, int /*assert*/)
405 {
406   /* From MPI forum advices
407   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
408   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
409   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
410   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
411   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
412   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
413   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
414   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
415   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
416   must complete, without further dependencies.  */
417
418   //naive, blocking implementation.
419   XBT_DEBUG("Entering MPI_Win_Start");
420   std::vector<MPI_Request> reqs;
421   for (int i = 0; i < group->size(); i++) {
422     int src = comm_->group()->rank(group->actor(i));
423     xbt_assert(src != MPI_UNDEFINED);
424     if (src != rank_)
425       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
426   }
427   int size = static_cast<int>(reqs.size());
428
429   Request::startall(size, reqs.data());
430   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
431   for (auto& req : reqs)
432     Request::unref(&req);
433
434   group->ref();
435   dst_group_ = group;
436   opened_++; // we're open for business !
437   XBT_DEBUG("Leaving MPI_Win_Start");
438   return MPI_SUCCESS;
439 }
440
441 int Win::post(MPI_Group group, int /*assert*/)
442 {
443   //let's make a synchronous send here
444   XBT_DEBUG("Entering MPI_Win_Post");
445   std::vector<MPI_Request> reqs;
446   for (int i = 0; i < group->size(); i++) {
447     int dst = comm_->group()->rank(group->actor(i));
448     xbt_assert(dst != MPI_UNDEFINED);
449     if (dst != rank_)
450       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
451   }
452   int size = static_cast<int>(reqs.size());
453
454   Request::startall(size, reqs.data());
455   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
456   for (auto& req : reqs)
457     Request::unref(&req);
458
459   group->ref();
460   src_group_ = group;
461   opened_++; // we're open for business !
462   XBT_DEBUG("Leaving MPI_Win_Post");
463   return MPI_SUCCESS;
464 }
465
466 int Win::complete(){
467   xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
468
469   XBT_DEBUG("Entering MPI_Win_Complete");
470   std::vector<MPI_Request> reqs;
471   for (int i = 0; i < dst_group_->size(); i++) {
472     int dst = comm_->group()->rank(dst_group_->actor(i));
473     xbt_assert(dst != MPI_UNDEFINED);
474     if (dst != rank_)
475       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
476   }
477   int size = static_cast<int>(reqs.size());
478
479   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
480   Request::startall(size, reqs.data());
481   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
482   for (auto& req : reqs)
483     Request::unref(&req);
484
485   flush_local_all();
486
487   opened_--; //we're closed for business !
488   Group::unref(dst_group_);
489   dst_group_ = MPI_GROUP_NULL;
490   return MPI_SUCCESS;
491 }
492
493 int Win::wait(){
494   //naive, blocking implementation.
495   XBT_DEBUG("Entering MPI_Win_Wait");
496   std::vector<MPI_Request> reqs;
497   for (int i = 0; i < src_group_->size(); i++) {
498     int src = comm_->group()->rank(src_group_->actor(i));
499     xbt_assert(src != MPI_UNDEFINED);
500     if (src != rank_)
501       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
502   }
503   int size = static_cast<int>(reqs.size());
504
505   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
506   Request::startall(size, reqs.data());
507   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
508   for (auto& req : reqs)
509     Request::unref(&req);
510
511   flush_local_all();
512
513   opened_--; //we're closed for business !
514   Group::unref(src_group_);
515   src_group_ = MPI_GROUP_NULL;
516   return MPI_SUCCESS;
517 }
518
519 int Win::lock(int lock_type, int rank, int /*assert*/)
520 {
521   MPI_Win target_win = connected_wins_[rank];
522
523   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
524     target_win->lock_mut_->lock();
525     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
526     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
527       target_win->lock_mut_->unlock();
528    }
529   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
530     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
531
532   target_win->lockers_.push_back(rank_);
533
534   flush(rank);
535   return MPI_SUCCESS;
536 }
537
538 int Win::lock_all(int assert){
539   int retval = MPI_SUCCESS;
540   for (int i = 0; i < comm_->size(); i++) {
541     int ret = this->lock(MPI_LOCK_SHARED, i, assert);
542     if (ret != MPI_SUCCESS)
543       retval = ret;
544   }
545   return retval;
546 }
547
548 int Win::unlock(int rank){
549   MPI_Win target_win = connected_wins_[rank];
550   int target_mode = target_win->mode_;
551   target_win->mode_= 0;
552   target_win->lockers_.remove(rank_);
553   if (target_mode==MPI_LOCK_EXCLUSIVE){
554     target_win->lock_mut_->unlock();
555   }
556
557   flush(rank);
558   return MPI_SUCCESS;
559 }
560
561 int Win::unlock_all(){
562   int retval = MPI_SUCCESS;
563   for (int i = 0; i < comm_->size(); i++) {
564     int ret = this->unlock(i);
565     if (ret != MPI_SUCCESS)
566       retval = ret;
567   }
568   return retval;
569 }
570
571 int Win::flush(int rank){
572   int finished = finish_comms(rank);
573   XBT_DEBUG("Win_flush on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
574   if (rank != rank_) {
575     finished = connected_wins_[rank]->finish_comms(rank_);
576     XBT_DEBUG("Win_flush on remote %d for local %d - Finished %d RMA calls", rank, rank_, finished);
577   }
578   return MPI_SUCCESS;
579 }
580
581 int Win::flush_local(int rank){
582   int finished = finish_comms(rank);
583   XBT_DEBUG("Win_flush_local on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
584   return MPI_SUCCESS;
585 }
586
587 int Win::flush_all(){
588   int finished = finish_comms();
589   XBT_DEBUG("Win_flush_all on local %d - Finished %d RMA calls", rank_, finished);
590   for (int i = 0; i < comm_->size(); i++) {
591     if (i != rank_) {
592       finished = connected_wins_[i]->finish_comms(rank_);
593       XBT_DEBUG("Win_flush_all on remote %d for local %d - Finished %d RMA calls", i, rank_, finished);
594     }
595   }
596   return MPI_SUCCESS;
597 }
598
599 int Win::flush_local_all(){
600   int finished = finish_comms();
601   XBT_DEBUG("Win_flush_local_all on local %d - Finished %d RMA calls", rank_, finished);
602   return MPI_SUCCESS;
603 }
604
605 Win* Win::f2c(int id){
606   return static_cast<Win*>(F2C::f2c(id));
607 }
608
609 int Win::finish_comms(){
610   // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
611   // Without this, the vector could get redimensioned when another process pushes.
612   // This would result in the array used by Request::waitall() to be invalidated.
613   // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
614   mut_->lock();
615   //Finish own requests
616   int size = static_cast<int>(requests_.size());
617   if (size > 0) {
618     MPI_Request* treqs = requests_.data();
619     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
620     requests_.clear();
621   }
622   mut_->unlock();
623   return size;
624 }
625
626 int Win::finish_comms(int rank){
627   // See comment about the mutex in finish_comms() above
628   mut_->lock();
629   // Finish own requests
630   // Let's see if we're either the destination or the sender of this request
631   // because we only wait for requests that we are responsible for.
632   // Also use the process id here since the request itself returns from src()
633   // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
634   aid_t proc_id = comm_->group()->actor(rank);
635   auto it     = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
636     return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
637   });
638   std::vector<MPI_Request> myreqqs(it, end(requests_));
639   requests_.erase(it, end(requests_));
640   int size = static_cast<int>(myreqqs.size());
641   if (size > 0) {
642     MPI_Request* treqs = myreqqs.data();
643     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
644     myreqqs.clear();
645   }
646   mut_->unlock();
647   return size;
648 }
649
650 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
651 {
652   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
653   for (int i = 0; not target_win && i < comm_->size(); i++) {
654     if (connected_wins_[i]->size_ > 0)
655       target_win = connected_wins_[i];
656   }
657   if (target_win) {
658     *size                         = target_win->size_;
659     *disp_unit                    = target_win->disp_unit_;
660     *static_cast<void**>(baseptr) = target_win->base_;
661   } else {
662     *size                         = 0;
663     *static_cast<void**>(baseptr) = nullptr;
664   }
665   return MPI_SUCCESS;
666 }
667
668 MPI_Errhandler Win::errhandler()
669 {
670   if (errhandler_ != MPI_ERRHANDLER_NULL)
671     errhandler_->ref();
672   return errhandler_;
673 }
674
675 void Win::set_errhandler(MPI_Errhandler errhandler)
676 {
677   if (errhandler_ != MPI_ERRHANDLER_NULL)
678     simgrid::smpi::Errhandler::unref(errhandler_);
679   errhandler_ = errhandler;
680   if (errhandler_ != MPI_ERRHANDLER_NULL)
681     errhandler_->ref();
682 }
683 } // namespace smpi
684 } // namespace simgrid