Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'issue105' into 'master'
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2022. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <simgrid/modelchecker.h>
7 #include "smpi_win.hpp"
8
9 #include "private.hpp"
10 #include "smpi_coll.hpp"
11 #include "smpi_comm.hpp"
12 #include "smpi_datatype.hpp"
13 #include "smpi_info.hpp"
14 #include "smpi_keyvals.hpp"
15 #include "smpi_request.hpp"
16 #include "src/smpi/include/smpi_actor.hpp"
17 #include "src/mc/mc_replay.hpp"
18
19 #include <algorithm>
20
21 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
22
23 #define CHECK_RMA_REMOTE_WIN(fun, win)\
24   if(target_count*target_datatype->get_extent()>win->size_){\
25     XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
26     fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
27     simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
28     return MPI_ERR_RMA_RANGE;\
29   }
30
31 #define CHECK_WIN_LOCKED(win)                                                                                          \
32   if (opened_ == 0) { /*check that post/start has been done*/                                                          \
33     bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
34     if (not locked)                                                                                                    \
35       return MPI_ERR_WIN;                                                                                              \
36   }
37
38 namespace simgrid{
39 namespace smpi{
40 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
41 int Win::keyval_id_=0;
42
43 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
44     : base_(base)
45     , size_(size)
46     , disp_unit_(disp_unit)
47     , info_(info)
48     , comm_(comm)
49     , connected_wins_(comm->size())
50     , rank_(comm->rank())
51     , allocated_(allocated)
52     , dynamic_(dynamic)
53 {
54   XBT_DEBUG("Creating window");
55   if(info!=MPI_INFO_NULL)
56     info->ref();
57   connected_wins_[rank_] = this;
58   errhandler_->ref();
59   comm->add_rma_win(this);
60   comm->ref();
61
62   colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
63                    MPI_BYTE, comm);
64   if  (MC_is_active() || MC_record_replay_is_active()){
65     s4u::Barrier* bar_ptr;
66     if (rank_ == 0) {
67       bar_ = s4u::Barrier::create(comm->size());
68       bar_ptr = bar_.get();
69     }
70     colls::bcast(&bar_ptr, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
71     if (rank_ != 0)
72       bar_ = s4u::BarrierPtr(bar_ptr);
73   }
74   this->add_f();
75 }
76
77 int Win::del(Win* win){
78   //As per the standard, perform a barrier to ensure every async comm is finished
79   if  (MC_is_active() || MC_record_replay_is_active())
80     win->bar_->wait();
81   else
82     colls::barrier(win->comm_);
83   win->flush_local_all();
84
85   if (win->info_ != MPI_INFO_NULL)
86     simgrid::smpi::Info::unref(win->info_);
87   if (win->errhandler_ != MPI_ERRHANDLER_NULL)
88     simgrid::smpi::Errhandler::unref(win->errhandler_);
89
90   win->comm_->remove_rma_win(win);
91
92   colls::barrier(win->comm_);
93   Comm::unref(win->comm_);
94   if (!win->lockers_.empty() || win->opened_ < 0){
95     XBT_WARN("Freeing a locked or opened window");
96     return MPI_ERR_WIN;
97   }
98   if (win->allocated_)
99     xbt_free(win->base_);
100
101   F2C::free_f(win->f2c_id());
102   win->cleanup_attr<Win>();
103
104   delete win;
105   return MPI_SUCCESS;
106 }
107
108 int Win::attach(void* /*base*/, MPI_Aint size)
109 {
110   if (not(base_ == MPI_BOTTOM || base_ == nullptr))
111     return MPI_ERR_ARG;
112   base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
113   size_+=size;
114   return MPI_SUCCESS;
115 }
116
117 int Win::detach(const void* /*base*/)
118 {
119   base_=MPI_BOTTOM;
120   size_=-1;
121   return MPI_SUCCESS;
122 }
123
124 void Win::get_name(char* name, int* length) const
125 {
126   *length = static_cast<int>(name_.length());
127   if (not name_.empty()) {
128     name_.copy(name, *length);
129     name[*length] = '\0';
130   }
131 }
132
133 void Win::get_group(MPI_Group* group){
134   if(comm_ != MPI_COMM_NULL){
135     *group = comm_->group();
136   } else {
137     *group = MPI_GROUP_NULL;
138   }
139 }
140
141 MPI_Info Win::info()
142 {
143   return info_;
144 }
145
146 int Win::rank() const
147 {
148   return rank_;
149 }
150
151 MPI_Comm Win::comm() const
152 {
153   return comm_;
154 }
155
156 MPI_Aint Win::size() const
157 {
158   return size_;
159 }
160
161 void* Win::base() const
162 {
163   return base_;
164 }
165
166 int Win::disp_unit() const
167 {
168   return disp_unit_;
169 }
170
171 bool Win::dynamic() const
172 {
173   return dynamic_;
174 }
175
176 void Win::set_info(MPI_Info info)
177 {
178   if (info_ != MPI_INFO_NULL)
179     simgrid::smpi::Info::unref(info_);
180   info_ = info;
181   if (info_ != MPI_INFO_NULL)
182     info_->ref();
183 }
184
185 void Win::set_name(const char* name){
186   name_ = name;
187 }
188
189 int Win::fence(int assert)
190 {
191   XBT_DEBUG("Entering fence");
192   opened_++;
193   if (not (assert & MPI_MODE_NOPRECEDE)) {
194     // This is not the first fence => finalize what came before
195     if (MC_is_active() || MC_record_replay_is_active())
196       bar_->wait();
197     else
198       colls::barrier(comm_);
199     flush_local_all();
200     count_=0;
201   }
202
203   if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
204     opened_=0;
205   assert_ = assert;
206   if (MC_is_active() || MC_record_replay_is_active())
207     bar_->wait();
208   else
209     colls::barrier(comm_);
210   XBT_DEBUG("Leaving fence");
211
212   return MPI_SUCCESS;
213 }
214
215 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
216               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
217 {
218   //get receiver pointer
219   Win* recv_win = connected_wins_[target_rank];
220
221   CHECK_WIN_LOCKED(recv_win)
222   CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
223
224   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
225
226   if (target_rank != rank_) { // This is not for myself, so we need to send messages
227     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
228     // prepare send_request
229     MPI_Request sreq =
230         Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
231                                MPI_OP_NULL);
232
233     //prepare receiver request
234     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
235                                               SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
236
237     //start send
238     sreq->start();
239
240     if(request!=nullptr){
241       *request=sreq;
242     }else{
243       mut_->lock();
244       requests_.push_back(sreq);
245       mut_->unlock();
246     }
247
248     //push request to receiver's win
249     recv_win->mut_->lock();
250     recv_win->requests_.push_back(rreq);
251     rreq->start();
252     recv_win->mut_->unlock();
253   } else {
254     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
255     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
256     if(request!=nullptr)
257       *request = MPI_REQUEST_NULL;
258   }
259
260   return MPI_SUCCESS;
261 }
262
263 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
264               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
265 {
266   //get sender pointer
267   Win* send_win = connected_wins_[target_rank];
268
269   CHECK_WIN_LOCKED(send_win)
270   CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
271
272   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
273   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
274
275   if (target_rank != rank_) {
276     //prepare send_request
277     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
278                                               SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
279
280     //prepare receiver request
281     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
282                                               SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
283
284     //start the send, with another process than us as sender.
285     sreq->start();
286     // push request to sender's win
287     send_win->mut_->lock();
288     send_win->requests_.push_back(sreq);
289     send_win->mut_->unlock();
290
291     //start recv
292     rreq->start();
293
294     if(request!=nullptr){
295       *request=rreq;
296     }else{
297       mut_->lock();
298       requests_.push_back(rreq);
299       mut_->unlock();
300     }
301   } else {
302     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
303     if(request!=nullptr)
304       *request=MPI_REQUEST_NULL;
305   }
306   return MPI_SUCCESS;
307 }
308
309 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
310               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
311 {
312   XBT_DEBUG("Entering MPI_Win_Accumulate");
313   //get receiver pointer
314   Win* recv_win = connected_wins_[target_rank];
315
316   //FIXME: local version
317   CHECK_WIN_LOCKED(recv_win)
318   CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
319
320   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
321   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
322   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
323   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
324   // prepare send_request
325
326   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
327                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
328
329   // prepare receiver request
330   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
331                                             SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
332
333   count_++;
334
335   // start send
336   sreq->start();
337   // push request to receiver's win
338   recv_win->mut_->lock();
339   recv_win->requests_.push_back(rreq);
340   rreq->start();
341   recv_win->mut_->unlock();
342
343   if (request != nullptr) {
344     *request = sreq;
345   } else {
346     mut_->lock();
347     requests_.push_back(sreq);
348     mut_->unlock();
349   }
350
351   // FIXME: The current implementation fails to ensure the correct ordering of the accumulate requests.  The following
352   // 'flush' is a workaround to fix that.
353   flush(target_rank);
354   XBT_DEBUG("Leaving MPI_Win_Accumulate");
355   return MPI_SUCCESS;
356 }
357
358 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
359                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
360                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
361 {
362   //get sender pointer
363   const Win* send_win = connected_wins_[target_rank];
364
365   CHECK_WIN_LOCKED(send_win)
366   CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
367
368   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
369   //need to be sure ops are correctly ordered, so finish request here ? slow.
370   MPI_Request req = MPI_REQUEST_NULL;
371   send_win->atomic_mut_->lock();
372   get(result_addr, result_count, result_datatype, target_rank,
373               target_disp, target_count, target_datatype, &req);
374   if (req != MPI_REQUEST_NULL)
375     Request::wait(&req, MPI_STATUS_IGNORE);
376   if(op!=MPI_NO_OP)
377     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
378               target_disp, target_count, target_datatype, op, &req);
379   if (req != MPI_REQUEST_NULL)
380     Request::wait(&req, MPI_STATUS_IGNORE);
381   send_win->atomic_mut_->unlock();
382   return MPI_SUCCESS;
383 }
384
385 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
386                           int target_rank, MPI_Aint target_disp)
387 {
388   //get sender pointer
389   const Win* send_win = connected_wins_[target_rank];
390
391   CHECK_WIN_LOCKED(send_win)
392
393   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
394   MPI_Request req = MPI_REQUEST_NULL;
395   send_win->atomic_mut_->lock();
396   get(result_addr, 1, datatype, target_rank,
397               target_disp, 1, datatype, &req);
398   if (req != MPI_REQUEST_NULL)
399     Request::wait(&req, MPI_STATUS_IGNORE);
400   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
401     put(origin_addr, 1, datatype, target_rank,
402               target_disp, 1, datatype);
403   }
404   send_win->atomic_mut_->unlock();
405   return MPI_SUCCESS;
406 }
407
408 int Win::start(MPI_Group group, int /*assert*/)
409 {
410   /* From MPI forum advices
411   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
412   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
413   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
414   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
415   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
416   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
417   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
418   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
419   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
420   must complete, without further dependencies.  */
421
422   //naive, blocking implementation.
423   XBT_DEBUG("Entering MPI_Win_Start");
424   std::vector<MPI_Request> reqs;
425   for (int i = 0; i < group->size(); i++) {
426     int src = comm_->group()->rank(group->actor(i));
427     xbt_assert(src != MPI_UNDEFINED);
428     if (src != rank_)
429       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
430   }
431   int size = static_cast<int>(reqs.size());
432
433   Request::startall(size, reqs.data());
434   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
435   for (auto& req : reqs)
436     Request::unref(&req);
437
438   group->ref();
439   dst_group_ = group;
440   opened_--; // we're open for business !
441   XBT_DEBUG("Leaving MPI_Win_Start");
442   return MPI_SUCCESS;
443 }
444
445 int Win::post(MPI_Group group, int /*assert*/)
446 {
447   //let's make a synchronous send here
448   XBT_DEBUG("Entering MPI_Win_Post");
449   std::vector<MPI_Request> reqs;
450   for (int i = 0; i < group->size(); i++) {
451     int dst = comm_->group()->rank(group->actor(i));
452     xbt_assert(dst != MPI_UNDEFINED);
453     if (dst != rank_)
454       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
455   }
456   int size = static_cast<int>(reqs.size());
457
458   Request::startall(size, reqs.data());
459   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
460   for (auto& req : reqs)
461     Request::unref(&req);
462
463   group->ref();
464   src_group_ = group;
465   opened_--; // we're open for business !
466   XBT_DEBUG("Leaving MPI_Win_Post");
467   return MPI_SUCCESS;
468 }
469
470 int Win::complete(){
471   xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
472
473   XBT_DEBUG("Entering MPI_Win_Complete");
474   std::vector<MPI_Request> reqs;
475   for (int i = 0; i < dst_group_->size(); i++) {
476     int dst = comm_->group()->rank(dst_group_->actor(i));
477     xbt_assert(dst != MPI_UNDEFINED);
478     if (dst != rank_)
479       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
480   }
481   int size = static_cast<int>(reqs.size());
482
483   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
484   Request::startall(size, reqs.data());
485   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
486   for (auto& req : reqs)
487     Request::unref(&req);
488
489   flush_local_all();
490
491   opened_++; //we're closed for business !
492   Group::unref(dst_group_);
493   dst_group_ = MPI_GROUP_NULL;
494   return MPI_SUCCESS;
495 }
496
497 int Win::wait(){
498   //naive, blocking implementation.
499   XBT_DEBUG("Entering MPI_Win_Wait");
500   std::vector<MPI_Request> reqs;
501   for (int i = 0; i < src_group_->size(); i++) {
502     int src = comm_->group()->rank(src_group_->actor(i));
503     xbt_assert(src != MPI_UNDEFINED);
504     if (src != rank_)
505       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
506   }
507   int size = static_cast<int>(reqs.size());
508
509   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
510   Request::startall(size, reqs.data());
511   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
512   for (auto& req : reqs)
513     Request::unref(&req);
514
515   flush_local_all();
516
517   opened_++; //we're closed for business !
518   Group::unref(src_group_);
519   src_group_ = MPI_GROUP_NULL;
520   return MPI_SUCCESS;
521 }
522
523 int Win::lock(int lock_type, int rank, int /*assert*/)
524 {
525   MPI_Win target_win = connected_wins_[rank];
526
527   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
528     target_win->lock_mut_->lock();
529     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
530     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
531       target_win->lock_mut_->unlock();
532    }
533   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
534     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
535
536   target_win->lockers_.push_back(rank_);
537
538   flush(rank);
539   return MPI_SUCCESS;
540 }
541
542 int Win::lock_all(int assert){
543   int retval = MPI_SUCCESS;
544   for (int i = 0; i < comm_->size(); i++) {
545     int ret = this->lock(MPI_LOCK_SHARED, i, assert);
546     if (ret != MPI_SUCCESS)
547       retval = ret;
548   }
549   return retval;
550 }
551
552 int Win::unlock(int rank){
553   MPI_Win target_win = connected_wins_[rank];
554   int target_mode = target_win->mode_;
555   target_win->mode_= 0;
556   target_win->lockers_.remove(rank_);
557   if (target_mode==MPI_LOCK_EXCLUSIVE){
558     target_win->lock_mut_->unlock();
559   }
560
561   flush(rank);
562   return MPI_SUCCESS;
563 }
564
565 int Win::unlock_all(){
566   int retval = MPI_SUCCESS;
567   for (int i = 0; i < comm_->size(); i++) {
568     int ret = this->unlock(i);
569     if (ret != MPI_SUCCESS)
570       retval = ret;
571   }
572   return retval;
573 }
574
575 int Win::flush(int rank){
576   int finished = finish_comms(rank);
577   XBT_DEBUG("Win_flush on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
578   if (rank != rank_) {
579     finished = connected_wins_[rank]->finish_comms(rank_);
580     XBT_DEBUG("Win_flush on remote %d for local %d - Finished %d RMA calls", rank, rank_, finished);
581   }
582   return MPI_SUCCESS;
583 }
584
585 int Win::flush_local(int rank){
586   int finished = finish_comms(rank);
587   XBT_DEBUG("Win_flush_local on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
588   return MPI_SUCCESS;
589 }
590
591 int Win::flush_all(){
592   int finished = finish_comms();
593   XBT_DEBUG("Win_flush_all on local %d - Finished %d RMA calls", rank_, finished);
594   for (int i = 0; i < comm_->size(); i++) {
595     if (i != rank_) {
596       finished = connected_wins_[i]->finish_comms(rank_);
597       XBT_DEBUG("Win_flush_all on remote %d for local %d - Finished %d RMA calls", i, rank_, finished);
598     }
599   }
600   return MPI_SUCCESS;
601 }
602
603 int Win::flush_local_all(){
604   int finished = finish_comms();
605   XBT_DEBUG("Win_flush_local_all on local %d - Finished %d RMA calls", rank_, finished);
606   return MPI_SUCCESS;
607 }
608
609 Win* Win::f2c(int id){
610   return static_cast<Win*>(F2C::f2c(id));
611 }
612
613 int Win::finish_comms(){
614   // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
615   // Without this, the vector could get redimensioned when another process pushes.
616   // This would result in the array used by Request::waitall() to be invalidated.
617   // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
618   mut_->lock();
619   //Finish own requests
620   int size = static_cast<int>(requests_.size());
621   if (size > 0) {
622     MPI_Request* treqs = requests_.data();
623     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
624     requests_.clear();
625   }
626   mut_->unlock();
627   return size;
628 }
629
630 int Win::finish_comms(int rank){
631   // See comment about the mutex in finish_comms() above
632   mut_->lock();
633   // Finish own requests
634   // Let's see if we're either the destination or the sender of this request
635   // because we only wait for requests that we are responsible for.
636   // Also use the process id here since the request itself returns from src()
637   // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
638   aid_t proc_id = comm_->group()->actor(rank);
639   auto it     = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
640     return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
641   });
642   std::vector<MPI_Request> myreqqs(it, end(requests_));
643   requests_.erase(it, end(requests_));
644   int size = static_cast<int>(myreqqs.size());
645   if (size > 0) {
646     MPI_Request* treqs = myreqqs.data();
647     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
648     myreqqs.clear();
649   }
650   mut_->unlock();
651   return size;
652 }
653
654 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
655 {
656   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
657   for (int i = 0; not target_win && i < comm_->size(); i++) {
658     if (connected_wins_[i]->size_ > 0)
659       target_win = connected_wins_[i];
660   }
661   if (target_win) {
662     *size                         = target_win->size_;
663     *disp_unit                    = target_win->disp_unit_;
664     *static_cast<void**>(baseptr) = target_win->base_;
665   } else {
666     *size                         = 0;
667     *static_cast<void**>(baseptr) = nullptr;
668   }
669   return MPI_SUCCESS;
670 }
671
672 MPI_Errhandler Win::errhandler()
673 {
674   if (errhandler_ != MPI_ERRHANDLER_NULL)
675     errhandler_->ref();
676   return errhandler_;
677 }
678
679 void Win::set_errhandler(MPI_Errhandler errhandler)
680 {
681   if (errhandler_ != MPI_ERRHANDLER_NULL)
682     simgrid::smpi::Errhandler::unref(errhandler_);
683   errhandler_ = errhandler;
684   if (errhandler_ != MPI_ERRHANDLER_NULL)
685     errhandler_->ref();
686 }
687 } // namespace smpi
688 } // namespace simgrid