Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Review usage of rank/rank_/rank() is smpi_win.
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7
8 #include "private.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
16
17 #include <algorithm>
18
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
20
21 #define CHECK_RMA_REMOTE_WIN(fun, win)\
22   if(target_count*target_datatype->get_extent()>win->size_){\
23     XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
24     fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
25     simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
26     return MPI_ERR_RMA_RANGE;\
27   }
28
29 #define CHECK_WIN_LOCKED(win)                                                                                          \
30   if (opened_ == 0) { /*check that post/start has been done*/                                                          \
31     bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
32     if (not locked)                                                                                                    \
33       return MPI_ERR_WIN;                                                                                              \
34   }
35
36 namespace simgrid{
37 namespace smpi{
38 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
39 int Win::keyval_id_=0;
40
41 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
42     : base_(base)
43     , size_(size)
44     , disp_unit_(disp_unit)
45     , info_(info)
46     , comm_(comm)
47     , connected_wins_(comm->size())
48     , rank_(comm->rank())
49     , allocated_(allocated)
50     , dynamic_(dynamic)
51 {
52   XBT_DEBUG("Creating window");
53   if(info!=MPI_INFO_NULL)
54     info->ref();
55   connected_wins_[rank_] = this;
56   if(rank_==0){
57     bar_ = new s4u::Barrier(comm->size());
58   }
59   errhandler_->ref();
60   comm->add_rma_win(this);
61   comm->ref();
62
63   colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
64                    MPI_BYTE, comm);
65
66   colls::bcast(&bar_, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
67
68   colls::barrier(comm);
69   this->add_f();
70 }
71
72 Win::~Win(){
73   //As per the standard, perform a barrier to ensure every async comm is finished
74   bar_->wait();
75
76   int finished = finish_comms();
77   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
78
79   if (info_ != MPI_INFO_NULL)
80     simgrid::smpi::Info::unref(info_);
81   if (errhandler_ != MPI_ERRHANDLER_NULL)
82     simgrid::smpi::Errhandler::unref(errhandler_);
83
84   comm_->remove_rma_win(this);
85
86   colls::barrier(comm_);
87   Comm::unref(comm_);
88   
89   if (rank_ == 0)
90     delete bar_;
91
92   if (allocated_)
93     xbt_free(base_);
94
95   F2C::free_f(this->f2c_id());
96   cleanup_attr<Win>();
97 }
98
99 int Win::attach(void* /*base*/, MPI_Aint size)
100 {
101   if (not(base_ == MPI_BOTTOM || base_ == nullptr))
102     return MPI_ERR_ARG;
103   base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
104   size_+=size;
105   return MPI_SUCCESS;
106 }
107
108 int Win::detach(const void* /*base*/)
109 {
110   base_=MPI_BOTTOM;
111   size_=-1;
112   return MPI_SUCCESS;
113 }
114
115 void Win::get_name(char* name, int* length) const
116 {
117   *length = static_cast<int>(name_.length());
118   if (not name_.empty()) {
119     name_.copy(name, *length);
120     name[*length] = '\0';
121   }
122 }
123
124 void Win::get_group(MPI_Group* group){
125   if(comm_ != MPI_COMM_NULL){
126     *group = comm_->group();
127   } else {
128     *group = MPI_GROUP_NULL;
129   }
130 }
131
132 MPI_Info Win::info()
133 {
134   if (info_ == MPI_INFO_NULL)
135     info_ = new Info();
136   info_->ref();
137   return info_;
138 }
139
140 int Win::rank() const
141 {
142   return rank_;
143 }
144
145 MPI_Comm Win::comm() const
146 {
147   return comm_;
148 }
149
150 MPI_Aint Win::size() const
151 {
152   return size_;
153 }
154
155 void* Win::base() const
156 {
157   return base_;
158 }
159
160 int Win::disp_unit() const
161 {
162   return disp_unit_;
163 }
164
165 bool Win::dynamic() const
166 {
167   return dynamic_;
168 }
169
170 void Win::set_info(MPI_Info info)
171 {
172   if (info_ != MPI_INFO_NULL)
173     simgrid::smpi::Info::unref(info_);
174   info_ = info;
175   if (info_ != MPI_INFO_NULL)
176     info_->ref();
177 }
178
179 void Win::set_name(const char* name){
180   name_ = name;
181 }
182
183 int Win::fence(int assert)
184 {
185   XBT_DEBUG("Entering fence");
186   if (opened_ == 0)
187     opened_=1;
188   if (not (assert & MPI_MODE_NOPRECEDE)) {
189     // This is not the first fence => finalize what came before
190     bar_->wait();
191     mut_->lock();
192     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
193     // Without this, the vector could get redimensioned when another process pushes.
194     // This would result in the array used by Request::waitall() to be invalidated.
195     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
196
197     // start all requests that have been prepared by another process
198     if (not requests_.empty()) {
199       int size           = static_cast<int>(requests_.size());
200       MPI_Request* treqs = requests_.data();
201       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
202     }
203     count_=0;
204     mut_->unlock();
205   }
206
207   if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
208     opened_=0;
209   assert_ = assert;
210
211   bar_->wait();
212   XBT_DEBUG("Leaving fence");
213
214   return MPI_SUCCESS;
215 }
216
217 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
218               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
219 {
220   //get receiver pointer
221   Win* recv_win = connected_wins_[target_rank];
222
223   CHECK_WIN_LOCKED(recv_win)
224   CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
225
226   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
227
228   if (target_rank != rank_) { // This is not for myself, so we need to send messages
229     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
230     // prepare send_request
231     MPI_Request sreq =
232         Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
233                                MPI_OP_NULL);
234
235     //prepare receiver request
236     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
237                                               SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
238
239     //start send
240     sreq->start();
241
242     if(request!=nullptr){
243       *request=sreq;
244     }else{
245       mut_->lock();
246       requests_.push_back(sreq);
247       mut_->unlock();
248     }
249
250     //push request to receiver's win
251     recv_win->mut_->lock();
252     recv_win->requests_.push_back(rreq);
253     rreq->start();
254     recv_win->mut_->unlock();
255   } else {
256     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
257     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
258     if(request!=nullptr)
259       *request = MPI_REQUEST_NULL;
260   }
261
262   return MPI_SUCCESS;
263 }
264
265 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
266               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
267 {
268   //get sender pointer
269   Win* send_win = connected_wins_[target_rank];
270
271   CHECK_WIN_LOCKED(send_win)
272   CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
273
274   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
275   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
276
277   if (target_rank != rank_) {
278     //prepare send_request
279     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
280                                               SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
281
282     //prepare receiver request
283     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
284                                               SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
285
286     //start the send, with another process than us as sender.
287     sreq->start();
288     //push request to receiver's win
289     send_win->mut_->lock();
290     send_win->requests_.push_back(sreq);
291     send_win->mut_->unlock();
292
293     //start recv
294     rreq->start();
295
296     if(request!=nullptr){
297       *request=rreq;
298     }else{
299       mut_->lock();
300       requests_.push_back(rreq);
301       mut_->unlock();
302     }
303   } else {
304     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
305     if(request!=nullptr)
306       *request=MPI_REQUEST_NULL;
307   }
308   return MPI_SUCCESS;
309 }
310
311 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
312               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
313 {
314   XBT_DEBUG("Entering MPI_Win_Accumulate");
315   //get receiver pointer
316   Win* recv_win = connected_wins_[target_rank];
317
318   //FIXME: local version
319   CHECK_WIN_LOCKED(recv_win)
320   CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
321
322   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
323   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
324   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
325   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
326   // prepare send_request
327
328   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
329                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
330
331   // prepare receiver request
332   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
333                                             SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
334
335   count_++;
336
337   // start send
338   sreq->start();
339   // push request to receiver's win
340   recv_win->mut_->lock();
341   recv_win->requests_.push_back(rreq);
342   rreq->start();
343   recv_win->mut_->unlock();
344
345   if (request != nullptr) {
346     *request = sreq;
347   } else {
348     mut_->lock();
349     requests_.push_back(sreq);
350     mut_->unlock();
351   }
352
353   XBT_DEBUG("Leaving MPI_Win_Accumulate");
354   return MPI_SUCCESS;
355 }
356
357 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
358                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
359                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
360 {
361   //get sender pointer
362   const Win* send_win = connected_wins_[target_rank];
363
364   CHECK_WIN_LOCKED(send_win)
365   CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
366
367   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
368   //need to be sure ops are correctly ordered, so finish request here ? slow.
369   MPI_Request req;
370   send_win->atomic_mut_->lock();
371   get(result_addr, result_count, result_datatype, target_rank,
372               target_disp, target_count, target_datatype, &req);
373   if (req != MPI_REQUEST_NULL)
374     Request::wait(&req, MPI_STATUS_IGNORE);
375   if(op!=MPI_NO_OP)
376     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
377               target_disp, target_count, target_datatype, op, &req);
378   if (req != MPI_REQUEST_NULL)
379     Request::wait(&req, MPI_STATUS_IGNORE);
380   send_win->atomic_mut_->unlock();
381   return MPI_SUCCESS;
382 }
383
384 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
385                           int target_rank, MPI_Aint target_disp)
386 {
387   //get sender pointer
388   const Win* send_win = connected_wins_[target_rank];
389
390   CHECK_WIN_LOCKED(send_win)
391
392   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
393   MPI_Request req = MPI_REQUEST_NULL;
394   send_win->atomic_mut_->lock();
395   get(result_addr, 1, datatype, target_rank,
396               target_disp, 1, datatype, &req);
397   if (req != MPI_REQUEST_NULL)
398     Request::wait(&req, MPI_STATUS_IGNORE);
399   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
400     put(origin_addr, 1, datatype, target_rank,
401               target_disp, 1, datatype);
402   }
403   send_win->atomic_mut_->unlock();
404   return MPI_SUCCESS;
405 }
406
407 int Win::start(MPI_Group group, int /*assert*/)
408 {
409   /* From MPI forum advices
410   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
411   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
412   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
413   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
414   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
415   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
416   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
417   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
418   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
419   must complete, without further dependencies.  */
420
421   //naive, blocking implementation.
422   XBT_DEBUG("Entering MPI_Win_Start");
423   std::vector<MPI_Request> reqs;
424   for (int i = 0; i < group->size(); i++) {
425     int src = comm_->group()->rank(group->actor(i));
426     xbt_assert(src != MPI_UNDEFINED);
427     if (src != rank_)
428       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
429   }
430   int size = static_cast<int>(reqs.size());
431
432   Request::startall(size, reqs.data());
433   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
434   for (auto& req : reqs)
435     Request::unref(&req);
436
437   group->ref();
438   dst_group_ = group;
439   opened_++; // we're open for business !
440   XBT_DEBUG("Leaving MPI_Win_Start");
441   return MPI_SUCCESS;
442 }
443
444 int Win::post(MPI_Group group, int /*assert*/)
445 {
446   //let's make a synchronous send here
447   XBT_DEBUG("Entering MPI_Win_Post");
448   std::vector<MPI_Request> reqs;
449   for (int i = 0; i < group->size(); i++) {
450     int dst = comm_->group()->rank(group->actor(i));
451     xbt_assert(dst != MPI_UNDEFINED);
452     if (dst != rank_)
453       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
454   }
455   int size = static_cast<int>(reqs.size());
456
457   Request::startall(size, reqs.data());
458   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
459   for (auto& req : reqs)
460     Request::unref(&req);
461
462   group->ref();
463   src_group_ = group;
464   opened_++; // we're open for business !
465   XBT_DEBUG("Leaving MPI_Win_Post");
466   return MPI_SUCCESS;
467 }
468
469 int Win::complete(){
470   xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
471
472   XBT_DEBUG("Entering MPI_Win_Complete");
473   std::vector<MPI_Request> reqs;
474   for (int i = 0; i < dst_group_->size(); i++) {
475     int dst = comm_->group()->rank(dst_group_->actor(i));
476     xbt_assert(dst != MPI_UNDEFINED);
477     if (dst != rank_)
478       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
479   }
480   int size = static_cast<int>(reqs.size());
481
482   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
483   Request::startall(size, reqs.data());
484   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
485   for (auto& req : reqs)
486     Request::unref(&req);
487
488   int finished = finish_comms();
489   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
490
491   opened_--; //we're closed for business !
492   Group::unref(dst_group_);
493   dst_group_ = MPI_GROUP_NULL;
494   return MPI_SUCCESS;
495 }
496
497 int Win::wait(){
498   //naive, blocking implementation.
499   XBT_DEBUG("Entering MPI_Win_Wait");
500   std::vector<MPI_Request> reqs;
501   for (int i = 0; i < src_group_->size(); i++) {
502     int src = comm_->group()->rank(src_group_->actor(i));
503     xbt_assert(src != MPI_UNDEFINED);
504     if (src != rank_)
505       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
506   }
507   int size = static_cast<int>(reqs.size());
508
509   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
510   Request::startall(size, reqs.data());
511   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
512   for (auto& req : reqs)
513     Request::unref(&req);
514
515   int finished = finish_comms();
516   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
517
518   opened_--; //we're closed for business !
519   Group::unref(src_group_);
520   src_group_ = MPI_GROUP_NULL;
521   return MPI_SUCCESS;
522 }
523
524 int Win::lock(int lock_type, int rank, int /*assert*/)
525 {
526   MPI_Win target_win = connected_wins_[rank];
527
528   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
529     target_win->lock_mut_->lock();
530     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
531     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
532       target_win->lock_mut_->unlock();
533    }
534   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
535     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
536
537   target_win->lockers_.push_back(rank_);
538
539   int finished = finish_comms(rank);
540   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
541   finished = target_win->finish_comms(rank_);
542   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
543   return MPI_SUCCESS;
544 }
545
546 int Win::lock_all(int assert){
547   int retval = MPI_SUCCESS;
548   for (int i = 0; i < comm_->size(); i++) {
549     int ret = this->lock(MPI_LOCK_SHARED, i, assert);
550     if (ret != MPI_SUCCESS)
551       retval = ret;
552   }
553   return retval;
554 }
555
556 int Win::unlock(int rank){
557   MPI_Win target_win = connected_wins_[rank];
558   int target_mode = target_win->mode_;
559   target_win->mode_= 0;
560   target_win->lockers_.remove(rank_);
561   if (target_mode==MPI_LOCK_EXCLUSIVE){
562     target_win->lock_mut_->unlock();
563   }
564
565   int finished = finish_comms(rank);
566   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
567   finished = target_win->finish_comms(rank_);
568   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
569   return MPI_SUCCESS;
570 }
571
572 int Win::unlock_all(){
573   int retval = MPI_SUCCESS;
574   for (int i = 0; i < comm_->size(); i++) {
575     int ret = this->unlock(i);
576     if (ret != MPI_SUCCESS)
577       retval = ret;
578   }
579   return retval;
580 }
581
582 int Win::flush(int rank){
583   MPI_Win target_win = connected_wins_[rank];
584   int finished       = finish_comms(rank_);
585   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
586   finished = target_win->finish_comms(rank);
587   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
588   return MPI_SUCCESS;
589 }
590
591 int Win::flush_local(int rank){
592   int finished = finish_comms(rank);
593   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
594   return MPI_SUCCESS;
595 }
596
597 int Win::flush_all(){
598   int finished = finish_comms();
599   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
600   for (int i = 0; i < comm_->size(); i++) {
601     finished = connected_wins_[i]->finish_comms(rank_);
602     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
603   }
604   return MPI_SUCCESS;
605 }
606
607 int Win::flush_local_all(){
608   int finished = finish_comms();
609   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
610   return MPI_SUCCESS;
611 }
612
613 Win* Win::f2c(int id){
614   return static_cast<Win*>(F2C::f2c(id));
615 }
616
617 int Win::finish_comms(){
618   mut_->lock();
619   //Finish own requests
620   int size = static_cast<int>(requests_.size());
621   if (size > 0) {
622     MPI_Request* treqs = requests_.data();
623     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
624     requests_.clear();
625   }
626   mut_->unlock();
627   return size;
628 }
629
630 int Win::finish_comms(int rank){
631   mut_->lock();
632   // Finish own requests
633   // Let's see if we're either the destination or the sender of this request
634   // because we only wait for requests that we are responsible for.
635   // Also use the process id here since the request itself returns from src()
636   // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
637   aid_t proc_id = comm_->group()->actor(rank);
638   auto it     = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
639     return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
640   });
641   std::vector<MPI_Request> myreqqs(it, end(requests_));
642   requests_.erase(it, end(requests_));
643   int size = static_cast<int>(myreqqs.size());
644   if (size > 0) {
645     MPI_Request* treqs = myreqqs.data();
646     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
647     myreqqs.clear();
648   }
649   mut_->unlock();
650   return size;
651 }
652
653 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
654 {
655   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
656   for (int i = 0; not target_win && i < comm_->size(); i++) {
657     if (connected_wins_[i]->size_ > 0)
658       target_win = connected_wins_[i];
659   }
660   if (target_win) {
661     *size                         = target_win->size_;
662     *disp_unit                    = target_win->disp_unit_;
663     *static_cast<void**>(baseptr) = target_win->base_;
664   } else {
665     *size                         = 0;
666     *static_cast<void**>(baseptr) = nullptr;
667   }
668   return MPI_SUCCESS;
669 }
670
671 MPI_Errhandler Win::errhandler()
672 {
673   if (errhandler_ != MPI_ERRHANDLER_NULL)
674     errhandler_->ref();
675   return errhandler_;
676 }
677
678 void Win::set_errhandler(MPI_Errhandler errhandler)
679 {
680   if (errhandler_ != MPI_ERRHANDLER_NULL)
681     simgrid::smpi::Errhandler::unref(errhandler_);
682   errhandler_ = errhandler;
683   if (errhandler_ != MPI_ERRHANDLER_NULL)
684     errhandler_->ref();
685 }
686 } // namespace smpi
687 } // namespace simgrid