Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
No_op has a specific effect, actually, and turns the get_accumulate in a get.
[simgrid.git] / src / smpi / smpi_win.cpp
1 /* Copyright (c) 2007-2017. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <vector>
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 namespace simgrid{
13 namespace smpi{
14 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
15 int Win::keyval_id_=0;
16
17 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
18   int comm_size = comm->size();
19   rank_      = comm->rank();
20   XBT_DEBUG("Creating window");
21   if(info!=MPI_INFO_NULL)
22     info->ref();
23   name_ = nullptr;
24   opened_ = 0;
25   group_ = MPI_GROUP_NULL;
26   requests_ = new std::vector<MPI_Request>();
27   mut_=xbt_mutex_init();
28   lock_mut_=xbt_mutex_init();
29   connected_wins_ = new MPI_Win[comm_size];
30   connected_wins_[rank_] = this;
31   count_ = 0;
32   if(rank_==0){
33     bar_ = MSG_barrier_init(comm_size);
34   }
35   mode_=0;
36
37   comm->add_rma_win(this);
38
39   Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
40                          MPI_BYTE, comm);
41
42   Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
43
44   Colls::barrier(comm);
45 }
46
47 Win::~Win(){
48   //As per the standard, perform a barrier to ensure every async comm is finished
49   MSG_barrier_wait(bar_);
50
51   int finished = finish_comms();
52   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
53
54   delete requests_;
55   delete[] connected_wins_;
56   if (name_ != nullptr){
57     xbt_free(name_);
58   }
59   if(info_!=MPI_INFO_NULL){
60     MPI_Info_free(&info_);
61   }
62
63   comm_->remove_rma_win(this);
64
65   Colls::barrier(comm_);
66   int rank=comm_->rank();
67   if(rank == 0)
68     MSG_barrier_destroy(bar_);
69   xbt_mutex_destroy(mut_);
70   xbt_mutex_destroy(lock_mut_);
71
72   if(allocated_ !=0)
73     xbt_free(base_);
74
75   cleanup_attr<Win>();
76 }
77
78 int Win::attach (void *base, MPI_Aint size){
79   if (!(base_ == MPI_BOTTOM || base_ == 0))
80     return MPI_ERR_ARG;
81   base_=0;//actually the address will be given in the RMA calls, as being the disp.
82   size_+=size;
83   return MPI_SUCCESS;
84 }
85
86 int Win::detach (void *base){
87   base_=MPI_BOTTOM;
88   size_=-1;
89   return MPI_SUCCESS;
90 }
91
92 void Win::get_name(char* name, int* length){
93   if(name_==nullptr){
94     *length=0;
95     name=nullptr;
96     return;
97   }
98   *length = strlen(name_);
99   strncpy(name, name_, *length+1);
100 }
101
102 void Win::get_group(MPI_Group* group){
103   if(comm_ != MPI_COMM_NULL){
104     *group = comm_->group();
105   } else {
106     *group = MPI_GROUP_NULL;
107   }
108 }
109
110 MPI_Info Win::info(){
111   if(info_== MPI_INFO_NULL)
112     info_ = new Info();
113   info_->ref();
114   return info_;
115 }
116
117 int Win::rank(){
118   return rank_;
119 }
120
121 MPI_Aint Win::size(){
122   return size_;
123 }
124
125 void* Win::base(){
126   return base_;
127 }
128
129 int Win::disp_unit(){
130   return disp_unit_;
131 }
132
133 int Win::dynamic(){
134   return dynamic_;
135 }
136
137 void Win::set_info(MPI_Info info){
138   if(info_!= MPI_INFO_NULL)
139     info->ref();
140   info_=info;
141 }
142
143 void Win::set_name(char* name){
144   name_ = xbt_strdup(name);
145 }
146
147 int Win::fence(int assert)
148 {
149   XBT_DEBUG("Entering fence");
150   if (opened_ == 0)
151     opened_=1;
152   if (assert != MPI_MODE_NOPRECEDE) {
153     // This is not the first fence => finalize what came before
154     MSG_barrier_wait(bar_);
155     xbt_mutex_acquire(mut_);
156     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
157     // Without this, the vector could get redimensionned when another process pushes.
158     // This would result in the array used by Request::waitall() to be invalidated.
159     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
160     std::vector<MPI_Request> *reqs = requests_;
161     int size = static_cast<int>(reqs->size());
162     // start all requests that have been prepared by another process
163     if (size > 0) {
164       MPI_Request* treqs = &(*reqs)[0];
165       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
166     }
167     count_=0;
168     xbt_mutex_release(mut_);
169   }
170
171   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
172     opened_=0;
173   assert_ = assert;
174
175   MSG_barrier_wait(bar_);
176   XBT_DEBUG("Leaving fence");
177
178   return MPI_SUCCESS;
179 }
180
181 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
182               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
183 {
184   //get receiver pointer
185   MPI_Win recv_win = connected_wins_[target_rank];
186
187   if(opened_==0){//check that post/start has been done
188     // no fence or start .. lock ok ?
189     int locked=0;
190     for(auto it : recv_win->lockers_)
191       if (it == comm_->rank())
192         locked = 1;
193     if(locked != 1)
194       return MPI_ERR_WIN;
195   }
196
197   if(target_count*target_datatype->get_extent()>recv_win->size_)
198     return MPI_ERR_ARG;
199
200   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
201   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
202
203   if(target_rank != comm_->rank()){
204     //prepare send_request
205     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
206         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
207
208     //prepare receiver request
209     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
210         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
211
212     //push request to receiver's win
213     xbt_mutex_acquire(recv_win->mut_);
214     recv_win->requests_->push_back(rreq);
215     xbt_mutex_release(recv_win->mut_);
216     //start send
217     sreq->start();
218     rreq->start();
219     //push request to sender's win
220     xbt_mutex_acquire(mut_);
221     requests_->push_back(sreq);
222     xbt_mutex_release(mut_);
223   }else{
224     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
225   }
226
227   return MPI_SUCCESS;
228 }
229
230 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
231               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
232 {
233   //get sender pointer
234   MPI_Win send_win = connected_wins_[target_rank];
235
236   if(opened_==0){//check that post/start has been done
237     // no fence or start .. lock ok ?
238     int locked=0;
239     for(auto it : send_win->lockers_)
240       if (it == comm_->rank())
241         locked = 1;
242     if(locked != 1)
243       return MPI_ERR_WIN;
244   }
245
246   if(target_count*target_datatype->get_extent()>send_win->size_)
247     return MPI_ERR_ARG;
248
249   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
250   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
251
252   if(target_rank != comm_->rank()){
253     //prepare send_request
254     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
255         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
256         MPI_OP_NULL);
257
258     //prepare receiver request
259     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
260         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
261         MPI_OP_NULL);
262
263     //start the send, with another process than us as sender. 
264     sreq->start();
265     //push request to receiver's win
266     xbt_mutex_acquire(send_win->mut_);
267     send_win->requests_->push_back(sreq);
268     xbt_mutex_release(send_win->mut_);
269
270     //start recv
271     rreq->start();
272     //push request to sender's win
273     xbt_mutex_acquire(mut_);
274     requests_->push_back(rreq);
275     xbt_mutex_release(mut_);
276   }else{
277     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
278   }
279
280   return MPI_SUCCESS;
281 }
282
283
284 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
285               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op)
286 {
287
288   //get receiver pointer
289   MPI_Win recv_win = connected_wins_[target_rank];
290
291   if(opened_==0){//check that post/start has been done
292     // no fence or start .. lock ok ?
293     int locked=0;
294     for(auto it : recv_win->lockers_)
295       if (it == comm_->rank())
296         locked = 1;
297     if(locked != 1)
298       return MPI_ERR_WIN;
299   }
300   //FIXME: local version 
301
302   if(target_count*target_datatype->get_extent()>recv_win->size_)
303     return MPI_ERR_ARG;
304
305   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
306   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
307     //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
308     //prepare send_request
309
310     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
311         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, comm_, op);
312
313     //prepare receiver request
314     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
315         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, recv_win->comm_, op);
316
317     count_++;
318     //push request to receiver's win
319     xbt_mutex_acquire(recv_win->mut_);
320     recv_win->requests_->push_back(rreq);
321     xbt_mutex_release(recv_win->mut_);
322     //start send
323     sreq->start();
324     rreq->start();
325     //push request to sender's win
326     xbt_mutex_acquire(mut_);
327     requests_->push_back(sreq);
328     xbt_mutex_release(mut_);
329
330   return MPI_SUCCESS;
331 }
332
333 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr, 
334               int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count, 
335               MPI_Datatype target_datatype, MPI_Op op){
336
337   //get sender pointer
338   MPI_Win send_win = connected_wins_[target_rank];
339
340   if(opened_==0){//check that post/start has been done
341     // no fence or start .. lock ok ?
342     int locked=0;
343     for(auto it : send_win->lockers_)
344       if (it == comm_->rank())
345         locked = 1;
346     if(locked != 1)
347       return MPI_ERR_WIN;
348   }
349
350   if(target_count*target_datatype->get_extent()>send_win->size_)
351     return MPI_ERR_ARG;
352
353   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
354
355   get(result_addr, result_count, result_datatype, target_rank,
356               target_disp, target_count, target_datatype);
357   if(op!=MPI_NO_OP)
358     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
359               target_disp, target_count, target_datatype, op);
360
361   return MPI_SUCCESS;
362
363 }
364
365 int Win::start(MPI_Group group, int assert){
366     /* From MPI forum advices
367     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
368     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
369     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
370     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
371     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
372     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
373     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
374     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
375     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
376     must complete, without further dependencies.  */
377
378   //naive, blocking implementation.
379     int i             = 0;
380     int j             = 0;
381     int size          = group->size();
382     MPI_Request* reqs = xbt_new0(MPI_Request, size);
383
384     while (j != size) {
385       int src = group->index(j);
386       if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
387         reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
388         i++;
389       }
390       j++;
391   }
392   size=i;
393   Request::startall(size, reqs);
394   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
395   for(i=0;i<size;i++){
396     Request::unref(&reqs[i]);
397   }
398   xbt_free(reqs);
399   opened_++; //we're open for business !
400   group_=group;
401   group->ref();
402   return MPI_SUCCESS;
403 }
404
405 int Win::post(MPI_Group group, int assert){
406   //let's make a synchronous send here
407   int i             = 0;
408   int j             = 0;
409   int size = group->size();
410   MPI_Request* reqs = xbt_new0(MPI_Request, size);
411
412   while(j!=size){
413     int dst=group->index(j);
414     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
415       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
416       i++;
417     }
418     j++;
419   }
420   size=i;
421
422   Request::startall(size, reqs);
423   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
424   for(i=0;i<size;i++){
425     Request::unref(&reqs[i]);
426   }
427   xbt_free(reqs);
428   opened_++; //we're open for business !
429   group_=group;
430   group->ref();
431   return MPI_SUCCESS;
432 }
433
434 int Win::complete(){
435   if(opened_==0)
436     xbt_die("Complete called on already opened MPI_Win");
437
438   XBT_DEBUG("Entering MPI_Win_Complete");
439   int i             = 0;
440   int j             = 0;
441   int size = group_->size();
442   MPI_Request* reqs = xbt_new0(MPI_Request, size);
443
444   while(j!=size){
445     int dst=group_->index(j);
446     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
447       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
448       i++;
449     }
450     j++;
451   }
452   size=i;
453   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
454   Request::startall(size, reqs);
455   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
456
457   for(i=0;i<size;i++){
458     Request::unref(&reqs[i]);
459   }
460   xbt_free(reqs);
461
462   int finished = finish_comms();
463   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
464
465   Group::unref(group_);
466   opened_--; //we're closed for business !
467   return MPI_SUCCESS;
468 }
469
470 int Win::wait(){
471   //naive, blocking implementation.
472   XBT_DEBUG("Entering MPI_Win_Wait");
473   int i             = 0;
474   int j             = 0;
475   int size          = group_->size();
476   MPI_Request* reqs = xbt_new0(MPI_Request, size);
477
478   while(j!=size){
479     int src=group_->index(j);
480     if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
481       reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
482       i++;
483     }
484     j++;
485   }
486   size=i;
487   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
488   Request::startall(size, reqs);
489   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
490   for(i=0;i<size;i++){
491     Request::unref(&reqs[i]);
492   }
493   xbt_free(reqs);
494   int finished = finish_comms();
495   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
496
497   Group::unref(group_);
498   opened_--; //we're opened for business !
499   return MPI_SUCCESS;
500 }
501
502 int Win::lock(int lock_type, int rank, int assert){
503   if(opened_!=0)
504     return MPI_ERR_WIN;
505
506   MPI_Win target_win = connected_wins_[rank];
507
508   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
509     xbt_mutex_acquire(target_win->lock_mut_);
510     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
511     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
512       xbt_mutex_release(target_win->lock_mut_);
513    }
514   } else if(!(target_win->mode_==MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
515         target_win->mode_+= lock_type; // don't set to exclusive if it's already shared
516
517   target_win->lockers_.push_back(comm_->rank());
518
519   int finished = finish_comms();
520   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
521
522   return MPI_SUCCESS;
523 }
524
525 int Win::unlock(int rank){
526   if(opened_!=0)
527     return MPI_ERR_WIN;
528
529   MPI_Win target_win = connected_wins_[rank];
530   int target_mode = target_win->mode_;
531   target_win->mode_= 0;
532   target_win->lockers_.remove(comm_->rank());
533   if (target_mode==MPI_LOCK_EXCLUSIVE){
534     xbt_mutex_release(target_win->lock_mut_);
535   }
536
537   int finished = finish_comms();
538   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
539
540   return MPI_SUCCESS;
541 }
542
543 Win* Win::f2c(int id){
544   return static_cast<Win*>(F2C::f2c(id));
545 }
546
547
548 int Win::finish_comms(){
549   xbt_mutex_acquire(mut_);
550   //Finish own requests
551   std::vector<MPI_Request> *reqqs = requests_;
552   int size = static_cast<int>(reqqs->size());
553   if (size > 0) {
554     MPI_Request* treqs = &(*reqqs)[0];
555     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
556     reqqs->clear();
557   }
558   xbt_mutex_release(mut_);
559   return size;
560 }
561
562
563 }
564 }