Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
close the window if we explicitely tell it's closed
[simgrid.git] / src / smpi / smpi_win.cpp
1 /* Copyright (c) 2007-2017. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <vector>
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 namespace simgrid{
13 namespace smpi{
14 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
15 int Win::keyval_id_=0;
16
17 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
18   int comm_size = comm->size();
19   rank_      = comm->rank();
20   XBT_DEBUG("Creating window");
21   if(info!=MPI_INFO_NULL)
22     info->ref();
23   name_ = nullptr;
24   opened_ = 0;
25   group_ = MPI_GROUP_NULL;
26   requests_ = new std::vector<MPI_Request>();
27   mut_=xbt_mutex_init();
28   lock_mut_=xbt_mutex_init();
29   connected_wins_ = new MPI_Win[comm_size];
30   connected_wins_[rank_] = this;
31   count_ = 0;
32   if(rank_==0){
33     bar_ = MSG_barrier_init(comm_size);
34   }
35   mode_=0;
36
37   comm->add_rma_win(this);
38
39   Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
40                          MPI_BYTE, comm);
41
42   Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
43
44   Colls::barrier(comm);
45 }
46
47 Win::~Win(){
48   //As per the standard, perform a barrier to ensure every async comm is finished
49   MSG_barrier_wait(bar_);
50
51   int finished = finish_comms();
52   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
53
54   delete requests_;
55   delete[] connected_wins_;
56   if (name_ != nullptr){
57     xbt_free(name_);
58   }
59   if(info_!=MPI_INFO_NULL){
60     MPI_Info_free(&info_);
61   }
62
63   comm_->remove_rma_win(this);
64
65   Colls::barrier(comm_);
66   int rank=comm_->rank();
67   if(rank == 0)
68     MSG_barrier_destroy(bar_);
69   xbt_mutex_destroy(mut_);
70   xbt_mutex_destroy(lock_mut_);
71
72   if(allocated_ !=0)
73     xbt_free(base_);
74
75   cleanup_attr<Win>();
76 }
77
78 int Win::attach (void *base, MPI_Aint size){
79   if (!(base_ == MPI_BOTTOM || base_ == 0))
80     return MPI_ERR_ARG;
81   base_=0;//actually the address will be given in the RMA calls, as being the disp.
82   size_+=size;
83   return MPI_SUCCESS;
84 }
85
86 int Win::detach (void *base){
87   base_=MPI_BOTTOM;
88   size_=-1;
89   return MPI_SUCCESS;
90 }
91
92 void Win::get_name(char* name, int* length){
93   if(name_==nullptr){
94     *length=0;
95     name=nullptr;
96     return;
97   }
98   *length = strlen(name_);
99   strncpy(name, name_, *length+1);
100 }
101
102 void Win::get_group(MPI_Group* group){
103   if(comm_ != MPI_COMM_NULL){
104     *group = comm_->group();
105   } else {
106     *group = MPI_GROUP_NULL;
107   }
108 }
109
110 MPI_Info Win::info(){
111   if(info_== MPI_INFO_NULL)
112     info_ = new Info();
113   info_->ref();
114   return info_;
115 }
116
117 int Win::rank(){
118   return rank_;
119 }
120
121 MPI_Aint Win::size(){
122   return size_;
123 }
124
125 void* Win::base(){
126   return base_;
127 }
128
129 int Win::disp_unit(){
130   return disp_unit_;
131 }
132
133 int Win::dynamic(){
134   return dynamic_;
135 }
136
137 void Win::set_info(MPI_Info info){
138   if(info_!= MPI_INFO_NULL)
139     info->ref();
140   info_=info;
141 }
142
143 void Win::set_name(char* name){
144   name_ = xbt_strdup(name);
145 }
146
147 int Win::fence(int assert)
148 {
149   XBT_DEBUG("Entering fence");
150   if (opened_ == 0)
151     opened_=1;
152   if (assert != MPI_MODE_NOPRECEDE) {
153     // This is not the first fence => finalize what came before
154     MSG_barrier_wait(bar_);
155     xbt_mutex_acquire(mut_);
156     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
157     // Without this, the vector could get redimensionned when another process pushes.
158     // This would result in the array used by Request::waitall() to be invalidated.
159     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
160     std::vector<MPI_Request> *reqs = requests_;
161     int size = static_cast<int>(reqs->size());
162     // start all requests that have been prepared by another process
163     if (size > 0) {
164       for (const auto& req : *reqs) {
165         if (req && (req->flags() & PREPARED))
166           req->start();
167       }
168
169       MPI_Request* treqs = &(*reqs)[0];
170
171       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
172     }
173     count_=0;
174     xbt_mutex_release(mut_);
175   }
176
177   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
178     opened_=0;
179   assert_ = assert;
180
181   MSG_barrier_wait(bar_);
182   XBT_DEBUG("Leaving fence");
183
184   return MPI_SUCCESS;
185 }
186
187 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
188               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
189 {
190   //get receiver pointer
191   MPI_Win recv_win = connected_wins_[target_rank];
192
193   if(opened_==0){//check that post/start has been done
194     // no fence or start .. lock ok ?
195     int locked=0;
196     for(auto it : recv_win->lockers_)
197       if (it == comm_->rank())
198         locked = 1;
199     if(locked != 1)
200       return MPI_ERR_WIN;
201   }
202
203   if(target_count*target_datatype->get_extent()>recv_win->size_)
204     return MPI_ERR_ARG;
205
206   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
207   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
208
209   if(target_rank != comm_->rank()){
210     //prepare send_request
211     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
212         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
213
214     //prepare receiver request
215     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
216         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
217
218     //push request to receiver's win
219     xbt_mutex_acquire(recv_win->mut_);
220     recv_win->requests_->push_back(rreq);
221     xbt_mutex_release(recv_win->mut_);
222     //start send
223     sreq->start();
224
225     //push request to sender's win
226     xbt_mutex_acquire(mut_);
227     requests_->push_back(sreq);
228     xbt_mutex_release(mut_);
229   }else{
230     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
231   }
232
233   return MPI_SUCCESS;
234 }
235
236 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
237               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
238 {
239   //get sender pointer
240   MPI_Win send_win = connected_wins_[target_rank];
241
242   if(opened_==0){//check that post/start has been done
243     // no fence or start .. lock ok ?
244     int locked=0;
245     for(auto it : send_win->lockers_)
246       if (it == comm_->rank())
247         locked = 1;
248     if(locked != 1)
249       return MPI_ERR_WIN;
250   }
251
252   if(target_count*target_datatype->get_extent()>send_win->size_)
253     return MPI_ERR_ARG;
254
255   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
256   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
257
258   if(target_rank != comm_->rank()){
259     //prepare send_request
260     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
261         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
262         MPI_OP_NULL);
263
264     //prepare receiver request
265     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
266         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
267         MPI_OP_NULL);
268
269     //start the send, with another process than us as sender. 
270     sreq->start();
271     //push request to receiver's win
272     xbt_mutex_acquire(send_win->mut_);
273     send_win->requests_->push_back(sreq);
274     xbt_mutex_release(send_win->mut_);
275
276     //start recv
277     rreq->start();
278     //push request to sender's win
279     xbt_mutex_acquire(mut_);
280     requests_->push_back(rreq);
281     xbt_mutex_release(mut_);
282   }else{
283     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
284   }
285
286   return MPI_SUCCESS;
287 }
288
289
290 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
291               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op)
292 {
293
294   //get receiver pointer
295   MPI_Win recv_win = connected_wins_[target_rank];
296
297   if(opened_==0){//check that post/start has been done
298     // no fence or start .. lock ok ?
299     int locked=0;
300     for(auto it : recv_win->lockers_)
301       if (it == comm_->rank())
302         locked = 1;
303     if(locked != 1)
304       return MPI_ERR_WIN;
305   }
306   //FIXME: local version 
307
308   if(target_count*target_datatype->get_extent()>recv_win->size_)
309     return MPI_ERR_ARG;
310
311   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
312   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
313     //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
314     //prepare send_request
315
316     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
317         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, comm_, op);
318
319     //prepare receiver request
320     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
321         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, recv_win->comm_, op);
322
323     count_++;
324     //push request to receiver's win
325     xbt_mutex_acquire(recv_win->mut_);
326     recv_win->requests_->push_back(rreq);
327     xbt_mutex_release(recv_win->mut_);
328     //start send
329     sreq->start();
330
331     //push request to sender's win
332     xbt_mutex_acquire(mut_);
333     requests_->push_back(sreq);
334     xbt_mutex_release(mut_);
335
336   return MPI_SUCCESS;
337 }
338
339 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr, 
340               int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count, 
341               MPI_Datatype target_datatype, MPI_Op op){
342
343   //get sender pointer
344   MPI_Win send_win = connected_wins_[target_rank];
345
346   if(opened_==0){//check that post/start has been done
347     // no fence or start .. lock ok ?
348     int locked=0;
349     for(auto it : send_win->lockers_)
350       if (it == comm_->rank())
351         locked = 1;
352     if(locked != 1)
353       return MPI_ERR_WIN;
354   }
355
356   if(target_count*target_datatype->get_extent()>send_win->size_)
357     return MPI_ERR_ARG;
358
359   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
360
361   get(result_addr, result_count, result_datatype, target_rank,
362               target_disp, target_count, target_datatype);
363   accumulate(origin_addr, origin_count, origin_datatype, target_rank,
364               target_disp, target_count, target_datatype, op);
365
366   return MPI_SUCCESS;
367
368 }
369
370 int Win::start(MPI_Group group, int assert){
371     /* From MPI forum advices
372     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
373     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
374     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
375     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
376     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
377     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
378     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
379     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
380     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
381     must complete, without further dependencies.  */
382
383   //naive, blocking implementation.
384     int i             = 0;
385     int j             = 0;
386     int size          = group->size();
387     MPI_Request* reqs = xbt_new0(MPI_Request, size);
388
389     while (j != size) {
390       int src = group->index(j);
391       if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
392         reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
393         i++;
394       }
395       j++;
396   }
397   size=i;
398   Request::startall(size, reqs);
399   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
400   for(i=0;i<size;i++){
401     Request::unref(&reqs[i]);
402   }
403   xbt_free(reqs);
404   opened_++; //we're open for business !
405   group_=group;
406   group->ref();
407   return MPI_SUCCESS;
408 }
409
410 int Win::post(MPI_Group group, int assert){
411   //let's make a synchronous send here
412   int i             = 0;
413   int j             = 0;
414   int size = group->size();
415   MPI_Request* reqs = xbt_new0(MPI_Request, size);
416
417   while(j!=size){
418     int dst=group->index(j);
419     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
420       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
421       i++;
422     }
423     j++;
424   }
425   size=i;
426
427   Request::startall(size, reqs);
428   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
429   for(i=0;i<size;i++){
430     Request::unref(&reqs[i]);
431   }
432   xbt_free(reqs);
433   opened_++; //we're open for business !
434   group_=group;
435   group->ref();
436   return MPI_SUCCESS;
437 }
438
439 int Win::complete(){
440   if(opened_==0)
441     xbt_die("Complete called on already opened MPI_Win");
442
443   XBT_DEBUG("Entering MPI_Win_Complete");
444   int i             = 0;
445   int j             = 0;
446   int size = group_->size();
447   MPI_Request* reqs = xbt_new0(MPI_Request, size);
448
449   while(j!=size){
450     int dst=group_->index(j);
451     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
452       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
453       i++;
454     }
455     j++;
456   }
457   size=i;
458   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
459   Request::startall(size, reqs);
460   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
461
462   for(i=0;i<size;i++){
463     Request::unref(&reqs[i]);
464   }
465   xbt_free(reqs);
466
467   int finished = finish_comms();
468   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
469
470   Group::unref(group_);
471   opened_--; //we're closed for business !
472   return MPI_SUCCESS;
473 }
474
475 int Win::wait(){
476   //naive, blocking implementation.
477   XBT_DEBUG("Entering MPI_Win_Wait");
478   int i             = 0;
479   int j             = 0;
480   int size          = group_->size();
481   MPI_Request* reqs = xbt_new0(MPI_Request, size);
482
483   while(j!=size){
484     int src=group_->index(j);
485     if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
486       reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
487       i++;
488     }
489     j++;
490   }
491   size=i;
492   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
493   Request::startall(size, reqs);
494   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
495   for(i=0;i<size;i++){
496     Request::unref(&reqs[i]);
497   }
498   xbt_free(reqs);
499   int finished = finish_comms();
500   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
501
502   Group::unref(group_);
503   opened_--; //we're opened for business !
504   return MPI_SUCCESS;
505 }
506
507 int Win::lock(int lock_type, int rank, int assert){
508   if(opened_!=0)
509     return MPI_ERR_WIN;
510
511   MPI_Win target_win = connected_wins_[rank];
512
513   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
514     xbt_mutex_acquire(target_win->lock_mut_);
515     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
516     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
517       xbt_mutex_release(target_win->lock_mut_);
518    }
519   } else if(!(target_win->mode_==MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
520         target_win->mode_+= lock_type; // don't set to exclusive if it's already shared
521
522   target_win->lockers_.push_back(comm_->rank());
523
524   int finished = finish_comms();
525   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
526
527   return MPI_SUCCESS;
528 }
529
530 int Win::unlock(int rank){
531   if(opened_!=0)
532     return MPI_ERR_WIN;
533
534   MPI_Win target_win = connected_wins_[rank];
535   int target_mode = target_win->mode_;
536   target_win->mode_= 0;
537   target_win->lockers_.remove(comm_->rank());
538   if (target_mode==MPI_LOCK_EXCLUSIVE){
539     xbt_mutex_release(target_win->lock_mut_);
540   }
541
542   int finished = finish_comms();
543   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
544
545   return MPI_SUCCESS;
546 }
547
548 Win* Win::f2c(int id){
549   return static_cast<Win*>(F2C::f2c(id));
550 }
551
552
553 int Win::finish_comms(){
554   xbt_mutex_acquire(mut_);
555   //Finish own requests
556   std::vector<MPI_Request> *reqqs = requests_;
557   int size = static_cast<int>(reqqs->size());
558   if (size > 0) {
559     // start all requests that have been prepared by another process
560     for (const auto& req : *reqqs) {
561       if (req && (req->flags() & PREPARED))
562         req->start();
563     }
564
565     MPI_Request* treqs = &(*reqqs)[0];
566     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
567     reqqs->clear();
568   }
569   xbt_mutex_release(mut_);
570   return size;
571 }
572
573
574 }
575 }