Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'add_remaining_comm_sync_bindings' into 'master'
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2022. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <simgrid/modelchecker.h>
7 #include "smpi_win.hpp"
8
9 #include "private.hpp"
10 #include "smpi_coll.hpp"
11 #include "smpi_comm.hpp"
12 #include "smpi_datatype.hpp"
13 #include "smpi_info.hpp"
14 #include "smpi_keyvals.hpp"
15 #include "smpi_request.hpp"
16 #include "src/smpi/include/smpi_actor.hpp"
17 #include "src/mc/mc_replay.hpp"
18
19 #include <algorithm>
20
21 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
22
23 #define CHECK_RMA_REMOTE_WIN(fun, win)\
24   if(target_count*target_datatype->get_extent()>win->size_){\
25     XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
26     fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
27     simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
28     return MPI_ERR_RMA_RANGE;\
29   }
30
31 #define CHECK_WIN_LOCKED(win)                                                                                          \
32   if (opened_ == 0) { /*check that post/start has been done*/                                                          \
33     bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
34     if (not locked)                                                                                                    \
35       return MPI_ERR_WIN;                                                                                              \
36   }
37
38 namespace simgrid{
39 namespace smpi{
40 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
41 int Win::keyval_id_=0;
42
43 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
44     : base_(base)
45     , size_(size)
46     , disp_unit_(disp_unit)
47     , info_(info)
48     , comm_(comm)
49     , connected_wins_(comm->size())
50     , rank_(comm->rank())
51     , allocated_(allocated)
52     , dynamic_(dynamic)
53 {
54   XBT_DEBUG("Creating window");
55   if(info!=MPI_INFO_NULL)
56     info->ref();
57   connected_wins_[rank_] = this;
58   errhandler_->ref();
59   comm->add_rma_win(this);
60   comm->ref();
61
62   colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
63                    MPI_BYTE, comm);
64   if  (MC_is_active() || MC_record_replay_is_active()){
65     if (bar_.get() == nullptr) // First to arrive on the barrier
66       bar_ = s4u::Barrier::create(comm->size());
67     bar_->wait();
68   }else{
69     colls::barrier(comm);
70   }
71   this->add_f();
72 }
73
74 Win::~Win(){
75   //As per the standard, perform a barrier to ensure every async comm is finished
76   if  (MC_is_active() || MC_record_replay_is_active())
77     bar_->wait();
78   else
79     colls::barrier(comm_);
80   flush_local_all();
81
82   if (info_ != MPI_INFO_NULL)
83     simgrid::smpi::Info::unref(info_);
84   if (errhandler_ != MPI_ERRHANDLER_NULL)
85     simgrid::smpi::Errhandler::unref(errhandler_);
86
87   comm_->remove_rma_win(this);
88
89   colls::barrier(comm_);
90   Comm::unref(comm_);
91
92   if (allocated_)
93     xbt_free(base_);
94
95   F2C::free_f(this->f2c_id());
96   cleanup_attr<Win>();
97 }
98
99 int Win::attach(void* /*base*/, MPI_Aint size)
100 {
101   if (not(base_ == MPI_BOTTOM || base_ == nullptr))
102     return MPI_ERR_ARG;
103   base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
104   size_+=size;
105   return MPI_SUCCESS;
106 }
107
108 int Win::detach(const void* /*base*/)
109 {
110   base_=MPI_BOTTOM;
111   size_=-1;
112   return MPI_SUCCESS;
113 }
114
115 void Win::get_name(char* name, int* length) const
116 {
117   *length = static_cast<int>(name_.length());
118   if (not name_.empty()) {
119     name_.copy(name, *length);
120     name[*length] = '\0';
121   }
122 }
123
124 void Win::get_group(MPI_Group* group){
125   if(comm_ != MPI_COMM_NULL){
126     *group = comm_->group();
127   } else {
128     *group = MPI_GROUP_NULL;
129   }
130 }
131
132 MPI_Info Win::info()
133 {
134   return info_;
135 }
136
137 int Win::rank() const
138 {
139   return rank_;
140 }
141
142 MPI_Comm Win::comm() const
143 {
144   return comm_;
145 }
146
147 MPI_Aint Win::size() const
148 {
149   return size_;
150 }
151
152 void* Win::base() const
153 {
154   return base_;
155 }
156
157 int Win::disp_unit() const
158 {
159   return disp_unit_;
160 }
161
162 bool Win::dynamic() const
163 {
164   return dynamic_;
165 }
166
167 void Win::set_info(MPI_Info info)
168 {
169   if (info_ != MPI_INFO_NULL)
170     simgrid::smpi::Info::unref(info_);
171   info_ = info;
172   if (info_ != MPI_INFO_NULL)
173     info_->ref();
174 }
175
176 void Win::set_name(const char* name){
177   name_ = name;
178 }
179
180 int Win::fence(int assert)
181 {
182   XBT_DEBUG("Entering fence");
183   opened_++;
184   if (not (assert & MPI_MODE_NOPRECEDE)) {
185     // This is not the first fence => finalize what came before
186     if (MC_is_active() || MC_record_replay_is_active())
187       bar_->wait();
188     else
189       colls::barrier(comm_);
190     flush_local_all();
191     count_=0;
192   }
193
194   if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
195     opened_=0;
196   assert_ = assert;
197   if (MC_is_active() || MC_record_replay_is_active())
198     bar_->wait();
199   else
200     colls::barrier(comm_);
201   XBT_DEBUG("Leaving fence");
202
203   return MPI_SUCCESS;
204 }
205
206 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
207               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
208 {
209   //get receiver pointer
210   Win* recv_win = connected_wins_[target_rank];
211
212   CHECK_WIN_LOCKED(recv_win)
213   CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
214
215   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
216
217   if (target_rank != rank_) { // This is not for myself, so we need to send messages
218     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
219     // prepare send_request
220     MPI_Request sreq =
221         Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
222                                MPI_OP_NULL);
223
224     //prepare receiver request
225     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
226                                               SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
227
228     //start send
229     sreq->start();
230
231     if(request!=nullptr){
232       *request=sreq;
233     }else{
234       mut_->lock();
235       requests_.push_back(sreq);
236       mut_->unlock();
237     }
238
239     //push request to receiver's win
240     recv_win->mut_->lock();
241     recv_win->requests_.push_back(rreq);
242     rreq->start();
243     recv_win->mut_->unlock();
244   } else {
245     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
246     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
247     if(request!=nullptr)
248       *request = MPI_REQUEST_NULL;
249   }
250
251   return MPI_SUCCESS;
252 }
253
254 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
255               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
256 {
257   //get sender pointer
258   Win* send_win = connected_wins_[target_rank];
259
260   CHECK_WIN_LOCKED(send_win)
261   CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
262
263   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
264   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
265
266   if (target_rank != rank_) {
267     //prepare send_request
268     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
269                                               SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
270
271     //prepare receiver request
272     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
273                                               SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
274
275     //start the send, with another process than us as sender.
276     sreq->start();
277     // push request to sender's win
278     send_win->mut_->lock();
279     send_win->requests_.push_back(sreq);
280     send_win->mut_->unlock();
281
282     //start recv
283     rreq->start();
284
285     if(request!=nullptr){
286       *request=rreq;
287     }else{
288       mut_->lock();
289       requests_.push_back(rreq);
290       mut_->unlock();
291     }
292   } else {
293     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
294     if(request!=nullptr)
295       *request=MPI_REQUEST_NULL;
296   }
297   return MPI_SUCCESS;
298 }
299
300 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
301               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
302 {
303   XBT_DEBUG("Entering MPI_Win_Accumulate");
304   //get receiver pointer
305   Win* recv_win = connected_wins_[target_rank];
306
307   //FIXME: local version
308   CHECK_WIN_LOCKED(recv_win)
309   CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
310
311   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
312   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
313   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
314   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
315   // prepare send_request
316
317   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
318                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
319
320   // prepare receiver request
321   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
322                                             SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
323
324   count_++;
325
326   // start send
327   sreq->start();
328   // push request to receiver's win
329   recv_win->mut_->lock();
330   recv_win->requests_.push_back(rreq);
331   rreq->start();
332   recv_win->mut_->unlock();
333
334   if (request != nullptr) {
335     *request = sreq;
336   } else {
337     mut_->lock();
338     requests_.push_back(sreq);
339     mut_->unlock();
340   }
341
342   // FIXME: The current implementation fails to ensure the correct ordering of the accumulate requests.  The following
343   // 'flush' is a workaround to fix that.
344   flush(target_rank);
345   XBT_DEBUG("Leaving MPI_Win_Accumulate");
346   return MPI_SUCCESS;
347 }
348
349 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
350                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
351                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
352 {
353   //get sender pointer
354   const Win* send_win = connected_wins_[target_rank];
355
356   CHECK_WIN_LOCKED(send_win)
357   CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
358
359   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
360   //need to be sure ops are correctly ordered, so finish request here ? slow.
361   MPI_Request req = MPI_REQUEST_NULL;
362   send_win->atomic_mut_->lock();
363   get(result_addr, result_count, result_datatype, target_rank,
364               target_disp, target_count, target_datatype, &req);
365   if (req != MPI_REQUEST_NULL)
366     Request::wait(&req, MPI_STATUS_IGNORE);
367   if(op!=MPI_NO_OP)
368     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
369               target_disp, target_count, target_datatype, op, &req);
370   if (req != MPI_REQUEST_NULL)
371     Request::wait(&req, MPI_STATUS_IGNORE);
372   send_win->atomic_mut_->unlock();
373   return MPI_SUCCESS;
374 }
375
376 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
377                           int target_rank, MPI_Aint target_disp)
378 {
379   //get sender pointer
380   const Win* send_win = connected_wins_[target_rank];
381
382   CHECK_WIN_LOCKED(send_win)
383
384   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
385   MPI_Request req = MPI_REQUEST_NULL;
386   send_win->atomic_mut_->lock();
387   get(result_addr, 1, datatype, target_rank,
388               target_disp, 1, datatype, &req);
389   if (req != MPI_REQUEST_NULL)
390     Request::wait(&req, MPI_STATUS_IGNORE);
391   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
392     put(origin_addr, 1, datatype, target_rank,
393               target_disp, 1, datatype);
394   }
395   send_win->atomic_mut_->unlock();
396   return MPI_SUCCESS;
397 }
398
399 int Win::start(MPI_Group group, int /*assert*/)
400 {
401   /* From MPI forum advices
402   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
403   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
404   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
405   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
406   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
407   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
408   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
409   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
410   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
411   must complete, without further dependencies.  */
412
413   //naive, blocking implementation.
414   XBT_DEBUG("Entering MPI_Win_Start");
415   std::vector<MPI_Request> reqs;
416   for (int i = 0; i < group->size(); i++) {
417     int src = comm_->group()->rank(group->actor(i));
418     xbt_assert(src != MPI_UNDEFINED);
419     if (src != rank_)
420       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
421   }
422   int size = static_cast<int>(reqs.size());
423
424   Request::startall(size, reqs.data());
425   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
426   for (auto& req : reqs)
427     Request::unref(&req);
428
429   group->ref();
430   dst_group_ = group;
431   opened_++; // we're open for business !
432   XBT_DEBUG("Leaving MPI_Win_Start");
433   return MPI_SUCCESS;
434 }
435
436 int Win::post(MPI_Group group, int /*assert*/)
437 {
438   //let's make a synchronous send here
439   XBT_DEBUG("Entering MPI_Win_Post");
440   std::vector<MPI_Request> reqs;
441   for (int i = 0; i < group->size(); i++) {
442     int dst = comm_->group()->rank(group->actor(i));
443     xbt_assert(dst != MPI_UNDEFINED);
444     if (dst != rank_)
445       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
446   }
447   int size = static_cast<int>(reqs.size());
448
449   Request::startall(size, reqs.data());
450   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
451   for (auto& req : reqs)
452     Request::unref(&req);
453
454   group->ref();
455   src_group_ = group;
456   opened_++; // we're open for business !
457   XBT_DEBUG("Leaving MPI_Win_Post");
458   return MPI_SUCCESS;
459 }
460
461 int Win::complete(){
462   xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
463
464   XBT_DEBUG("Entering MPI_Win_Complete");
465   std::vector<MPI_Request> reqs;
466   for (int i = 0; i < dst_group_->size(); i++) {
467     int dst = comm_->group()->rank(dst_group_->actor(i));
468     xbt_assert(dst != MPI_UNDEFINED);
469     if (dst != rank_)
470       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
471   }
472   int size = static_cast<int>(reqs.size());
473
474   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
475   Request::startall(size, reqs.data());
476   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
477   for (auto& req : reqs)
478     Request::unref(&req);
479
480   flush_local_all();
481
482   opened_--; //we're closed for business !
483   Group::unref(dst_group_);
484   dst_group_ = MPI_GROUP_NULL;
485   return MPI_SUCCESS;
486 }
487
488 int Win::wait(){
489   //naive, blocking implementation.
490   XBT_DEBUG("Entering MPI_Win_Wait");
491   std::vector<MPI_Request> reqs;
492   for (int i = 0; i < src_group_->size(); i++) {
493     int src = comm_->group()->rank(src_group_->actor(i));
494     xbt_assert(src != MPI_UNDEFINED);
495     if (src != rank_)
496       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
497   }
498   int size = static_cast<int>(reqs.size());
499
500   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
501   Request::startall(size, reqs.data());
502   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
503   for (auto& req : reqs)
504     Request::unref(&req);
505
506   flush_local_all();
507
508   opened_--; //we're closed for business !
509   Group::unref(src_group_);
510   src_group_ = MPI_GROUP_NULL;
511   return MPI_SUCCESS;
512 }
513
514 int Win::lock(int lock_type, int rank, int /*assert*/)
515 {
516   MPI_Win target_win = connected_wins_[rank];
517
518   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
519     target_win->lock_mut_->lock();
520     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
521     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
522       target_win->lock_mut_->unlock();
523    }
524   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
525     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
526
527   target_win->lockers_.push_back(rank_);
528
529   flush(rank);
530   return MPI_SUCCESS;
531 }
532
533 int Win::lock_all(int assert){
534   int retval = MPI_SUCCESS;
535   for (int i = 0; i < comm_->size(); i++) {
536     int ret = this->lock(MPI_LOCK_SHARED, i, assert);
537     if (ret != MPI_SUCCESS)
538       retval = ret;
539   }
540   return retval;
541 }
542
543 int Win::unlock(int rank){
544   MPI_Win target_win = connected_wins_[rank];
545   int target_mode = target_win->mode_;
546   target_win->mode_= 0;
547   target_win->lockers_.remove(rank_);
548   if (target_mode==MPI_LOCK_EXCLUSIVE){
549     target_win->lock_mut_->unlock();
550   }
551
552   flush(rank);
553   return MPI_SUCCESS;
554 }
555
556 int Win::unlock_all(){
557   int retval = MPI_SUCCESS;
558   for (int i = 0; i < comm_->size(); i++) {
559     int ret = this->unlock(i);
560     if (ret != MPI_SUCCESS)
561       retval = ret;
562   }
563   return retval;
564 }
565
566 int Win::flush(int rank){
567   int finished = finish_comms(rank);
568   XBT_DEBUG("Win_flush on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
569   if (rank != rank_) {
570     finished = connected_wins_[rank]->finish_comms(rank_);
571     XBT_DEBUG("Win_flush on remote %d for local %d - Finished %d RMA calls", rank, rank_, finished);
572   }
573   return MPI_SUCCESS;
574 }
575
576 int Win::flush_local(int rank){
577   int finished = finish_comms(rank);
578   XBT_DEBUG("Win_flush_local on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
579   return MPI_SUCCESS;
580 }
581
582 int Win::flush_all(){
583   int finished = finish_comms();
584   XBT_DEBUG("Win_flush_all on local %d - Finished %d RMA calls", rank_, finished);
585   for (int i = 0; i < comm_->size(); i++) {
586     if (i != rank_) {
587       finished = connected_wins_[i]->finish_comms(rank_);
588       XBT_DEBUG("Win_flush_all on remote %d for local %d - Finished %d RMA calls", i, rank_, finished);
589     }
590   }
591   return MPI_SUCCESS;
592 }
593
594 int Win::flush_local_all(){
595   int finished = finish_comms();
596   XBT_DEBUG("Win_flush_local_all on local %d - Finished %d RMA calls", rank_, finished);
597   return MPI_SUCCESS;
598 }
599
600 Win* Win::f2c(int id){
601   return static_cast<Win*>(F2C::f2c(id));
602 }
603
604 int Win::finish_comms(){
605   // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
606   // Without this, the vector could get redimensioned when another process pushes.
607   // This would result in the array used by Request::waitall() to be invalidated.
608   // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
609   mut_->lock();
610   //Finish own requests
611   int size = static_cast<int>(requests_.size());
612   if (size > 0) {
613     MPI_Request* treqs = requests_.data();
614     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
615     requests_.clear();
616   }
617   mut_->unlock();
618   return size;
619 }
620
621 int Win::finish_comms(int rank){
622   // See comment about the mutex in finish_comms() above
623   mut_->lock();
624   // Finish own requests
625   // Let's see if we're either the destination or the sender of this request
626   // because we only wait for requests that we are responsible for.
627   // Also use the process id here since the request itself returns from src()
628   // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
629   aid_t proc_id = comm_->group()->actor(rank);
630   auto it     = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
631     return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
632   });
633   std::vector<MPI_Request> myreqqs(it, end(requests_));
634   requests_.erase(it, end(requests_));
635   int size = static_cast<int>(myreqqs.size());
636   if (size > 0) {
637     MPI_Request* treqs = myreqqs.data();
638     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
639     myreqqs.clear();
640   }
641   mut_->unlock();
642   return size;
643 }
644
645 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
646 {
647   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
648   for (int i = 0; not target_win && i < comm_->size(); i++) {
649     if (connected_wins_[i]->size_ > 0)
650       target_win = connected_wins_[i];
651   }
652   if (target_win) {
653     *size                         = target_win->size_;
654     *disp_unit                    = target_win->disp_unit_;
655     *static_cast<void**>(baseptr) = target_win->base_;
656   } else {
657     *size                         = 0;
658     *static_cast<void**>(baseptr) = nullptr;
659   }
660   return MPI_SUCCESS;
661 }
662
663 MPI_Errhandler Win::errhandler()
664 {
665   if (errhandler_ != MPI_ERRHANDLER_NULL)
666     errhandler_->ref();
667   return errhandler_;
668 }
669
670 void Win::set_errhandler(MPI_Errhandler errhandler)
671 {
672   if (errhandler_ != MPI_ERRHANDLER_NULL)
673     simgrid::smpi::Errhandler::unref(errhandler_);
674   errhandler_ = errhandler;
675   if (errhandler_ != MPI_ERRHANDLER_NULL)
676     errhandler_->ref();
677 }
678 } // namespace smpi
679 } // namespace simgrid