1 /* Copyright (c) 2007-2017. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
14 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
15 int Win::keyval_id_=0;
17 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
18 int comm_size = comm->size();
20 XBT_DEBUG("Creating window");
21 if(info!=MPI_INFO_NULL)
25 group_ = MPI_GROUP_NULL;
26 requests_ = new std::vector<MPI_Request>();
27 mut_=xbt_mutex_init();
28 lock_mut_=xbt_mutex_init();
29 connected_wins_ = new MPI_Win[comm_size];
30 connected_wins_[rank_] = this;
33 bar_ = MSG_barrier_init(comm_size);
37 comm->add_rma_win(this);
39 Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
42 Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
48 //As per the standard, perform a barrier to ensure every async comm is finished
49 MSG_barrier_wait(bar_);
51 int finished = finish_comms();
52 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
55 delete[] connected_wins_;
56 if (name_ != nullptr){
59 if(info_!=MPI_INFO_NULL){
60 MPI_Info_free(&info_);
63 comm_->remove_rma_win(this);
65 Colls::barrier(comm_);
66 int rank=comm_->rank();
68 MSG_barrier_destroy(bar_);
69 xbt_mutex_destroy(mut_);
70 xbt_mutex_destroy(lock_mut_);
78 int Win::attach (void *base, MPI_Aint size){
79 if (!(base_ == MPI_BOTTOM || base_ == 0))
81 base_=0;//actually the address will be given in the RMA calls, as being the disp.
86 int Win::detach (void *base){
92 void Win::get_name(char* name, int* length){
98 *length = strlen(name_);
99 strncpy(name, name_, *length+1);
102 void Win::get_group(MPI_Group* group){
103 if(comm_ != MPI_COMM_NULL){
104 *group = comm_->group();
106 *group = MPI_GROUP_NULL;
110 MPI_Info Win::info(){
111 if(info_== MPI_INFO_NULL)
121 MPI_Aint Win::size(){
129 int Win::disp_unit(){
137 void Win::set_info(MPI_Info info){
138 if(info_!= MPI_INFO_NULL)
143 void Win::set_name(char* name){
144 name_ = xbt_strdup(name);
147 int Win::fence(int assert)
149 XBT_DEBUG("Entering fence");
152 if (assert != MPI_MODE_NOPRECEDE) {
153 // This is not the first fence => finalize what came before
154 MSG_barrier_wait(bar_);
155 xbt_mutex_acquire(mut_);
156 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
157 // Without this, the vector could get redimensionned when another process pushes.
158 // This would result in the array used by Request::waitall() to be invalidated.
159 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
160 std::vector<MPI_Request> *reqs = requests_;
161 int size = static_cast<int>(reqs->size());
162 // start all requests that have been prepared by another process
164 MPI_Request* treqs = &(*reqs)[0];
165 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
168 xbt_mutex_release(mut_);
171 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
175 MSG_barrier_wait(bar_);
176 XBT_DEBUG("Leaving fence");
181 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
182 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
184 //get receiver pointer
185 MPI_Win recv_win = connected_wins_[target_rank];
187 if(opened_==0){//check that post/start has been done
188 // no fence or start .. lock ok ?
190 for(auto it : recv_win->lockers_)
191 if (it == comm_->rank())
197 if(target_count*target_datatype->get_extent()>recv_win->size_)
200 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
201 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
203 if(target_rank != comm_->rank()){
204 //prepare send_request
205 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
206 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
208 //prepare receiver request
209 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
210 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
212 //push request to receiver's win
213 xbt_mutex_acquire(recv_win->mut_);
214 recv_win->requests_->push_back(rreq);
215 xbt_mutex_release(recv_win->mut_);
219 //push request to sender's win
220 xbt_mutex_acquire(mut_);
221 requests_->push_back(sreq);
222 xbt_mutex_release(mut_);
224 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
230 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
231 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
234 MPI_Win send_win = connected_wins_[target_rank];
236 if(opened_==0){//check that post/start has been done
237 // no fence or start .. lock ok ?
239 for(auto it : send_win->lockers_)
240 if (it == comm_->rank())
246 if(target_count*target_datatype->get_extent()>send_win->size_)
249 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
250 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
252 if(target_rank != comm_->rank()){
253 //prepare send_request
254 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
255 comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
258 //prepare receiver request
259 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
260 comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
263 //start the send, with another process than us as sender.
265 //push request to receiver's win
266 xbt_mutex_acquire(send_win->mut_);
267 send_win->requests_->push_back(sreq);
268 xbt_mutex_release(send_win->mut_);
272 //push request to sender's win
273 xbt_mutex_acquire(mut_);
274 requests_->push_back(rreq);
275 xbt_mutex_release(mut_);
277 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
284 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
285 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op)
288 //get receiver pointer
289 MPI_Win recv_win = connected_wins_[target_rank];
291 if(opened_==0){//check that post/start has been done
292 // no fence or start .. lock ok ?
294 for(auto it : recv_win->lockers_)
295 if (it == comm_->rank())
300 //FIXME: local version
302 if(target_count*target_datatype->get_extent()>recv_win->size_)
305 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
306 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
307 //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
308 //prepare send_request
310 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
311 smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, comm_, op);
313 //prepare receiver request
314 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
315 smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, recv_win->comm_, op);
318 //push request to receiver's win
319 xbt_mutex_acquire(recv_win->mut_);
320 recv_win->requests_->push_back(rreq);
321 xbt_mutex_release(recv_win->mut_);
325 //push request to sender's win
326 xbt_mutex_acquire(mut_);
327 requests_->push_back(sreq);
328 xbt_mutex_release(mut_);
333 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr,
334 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count,
335 MPI_Datatype target_datatype, MPI_Op op){
338 MPI_Win send_win = connected_wins_[target_rank];
340 if(opened_==0){//check that post/start has been done
341 // no fence or start .. lock ok ?
343 for(auto it : send_win->lockers_)
344 if (it == comm_->rank())
350 if(target_count*target_datatype->get_extent()>send_win->size_)
353 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
355 get(result_addr, result_count, result_datatype, target_rank,
356 target_disp, target_count, target_datatype);
357 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
358 target_disp, target_count, target_datatype, op);
364 int Win::start(MPI_Group group, int assert){
365 /* From MPI forum advices
366 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
367 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
368 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
369 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
370 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
371 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
372 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
373 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
374 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
375 must complete, without further dependencies. */
377 //naive, blocking implementation.
380 int size = group->size();
381 MPI_Request* reqs = xbt_new0(MPI_Request, size);
384 int src = group->index(j);
385 if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
386 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
392 Request::startall(size, reqs);
393 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
395 Request::unref(&reqs[i]);
398 opened_++; //we're open for business !
404 int Win::post(MPI_Group group, int assert){
405 //let's make a synchronous send here
408 int size = group->size();
409 MPI_Request* reqs = xbt_new0(MPI_Request, size);
412 int dst=group->index(j);
413 if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
414 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
421 Request::startall(size, reqs);
422 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
424 Request::unref(&reqs[i]);
427 opened_++; //we're open for business !
435 xbt_die("Complete called on already opened MPI_Win");
437 XBT_DEBUG("Entering MPI_Win_Complete");
440 int size = group_->size();
441 MPI_Request* reqs = xbt_new0(MPI_Request, size);
444 int dst=group_->index(j);
445 if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
446 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
452 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
453 Request::startall(size, reqs);
454 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
457 Request::unref(&reqs[i]);
461 int finished = finish_comms();
462 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
464 Group::unref(group_);
465 opened_--; //we're closed for business !
470 //naive, blocking implementation.
471 XBT_DEBUG("Entering MPI_Win_Wait");
474 int size = group_->size();
475 MPI_Request* reqs = xbt_new0(MPI_Request, size);
478 int src=group_->index(j);
479 if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
480 reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
486 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
487 Request::startall(size, reqs);
488 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
490 Request::unref(&reqs[i]);
493 int finished = finish_comms();
494 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
496 Group::unref(group_);
497 opened_--; //we're opened for business !
501 int Win::lock(int lock_type, int rank, int assert){
505 MPI_Win target_win = connected_wins_[rank];
507 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
508 xbt_mutex_acquire(target_win->lock_mut_);
509 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
510 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
511 xbt_mutex_release(target_win->lock_mut_);
513 } else if(!(target_win->mode_==MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
514 target_win->mode_+= lock_type; // don't set to exclusive if it's already shared
516 target_win->lockers_.push_back(comm_->rank());
518 int finished = finish_comms();
519 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
524 int Win::unlock(int rank){
528 MPI_Win target_win = connected_wins_[rank];
529 int target_mode = target_win->mode_;
530 target_win->mode_= 0;
531 target_win->lockers_.remove(comm_->rank());
532 if (target_mode==MPI_LOCK_EXCLUSIVE){
533 xbt_mutex_release(target_win->lock_mut_);
536 int finished = finish_comms();
537 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
542 Win* Win::f2c(int id){
543 return static_cast<Win*>(F2C::f2c(id));
547 int Win::finish_comms(){
548 xbt_mutex_acquire(mut_);
549 //Finish own requests
550 std::vector<MPI_Request> *reqqs = requests_;
551 int size = static_cast<int>(reqqs->size());
553 MPI_Request* treqs = &(*reqqs)[0];
554 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
557 xbt_mutex_release(mut_);