Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add MPI_Win_lock_all MPI_Win_unlock_all MPI_Win_flush MPI_Win_flush_local MPI_Win_flu...
[simgrid.git] / src / smpi / smpi_win.cpp
1 /* Copyright (c) 2007-2017. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <vector>
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 namespace simgrid{
13 namespace smpi{
14 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
15 int Win::keyval_id_=0;
16
17 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
18   int comm_size = comm->size();
19   rank_      = comm->rank();
20   XBT_DEBUG("Creating window");
21   if(info!=MPI_INFO_NULL)
22     info->ref();
23   name_ = nullptr;
24   opened_ = 0;
25   group_ = MPI_GROUP_NULL;
26   requests_ = new std::vector<MPI_Request>();
27   mut_=xbt_mutex_init();
28   lock_mut_=xbt_mutex_init();
29   connected_wins_ = new MPI_Win[comm_size];
30   connected_wins_[rank_] = this;
31   count_ = 0;
32   if(rank_==0){
33     bar_ = MSG_barrier_init(comm_size);
34   }
35   mode_=0;
36
37   comm->add_rma_win(this);
38
39   Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
40                          MPI_BYTE, comm);
41
42   Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
43
44   Colls::barrier(comm);
45 }
46
47 Win::~Win(){
48   //As per the standard, perform a barrier to ensure every async comm is finished
49   MSG_barrier_wait(bar_);
50
51   int finished = finish_comms();
52   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
53
54   delete requests_;
55   delete[] connected_wins_;
56   if (name_ != nullptr){
57     xbt_free(name_);
58   }
59   if(info_!=MPI_INFO_NULL){
60     MPI_Info_free(&info_);
61   }
62
63   comm_->remove_rma_win(this);
64
65   Colls::barrier(comm_);
66   int rank=comm_->rank();
67   if(rank == 0)
68     MSG_barrier_destroy(bar_);
69   xbt_mutex_destroy(mut_);
70   xbt_mutex_destroy(lock_mut_);
71
72   if(allocated_ !=0)
73     xbt_free(base_);
74
75   cleanup_attr<Win>();
76 }
77
78 int Win::attach (void *base, MPI_Aint size){
79   if (!(base_ == MPI_BOTTOM || base_ == 0))
80     return MPI_ERR_ARG;
81   base_=0;//actually the address will be given in the RMA calls, as being the disp.
82   size_+=size;
83   return MPI_SUCCESS;
84 }
85
86 int Win::detach (void *base){
87   base_=MPI_BOTTOM;
88   size_=-1;
89   return MPI_SUCCESS;
90 }
91
92 void Win::get_name(char* name, int* length){
93   if(name_==nullptr){
94     *length=0;
95     name=nullptr;
96     return;
97   }
98   *length = strlen(name_);
99   strncpy(name, name_, *length+1);
100 }
101
102 void Win::get_group(MPI_Group* group){
103   if(comm_ != MPI_COMM_NULL){
104     *group = comm_->group();
105   } else {
106     *group = MPI_GROUP_NULL;
107   }
108 }
109
110 MPI_Info Win::info(){
111   if(info_== MPI_INFO_NULL)
112     info_ = new Info();
113   info_->ref();
114   return info_;
115 }
116
117 int Win::rank(){
118   return rank_;
119 }
120
121 MPI_Aint Win::size(){
122   return size_;
123 }
124
125 void* Win::base(){
126   return base_;
127 }
128
129 int Win::disp_unit(){
130   return disp_unit_;
131 }
132
133 int Win::dynamic(){
134   return dynamic_;
135 }
136
137 void Win::set_info(MPI_Info info){
138   if(info_!= MPI_INFO_NULL)
139     info->ref();
140   info_=info;
141 }
142
143 void Win::set_name(char* name){
144   name_ = xbt_strdup(name);
145 }
146
147 int Win::fence(int assert)
148 {
149   XBT_DEBUG("Entering fence");
150   if (opened_ == 0)
151     opened_=1;
152   if (assert != MPI_MODE_NOPRECEDE) {
153     // This is not the first fence => finalize what came before
154     MSG_barrier_wait(bar_);
155     xbt_mutex_acquire(mut_);
156     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
157     // Without this, the vector could get redimensionned when another process pushes.
158     // This would result in the array used by Request::waitall() to be invalidated.
159     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
160     std::vector<MPI_Request> *reqs = requests_;
161     int size = static_cast<int>(reqs->size());
162     // start all requests that have been prepared by another process
163     if (size > 0) {
164       MPI_Request* treqs = &(*reqs)[0];
165       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
166     }
167     count_=0;
168     xbt_mutex_release(mut_);
169   }
170
171   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
172     opened_=0;
173   assert_ = assert;
174
175   MSG_barrier_wait(bar_);
176   XBT_DEBUG("Leaving fence");
177
178   return MPI_SUCCESS;
179 }
180
181 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
182               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
183 {
184   //get receiver pointer
185   MPI_Win recv_win = connected_wins_[target_rank];
186
187   if(opened_==0){//check that post/start has been done
188     // no fence or start .. lock ok ?
189     int locked=0;
190     for(auto it : recv_win->lockers_)
191       if (it == comm_->rank())
192         locked = 1;
193     if(locked != 1)
194       return MPI_ERR_WIN;
195   }
196
197   if(target_count*target_datatype->get_extent()>recv_win->size_)
198     return MPI_ERR_ARG;
199
200   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
201   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
202
203   if(target_rank != comm_->rank()){
204     //prepare send_request
205     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
206         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
207
208     //prepare receiver request
209     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
210         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
211
212     //start send
213     sreq->start();
214     //push request to receiver's win
215     xbt_mutex_acquire(recv_win->mut_);
216     recv_win->requests_->push_back(rreq);
217     rreq->start();
218     xbt_mutex_release(recv_win->mut_);
219     //push request to sender's win
220     xbt_mutex_acquire(mut_);
221     requests_->push_back(sreq);
222     xbt_mutex_release(mut_);
223   }else{
224     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
225   }
226
227   return MPI_SUCCESS;
228 }
229
230 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
231               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
232 {
233   //get sender pointer
234   MPI_Win send_win = connected_wins_[target_rank];
235
236   if(opened_==0){//check that post/start has been done
237     // no fence or start .. lock ok ?
238     int locked=0;
239     for(auto it : send_win->lockers_)
240       if (it == comm_->rank())
241         locked = 1;
242     if(locked != 1)
243       return MPI_ERR_WIN;
244   }
245
246   if(target_count*target_datatype->get_extent()>send_win->size_)
247     return MPI_ERR_ARG;
248
249   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
250   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
251
252   if(target_rank != comm_->rank()){
253     //prepare send_request
254     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
255         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
256         MPI_OP_NULL);
257
258     //prepare receiver request
259     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
260         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
261         MPI_OP_NULL);
262
263     //start the send, with another process than us as sender. 
264     sreq->start();
265     //push request to receiver's win
266     xbt_mutex_acquire(send_win->mut_);
267     send_win->requests_->push_back(sreq);
268     xbt_mutex_release(send_win->mut_);
269
270     //start recv
271     rreq->start();
272     //push request to sender's win
273     xbt_mutex_acquire(mut_);
274     requests_->push_back(rreq);
275     xbt_mutex_release(mut_);
276   }else{
277     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
278   }
279
280   return MPI_SUCCESS;
281 }
282
283
284 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
285               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op)
286 {
287
288   //get receiver pointer
289   MPI_Win recv_win = connected_wins_[target_rank];
290
291   if(opened_==0){//check that post/start has been done
292     // no fence or start .. lock ok ?
293     int locked=0;
294     for(auto it : recv_win->lockers_)
295       if (it == comm_->rank())
296         locked = 1;
297     if(locked != 1)
298       return MPI_ERR_WIN;
299   }
300   //FIXME: local version 
301
302   if(target_count*target_datatype->get_extent()>recv_win->size_)
303     return MPI_ERR_ARG;
304
305   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
306   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
307     //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
308     //prepare send_request
309
310     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
311         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, comm_, op);
312
313     //prepare receiver request
314     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
315         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, recv_win->comm_, op);
316
317     count_++;
318
319     //start send
320     sreq->start();
321     //push request to receiver's win
322     xbt_mutex_acquire(recv_win->mut_);
323     recv_win->requests_->push_back(rreq);
324     rreq->start();
325     xbt_mutex_release(recv_win->mut_);
326     //push request to sender's win
327     xbt_mutex_acquire(mut_);
328     requests_->push_back(sreq);
329     xbt_mutex_release(mut_);
330
331   return MPI_SUCCESS;
332 }
333
334 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr, 
335               int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count, 
336               MPI_Datatype target_datatype, MPI_Op op){
337
338   //get sender pointer
339   MPI_Win send_win = connected_wins_[target_rank];
340
341   if(opened_==0){//check that post/start has been done
342     // no fence or start .. lock ok ?
343     int locked=0;
344     for(auto it : send_win->lockers_)
345       if (it == comm_->rank())
346         locked = 1;
347     if(locked != 1)
348       return MPI_ERR_WIN;
349   }
350
351   if(target_count*target_datatype->get_extent()>send_win->size_)
352     return MPI_ERR_ARG;
353
354   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
355
356   get(result_addr, result_count, result_datatype, target_rank,
357               target_disp, target_count, target_datatype);
358   if(op!=MPI_NO_OP)
359     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
360               target_disp, target_count, target_datatype, op);
361
362   return MPI_SUCCESS;
363
364 }
365
366 int Win::start(MPI_Group group, int assert){
367     /* From MPI forum advices
368     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
369     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
370     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
371     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
372     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
373     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
374     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
375     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
376     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
377     must complete, without further dependencies.  */
378
379   //naive, blocking implementation.
380     int i             = 0;
381     int j             = 0;
382     int size          = group->size();
383     MPI_Request* reqs = xbt_new0(MPI_Request, size);
384
385     while (j != size) {
386       int src = group->index(j);
387       if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
388         reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
389         i++;
390       }
391       j++;
392   }
393   size=i;
394   Request::startall(size, reqs);
395   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
396   for(i=0;i<size;i++){
397     Request::unref(&reqs[i]);
398   }
399   xbt_free(reqs);
400   opened_++; //we're open for business !
401   group_=group;
402   group->ref();
403   return MPI_SUCCESS;
404 }
405
406 int Win::post(MPI_Group group, int assert){
407   //let's make a synchronous send here
408   int i             = 0;
409   int j             = 0;
410   int size = group->size();
411   MPI_Request* reqs = xbt_new0(MPI_Request, size);
412
413   while(j!=size){
414     int dst=group->index(j);
415     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
416       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
417       i++;
418     }
419     j++;
420   }
421   size=i;
422
423   Request::startall(size, reqs);
424   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
425   for(i=0;i<size;i++){
426     Request::unref(&reqs[i]);
427   }
428   xbt_free(reqs);
429   opened_++; //we're open for business !
430   group_=group;
431   group->ref();
432   return MPI_SUCCESS;
433 }
434
435 int Win::complete(){
436   if(opened_==0)
437     xbt_die("Complete called on already opened MPI_Win");
438
439   XBT_DEBUG("Entering MPI_Win_Complete");
440   int i             = 0;
441   int j             = 0;
442   int size = group_->size();
443   MPI_Request* reqs = xbt_new0(MPI_Request, size);
444
445   while(j!=size){
446     int dst=group_->index(j);
447     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
448       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
449       i++;
450     }
451     j++;
452   }
453   size=i;
454   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
455   Request::startall(size, reqs);
456   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
457
458   for(i=0;i<size;i++){
459     Request::unref(&reqs[i]);
460   }
461   xbt_free(reqs);
462
463   int finished = finish_comms();
464   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
465
466   Group::unref(group_);
467   opened_--; //we're closed for business !
468   return MPI_SUCCESS;
469 }
470
471 int Win::wait(){
472   //naive, blocking implementation.
473   XBT_DEBUG("Entering MPI_Win_Wait");
474   int i             = 0;
475   int j             = 0;
476   int size          = group_->size();
477   MPI_Request* reqs = xbt_new0(MPI_Request, size);
478
479   while(j!=size){
480     int src=group_->index(j);
481     if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
482       reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
483       i++;
484     }
485     j++;
486   }
487   size=i;
488   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
489   Request::startall(size, reqs);
490   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
491   for(i=0;i<size;i++){
492     Request::unref(&reqs[i]);
493   }
494   xbt_free(reqs);
495   int finished = finish_comms();
496   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
497
498   Group::unref(group_);
499   opened_--; //we're opened for business !
500   return MPI_SUCCESS;
501 }
502
503 int Win::lock(int lock_type, int rank, int assert){
504   if(opened_!=0)
505     return MPI_ERR_WIN;
506
507   MPI_Win target_win = connected_wins_[rank];
508
509   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
510     xbt_mutex_acquire(target_win->lock_mut_);
511     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
512     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
513       xbt_mutex_release(target_win->lock_mut_);
514    }
515   } else if(!(target_win->mode_==MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
516         target_win->mode_+= lock_type; // don't set to exclusive if it's already shared
517
518   target_win->lockers_.push_back(comm_->rank());
519
520   int finished = finish_comms();
521   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
522
523   return MPI_SUCCESS;
524 }
525
526 int Win::lock_all(int assert){
527   int i=0;
528   int retval = MPI_SUCCESS;
529   for (i=0; i<comm_->size();i++){
530       int ret = this->lock(MPI_LOCK_SHARED, i, assert);
531       if(ret != MPI_SUCCESS)
532         retval = ret;
533   }
534   return retval;
535 }
536
537 int Win::unlock(int rank){
538   if(opened_!=0)
539     return MPI_ERR_WIN;
540
541   MPI_Win target_win = connected_wins_[rank];
542   int target_mode = target_win->mode_;
543   target_win->mode_= 0;
544   target_win->lockers_.remove(comm_->rank());
545   if (target_mode==MPI_LOCK_EXCLUSIVE){
546     xbt_mutex_release(target_win->lock_mut_);
547   }
548
549   int finished = finish_comms();
550   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
551
552   return MPI_SUCCESS;
553 }
554
555 int Win::unlock_all(){
556   int i=0;
557   int retval = MPI_SUCCESS;
558   for (i=0; i<comm_->size();i++){
559       int ret = this->unlock(i);
560       if(ret != MPI_SUCCESS)
561         retval = ret;
562   }
563   return retval;
564 }
565
566 int Win::flush(int rank){
567   MPI_Win target_win = connected_wins_[rank];
568   int finished = finish_comms(rank);
569   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
570   finished = target_win->finish_comms(rank_);
571   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
572   return MPI_SUCCESS;
573 }
574
575 int Win::flush_local(int rank){
576   int finished = finish_comms(rank);
577   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
578   return MPI_SUCCESS;
579 }
580
581 int Win::flush_all(){
582   int i=0;
583   int finished = 0;
584   finished = finish_comms();
585   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
586   for (i=0; i<comm_->size();i++){
587     finished = connected_wins_[i]->finish_comms(rank_);
588     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
589   }
590   return MPI_SUCCESS;
591 }
592
593 int Win::flush_local_all(){
594   int finished = finish_comms();
595   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
596   return MPI_SUCCESS;
597 }
598
599 Win* Win::f2c(int id){
600   return static_cast<Win*>(F2C::f2c(id));
601 }
602
603
604 int Win::finish_comms(){
605   xbt_mutex_acquire(mut_);
606   //Finish own requests
607   std::vector<MPI_Request> *reqqs = requests_;
608   int size = static_cast<int>(reqqs->size());
609   if (size > 0) {
610     MPI_Request* treqs = &(*reqqs)[0];
611     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
612     reqqs->clear();
613   }
614   xbt_mutex_release(mut_);
615   return size;
616 }
617
618 int Win::finish_comms(int rank){
619   xbt_mutex_acquire(mut_);
620   //Finish own requests
621   std::vector<MPI_Request> *reqqs = requests_;
622   int size = static_cast<int>(reqqs->size());
623   if (size > 0) {
624     size = 0;
625     std::vector<MPI_Request>* myreqqs = new std::vector<MPI_Request>();
626     std::vector<MPI_Request>::iterator iter = reqqs->begin();
627     while (iter != reqqs->end()){
628       if(((*iter)->src() == rank) || ((*iter)->dst() == rank)){
629           myreqqs->push_back(*iter);
630           iter = reqqs->erase(iter);
631           size++;
632       } else {
633         ++iter;
634       }
635     }
636     if(size >0){
637       MPI_Request* treqs = &(*myreqqs)[0];
638       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
639       myreqqs->clear();
640       delete myreqqs;
641     }
642   }
643   xbt_mutex_release(mut_);
644   return size;
645 }
646
647
648 }
649 }