Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Use C++17's std::scoped_lock where appropriate.
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2023. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <simgrid/modelchecker.h>
7 #include "smpi_win.hpp"
8
9 #include "private.hpp"
10 #include "smpi_coll.hpp"
11 #include "smpi_comm.hpp"
12 #include "smpi_datatype.hpp"
13 #include "smpi_info.hpp"
14 #include "smpi_keyvals.hpp"
15 #include "smpi_request.hpp"
16 #include "src/smpi/include/smpi_actor.hpp"
17 #include "src/mc/mc_replay.hpp"
18
19 #include <algorithm>
20 #include <mutex> // std::scoped_lock
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
23
24 #define CHECK_RMA_REMOTE_WIN(fun, win)\
25   if(target_count*target_datatype->get_extent()>win->size_){\
26     XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
27     fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
28     simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
29     return MPI_ERR_RMA_RANGE;\
30   }
31
32 #define CHECK_WIN_LOCKED(win)                                                                                          \
33   if (opened_ == 0) { /*check that post/start has been done*/                                                          \
34     bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
35     if (not locked)                                                                                                    \
36       return MPI_ERR_WIN;                                                                                              \
37   }
38
39 namespace simgrid::smpi {
40 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
41 int Win::keyval_id_=0;
42
43 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
44     : base_(base)
45     , size_(size)
46     , disp_unit_(disp_unit)
47     , info_(info)
48     , comm_(comm)
49     , connected_wins_(comm->size())
50     , rank_(comm->rank())
51     , allocated_(allocated)
52     , dynamic_(dynamic)
53 {
54   XBT_DEBUG("Creating window");
55   if(info!=MPI_INFO_NULL)
56     info->ref();
57   connected_wins_[rank_] = this;
58   errhandler_->ref();
59   comm->add_rma_win(this);
60   comm->ref();
61
62   colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
63                    MPI_BYTE, comm);
64   if  (MC_is_active() || MC_record_replay_is_active()){
65     s4u::Barrier* bar_ptr;
66     if (rank_ == 0) {
67       bar_ = s4u::Barrier::create(comm->size());
68       bar_ptr = bar_.get();
69     }
70     colls::bcast(&bar_ptr, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
71     if (rank_ != 0)
72       bar_ = s4u::BarrierPtr(bar_ptr);
73   }
74   this->add_f();
75 }
76
77 int Win::del(Win* win){
78   //As per the standard, perform a barrier to ensure every async comm is finished
79   if  (MC_is_active() || MC_record_replay_is_active())
80     win->bar_->wait();
81   else
82     colls::barrier(win->comm_);
83   win->flush_local_all();
84
85   if (win->info_ != MPI_INFO_NULL)
86     simgrid::smpi::Info::unref(win->info_);
87   if (win->errhandler_ != MPI_ERRHANDLER_NULL)
88     simgrid::smpi::Errhandler::unref(win->errhandler_);
89
90   win->comm_->remove_rma_win(win);
91
92   colls::barrier(win->comm_);
93   Comm::unref(win->comm_);
94   if (not win->lockers_.empty() || win->opened_ < 0) {
95     XBT_WARN("Freeing a locked or opened window");
96     return MPI_ERR_WIN;
97   }
98   if (win->allocated_)
99     xbt_free(win->base_);
100
101   F2C::free_f(win->f2c_id());
102   win->cleanup_attr<Win>();
103
104   delete win;
105   return MPI_SUCCESS;
106 }
107
108 int Win::attach(void* /*base*/, MPI_Aint size)
109 {
110   if (not(base_ == MPI_BOTTOM || base_ == nullptr))
111     return MPI_ERR_ARG;
112   base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
113   size_+=size;
114   return MPI_SUCCESS;
115 }
116
117 int Win::detach(const void* /*base*/)
118 {
119   base_=MPI_BOTTOM;
120   size_=-1;
121   return MPI_SUCCESS;
122 }
123
124 void Win::get_name(char* name, int* length) const
125 {
126   *length = static_cast<int>(name_.length());
127   if (not name_.empty()) {
128     name_.copy(name, *length);
129     name[*length] = '\0';
130   }
131 }
132
133 void Win::get_group(MPI_Group* group){
134   if(comm_ != MPI_COMM_NULL){
135     *group = comm_->group();
136   } else {
137     *group = MPI_GROUP_NULL;
138   }
139 }
140
141 MPI_Info Win::info()
142 {
143   return info_;
144 }
145
146 int Win::rank() const
147 {
148   return rank_;
149 }
150
151 MPI_Comm Win::comm() const
152 {
153   return comm_;
154 }
155
156 MPI_Aint Win::size() const
157 {
158   return size_;
159 }
160
161 void* Win::base() const
162 {
163   return base_;
164 }
165
166 int Win::disp_unit() const
167 {
168   return disp_unit_;
169 }
170
171 bool Win::dynamic() const
172 {
173   return dynamic_;
174 }
175
176 void Win::set_info(MPI_Info info)
177 {
178   if (info_ != MPI_INFO_NULL)
179     simgrid::smpi::Info::unref(info_);
180   info_ = info;
181   if (info_ != MPI_INFO_NULL)
182     info_->ref();
183 }
184
185 void Win::set_name(const char* name){
186   name_ = name;
187 }
188
189 int Win::fence(int assert)
190 {
191   XBT_DEBUG("Entering fence");
192   opened_++;
193   if (not (assert & MPI_MODE_NOPRECEDE)) {
194     // This is not the first fence => finalize what came before
195     if (MC_is_active() || MC_record_replay_is_active())
196       bar_->wait();
197     else
198       colls::barrier(comm_);
199     flush_local_all();
200     count_=0;
201   }
202
203   if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
204     opened_=0;
205   assert_ = assert;
206   if (MC_is_active() || MC_record_replay_is_active())
207     bar_->wait();
208   else
209     colls::barrier(comm_);
210   XBT_DEBUG("Leaving fence");
211
212   return MPI_SUCCESS;
213 }
214
215 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
216               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
217 {
218   //get receiver pointer
219   Win* recv_win = connected_wins_[target_rank];
220
221   CHECK_WIN_LOCKED(recv_win)
222   CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
223
224   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
225
226   if (target_rank != rank_) { // This is not for myself, so we need to send messages
227     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
228     // prepare send_request
229     MPI_Request sreq =
230         Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
231                                MPI_OP_NULL);
232
233     //prepare receiver request
234     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
235                                               SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
236
237     //start send
238     sreq->start();
239
240     if(request!=nullptr){
241       *request=sreq;
242     }else{
243       const std::scoped_lock lock(*mut_);
244       requests_.push_back(sreq);
245     }
246
247     //push request to receiver's win
248     const std::scoped_lock recv_lock(*recv_win->mut_);
249     recv_win->requests_.push_back(rreq);
250     rreq->start();
251   } else {
252     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
253     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
254     if(request!=nullptr)
255       *request = MPI_REQUEST_NULL;
256   }
257
258   return MPI_SUCCESS;
259 }
260
261 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
262               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
263 {
264   //get sender pointer
265   Win* send_win = connected_wins_[target_rank];
266
267   CHECK_WIN_LOCKED(send_win)
268   CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
269
270   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
271   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
272
273   if (target_rank != rank_) {
274     //prepare send_request
275     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
276                                               SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
277
278     //prepare receiver request
279     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
280                                               SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
281
282     //start the send, with another process than us as sender.
283     sreq->start();
284     // push request to sender's win
285     if (const std::scoped_lock send_lock(*send_win->mut_); true) {
286       send_win->requests_.push_back(sreq);
287     }
288
289     //start recv
290     rreq->start();
291
292     if(request!=nullptr){
293       *request=rreq;
294     }else{
295       const std::scoped_lock lock(*mut_);
296       requests_.push_back(rreq);
297     }
298   } else {
299     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
300     if(request!=nullptr)
301       *request=MPI_REQUEST_NULL;
302   }
303   return MPI_SUCCESS;
304 }
305
306 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
307               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
308 {
309   XBT_DEBUG("Entering MPI_Win_Accumulate");
310   //get receiver pointer
311   Win* recv_win = connected_wins_[target_rank];
312
313   //FIXME: local version
314   CHECK_WIN_LOCKED(recv_win)
315   CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
316
317   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
318   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
319   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
320   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
321   // prepare send_request
322
323   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
324                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
325
326   // prepare receiver request
327   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
328                                             SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
329
330   count_++;
331
332   // start send
333   sreq->start();
334   // push request to receiver's win
335   if (const std::scoped_lock recv_lock(*recv_win->mut_); true) {
336     recv_win->requests_.push_back(rreq);
337     rreq->start();
338   }
339
340   if (request != nullptr) {
341     *request = sreq;
342   } else {
343     const std::scoped_lock lock(*mut_);
344     requests_.push_back(sreq);
345   }
346
347   // FIXME: The current implementation fails to ensure the correct ordering of the accumulate requests.  The following
348   // 'flush' is a workaround to fix that.
349   flush(target_rank);
350   XBT_DEBUG("Leaving MPI_Win_Accumulate");
351   return MPI_SUCCESS;
352 }
353
354 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
355                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
356                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
357 {
358   //get sender pointer
359   const Win* send_win = connected_wins_[target_rank];
360
361   CHECK_WIN_LOCKED(send_win)
362   CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
363
364   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
365   //need to be sure ops are correctly ordered, so finish request here ? slow.
366   MPI_Request req = MPI_REQUEST_NULL;
367   const std::scoped_lock lock(*send_win->atomic_mut_);
368   get(result_addr, result_count, result_datatype, target_rank,
369               target_disp, target_count, target_datatype, &req);
370   if (req != MPI_REQUEST_NULL)
371     Request::wait(&req, MPI_STATUS_IGNORE);
372   if(op!=MPI_NO_OP)
373     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
374               target_disp, target_count, target_datatype, op, &req);
375   if (req != MPI_REQUEST_NULL)
376     Request::wait(&req, MPI_STATUS_IGNORE);
377   return MPI_SUCCESS;
378 }
379
380 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
381                           int target_rank, MPI_Aint target_disp)
382 {
383   //get sender pointer
384   const Win* send_win = connected_wins_[target_rank];
385
386   CHECK_WIN_LOCKED(send_win)
387
388   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
389   MPI_Request req = MPI_REQUEST_NULL;
390   const std::scoped_lock lock(*send_win->atomic_mut_);
391   get(result_addr, 1, datatype, target_rank,
392               target_disp, 1, datatype, &req);
393   if (req != MPI_REQUEST_NULL)
394     Request::wait(&req, MPI_STATUS_IGNORE);
395   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
396     put(origin_addr, 1, datatype, target_rank,
397               target_disp, 1, datatype);
398   }
399   return MPI_SUCCESS;
400 }
401
402 int Win::start(MPI_Group group, int /*assert*/)
403 {
404   /* From MPI forum advices
405   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
406   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
407   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
408   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
409   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
410   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
411   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
412   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
413   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
414   must complete, without further dependencies.  */
415
416   //naive, blocking implementation.
417   XBT_DEBUG("Entering MPI_Win_Start");
418   std::vector<MPI_Request> reqs;
419   for (int i = 0; i < group->size(); i++) {
420     int src = comm_->group()->rank(group->actor(i));
421     xbt_assert(src != MPI_UNDEFINED);
422     if (src != rank_)
423       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
424   }
425   int size = static_cast<int>(reqs.size());
426
427   Request::startall(size, reqs.data());
428   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
429   for (auto& req : reqs)
430     Request::unref(&req);
431
432   group->ref();
433   dst_group_ = group;
434   opened_--; // we're open for business !
435   XBT_DEBUG("Leaving MPI_Win_Start");
436   return MPI_SUCCESS;
437 }
438
439 int Win::post(MPI_Group group, int /*assert*/)
440 {
441   //let's make a synchronous send here
442   XBT_DEBUG("Entering MPI_Win_Post");
443   std::vector<MPI_Request> reqs;
444   for (int i = 0; i < group->size(); i++) {
445     int dst = comm_->group()->rank(group->actor(i));
446     xbt_assert(dst != MPI_UNDEFINED);
447     if (dst != rank_)
448       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
449   }
450   int size = static_cast<int>(reqs.size());
451
452   Request::startall(size, reqs.data());
453   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
454   for (auto& req : reqs)
455     Request::unref(&req);
456
457   group->ref();
458   src_group_ = group;
459   opened_--; // we're open for business !
460   XBT_DEBUG("Leaving MPI_Win_Post");
461   return MPI_SUCCESS;
462 }
463
464 int Win::complete(){
465   xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
466
467   XBT_DEBUG("Entering MPI_Win_Complete");
468   std::vector<MPI_Request> reqs;
469   for (int i = 0; i < dst_group_->size(); i++) {
470     int dst = comm_->group()->rank(dst_group_->actor(i));
471     xbt_assert(dst != MPI_UNDEFINED);
472     if (dst != rank_)
473       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
474   }
475   int size = static_cast<int>(reqs.size());
476
477   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
478   Request::startall(size, reqs.data());
479   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
480   for (auto& req : reqs)
481     Request::unref(&req);
482
483   flush_local_all();
484
485   opened_++; //we're closed for business !
486   Group::unref(dst_group_);
487   dst_group_ = MPI_GROUP_NULL;
488   return MPI_SUCCESS;
489 }
490
491 int Win::wait(){
492   //naive, blocking implementation.
493   XBT_DEBUG("Entering MPI_Win_Wait");
494   std::vector<MPI_Request> reqs;
495   for (int i = 0; i < src_group_->size(); i++) {
496     int src = comm_->group()->rank(src_group_->actor(i));
497     xbt_assert(src != MPI_UNDEFINED);
498     if (src != rank_)
499       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
500   }
501   int size = static_cast<int>(reqs.size());
502
503   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
504   Request::startall(size, reqs.data());
505   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
506   for (auto& req : reqs)
507     Request::unref(&req);
508
509   flush_local_all();
510
511   opened_++; //we're closed for business !
512   Group::unref(src_group_);
513   src_group_ = MPI_GROUP_NULL;
514   return MPI_SUCCESS;
515 }
516
517 int Win::lock(int lock_type, int rank, int /*assert*/)
518 {
519   MPI_Win target_win = connected_wins_[rank];
520
521   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
522     target_win->lock_mut_->lock();
523     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
524     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
525       target_win->lock_mut_->unlock();
526    }
527   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
528     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
529
530   target_win->lockers_.push_back(rank_);
531
532   flush(rank);
533   return MPI_SUCCESS;
534 }
535
536 int Win::lock_all(int assert){
537   int retval = MPI_SUCCESS;
538   for (int i = 0; i < comm_->size(); i++) {
539     int ret = this->lock(MPI_LOCK_SHARED, i, assert);
540     if (ret != MPI_SUCCESS)
541       retval = ret;
542   }
543   return retval;
544 }
545
546 int Win::unlock(int rank){
547   MPI_Win target_win = connected_wins_[rank];
548   int target_mode = target_win->mode_;
549   target_win->mode_= 0;
550   target_win->lockers_.remove(rank_);
551   if (target_mode==MPI_LOCK_EXCLUSIVE){
552     target_win->lock_mut_->unlock();
553   }
554
555   flush(rank);
556   return MPI_SUCCESS;
557 }
558
559 int Win::unlock_all(){
560   int retval = MPI_SUCCESS;
561   for (int i = 0; i < comm_->size(); i++) {
562     int ret = this->unlock(i);
563     if (ret != MPI_SUCCESS)
564       retval = ret;
565   }
566   return retval;
567 }
568
569 int Win::flush(int rank){
570   int finished = finish_comms(rank);
571   XBT_DEBUG("Win_flush on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
572   if (rank != rank_) {
573     finished = connected_wins_[rank]->finish_comms(rank_);
574     XBT_DEBUG("Win_flush on remote %d for local %d - Finished %d RMA calls", rank, rank_, finished);
575   }
576   return MPI_SUCCESS;
577 }
578
579 int Win::flush_local(int rank){
580   int finished = finish_comms(rank);
581   XBT_DEBUG("Win_flush_local on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
582   return MPI_SUCCESS;
583 }
584
585 int Win::flush_all(){
586   int finished = finish_comms();
587   XBT_DEBUG("Win_flush_all on local %d - Finished %d RMA calls", rank_, finished);
588   for (int i = 0; i < comm_->size(); i++) {
589     if (i != rank_) {
590       finished = connected_wins_[i]->finish_comms(rank_);
591       XBT_DEBUG("Win_flush_all on remote %d for local %d - Finished %d RMA calls", i, rank_, finished);
592     }
593   }
594   return MPI_SUCCESS;
595 }
596
597 int Win::flush_local_all(){
598   int finished = finish_comms();
599   XBT_DEBUG("Win_flush_local_all on local %d - Finished %d RMA calls", rank_, finished);
600   return MPI_SUCCESS;
601 }
602
603 Win* Win::f2c(int id){
604   return static_cast<Win*>(F2C::f2c(id));
605 }
606
607 int Win::finish_comms(){
608   // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
609   // Without this, the vector could get redimensioned when another process pushes.
610   // This would result in the array used by Request::waitall() to be invalidated.
611   // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
612   const std::scoped_lock lock(*mut_);
613   //Finish own requests
614   int size = static_cast<int>(requests_.size());
615   if (size > 0) {
616     MPI_Request* treqs = requests_.data();
617     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
618     requests_.clear();
619   }
620   return size;
621 }
622
623 int Win::finish_comms(int rank){
624   // See comment about the mutex in finish_comms() above
625   const std::scoped_lock lock(*mut_);
626   // Finish own requests
627   // Let's see if we're either the destination or the sender of this request
628   // because we only wait for requests that we are responsible for.
629   // Also use the process id here since the request itself returns from src()
630   // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
631   aid_t proc_id = comm_->group()->actor(rank);
632   auto it     = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
633     return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
634   });
635   std::vector<MPI_Request> myreqqs(it, end(requests_));
636   requests_.erase(it, end(requests_));
637   int size = static_cast<int>(myreqqs.size());
638   if (size > 0) {
639     MPI_Request* treqs = myreqqs.data();
640     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
641     myreqqs.clear();
642   }
643   return size;
644 }
645
646 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
647 {
648   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
649   for (int i = 0; not target_win && i < comm_->size(); i++) {
650     if (connected_wins_[i]->size_ > 0)
651       target_win = connected_wins_[i];
652   }
653   if (target_win) {
654     *size                         = target_win->size_;
655     *disp_unit                    = target_win->disp_unit_;
656     *static_cast<void**>(baseptr) = target_win->base_;
657   } else {
658     *size                         = 0;
659     *static_cast<void**>(baseptr) = nullptr;
660   }
661   return MPI_SUCCESS;
662 }
663
664 MPI_Errhandler Win::errhandler()
665 {
666   if (errhandler_ != MPI_ERRHANDLER_NULL)
667     errhandler_->ref();
668   return errhandler_;
669 }
670
671 void Win::set_errhandler(MPI_Errhandler errhandler)
672 {
673   if (errhandler_ != MPI_ERRHANDLER_NULL)
674     simgrid::smpi::Errhandler::unref(errhandler_);
675   errhandler_ = errhandler;
676   if (errhandler_ != MPI_ERRHANDLER_NULL)
677     errhandler_->ref();
678 }
679 } // namespace simgrid::smpi