Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
We can start the recv requests directly now, and don't have to wait for a synchro.
[simgrid.git] / src / smpi / smpi_win.cpp
1 /* Copyright (c) 2007-2017. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <vector>
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 namespace simgrid{
13 namespace smpi{
14 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
15 int Win::keyval_id_=0;
16
17 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
18   int comm_size = comm->size();
19   rank_      = comm->rank();
20   XBT_DEBUG("Creating window");
21   if(info!=MPI_INFO_NULL)
22     info->ref();
23   name_ = nullptr;
24   opened_ = 0;
25   group_ = MPI_GROUP_NULL;
26   requests_ = new std::vector<MPI_Request>();
27   mut_=xbt_mutex_init();
28   lock_mut_=xbt_mutex_init();
29   connected_wins_ = new MPI_Win[comm_size];
30   connected_wins_[rank_] = this;
31   count_ = 0;
32   if(rank_==0){
33     bar_ = MSG_barrier_init(comm_size);
34   }
35   mode_=0;
36
37   comm->add_rma_win(this);
38
39   Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
40                          MPI_BYTE, comm);
41
42   Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
43
44   Colls::barrier(comm);
45 }
46
47 Win::~Win(){
48   //As per the standard, perform a barrier to ensure every async comm is finished
49   MSG_barrier_wait(bar_);
50
51   int finished = finish_comms();
52   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
53
54   delete requests_;
55   delete[] connected_wins_;
56   if (name_ != nullptr){
57     xbt_free(name_);
58   }
59   if(info_!=MPI_INFO_NULL){
60     MPI_Info_free(&info_);
61   }
62
63   comm_->remove_rma_win(this);
64
65   Colls::barrier(comm_);
66   int rank=comm_->rank();
67   if(rank == 0)
68     MSG_barrier_destroy(bar_);
69   xbt_mutex_destroy(mut_);
70   xbt_mutex_destroy(lock_mut_);
71
72   if(allocated_ !=0)
73     xbt_free(base_);
74
75   cleanup_attr<Win>();
76 }
77
78 int Win::attach (void *base, MPI_Aint size){
79   if (!(base_ == MPI_BOTTOM || base_ == 0))
80     return MPI_ERR_ARG;
81   base_=0;//actually the address will be given in the RMA calls, as being the disp.
82   size_+=size;
83   return MPI_SUCCESS;
84 }
85
86 int Win::detach (void *base){
87   base_=MPI_BOTTOM;
88   size_=-1;
89   return MPI_SUCCESS;
90 }
91
92 void Win::get_name(char* name, int* length){
93   if(name_==nullptr){
94     *length=0;
95     name=nullptr;
96     return;
97   }
98   *length = strlen(name_);
99   strncpy(name, name_, *length+1);
100 }
101
102 void Win::get_group(MPI_Group* group){
103   if(comm_ != MPI_COMM_NULL){
104     *group = comm_->group();
105   } else {
106     *group = MPI_GROUP_NULL;
107   }
108 }
109
110 MPI_Info Win::info(){
111   if(info_== MPI_INFO_NULL)
112     info_ = new Info();
113   info_->ref();
114   return info_;
115 }
116
117 int Win::rank(){
118   return rank_;
119 }
120
121 MPI_Aint Win::size(){
122   return size_;
123 }
124
125 void* Win::base(){
126   return base_;
127 }
128
129 int Win::disp_unit(){
130   return disp_unit_;
131 }
132
133 int Win::dynamic(){
134   return dynamic_;
135 }
136
137 void Win::set_info(MPI_Info info){
138   if(info_!= MPI_INFO_NULL)
139     info->ref();
140   info_=info;
141 }
142
143 void Win::set_name(char* name){
144   name_ = xbt_strdup(name);
145 }
146
147 int Win::fence(int assert)
148 {
149   XBT_DEBUG("Entering fence");
150   if (opened_ == 0)
151     opened_=1;
152   if (assert != MPI_MODE_NOPRECEDE) {
153     // This is not the first fence => finalize what came before
154     MSG_barrier_wait(bar_);
155     xbt_mutex_acquire(mut_);
156     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
157     // Without this, the vector could get redimensionned when another process pushes.
158     // This would result in the array used by Request::waitall() to be invalidated.
159     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
160     std::vector<MPI_Request> *reqs = requests_;
161     int size = static_cast<int>(reqs->size());
162     // start all requests that have been prepared by another process
163     if (size > 0) {
164       MPI_Request* treqs = &(*reqs)[0];
165       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
166     }
167     count_=0;
168     xbt_mutex_release(mut_);
169   }
170
171   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
172     opened_=0;
173   assert_ = assert;
174
175   MSG_barrier_wait(bar_);
176   XBT_DEBUG("Leaving fence");
177
178   return MPI_SUCCESS;
179 }
180
181 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
182               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
183 {
184   //get receiver pointer
185   MPI_Win recv_win = connected_wins_[target_rank];
186
187   if(opened_==0){//check that post/start has been done
188     // no fence or start .. lock ok ?
189     int locked=0;
190     for(auto it : recv_win->lockers_)
191       if (it == comm_->rank())
192         locked = 1;
193     if(locked != 1)
194       return MPI_ERR_WIN;
195   }
196
197   if(target_count*target_datatype->get_extent()>recv_win->size_)
198     return MPI_ERR_ARG;
199
200   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
201   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
202
203   if(target_rank != comm_->rank()){
204     //prepare send_request
205     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
206         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
207
208     //prepare receiver request
209     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
210         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
211
212     //push request to receiver's win
213     xbt_mutex_acquire(recv_win->mut_);
214     recv_win->requests_->push_back(rreq);
215     xbt_mutex_release(recv_win->mut_);
216     //start send
217     sreq->start();
218     rreq->start();
219     //push request to sender's win
220     xbt_mutex_acquire(mut_);
221     requests_->push_back(sreq);
222     xbt_mutex_release(mut_);
223   }else{
224     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
225   }
226
227   return MPI_SUCCESS;
228 }
229
230 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
231               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
232 {
233   //get sender pointer
234   MPI_Win send_win = connected_wins_[target_rank];
235
236   if(opened_==0){//check that post/start has been done
237     // no fence or start .. lock ok ?
238     int locked=0;
239     for(auto it : send_win->lockers_)
240       if (it == comm_->rank())
241         locked = 1;
242     if(locked != 1)
243       return MPI_ERR_WIN;
244   }
245
246   if(target_count*target_datatype->get_extent()>send_win->size_)
247     return MPI_ERR_ARG;
248
249   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
250   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
251
252   if(target_rank != comm_->rank()){
253     //prepare send_request
254     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
255         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
256         MPI_OP_NULL);
257
258     //prepare receiver request
259     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
260         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
261         MPI_OP_NULL);
262
263     //start the send, with another process than us as sender. 
264     sreq->start();
265     //push request to receiver's win
266     xbt_mutex_acquire(send_win->mut_);
267     send_win->requests_->push_back(sreq);
268     xbt_mutex_release(send_win->mut_);
269
270     //start recv
271     rreq->start();
272     //push request to sender's win
273     xbt_mutex_acquire(mut_);
274     requests_->push_back(rreq);
275     xbt_mutex_release(mut_);
276   }else{
277     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
278   }
279
280   return MPI_SUCCESS;
281 }
282
283
284 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
285               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op)
286 {
287
288   //get receiver pointer
289   MPI_Win recv_win = connected_wins_[target_rank];
290
291   if(opened_==0){//check that post/start has been done
292     // no fence or start .. lock ok ?
293     int locked=0;
294     for(auto it : recv_win->lockers_)
295       if (it == comm_->rank())
296         locked = 1;
297     if(locked != 1)
298       return MPI_ERR_WIN;
299   }
300   //FIXME: local version 
301
302   if(target_count*target_datatype->get_extent()>recv_win->size_)
303     return MPI_ERR_ARG;
304
305   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
306   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
307     //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
308     //prepare send_request
309
310     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
311         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, comm_, op);
312
313     //prepare receiver request
314     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
315         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, recv_win->comm_, op);
316
317     count_++;
318     //push request to receiver's win
319     xbt_mutex_acquire(recv_win->mut_);
320     recv_win->requests_->push_back(rreq);
321     xbt_mutex_release(recv_win->mut_);
322     //start send
323     sreq->start();
324     rreq->start();
325     //push request to sender's win
326     xbt_mutex_acquire(mut_);
327     requests_->push_back(sreq);
328     xbt_mutex_release(mut_);
329
330   return MPI_SUCCESS;
331 }
332
333 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr, 
334               int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count, 
335               MPI_Datatype target_datatype, MPI_Op op){
336
337   //get sender pointer
338   MPI_Win send_win = connected_wins_[target_rank];
339
340   if(opened_==0){//check that post/start has been done
341     // no fence or start .. lock ok ?
342     int locked=0;
343     for(auto it : send_win->lockers_)
344       if (it == comm_->rank())
345         locked = 1;
346     if(locked != 1)
347       return MPI_ERR_WIN;
348   }
349
350   if(target_count*target_datatype->get_extent()>send_win->size_)
351     return MPI_ERR_ARG;
352
353   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
354
355   get(result_addr, result_count, result_datatype, target_rank,
356               target_disp, target_count, target_datatype);
357   accumulate(origin_addr, origin_count, origin_datatype, target_rank,
358               target_disp, target_count, target_datatype, op);
359
360   return MPI_SUCCESS;
361
362 }
363
364 int Win::start(MPI_Group group, int assert){
365     /* From MPI forum advices
366     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
367     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
368     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
369     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
370     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
371     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
372     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
373     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
374     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
375     must complete, without further dependencies.  */
376
377   //naive, blocking implementation.
378     int i             = 0;
379     int j             = 0;
380     int size          = group->size();
381     MPI_Request* reqs = xbt_new0(MPI_Request, size);
382
383     while (j != size) {
384       int src = group->index(j);
385       if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
386         reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
387         i++;
388       }
389       j++;
390   }
391   size=i;
392   Request::startall(size, reqs);
393   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
394   for(i=0;i<size;i++){
395     Request::unref(&reqs[i]);
396   }
397   xbt_free(reqs);
398   opened_++; //we're open for business !
399   group_=group;
400   group->ref();
401   return MPI_SUCCESS;
402 }
403
404 int Win::post(MPI_Group group, int assert){
405   //let's make a synchronous send here
406   int i             = 0;
407   int j             = 0;
408   int size = group->size();
409   MPI_Request* reqs = xbt_new0(MPI_Request, size);
410
411   while(j!=size){
412     int dst=group->index(j);
413     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
414       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
415       i++;
416     }
417     j++;
418   }
419   size=i;
420
421   Request::startall(size, reqs);
422   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
423   for(i=0;i<size;i++){
424     Request::unref(&reqs[i]);
425   }
426   xbt_free(reqs);
427   opened_++; //we're open for business !
428   group_=group;
429   group->ref();
430   return MPI_SUCCESS;
431 }
432
433 int Win::complete(){
434   if(opened_==0)
435     xbt_die("Complete called on already opened MPI_Win");
436
437   XBT_DEBUG("Entering MPI_Win_Complete");
438   int i             = 0;
439   int j             = 0;
440   int size = group_->size();
441   MPI_Request* reqs = xbt_new0(MPI_Request, size);
442
443   while(j!=size){
444     int dst=group_->index(j);
445     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
446       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
447       i++;
448     }
449     j++;
450   }
451   size=i;
452   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
453   Request::startall(size, reqs);
454   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
455
456   for(i=0;i<size;i++){
457     Request::unref(&reqs[i]);
458   }
459   xbt_free(reqs);
460
461   int finished = finish_comms();
462   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
463
464   Group::unref(group_);
465   opened_--; //we're closed for business !
466   return MPI_SUCCESS;
467 }
468
469 int Win::wait(){
470   //naive, blocking implementation.
471   XBT_DEBUG("Entering MPI_Win_Wait");
472   int i             = 0;
473   int j             = 0;
474   int size          = group_->size();
475   MPI_Request* reqs = xbt_new0(MPI_Request, size);
476
477   while(j!=size){
478     int src=group_->index(j);
479     if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
480       reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
481       i++;
482     }
483     j++;
484   }
485   size=i;
486   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
487   Request::startall(size, reqs);
488   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
489   for(i=0;i<size;i++){
490     Request::unref(&reqs[i]);
491   }
492   xbt_free(reqs);
493   int finished = finish_comms();
494   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
495
496   Group::unref(group_);
497   opened_--; //we're opened for business !
498   return MPI_SUCCESS;
499 }
500
501 int Win::lock(int lock_type, int rank, int assert){
502   if(opened_!=0)
503     return MPI_ERR_WIN;
504
505   MPI_Win target_win = connected_wins_[rank];
506
507   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
508     xbt_mutex_acquire(target_win->lock_mut_);
509     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
510     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
511       xbt_mutex_release(target_win->lock_mut_);
512    }
513   } else if(!(target_win->mode_==MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
514         target_win->mode_+= lock_type; // don't set to exclusive if it's already shared
515
516   target_win->lockers_.push_back(comm_->rank());
517
518   int finished = finish_comms();
519   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
520
521   return MPI_SUCCESS;
522 }
523
524 int Win::unlock(int rank){
525   if(opened_!=0)
526     return MPI_ERR_WIN;
527
528   MPI_Win target_win = connected_wins_[rank];
529   int target_mode = target_win->mode_;
530   target_win->mode_= 0;
531   target_win->lockers_.remove(comm_->rank());
532   if (target_mode==MPI_LOCK_EXCLUSIVE){
533     xbt_mutex_release(target_win->lock_mut_);
534   }
535
536   int finished = finish_comms();
537   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
538
539   return MPI_SUCCESS;
540 }
541
542 Win* Win::f2c(int id){
543   return static_cast<Win*>(F2C::f2c(id));
544 }
545
546
547 int Win::finish_comms(){
548   xbt_mutex_acquire(mut_);
549   //Finish own requests
550   std::vector<MPI_Request> *reqqs = requests_;
551   int size = static_cast<int>(reqqs->size());
552   if (size > 0) {
553     MPI_Request* treqs = &(*reqqs)[0];
554     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
555     reqqs->clear();
556   }
557   xbt_mutex_release(mut_);
558   return size;
559 }
560
561
562 }
563 }