Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Mainly on 32 bits systems, target_disp can be seen as negative in one case.
[simgrid.git] / src / smpi / smpi_win.cpp
1 /* Copyright (c) 2007-2017. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <vector>
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 namespace simgrid{
13 namespace smpi{
14 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
15 int Win::keyval_id_=0;
16
17 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
18   int comm_size = comm->size();
19   rank_      = comm->rank();
20   XBT_DEBUG("Creating window");
21   if(info!=MPI_INFO_NULL)
22     info->ref();
23   name_ = nullptr;
24   opened_ = 0;
25   group_ = MPI_GROUP_NULL;
26   requests_ = new std::vector<MPI_Request>();
27   mut_=xbt_mutex_init();
28   lock_mut_=xbt_mutex_init();
29   connected_wins_ = new MPI_Win[comm_size];
30   connected_wins_[rank_] = this;
31   count_ = 0;
32   if(rank_==0){
33     bar_ = MSG_barrier_init(comm_size);
34   }
35   mode_=0;
36
37   comm->add_rma_win(this);
38
39   Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
40                          MPI_BYTE, comm);
41
42   Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
43
44   Colls::barrier(comm);
45 }
46
47 Win::~Win(){
48   //As per the standard, perform a barrier to ensure every async comm is finished
49   MSG_barrier_wait(bar_);
50
51   int finished = finish_comms();
52   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
53
54   delete requests_;
55   delete[] connected_wins_;
56   if (name_ != nullptr){
57     xbt_free(name_);
58   }
59   if(info_!=MPI_INFO_NULL){
60     MPI_Info_free(&info_);
61   }
62
63   comm_->remove_rma_win(this);
64
65   Colls::barrier(comm_);
66   int rank=comm_->rank();
67   if(rank == 0)
68     MSG_barrier_destroy(bar_);
69   xbt_mutex_destroy(mut_);
70   xbt_mutex_destroy(lock_mut_);
71
72   if(allocated_ !=0)
73     xbt_free(base_);
74
75   cleanup_attr<Win>();
76 }
77
78 int Win::attach (void *base, MPI_Aint size){
79   if (!(base_ == MPI_BOTTOM || base_ == 0))
80     return MPI_ERR_ARG;
81   base_=0;//actually the address will be given in the RMA calls, as being the disp.
82   size_+=size;
83   return MPI_SUCCESS;
84 }
85
86 int Win::detach (void *base){
87   base_=MPI_BOTTOM;
88   size_=-1;
89   return MPI_SUCCESS;
90 }
91
92 void Win::get_name(char* name, int* length){
93   if(name_==nullptr){
94     *length=0;
95     name=nullptr;
96     return;
97   }
98   *length = strlen(name_);
99   strncpy(name, name_, *length+1);
100 }
101
102 void Win::get_group(MPI_Group* group){
103   if(comm_ != MPI_COMM_NULL){
104     *group = comm_->group();
105   } else {
106     *group = MPI_GROUP_NULL;
107   }
108 }
109
110 MPI_Info Win::info(){
111   if(info_== MPI_INFO_NULL)
112     info_ = new Info();
113   info_->ref();
114   return info_;
115 }
116
117 int Win::rank(){
118   return rank_;
119 }
120
121 MPI_Aint Win::size(){
122   return size_;
123 }
124
125 void* Win::base(){
126   return base_;
127 }
128
129 int Win::disp_unit(){
130   return disp_unit_;
131 }
132
133 int Win::dynamic(){
134   return dynamic_;
135 }
136
137 void Win::set_info(MPI_Info info){
138   if(info_!= MPI_INFO_NULL)
139     info->ref();
140   info_=info;
141 }
142
143 void Win::set_name(char* name){
144   name_ = xbt_strdup(name);
145 }
146
147 int Win::fence(int assert)
148 {
149   XBT_DEBUG("Entering fence");
150   if (opened_ == 0)
151     opened_=1;
152   if (assert != MPI_MODE_NOPRECEDE) {
153     // This is not the first fence => finalize what came before
154     MSG_barrier_wait(bar_);
155     xbt_mutex_acquire(mut_);
156     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
157     // Without this, the vector could get redimensionned when another process pushes.
158     // This would result in the array used by Request::waitall() to be invalidated.
159     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
160     std::vector<MPI_Request> *reqs = requests_;
161     int size = static_cast<int>(reqs->size());
162     // start all requests that have been prepared by another process
163     if (size > 0) {
164       for (const auto& req : *reqs) {
165         if (req && (req->flags() & PREPARED))
166           req->start();
167       }
168
169       MPI_Request* treqs = &(*reqs)[0];
170
171       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
172     }
173     count_=0;
174     xbt_mutex_release(mut_);
175   }
176   assert_ = assert;
177
178   MSG_barrier_wait(bar_);
179   XBT_DEBUG("Leaving fence");
180
181   return MPI_SUCCESS;
182 }
183
184 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
185               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
186 {
187   //get receiver pointer
188   MPI_Win recv_win = connected_wins_[target_rank];
189
190   if(opened_==0){//check that post/start has been done
191     // no fence or start .. lock ok ?
192     int locked=0;
193     for(auto it : recv_win->lockers_)
194       if (it == comm_->rank())
195         locked = 1;
196     if(locked != 1)
197       return MPI_ERR_WIN;
198   }
199
200   if(target_count*target_datatype->get_extent()>recv_win->size_)
201     return MPI_ERR_ARG;
202
203   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
204   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
205
206   if(target_rank != comm_->rank()){
207     //prepare send_request
208     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
209         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
210
211     //prepare receiver request
212     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
213         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
214
215     //push request to receiver's win
216     xbt_mutex_acquire(recv_win->mut_);
217     recv_win->requests_->push_back(rreq);
218     xbt_mutex_release(recv_win->mut_);
219     //start send
220     sreq->start();
221
222     //push request to sender's win
223     xbt_mutex_acquire(mut_);
224     requests_->push_back(sreq);
225     xbt_mutex_release(mut_);
226   }else{
227     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
228   }
229
230   return MPI_SUCCESS;
231 }
232
233 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
234               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
235 {
236   //get sender pointer
237   MPI_Win send_win = connected_wins_[target_rank];
238
239   if(opened_==0){//check that post/start has been done
240     // no fence or start .. lock ok ?
241     int locked=0;
242     for(auto it : send_win->lockers_)
243       if (it == comm_->rank())
244         locked = 1;
245     if(locked != 1)
246       return MPI_ERR_WIN;
247   }
248
249   if(target_count*target_datatype->get_extent()>send_win->size_)
250     return MPI_ERR_ARG;
251
252   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
253   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
254
255   if(target_rank != comm_->rank()){
256     //prepare send_request
257     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
258         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
259         MPI_OP_NULL);
260
261     //prepare receiver request
262     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
263         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
264         MPI_OP_NULL);
265
266     //start the send, with another process than us as sender. 
267     sreq->start();
268     //push request to receiver's win
269     xbt_mutex_acquire(send_win->mut_);
270     send_win->requests_->push_back(sreq);
271     xbt_mutex_release(send_win->mut_);
272
273     //start recv
274     rreq->start();
275     //push request to sender's win
276     xbt_mutex_acquire(mut_);
277     requests_->push_back(rreq);
278     xbt_mutex_release(mut_);
279   }else{
280     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
281   }
282
283   return MPI_SUCCESS;
284 }
285
286
287 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
288               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op)
289 {
290
291   //get receiver pointer
292   MPI_Win recv_win = connected_wins_[target_rank];
293
294   if(opened_==0){//check that post/start has been done
295     // no fence or start .. lock ok ?
296     int locked=0;
297     for(auto it : recv_win->lockers_)
298       if (it == comm_->rank())
299         locked = 1;
300     if(locked != 1)
301       return MPI_ERR_WIN;
302   }
303   //FIXME: local version 
304
305   if(target_count*target_datatype->get_extent()>recv_win->size_)
306     return MPI_ERR_ARG;
307
308   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
309   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
310     //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
311     //prepare send_request
312
313     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
314         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, comm_, op);
315
316     //prepare receiver request
317     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
318         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, recv_win->comm_, op);
319
320     count_++;
321     //push request to receiver's win
322     xbt_mutex_acquire(recv_win->mut_);
323     recv_win->requests_->push_back(rreq);
324     xbt_mutex_release(recv_win->mut_);
325     //start send
326     sreq->start();
327
328     //push request to sender's win
329     xbt_mutex_acquire(mut_);
330     requests_->push_back(sreq);
331     xbt_mutex_release(mut_);
332
333   return MPI_SUCCESS;
334 }
335
336 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr, 
337               int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count, 
338               MPI_Datatype target_datatype, MPI_Op op){
339
340   //get sender pointer
341   MPI_Win send_win = connected_wins_[target_rank];
342
343   if(opened_==0){//check that post/start has been done
344     // no fence or start .. lock ok ?
345     int locked=0;
346     for(auto it : send_win->lockers_)
347       if (it == comm_->rank())
348         locked = 1;
349     if(locked != 1)
350       return MPI_ERR_WIN;
351   }
352
353   if(target_count*target_datatype->get_extent()>send_win->size_)
354     return MPI_ERR_ARG;
355
356   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
357
358   get(result_addr, result_count, result_datatype, target_rank,
359               target_disp, target_count, target_datatype);
360   accumulate(origin_addr, origin_count, origin_datatype, target_rank,
361               target_disp, target_count, target_datatype, op);
362
363   return MPI_SUCCESS;
364
365 }
366
367 int Win::start(MPI_Group group, int assert){
368     /* From MPI forum advices
369     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
370     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
371     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
372     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
373     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
374     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
375     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
376     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
377     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
378     must complete, without further dependencies.  */
379
380   //naive, blocking implementation.
381     int i             = 0;
382     int j             = 0;
383     int size          = group->size();
384     MPI_Request* reqs = xbt_new0(MPI_Request, size);
385
386     while (j != size) {
387       int src = group->index(j);
388       if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
389         reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
390         i++;
391       }
392       j++;
393   }
394   size=i;
395   Request::startall(size, reqs);
396   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
397   for(i=0;i<size;i++){
398     Request::unref(&reqs[i]);
399   }
400   xbt_free(reqs);
401   opened_++; //we're open for business !
402   group_=group;
403   group->ref();
404   return MPI_SUCCESS;
405 }
406
407 int Win::post(MPI_Group group, int assert){
408   //let's make a synchronous send here
409   int i             = 0;
410   int j             = 0;
411   int size = group->size();
412   MPI_Request* reqs = xbt_new0(MPI_Request, size);
413
414   while(j!=size){
415     int dst=group->index(j);
416     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
417       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
418       i++;
419     }
420     j++;
421   }
422   size=i;
423
424   Request::startall(size, reqs);
425   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
426   for(i=0;i<size;i++){
427     Request::unref(&reqs[i]);
428   }
429   xbt_free(reqs);
430   opened_++; //we're open for business !
431   group_=group;
432   group->ref();
433   return MPI_SUCCESS;
434 }
435
436 int Win::complete(){
437   if(opened_==0)
438     xbt_die("Complete called on already opened MPI_Win");
439
440   XBT_DEBUG("Entering MPI_Win_Complete");
441   int i             = 0;
442   int j             = 0;
443   int size = group_->size();
444   MPI_Request* reqs = xbt_new0(MPI_Request, size);
445
446   while(j!=size){
447     int dst=group_->index(j);
448     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
449       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
450       i++;
451     }
452     j++;
453   }
454   size=i;
455   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
456   Request::startall(size, reqs);
457   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
458
459   for(i=0;i<size;i++){
460     Request::unref(&reqs[i]);
461   }
462   xbt_free(reqs);
463
464   int finished = finish_comms();
465   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
466
467   Group::unref(group_);
468   opened_--; //we're closed for business !
469   return MPI_SUCCESS;
470 }
471
472 int Win::wait(){
473   //naive, blocking implementation.
474   XBT_DEBUG("Entering MPI_Win_Wait");
475   int i             = 0;
476   int j             = 0;
477   int size          = group_->size();
478   MPI_Request* reqs = xbt_new0(MPI_Request, size);
479
480   while(j!=size){
481     int src=group_->index(j);
482     if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
483       reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
484       i++;
485     }
486     j++;
487   }
488   size=i;
489   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
490   Request::startall(size, reqs);
491   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
492   for(i=0;i<size;i++){
493     Request::unref(&reqs[i]);
494   }
495   xbt_free(reqs);
496   int finished = finish_comms();
497   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
498
499   Group::unref(group_);
500   opened_--; //we're opened for business !
501   return MPI_SUCCESS;
502 }
503
504 int Win::lock(int lock_type, int rank, int assert){
505   if(opened_!=0)
506     return MPI_ERR_WIN;
507
508   MPI_Win target_win = connected_wins_[rank];
509
510   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
511     xbt_mutex_acquire(target_win->lock_mut_);
512     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
513     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
514       xbt_mutex_release(target_win->lock_mut_);
515    }
516   } else if(!(target_win->mode_==MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
517         target_win->mode_+= lock_type; // don't set to exclusive if it's already shared
518
519   target_win->lockers_.push_back(comm_->rank());
520
521   int finished = finish_comms();
522   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
523
524   return MPI_SUCCESS;
525 }
526
527 int Win::unlock(int rank){
528   if(opened_!=0)
529     return MPI_ERR_WIN;
530
531   MPI_Win target_win = connected_wins_[rank];
532   int target_mode = target_win->mode_;
533   target_win->mode_= 0;
534   target_win->lockers_.remove(comm_->rank());
535   if (target_mode==MPI_LOCK_EXCLUSIVE){
536     xbt_mutex_release(target_win->lock_mut_);
537   }
538
539   int finished = finish_comms();
540   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
541
542   return MPI_SUCCESS;
543 }
544
545 Win* Win::f2c(int id){
546   return static_cast<Win*>(F2C::f2c(id));
547 }
548
549
550 int Win::finish_comms(){
551   xbt_mutex_acquire(mut_);
552   //Finish own requests
553   std::vector<MPI_Request> *reqqs = requests_;
554   int size = static_cast<int>(reqqs->size());
555   if (size > 0) {
556     // start all requests that have been prepared by another process
557     for (const auto& req : *reqqs) {
558       if (req && (req->flags() & PREPARED))
559         req->start();
560     }
561
562     MPI_Request* treqs = &(*reqqs)[0];
563     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
564     reqqs->clear();
565   }
566   xbt_mutex_release(mut_);
567   return size;
568 }
569
570
571 }
572 }