Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Cosmetics.
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7
8 #include "private.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
16
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
18
19
20 namespace simgrid{
21 namespace smpi{
22 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
23 int Win::keyval_id_=0;
24
25 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
26   int comm_size = comm->size();
27   rank_         = comm->rank();
28   XBT_DEBUG("Creating window");
29   if(info!=MPI_INFO_NULL)
30     info->ref();
31   name_                  = nullptr;
32   opened_                = 0;
33   group_                 = MPI_GROUP_NULL;
34   requests_              = new std::vector<MPI_Request>();
35   mut_                   = s4u::Mutex::create();
36   lock_mut_              = s4u::Mutex::create();
37   atomic_mut_            = s4u::Mutex::create();
38   connected_wins_        = new MPI_Win[comm_size];
39   connected_wins_[rank_] = this;
40   count_                 = 0;
41   if(rank_==0){
42     bar_ = new s4u::Barrier(comm_size);
43   }
44   mode_=0;
45
46   comm->add_rma_win(this);
47   comm->ref();
48
49   Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
50                          MPI_BYTE, comm);
51
52   Colls::bcast(&(bar_), sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
53
54   Colls::barrier(comm);
55 }
56
57 Win::~Win(){
58   //As per the standard, perform a barrier to ensure every async comm is finished
59   bar_->wait();
60
61   int finished = finish_comms();
62   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
63
64   delete requests_;
65   delete[] connected_wins_;
66   if (name_ != nullptr){
67     xbt_free(name_);
68   }
69   if(info_!=MPI_INFO_NULL){
70     MPI_Info_free(&info_);
71   }
72
73   comm_->remove_rma_win(this);
74
75   Colls::barrier(comm_);
76   Comm::unref(comm_);
77   
78   if (rank_ == 0)
79     delete bar_;
80
81   if(allocated_ !=0)
82     xbt_free(base_);
83
84   cleanup_attr<Win>();
85 }
86
87 int Win::attach (void *base, MPI_Aint size){
88   if (not(base_ == MPI_BOTTOM || base_ == 0))
89     return MPI_ERR_ARG;
90   base_=0;//actually the address will be given in the RMA calls, as being the disp.
91   size_+=size;
92   return MPI_SUCCESS;
93 }
94
95 int Win::detach (void *base){
96   base_=MPI_BOTTOM;
97   size_=-1;
98   return MPI_SUCCESS;
99 }
100
101 void Win::get_name(char* name, int* length){
102   if(name_==nullptr){
103     *length=0;
104     name=nullptr;
105     return;
106   }
107   *length = strlen(name_);
108   strncpy(name, name_, *length+1);
109 }
110
111 void Win::get_group(MPI_Group* group){
112   if(comm_ != MPI_COMM_NULL){
113     *group = comm_->group();
114   } else {
115     *group = MPI_GROUP_NULL;
116   }
117 }
118
119 MPI_Info Win::info(){
120   if(info_== MPI_INFO_NULL)
121     info_ = new Info();
122   info_->ref();
123   return info_;
124 }
125
126 int Win::rank(){
127   return rank_;
128 }
129
130 MPI_Aint Win::size(){
131   return size_;
132 }
133
134 void* Win::base(){
135   return base_;
136 }
137
138 int Win::disp_unit(){
139   return disp_unit_;
140 }
141
142 int Win::dynamic(){
143   return dynamic_;
144 }
145
146 void Win::set_info(MPI_Info info){
147   if(info_!= MPI_INFO_NULL)
148     info->ref();
149   info_=info;
150 }
151
152 void Win::set_name(char* name){
153   name_ = xbt_strdup(name);
154 }
155
156 int Win::fence(int assert)
157 {
158   XBT_DEBUG("Entering fence");
159   if (opened_ == 0)
160     opened_=1;
161   if (assert != MPI_MODE_NOPRECEDE) {
162     // This is not the first fence => finalize what came before
163     bar_->wait();
164     mut_->lock();
165     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
166     // Without this, the vector could get redimensionned when another process pushes.
167     // This would result in the array used by Request::waitall() to be invalidated.
168     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
169     std::vector<MPI_Request> *reqs = requests_;
170     int size = static_cast<int>(reqs->size());
171     // start all requests that have been prepared by another process
172     if (size > 0) {
173       MPI_Request* treqs = &(*reqs)[0];
174       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
175     }
176     count_=0;
177     mut_->unlock();
178   }
179
180   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
181     opened_=0;
182   assert_ = assert;
183
184   bar_->wait();
185   XBT_DEBUG("Leaving fence");
186
187   return MPI_SUCCESS;
188 }
189
190 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
191               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
192 {
193   //get receiver pointer
194   MPI_Win recv_win = connected_wins_[target_rank];
195
196   if(opened_==0){//check that post/start has been done
197     // no fence or start .. lock ok ?
198     int locked=0;
199     for (auto const& it : recv_win->lockers_)
200       if (it == comm_->rank())
201         locked = 1;
202     if(locked != 1)
203       return MPI_ERR_WIN;
204   }
205
206   if(target_count*target_datatype->get_extent()>recv_win->size_)
207     return MPI_ERR_ARG;
208
209   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
210
211   if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
212     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
213     // prepare send_request
214     MPI_Request sreq =
215         // TODO cheinrich Check for rank / pid conversion
216         Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
217                                comm_, MPI_OP_NULL);
218
219     //prepare receiver request
220     // TODO cheinrich Check for rank / pid conversion
221     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
222                                               target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
223
224     //start send
225     sreq->start();
226
227     if(request!=nullptr){
228       *request=sreq;
229     }else{
230       mut_->lock();
231       requests_->push_back(sreq);
232       mut_->unlock();
233     }
234
235     //push request to receiver's win
236     recv_win->mut_->lock();
237     recv_win->requests_->push_back(rreq);
238     rreq->start();
239     recv_win->mut_->unlock();
240
241   }else{
242     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
243     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
244     if(request!=nullptr)
245       *request = MPI_REQUEST_NULL;
246   }
247
248   return MPI_SUCCESS;
249 }
250
251 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
252               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
253 {
254   //get sender pointer
255   MPI_Win send_win = connected_wins_[target_rank];
256
257   if(opened_==0){//check that post/start has been done
258     // no fence or start .. lock ok ?
259     int locked=0;
260     for (auto const& it : send_win->lockers_)
261       if (it == comm_->rank())
262         locked = 1;
263     if(locked != 1)
264       return MPI_ERR_WIN;
265   }
266
267   if(target_count*target_datatype->get_extent()>send_win->size_)
268     return MPI_ERR_ARG;
269
270   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
271   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
272
273   if(target_rank != comm_->rank()){
274     //prepare send_request
275     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
276                                               send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
277
278     //prepare receiver request
279     MPI_Request rreq = Request::rma_recv_init(
280         origin_addr, origin_count, origin_datatype, target_rank,
281         comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
282         SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
283
284     //start the send, with another process than us as sender.
285     sreq->start();
286     //push request to receiver's win
287     send_win->mut_->lock();
288     send_win->requests_->push_back(sreq);
289     send_win->mut_->unlock();
290
291     //start recv
292     rreq->start();
293
294     if(request!=nullptr){
295       *request=rreq;
296     }else{
297       mut_->lock();
298       requests_->push_back(rreq);
299       mut_->unlock();
300     }
301
302   }else{
303     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
304     if(request!=nullptr)
305       *request=MPI_REQUEST_NULL;
306   }
307
308   return MPI_SUCCESS;
309 }
310
311
312 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
313               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
314 {
315   XBT_DEBUG("Entering MPI_Win_Accumulate");
316   //get receiver pointer
317   MPI_Win recv_win = connected_wins_[target_rank];
318
319   if(opened_==0){//check that post/start has been done
320     // no fence or start .. lock ok ?
321     int locked=0;
322     for (auto const& it : recv_win->lockers_)
323       if (it == comm_->rank())
324         locked = 1;
325     if(locked != 1)
326       return MPI_ERR_WIN;
327   }
328   //FIXME: local version
329
330   if(target_count*target_datatype->get_extent()>recv_win->size_)
331     return MPI_ERR_ARG;
332
333   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
334   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
335     //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
336     //prepare send_request
337
338   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
339                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
340
341   // prepare receiver request
342   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
343                                             recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
344
345   count_++;
346
347   // start send
348   sreq->start();
349   // push request to receiver's win
350   recv_win->mut_->lock();
351   recv_win->requests_->push_back(rreq);
352   rreq->start();
353   recv_win->mut_->unlock();
354
355   if (request != nullptr) {
356     *request = sreq;
357   } else {
358     mut_->lock();
359     requests_->push_back(sreq);
360     mut_->unlock();
361   }
362
363   XBT_DEBUG("Leaving MPI_Win_Accumulate");
364   return MPI_SUCCESS;
365 }
366
367 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr,
368               int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count,
369               MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request){
370
371   //get sender pointer
372   MPI_Win send_win = connected_wins_[target_rank];
373
374   if(opened_==0){//check that post/start has been done
375     // no fence or start .. lock ok ?
376     int locked=0;
377     for (auto const& it : send_win->lockers_)
378       if (it == comm_->rank())
379         locked = 1;
380     if(locked != 1)
381       return MPI_ERR_WIN;
382   }
383
384   if(target_count*target_datatype->get_extent()>send_win->size_)
385     return MPI_ERR_ARG;
386
387   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
388   //need to be sure ops are correctly ordered, so finish request here ? slow.
389   MPI_Request req;
390   send_win->atomic_mut_->lock();
391   get(result_addr, result_count, result_datatype, target_rank,
392               target_disp, target_count, target_datatype, &req);
393   if (req != MPI_REQUEST_NULL)
394     Request::wait(&req, MPI_STATUS_IGNORE);
395   if(op!=MPI_NO_OP)
396     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
397               target_disp, target_count, target_datatype, op, &req);
398   if (req != MPI_REQUEST_NULL)
399     Request::wait(&req, MPI_STATUS_IGNORE);
400   send_win->atomic_mut_->unlock();
401   return MPI_SUCCESS;
402
403 }
404
405 int Win::compare_and_swap(void *origin_addr, void *compare_addr,
406         void *result_addr, MPI_Datatype datatype, int target_rank,
407         MPI_Aint target_disp){
408   //get sender pointer
409   MPI_Win send_win = connected_wins_[target_rank];
410
411   if(opened_==0){//check that post/start has been done
412     // no fence or start .. lock ok ?
413     int locked=0;
414     for (auto const& it : send_win->lockers_)
415       if (it == comm_->rank())
416         locked = 1;
417     if(locked != 1)
418       return MPI_ERR_WIN;
419   }
420
421   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
422   MPI_Request req = MPI_REQUEST_NULL;
423   send_win->atomic_mut_->lock();
424   get(result_addr, 1, datatype, target_rank,
425               target_disp, 1, datatype, &req);
426   if (req != MPI_REQUEST_NULL)
427     Request::wait(&req, MPI_STATUS_IGNORE);
428   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
429     put(origin_addr, 1, datatype, target_rank,
430               target_disp, 1, datatype);
431   }
432   send_win->atomic_mut_->unlock();
433   return MPI_SUCCESS;
434 }
435
436 int Win::start(MPI_Group group, int assert){
437   /* From MPI forum advices
438   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
439   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
440   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
441   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
442   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
443   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
444   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
445   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
446   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
447   must complete, without further dependencies.  */
448
449   //naive, blocking implementation.
450   int i             = 0;
451   int j             = 0;
452   int size          = group->size();
453   MPI_Request* reqs = xbt_new0(MPI_Request, size);
454
455   XBT_DEBUG("Entering MPI_Win_Start");
456   while (j != size) {
457     int src = comm_->group()->rank(group->actor(j));
458     if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
459       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
460       i++;
461     }
462     j++;
463   }
464   size = i;
465   Request::startall(size, reqs);
466   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
467   for (i = 0; i < size; i++) {
468     Request::unref(&reqs[i]);
469   }
470   xbt_free(reqs);
471   opened_++; //we're open for business !
472   group_=group;
473   group->ref();
474   XBT_DEBUG("Leaving MPI_Win_Start");
475   return MPI_SUCCESS;
476 }
477
478 int Win::post(MPI_Group group, int assert){
479   //let's make a synchronous send here
480   int i             = 0;
481   int j             = 0;
482   int size = group->size();
483   MPI_Request* reqs = xbt_new0(MPI_Request, size);
484
485   XBT_DEBUG("Entering MPI_Win_Post");
486   while(j!=size){
487     int dst = comm_->group()->rank(group->actor(j));
488     if (dst != rank_ && dst != MPI_UNDEFINED) {
489       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
490       i++;
491     }
492     j++;
493   }
494   size=i;
495
496   Request::startall(size, reqs);
497   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
498   for(i=0;i<size;i++){
499     Request::unref(&reqs[i]);
500   }
501   xbt_free(reqs);
502   opened_++; //we're open for business !
503   group_=group;
504   group->ref();
505   XBT_DEBUG("Leaving MPI_Win_Post");
506   return MPI_SUCCESS;
507 }
508
509 int Win::complete(){
510   if(opened_==0)
511     xbt_die("Complete called on already opened MPI_Win");
512
513   XBT_DEBUG("Entering MPI_Win_Complete");
514   int i             = 0;
515   int j             = 0;
516   int size          = group_->size();
517   MPI_Request* reqs = xbt_new0(MPI_Request, size);
518
519   while(j!=size){
520     int dst = comm_->group()->rank(group_->actor(j));
521     if (dst != rank_ && dst != MPI_UNDEFINED) {
522       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
523       i++;
524     }
525     j++;
526   }
527   size=i;
528   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
529   Request::startall(size, reqs);
530   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
531
532   for(i=0;i<size;i++){
533     Request::unref(&reqs[i]);
534   }
535   xbt_free(reqs);
536
537   int finished = finish_comms();
538   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
539
540   Group::unref(group_);
541   opened_--; //we're closed for business !
542   return MPI_SUCCESS;
543 }
544
545 int Win::wait(){
546   //naive, blocking implementation.
547   XBT_DEBUG("Entering MPI_Win_Wait");
548   int i             = 0;
549   int j             = 0;
550   int size          = group_->size();
551   MPI_Request* reqs = xbt_new0(MPI_Request, size);
552
553   while(j!=size){
554     int src = comm_->group()->rank(group_->actor(j));
555     if (src != rank_ && src != MPI_UNDEFINED) {
556       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
557       i++;
558     }
559     j++;
560   }
561   size=i;
562   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
563   Request::startall(size, reqs);
564   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
565   for(i=0;i<size;i++){
566     Request::unref(&reqs[i]);
567   }
568   xbt_free(reqs);
569   int finished = finish_comms();
570   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
571
572   Group::unref(group_);
573   opened_--; //we're opened for business !
574   return MPI_SUCCESS;
575 }
576
577 int Win::lock(int lock_type, int rank, int assert){
578   MPI_Win target_win = connected_wins_[rank];
579
580   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
581     target_win->lock_mut_->lock();
582     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
583     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
584       target_win->lock_mut_->unlock();
585    }
586   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
587     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
588
589   target_win->lockers_.push_back(comm_->rank());
590
591   int finished = finish_comms(rank);
592   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
593   finished = target_win->finish_comms(rank_);
594   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
595   return MPI_SUCCESS;
596 }
597
598 int Win::lock_all(int assert){
599   int i=0;
600   int retval = MPI_SUCCESS;
601   for (i=0; i<comm_->size();i++){
602       int ret = this->lock(MPI_LOCK_SHARED, i, assert);
603       if(ret != MPI_SUCCESS)
604         retval = ret;
605   }
606   return retval;
607 }
608
609 int Win::unlock(int rank){
610   MPI_Win target_win = connected_wins_[rank];
611   int target_mode = target_win->mode_;
612   target_win->mode_= 0;
613   target_win->lockers_.remove(comm_->rank());
614   if (target_mode==MPI_LOCK_EXCLUSIVE){
615     target_win->lock_mut_->unlock();
616   }
617
618   int finished = finish_comms(rank);
619   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
620   finished = target_win->finish_comms(rank_);
621   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
622   return MPI_SUCCESS;
623 }
624
625 int Win::unlock_all(){
626   int i=0;
627   int retval = MPI_SUCCESS;
628   for (i=0; i<comm_->size();i++){
629     int ret = this->unlock(i);
630     if (ret != MPI_SUCCESS)
631       retval = ret;
632   }
633   return retval;
634 }
635
636 int Win::flush(int rank){
637   MPI_Win target_win = connected_wins_[rank];
638   int finished       = finish_comms(rank_);
639   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
640   finished = target_win->finish_comms(rank);
641   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
642   return MPI_SUCCESS;
643 }
644
645 int Win::flush_local(int rank){
646   int finished = finish_comms(rank);
647   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
648   return MPI_SUCCESS;
649 }
650
651 int Win::flush_all(){
652   int finished = finish_comms();
653   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
654   for (int i = 0; i < comm_->size(); i++) {
655     finished = connected_wins_[i]->finish_comms(rank_);
656     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
657   }
658   return MPI_SUCCESS;
659 }
660
661 int Win::flush_local_all(){
662   int finished = finish_comms();
663   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
664   return MPI_SUCCESS;
665 }
666
667 Win* Win::f2c(int id){
668   return static_cast<Win*>(F2C::f2c(id));
669 }
670
671 int Win::finish_comms(){
672   mut_->lock();
673   //Finish own requests
674   std::vector<MPI_Request> *reqqs = requests_;
675   int size = static_cast<int>(reqqs->size());
676   if (size > 0) {
677     MPI_Request* treqs = &(*reqqs)[0];
678     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
679     reqqs->clear();
680   }
681   mut_->unlock();
682   return size;
683 }
684
685 int Win::finish_comms(int rank){
686   mut_->lock();
687   //Finish own requests
688   std::vector<MPI_Request> *reqqs = requests_;
689   int size = static_cast<int>(reqqs->size());
690   if (size > 0) {
691     size = 0;
692     std::vector<MPI_Request> myreqqs;
693     std::vector<MPI_Request>::iterator iter = reqqs->begin();
694     int proc_id                             = comm_->group()->actor(rank)->get_pid();
695     while (iter != reqqs->end()){
696       // Let's see if we're either the destination or the sender of this request
697       // because we only wait for requests that we are responsible for.
698       // Also use the process id here since the request itself returns from src()
699       // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
700       if (((*iter) != MPI_REQUEST_NULL) && (((*iter)->src() == proc_id) || ((*iter)->dst() == proc_id))) {
701         myreqqs.push_back(*iter);
702         iter = reqqs->erase(iter);
703         size++;
704       } else {
705         ++iter;
706       }
707     }
708     if(size >0){
709       MPI_Request* treqs = &myreqqs[0];
710       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
711       myreqqs.clear();
712     }
713   }
714   mut_->unlock();
715   return size;
716 }
717
718 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr)
719 {
720   MPI_Win target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
721   for (int i = 0; not target_win && i < comm_->size(); i++) {
722     if (connected_wins_[i]->size_ > 0)
723       target_win = connected_wins_[i];
724   }
725   if (target_win) {
726     *size                         = target_win->size_;
727     *disp_unit                    = target_win->disp_unit_;
728     *static_cast<void**>(baseptr) = target_win->base_;
729   } else {
730     *size                         = 0;
731     *static_cast<void**>(baseptr) = xbt_malloc(0);
732   }
733   return MPI_SUCCESS;
734 }
735 }
736 }