Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
use previous buffer check feature in MPI checks, to crash when a buffer overflow...
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7
8 #include "private.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
16
17 #include <algorithm>
18
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
20
21 #define CHECK_RMA_REMOTE_WIN(fun, win)\
22   if(target_count*target_datatype->get_extent()>win->size_){\
23     XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
24     fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
25     simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
26     return MPI_ERR_RMA_RANGE;\
27   }
28
29 #define CHECK_WIN_LOCKED(win)\
30   if(opened_==0){ /*check that post/start has been done*/\
31     int locked=0;\
32     for (auto const& it : win->lockers_)\
33       if (it == comm_->rank())\
34         locked = 1;\
35     if(locked != 1)\
36       return MPI_ERR_WIN;\
37   }
38
39 namespace simgrid{
40 namespace smpi{
41 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
42 int Win::keyval_id_=0;
43
44 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic)
45     : base_(base)
46     , size_(size)
47     , disp_unit_(disp_unit)
48     , info_(info)
49     , comm_(comm)
50     , connected_wins_(comm->size())
51     , rank_(comm->rank())
52     , allocated_(allocated)
53     , dynamic_(dynamic)
54 {
55   XBT_DEBUG("Creating window");
56   if(info!=MPI_INFO_NULL)
57     info->ref();
58   connected_wins_[rank_] = this;
59   if(rank_==0){
60     bar_ = new s4u::Barrier(comm->size());
61   }
62   errhandler_->ref();
63   comm->add_rma_win(this);
64   comm->ref();
65
66   colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
67                    MPI_BYTE, comm);
68
69   colls::bcast(&bar_, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
70
71   colls::barrier(comm);
72   this->add_f();
73 }
74
75 Win::~Win(){
76   //As per the standard, perform a barrier to ensure every async comm is finished
77   bar_->wait();
78
79   int finished = finish_comms();
80   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
81
82   if (info_ != MPI_INFO_NULL)
83     simgrid::smpi::Info::unref(info_);
84   if (errhandler_ != MPI_ERRHANDLER_NULL)
85     simgrid::smpi::Errhandler::unref(errhandler_);
86
87   comm_->remove_rma_win(this);
88
89   colls::barrier(comm_);
90   Comm::unref(comm_);
91   
92   if (rank_ == 0)
93     delete bar_;
94
95   if(allocated_ !=0)
96     xbt_free(base_);
97
98   F2C::free_f(this->f2c_id());
99   cleanup_attr<Win>();
100 }
101
102 int Win::attach(void* /*base*/, MPI_Aint size)
103 {
104   if (not(base_ == MPI_BOTTOM || base_ == nullptr))
105     return MPI_ERR_ARG;
106   base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
107   size_+=size;
108   return MPI_SUCCESS;
109 }
110
111 int Win::detach(const void* /*base*/)
112 {
113   base_=MPI_BOTTOM;
114   size_=-1;
115   return MPI_SUCCESS;
116 }
117
118 void Win::get_name(char* name, int* length) const
119 {
120   *length = static_cast<int>(name_.length());
121   if (not name_.empty()) {
122     name_.copy(name, *length);
123     name[*length] = '\0';
124   }
125 }
126
127 void Win::get_group(MPI_Group* group){
128   if(comm_ != MPI_COMM_NULL){
129     *group = comm_->group();
130   } else {
131     *group = MPI_GROUP_NULL;
132   }
133 }
134
135 MPI_Info Win::info()
136 {
137   if (info_ == MPI_INFO_NULL)
138     info_ = new Info();
139   info_->ref();
140   return info_;
141 }
142
143 int Win::rank() const
144 {
145   return rank_;
146 }
147
148 MPI_Comm Win::comm() const
149 {
150   return comm_;
151 }
152
153 MPI_Aint Win::size() const
154 {
155   return size_;
156 }
157
158 void* Win::base() const
159 {
160   return base_;
161 }
162
163 int Win::disp_unit() const
164 {
165   return disp_unit_;
166 }
167
168 int Win::dynamic() const
169 {
170   return dynamic_;
171 }
172
173 void Win::set_info(MPI_Info info)
174 {
175   if (info_ != MPI_INFO_NULL)
176     simgrid::smpi::Info::unref(info_);
177   info_ = info;
178   if (info_ != MPI_INFO_NULL)
179     info_->ref();
180 }
181
182 void Win::set_name(const char* name){
183   name_ = name;
184 }
185
186 int Win::fence(int assert)
187 {
188   XBT_DEBUG("Entering fence");
189   if (opened_ == 0)
190     opened_=1;
191   if (assert != MPI_MODE_NOPRECEDE) {
192     // This is not the first fence => finalize what came before
193     bar_->wait();
194     mut_->lock();
195     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
196     // Without this, the vector could get redimensioned when another process pushes.
197     // This would result in the array used by Request::waitall() to be invalidated.
198     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
199
200     // start all requests that have been prepared by another process
201     if (not requests_.empty()) {
202       int size           = static_cast<int>(requests_.size());
203       MPI_Request* treqs = requests_.data();
204       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
205     }
206     count_=0;
207     mut_->unlock();
208   }
209
210   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
211     opened_=0;
212   assert_ = assert;
213
214   bar_->wait();
215   XBT_DEBUG("Leaving fence");
216
217   return MPI_SUCCESS;
218 }
219
220 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
221               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
222 {
223   //get receiver pointer
224   Win* recv_win = connected_wins_[target_rank];
225
226   if(opened_==0){//check that post/start has been done
227     // no fence or start .. lock ok ?
228     int locked=0;
229     for (auto const& it : recv_win->lockers_)
230       if (it == comm_->rank())
231         locked = 1;
232     if(locked != 1)
233       return MPI_ERR_WIN;
234   }
235
236   CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
237
238   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
239
240   if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
241     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
242     // prepare send_request
243     MPI_Request sreq =
244         // TODO cheinrich Check for rank / pid conversion
245         Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
246                                comm_, MPI_OP_NULL);
247
248     //prepare receiver request
249     // TODO cheinrich Check for rank / pid conversion
250     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
251                                               target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
252
253     //start send
254     sreq->start();
255
256     if(request!=nullptr){
257       *request=sreq;
258     }else{
259       mut_->lock();
260       requests_.push_back(sreq);
261       mut_->unlock();
262     }
263
264     //push request to receiver's win
265     recv_win->mut_->lock();
266     recv_win->requests_.push_back(rreq);
267     rreq->start();
268     recv_win->mut_->unlock();
269   } else {
270     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
271     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
272     if(request!=nullptr)
273       *request = MPI_REQUEST_NULL;
274   }
275
276   return MPI_SUCCESS;
277 }
278
279 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
280               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
281 {
282   //get sender pointer
283   Win* send_win = connected_wins_[target_rank];
284
285   CHECK_WIN_LOCKED(send_win)
286   CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
287
288   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
289   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
290
291   if(target_rank != comm_->rank()){
292     //prepare send_request
293     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
294                                               send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
295
296     //prepare receiver request
297     MPI_Request rreq = Request::rma_recv_init(
298         origin_addr, origin_count, origin_datatype, target_rank,
299         comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
300         SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
301
302     //start the send, with another process than us as sender.
303     sreq->start();
304     //push request to receiver's win
305     send_win->mut_->lock();
306     send_win->requests_.push_back(sreq);
307     send_win->mut_->unlock();
308
309     //start recv
310     rreq->start();
311
312     if(request!=nullptr){
313       *request=rreq;
314     }else{
315       mut_->lock();
316       requests_.push_back(rreq);
317       mut_->unlock();
318     }
319   } else {
320     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
321     if(request!=nullptr)
322       *request=MPI_REQUEST_NULL;
323   }
324   return MPI_SUCCESS;
325 }
326
327 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
328               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
329 {
330   XBT_DEBUG("Entering MPI_Win_Accumulate");
331   //get receiver pointer
332   Win* recv_win = connected_wins_[target_rank];
333
334   //FIXME: local version
335   CHECK_WIN_LOCKED(recv_win)
336   CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
337
338   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
339   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
340   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
341   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
342   // prepare send_request
343
344   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
345                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
346
347   // prepare receiver request
348   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
349                                             recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
350
351   count_++;
352
353   // start send
354   sreq->start();
355   // push request to receiver's win
356   recv_win->mut_->lock();
357   recv_win->requests_.push_back(rreq);
358   rreq->start();
359   recv_win->mut_->unlock();
360
361   if (request != nullptr) {
362     *request = sreq;
363   } else {
364     mut_->lock();
365     requests_.push_back(sreq);
366     mut_->unlock();
367   }
368
369   XBT_DEBUG("Leaving MPI_Win_Accumulate");
370   return MPI_SUCCESS;
371 }
372
373 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
374                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
375                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
376 {
377   //get sender pointer
378   const Win* send_win = connected_wins_[target_rank];
379
380   CHECK_WIN_LOCKED(send_win)
381   CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
382
383   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
384   //need to be sure ops are correctly ordered, so finish request here ? slow.
385   MPI_Request req;
386   send_win->atomic_mut_->lock();
387   get(result_addr, result_count, result_datatype, target_rank,
388               target_disp, target_count, target_datatype, &req);
389   if (req != MPI_REQUEST_NULL)
390     Request::wait(&req, MPI_STATUS_IGNORE);
391   if(op!=MPI_NO_OP)
392     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
393               target_disp, target_count, target_datatype, op, &req);
394   if (req != MPI_REQUEST_NULL)
395     Request::wait(&req, MPI_STATUS_IGNORE);
396   send_win->atomic_mut_->unlock();
397   return MPI_SUCCESS;
398 }
399
400 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
401                           int target_rank, MPI_Aint target_disp)
402 {
403   //get sender pointer
404   const Win* send_win = connected_wins_[target_rank];
405
406   CHECK_WIN_LOCKED(send_win)
407
408   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
409   MPI_Request req = MPI_REQUEST_NULL;
410   send_win->atomic_mut_->lock();
411   get(result_addr, 1, datatype, target_rank,
412               target_disp, 1, datatype, &req);
413   if (req != MPI_REQUEST_NULL)
414     Request::wait(&req, MPI_STATUS_IGNORE);
415   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
416     put(origin_addr, 1, datatype, target_rank,
417               target_disp, 1, datatype);
418   }
419   send_win->atomic_mut_->unlock();
420   return MPI_SUCCESS;
421 }
422
423 int Win::start(MPI_Group group, int /*assert*/)
424 {
425   /* From MPI forum advices
426   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
427   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
428   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
429   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
430   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
431   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
432   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
433   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
434   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
435   must complete, without further dependencies.  */
436
437   //naive, blocking implementation.
438   int i             = 0;
439   int j             = 0;
440   int size          = group->size();
441   std::vector<MPI_Request> reqs(size);
442
443   XBT_DEBUG("Entering MPI_Win_Start");
444   while (j != size) {
445     int src = comm_->group()->rank(group->actor(j));
446     if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
447       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
448       i++;
449     }
450     j++;
451   }
452   size = i;
453   Request::startall(size, reqs.data());
454   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
455   for (i = 0; i < size; i++) {
456     Request::unref(&reqs[i]);
457   }
458   opened_++; //we're open for business !
459   group_=group;
460   group->ref();
461   XBT_DEBUG("Leaving MPI_Win_Start");
462   return MPI_SUCCESS;
463 }
464
465 int Win::post(MPI_Group group, int /*assert*/)
466 {
467   //let's make a synchronous send here
468   int i             = 0;
469   int j             = 0;
470   int size = group->size();
471   std::vector<MPI_Request> reqs(size);
472
473   XBT_DEBUG("Entering MPI_Win_Post");
474   while(j!=size){
475     int dst = comm_->group()->rank(group->actor(j));
476     if (dst != rank_ && dst != MPI_UNDEFINED) {
477       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
478       i++;
479     }
480     j++;
481   }
482   size=i;
483
484   Request::startall(size, reqs.data());
485   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
486   for(i=0;i<size;i++){
487     Request::unref(&reqs[i]);
488   }
489   opened_++; //we're open for business !
490   group_=group;
491   group->ref();
492   XBT_DEBUG("Leaving MPI_Win_Post");
493   return MPI_SUCCESS;
494 }
495
496 int Win::complete(){
497   xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
498
499   XBT_DEBUG("Entering MPI_Win_Complete");
500   int i             = 0;
501   int j             = 0;
502   int size          = group_->size();
503   std::vector<MPI_Request> reqs(size);
504
505   while(j!=size){
506     int dst = comm_->group()->rank(group_->actor(j));
507     if (dst != rank_ && dst != MPI_UNDEFINED) {
508       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
509       i++;
510     }
511     j++;
512   }
513   size=i;
514   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
515   Request::startall(size, reqs.data());
516   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
517
518   for(i=0;i<size;i++){
519     Request::unref(&reqs[i]);
520   }
521
522   int finished = finish_comms();
523   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
524
525   Group::unref(group_);
526   opened_--; //we're closed for business !
527   return MPI_SUCCESS;
528 }
529
530 int Win::wait(){
531   //naive, blocking implementation.
532   XBT_DEBUG("Entering MPI_Win_Wait");
533   int i             = 0;
534   int j             = 0;
535   int size          = group_->size();
536   std::vector<MPI_Request> reqs(size);
537
538   while(j!=size){
539     int src = comm_->group()->rank(group_->actor(j));
540     if (src != rank_ && src != MPI_UNDEFINED) {
541       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
542       i++;
543     }
544     j++;
545   }
546   size=i;
547   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
548   Request::startall(size, reqs.data());
549   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
550   for(i=0;i<size;i++){
551     Request::unref(&reqs[i]);
552   }
553   int finished = finish_comms();
554   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
555
556   Group::unref(group_);
557   opened_--; //we're opened for business !
558   return MPI_SUCCESS;
559 }
560
561 int Win::lock(int lock_type, int rank, int /*assert*/)
562 {
563   MPI_Win target_win = connected_wins_[rank];
564
565   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
566     target_win->lock_mut_->lock();
567     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
568     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
569       target_win->lock_mut_->unlock();
570    }
571   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
572     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
573
574   target_win->lockers_.push_back(comm_->rank());
575
576   int finished = finish_comms(rank);
577   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
578   finished = target_win->finish_comms(rank_);
579   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
580   return MPI_SUCCESS;
581 }
582
583 int Win::lock_all(int assert){
584   int retval = MPI_SUCCESS;
585   for (int i = 0; i < comm_->size(); i++) {
586     int ret = this->lock(MPI_LOCK_SHARED, i, assert);
587     if (ret != MPI_SUCCESS)
588       retval = ret;
589   }
590   return retval;
591 }
592
593 int Win::unlock(int rank){
594   MPI_Win target_win = connected_wins_[rank];
595   int target_mode = target_win->mode_;
596   target_win->mode_= 0;
597   target_win->lockers_.remove(comm_->rank());
598   if (target_mode==MPI_LOCK_EXCLUSIVE){
599     target_win->lock_mut_->unlock();
600   }
601
602   int finished = finish_comms(rank);
603   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
604   finished = target_win->finish_comms(rank_);
605   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
606   return MPI_SUCCESS;
607 }
608
609 int Win::unlock_all(){
610   int retval = MPI_SUCCESS;
611   for (int i = 0; i < comm_->size(); i++) {
612     int ret = this->unlock(i);
613     if (ret != MPI_SUCCESS)
614       retval = ret;
615   }
616   return retval;
617 }
618
619 int Win::flush(int rank){
620   MPI_Win target_win = connected_wins_[rank];
621   int finished       = finish_comms(rank_);
622   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
623   finished = target_win->finish_comms(rank);
624   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
625   return MPI_SUCCESS;
626 }
627
628 int Win::flush_local(int rank){
629   int finished = finish_comms(rank);
630   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
631   return MPI_SUCCESS;
632 }
633
634 int Win::flush_all(){
635   int finished = finish_comms();
636   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
637   for (int i = 0; i < comm_->size(); i++) {
638     finished = connected_wins_[i]->finish_comms(rank_);
639     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
640   }
641   return MPI_SUCCESS;
642 }
643
644 int Win::flush_local_all(){
645   int finished = finish_comms();
646   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
647   return MPI_SUCCESS;
648 }
649
650 Win* Win::f2c(int id){
651   return static_cast<Win*>(F2C::f2c(id));
652 }
653
654 int Win::finish_comms(){
655   mut_->lock();
656   //Finish own requests
657   int size = static_cast<int>(requests_.size());
658   if (size > 0) {
659     MPI_Request* treqs = requests_.data();
660     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
661     requests_.clear();
662   }
663   mut_->unlock();
664   return size;
665 }
666
667 int Win::finish_comms(int rank){
668   mut_->lock();
669   // Finish own requests
670   // Let's see if we're either the destination or the sender of this request
671   // because we only wait for requests that we are responsible for.
672   // Also use the process id here since the request itself returns from src()
673   // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
674   int proc_id = comm_->group()->actor(rank)->get_pid();
675   auto it     = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
676     return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
677   });
678   std::vector<MPI_Request> myreqqs(it, end(requests_));
679   requests_.erase(it, end(requests_));
680   int size = static_cast<int>(myreqqs.size());
681   if (size > 0) {
682     MPI_Request* treqs = myreqqs.data();
683     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
684     myreqqs.clear();
685   }
686   mut_->unlock();
687   return size;
688 }
689
690 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
691 {
692   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
693   for (int i = 0; not target_win && i < comm_->size(); i++) {
694     if (connected_wins_[i]->size_ > 0)
695       target_win = connected_wins_[i];
696   }
697   if (target_win) {
698     *size                         = target_win->size_;
699     *disp_unit                    = target_win->disp_unit_;
700     *static_cast<void**>(baseptr) = target_win->base_;
701   } else {
702     *size                         = 0;
703     *static_cast<void**>(baseptr) = nullptr;
704   }
705   return MPI_SUCCESS;
706 }
707
708 MPI_Errhandler Win::errhandler()
709 {
710   if (errhandler_ != MPI_ERRHANDLER_NULL)
711     errhandler_->ref();
712   return errhandler_;
713 }
714
715 void Win::set_errhandler(MPI_Errhandler errhandler)
716 {
717   if (errhandler_ != MPI_ERRHANDLER_NULL)
718     simgrid::smpi::Errhandler::unref(errhandler_);
719   errhandler_ = errhandler;
720   if (errhandler_ != MPI_ERRHANDLER_NULL)
721     errhandler_->ref();
722 }
723 } // namespace smpi
724 } // namespace simgrid