Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
9398095d5b0b744f4a818cdc43f2db7f544460d9
[simgrid.git] / src / smpi / smpi_win.cpp
1 /* Copyright (c) 2007-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <vector>
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 namespace simgrid{
13 namespace smpi{
14 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
15 int Win::keyval_id_=0;
16
17 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm){
18   int comm_size = comm->size();
19   rank_      = comm->rank();
20   XBT_DEBUG("Creating window");
21   if(info!=MPI_INFO_NULL)
22     info->ref();
23   name_ = nullptr;
24   opened_ = 0;
25   group_ = MPI_GROUP_NULL;
26   requests_ = new std::vector<MPI_Request>();
27   mut_=xbt_mutex_init();
28   lock_mut_=xbt_mutex_init();
29   connected_wins_ = new MPI_Win[comm_size];
30   connected_wins_[rank_] = this;
31   count_ = 0;
32   if(rank_==0){
33     bar_ = MSG_barrier_init(comm_size);
34   }
35   mode_=0;
36
37   comm->add_rma_win(this);
38
39   Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
40                          MPI_BYTE, comm);
41
42   Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
43
44   Colls::barrier(comm);
45 }
46
47 Win::~Win(){
48   //As per the standard, perform a barrier to ensure every async comm is finished
49   MSG_barrier_wait(bar_);
50
51   int finished = finish_comms();
52   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
53
54   delete requests_;
55   delete[] connected_wins_;
56   if (name_ != nullptr){
57     xbt_free(name_);
58   }
59   if(info_!=MPI_INFO_NULL){
60     MPI_Info_free(&info_);
61   }
62
63   comm_->remove_rma_win(this);
64
65   Colls::barrier(comm_);
66   int rank=comm_->rank();
67   if(rank == 0)
68     MSG_barrier_destroy(bar_);
69   xbt_mutex_destroy(mut_);
70   xbt_mutex_destroy(lock_mut_);
71
72   cleanup_attr<Win>();
73 }
74
75 void Win::get_name(char* name, int* length){
76   if(name_==nullptr){
77     *length=0;
78     name=nullptr;
79     return;
80   }
81   *length = strlen(name_);
82   strncpy(name, name_, *length+1);
83 }
84
85 void Win::get_group(MPI_Group* group){
86   if(comm_ != MPI_COMM_NULL){
87     *group = comm_->group();
88   } else {
89     *group = MPI_GROUP_NULL;
90   }
91 }
92
93 int Win::rank(){
94   return rank_;
95 }
96
97 MPI_Aint Win::size(){
98   return size_;
99 }
100
101 void* Win::base(){
102   return base_;
103 }
104
105 int Win::disp_unit(){
106   return disp_unit_;
107 }
108
109
110 void Win::set_name(char* name){
111   name_ = xbt_strdup(name);
112 }
113
114 int Win::fence(int assert)
115 {
116   XBT_DEBUG("Entering fence");
117   if (opened_ == 0)
118     opened_=1;
119   if (assert != MPI_MODE_NOPRECEDE) {
120     // This is not the first fence => finalize what came before
121     MSG_barrier_wait(bar_);
122     xbt_mutex_acquire(mut_);
123     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
124     // Without this, the vector could get redimensionned when another process pushes.
125     // This would result in the array used by Request::waitall() to be invalidated.
126     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
127     std::vector<MPI_Request> *reqs = requests_;
128     int size = static_cast<int>(reqs->size());
129     // start all requests that have been prepared by another process
130     if (size > 0) {
131       for (const auto& req : *reqs) {
132         if (req && (req->flags() & PREPARED))
133           req->start();
134       }
135
136       MPI_Request* treqs = &(*reqs)[0];
137
138       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
139     }
140     count_=0;
141     xbt_mutex_release(mut_);
142   }
143   assert_ = assert;
144
145   MSG_barrier_wait(bar_);
146   XBT_DEBUG("Leaving fence");
147
148   return MPI_SUCCESS;
149 }
150
151 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
152               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
153 {
154   //get receiver pointer
155   MPI_Win recv_win = connected_wins_[target_rank];
156
157   if(opened_==0){//check that post/start has been done
158     // no fence or start .. lock ok ?
159     int locked=0;
160     for(auto it : recv_win->lockers_)
161       if (it == comm_->rank())
162         locked = 1;
163     if(locked != 1)
164       return MPI_ERR_WIN;
165   }
166
167   if(target_count*target_datatype->get_extent()>recv_win->size_)
168     return MPI_ERR_ARG;
169
170   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
171   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
172
173   if(target_rank != comm_->rank()){
174     //prepare send_request
175     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
176         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
177
178     //prepare receiver request
179     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
180         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
181
182     //push request to receiver's win
183     xbt_mutex_acquire(recv_win->mut_);
184     recv_win->requests_->push_back(rreq);
185     xbt_mutex_release(recv_win->mut_);
186     //start send
187     sreq->start();
188
189     //push request to sender's win
190     xbt_mutex_acquire(mut_);
191     requests_->push_back(sreq);
192     xbt_mutex_release(mut_);
193   }else{
194     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
195   }
196
197   return MPI_SUCCESS;
198 }
199
200 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
201               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
202 {
203   //get sender pointer
204   MPI_Win send_win = connected_wins_[target_rank];
205
206   if(opened_==0){//check that post/start has been done
207     // no fence or start .. lock ok ?
208     int locked=0;
209     for(auto it : send_win->lockers_)
210       if (it == comm_->rank())
211         locked = 1;
212     if(locked != 1)
213       return MPI_ERR_WIN;
214   }
215
216   if(target_count*target_datatype->get_extent()>send_win->size_)
217     return MPI_ERR_ARG;
218
219   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
220   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
221
222   if(target_rank != comm_->rank()){
223     //prepare send_request
224     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
225         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
226         MPI_OP_NULL);
227
228     //prepare receiver request
229     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
230         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
231         MPI_OP_NULL);
232
233     //start the send, with another process than us as sender. 
234     sreq->start();
235     //push request to receiver's win
236     xbt_mutex_acquire(send_win->mut_);
237     send_win->requests_->push_back(sreq);
238     xbt_mutex_release(send_win->mut_);
239
240     //start recv
241     rreq->start();
242     //push request to sender's win
243     xbt_mutex_acquire(mut_);
244     requests_->push_back(rreq);
245     xbt_mutex_release(mut_);
246   }else{
247     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
248   }
249
250   return MPI_SUCCESS;
251 }
252
253
254 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
255               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op)
256 {
257
258   //get receiver pointer
259   MPI_Win recv_win = connected_wins_[target_rank];
260
261   if(opened_==0){//check that post/start has been done
262     // no fence or start .. lock ok ?
263     int locked=0;
264     for(auto it : recv_win->lockers_)
265       if (it == comm_->rank())
266         locked = 1;
267     if(locked != 1)
268       return MPI_ERR_WIN;
269   }
270   //FIXME: local version 
271
272   if(target_count*target_datatype->get_extent()>recv_win->size_)
273     return MPI_ERR_ARG;
274
275   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
276   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
277     //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
278     //prepare send_request
279
280     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
281         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, comm_, op);
282
283     //prepare receiver request
284     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
285         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, recv_win->comm_, op);
286
287     count_++;
288     //push request to receiver's win
289     xbt_mutex_acquire(recv_win->mut_);
290     recv_win->requests_->push_back(rreq);
291     xbt_mutex_release(recv_win->mut_);
292     //start send
293     sreq->start();
294
295     //push request to sender's win
296     xbt_mutex_acquire(mut_);
297     requests_->push_back(sreq);
298     xbt_mutex_release(mut_);
299
300   return MPI_SUCCESS;
301 }
302
303 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr, 
304               int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count, 
305               MPI_Datatype target_datatype, MPI_Op op){
306
307   //get sender pointer
308   MPI_Win send_win = connected_wins_[target_rank];
309
310   if(opened_==0){//check that post/start has been done
311     // no fence or start .. lock ok ?
312     int locked=0;
313     for(auto it : send_win->lockers_)
314       if (it == comm_->rank())
315         locked = 1;
316     if(locked != 1)
317       return MPI_ERR_WIN;
318   }
319
320   if(target_count*target_datatype->get_extent()>send_win->size_)
321     return MPI_ERR_ARG;
322
323   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
324
325   get(result_addr, result_count, result_datatype, target_rank,
326               target_disp, target_count, target_datatype);
327   accumulate(origin_addr, origin_count, origin_datatype, target_rank,
328               target_disp, target_count, target_datatype, op);
329
330   return MPI_SUCCESS;
331
332 }
333
334 int Win::start(MPI_Group group, int assert){
335     /* From MPI forum advices
336     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
337     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
338     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
339     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
340     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
341     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
342     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
343     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
344     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
345     must complete, without further dependencies.  */
346
347   //naive, blocking implementation.
348     int i             = 0;
349     int j             = 0;
350     int size          = group->size();
351     MPI_Request* reqs = xbt_new0(MPI_Request, size);
352
353     while (j != size) {
354       int src = group->index(j);
355       if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
356         reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
357         i++;
358       }
359       j++;
360   }
361   size=i;
362   Request::startall(size, reqs);
363   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
364   for(i=0;i<size;i++){
365     Request::unref(&reqs[i]);
366   }
367   xbt_free(reqs);
368   opened_++; //we're open for business !
369   group_=group;
370   group->ref();
371   return MPI_SUCCESS;
372 }
373
374 int Win::post(MPI_Group group, int assert){
375   //let's make a synchronous send here
376   int i             = 0;
377   int j             = 0;
378   int size = group->size();
379   MPI_Request* reqs = xbt_new0(MPI_Request, size);
380
381   while(j!=size){
382     int dst=group->index(j);
383     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
384       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
385       i++;
386     }
387     j++;
388   }
389   size=i;
390
391   Request::startall(size, reqs);
392   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
393   for(i=0;i<size;i++){
394     Request::unref(&reqs[i]);
395   }
396   xbt_free(reqs);
397   opened_++; //we're open for business !
398   group_=group;
399   group->ref();
400   return MPI_SUCCESS;
401 }
402
403 int Win::complete(){
404   if(opened_==0)
405     xbt_die("Complete called on already opened MPI_Win");
406
407   XBT_DEBUG("Entering MPI_Win_Complete");
408   int i             = 0;
409   int j             = 0;
410   int size = group_->size();
411   MPI_Request* reqs = xbt_new0(MPI_Request, size);
412
413   while(j!=size){
414     int dst=group_->index(j);
415     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
416       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
417       i++;
418     }
419     j++;
420   }
421   size=i;
422   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
423   Request::startall(size, reqs);
424   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
425
426   for(i=0;i<size;i++){
427     Request::unref(&reqs[i]);
428   }
429   xbt_free(reqs);
430
431   int finished = finish_comms();
432   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
433
434   Group::unref(group_);
435   opened_--; //we're closed for business !
436   return MPI_SUCCESS;
437 }
438
439 int Win::wait(){
440   //naive, blocking implementation.
441   XBT_DEBUG("Entering MPI_Win_Wait");
442   int i             = 0;
443   int j             = 0;
444   int size          = group_->size();
445   MPI_Request* reqs = xbt_new0(MPI_Request, size);
446
447   while(j!=size){
448     int src=group_->index(j);
449     if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
450       reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
451       i++;
452     }
453     j++;
454   }
455   size=i;
456   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
457   Request::startall(size, reqs);
458   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
459   for(i=0;i<size;i++){
460     Request::unref(&reqs[i]);
461   }
462   xbt_free(reqs);
463   int finished = finish_comms();
464   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
465
466   Group::unref(group_);
467   opened_--; //we're opened for business !
468   return MPI_SUCCESS;
469 }
470
471 int Win::lock(int lock_type, int rank, int assert){
472   if(opened_!=0)
473     return MPI_ERR_WIN;
474
475   MPI_Win target_win = connected_wins_[rank];
476
477   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
478     xbt_mutex_acquire(target_win->lock_mut_);
479     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
480     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
481       xbt_mutex_release(target_win->lock_mut_);
482    }
483   } else if(!(target_win->mode_==MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
484         target_win->mode_+= lock_type; // don't set to exclusive if it's already shared
485
486   target_win->lockers_.push_back(comm_->rank());
487
488   int finished = finish_comms();
489   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
490
491   return MPI_SUCCESS;
492 }
493
494 int Win::unlock(int rank){
495   if(opened_!=0)
496     return MPI_ERR_WIN;
497
498   MPI_Win target_win = connected_wins_[rank];
499   int target_mode = target_win->mode_;
500   target_win->mode_= 0;
501   target_win->lockers_.remove(comm_->rank());
502   if (target_mode==MPI_LOCK_EXCLUSIVE){
503     xbt_mutex_release(target_win->lock_mut_);
504   }
505
506   int finished = finish_comms();
507   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
508
509   return MPI_SUCCESS;
510 }
511
512 Win* Win::f2c(int id){
513   return static_cast<Win*>(F2C::f2c(id));
514 }
515
516
517 int Win::finish_comms(){
518   xbt_mutex_acquire(mut_);
519   //Finish own requests
520   std::vector<MPI_Request> *reqqs = requests_;
521   int size = static_cast<int>(reqqs->size());
522   if (size > 0) {
523     // start all requests that have been prepared by another process
524     for (const auto& req : *reqqs) {
525       if (req && (req->flags() & PREPARED))
526         req->start();
527     }
528
529     MPI_Request* treqs = &(*reqqs)[0];
530     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
531     reqqs->clear();
532   }
533   xbt_mutex_release(mut_);
534   return size;
535 }
536
537
538 }
539 }