1 /* Copyright (c) 2007-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_datatype.hpp"
11 #include "smpi_info.hpp"
12 #include "smpi_keyvals.hpp"
13 #include "smpi_process.hpp"
14 #include "smpi_request.hpp"
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
20 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
21 int Win::keyval_id_=0;
23 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
24 int comm_size = comm->size();
26 XBT_DEBUG("Creating window");
27 if(info!=MPI_INFO_NULL)
31 group_ = MPI_GROUP_NULL;
32 requests_ = new std::vector<MPI_Request>();
33 mut_=xbt_mutex_init();
34 lock_mut_=xbt_mutex_init();
35 atomic_mut_=xbt_mutex_init();
36 connected_wins_ = new MPI_Win[comm_size];
37 connected_wins_[rank_] = this;
40 bar_ = MSG_barrier_init(comm_size);
44 comm->add_rma_win(this);
46 Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
49 Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
55 //As per the standard, perform a barrier to ensure every async comm is finished
56 MSG_barrier_wait(bar_);
58 int finished = finish_comms();
59 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
62 delete[] connected_wins_;
63 if (name_ != nullptr){
66 if(info_!=MPI_INFO_NULL){
67 MPI_Info_free(&info_);
70 comm_->remove_rma_win(this);
72 Colls::barrier(comm_);
73 int rank=comm_->rank();
75 MSG_barrier_destroy(bar_);
76 xbt_mutex_destroy(mut_);
77 xbt_mutex_destroy(lock_mut_);
78 xbt_mutex_destroy(atomic_mut_);
86 int Win::attach (void *base, MPI_Aint size){
87 if (not(base_ == MPI_BOTTOM || base_ == 0))
89 base_=0;//actually the address will be given in the RMA calls, as being the disp.
94 int Win::detach (void *base){
100 void Win::get_name(char* name, int* length){
106 *length = strlen(name_);
107 strncpy(name, name_, *length+1);
110 void Win::get_group(MPI_Group* group){
111 if(comm_ != MPI_COMM_NULL){
112 *group = comm_->group();
114 *group = MPI_GROUP_NULL;
118 MPI_Info Win::info(){
119 if(info_== MPI_INFO_NULL)
129 MPI_Aint Win::size(){
137 int Win::disp_unit(){
145 void Win::set_info(MPI_Info info){
146 if(info_!= MPI_INFO_NULL)
151 void Win::set_name(char* name){
152 name_ = xbt_strdup(name);
155 int Win::fence(int assert)
157 XBT_DEBUG("Entering fence");
160 if (assert != MPI_MODE_NOPRECEDE) {
161 // This is not the first fence => finalize what came before
162 MSG_barrier_wait(bar_);
163 xbt_mutex_acquire(mut_);
164 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
165 // Without this, the vector could get redimensionned when another process pushes.
166 // This would result in the array used by Request::waitall() to be invalidated.
167 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
168 std::vector<MPI_Request> *reqs = requests_;
169 int size = static_cast<int>(reqs->size());
170 // start all requests that have been prepared by another process
172 MPI_Request* treqs = &(*reqs)[0];
173 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
176 xbt_mutex_release(mut_);
179 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
183 MSG_barrier_wait(bar_);
184 XBT_DEBUG("Leaving fence");
189 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
190 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
192 //get receiver pointer
193 MPI_Win recv_win = connected_wins_[target_rank];
195 if(opened_==0){//check that post/start has been done
196 // no fence or start .. lock ok ?
198 for (auto const& it : recv_win->lockers_)
199 if (it == comm_->rank())
205 if(target_count*target_datatype->get_extent()>recv_win->size_)
208 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
209 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
211 if(target_rank != comm_->rank()){
212 //prepare send_request
213 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
214 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
216 //prepare receiver request
217 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
218 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
223 if(request!=nullptr){
226 xbt_mutex_acquire(mut_);
227 requests_->push_back(sreq);
228 xbt_mutex_release(mut_);
231 //push request to receiver's win
232 xbt_mutex_acquire(recv_win->mut_);
233 recv_win->requests_->push_back(rreq);
235 xbt_mutex_release(recv_win->mut_);
238 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
240 *request = MPI_REQUEST_NULL;
246 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
247 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
250 MPI_Win send_win = connected_wins_[target_rank];
252 if(opened_==0){//check that post/start has been done
253 // no fence or start .. lock ok ?
255 for (auto const& it : send_win->lockers_)
256 if (it == comm_->rank())
262 if(target_count*target_datatype->get_extent()>send_win->size_)
265 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
266 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
268 if(target_rank != comm_->rank()){
269 //prepare send_request
270 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
271 comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
274 //prepare receiver request
275 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
276 comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
279 //start the send, with another process than us as sender.
281 //push request to receiver's win
282 xbt_mutex_acquire(send_win->mut_);
283 send_win->requests_->push_back(sreq);
284 xbt_mutex_release(send_win->mut_);
289 if(request!=nullptr){
292 xbt_mutex_acquire(mut_);
293 requests_->push_back(rreq);
294 xbt_mutex_release(mut_);
298 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
300 *request=MPI_REQUEST_NULL;
307 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
308 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
310 XBT_DEBUG("Entering MPI_Win_Accumulate");
311 //get receiver pointer
312 MPI_Win recv_win = connected_wins_[target_rank];
314 if(opened_==0){//check that post/start has been done
315 // no fence or start .. lock ok ?
317 for (auto const& it : recv_win->lockers_)
318 if (it == comm_->rank())
323 //FIXME: local version
325 if(target_count*target_datatype->get_extent()>recv_win->size_)
328 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
329 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
330 //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
331 //prepare send_request
333 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
334 smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, comm_, op);
336 //prepare receiver request
337 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
338 smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, recv_win->comm_, op);
344 //push request to receiver's win
345 xbt_mutex_acquire(recv_win->mut_);
346 recv_win->requests_->push_back(rreq);
348 xbt_mutex_release(recv_win->mut_);
350 if(request!=nullptr){
353 xbt_mutex_acquire(mut_);
354 requests_->push_back(sreq);
355 xbt_mutex_release(mut_);
358 XBT_DEBUG("Leaving MPI_Win_Accumulate");
362 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr,
363 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count,
364 MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request){
367 MPI_Win send_win = connected_wins_[target_rank];
369 if(opened_==0){//check that post/start has been done
370 // no fence or start .. lock ok ?
372 for (auto const& it : send_win->lockers_)
373 if (it == comm_->rank())
379 if(target_count*target_datatype->get_extent()>send_win->size_)
382 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
383 //need to be sure ops are correctly ordered, so finish request here ? slow.
385 xbt_mutex_acquire(send_win->atomic_mut_);
386 get(result_addr, result_count, result_datatype, target_rank,
387 target_disp, target_count, target_datatype, &req);
388 if (req != MPI_REQUEST_NULL)
389 Request::wait(&req, MPI_STATUS_IGNORE);
391 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
392 target_disp, target_count, target_datatype, op, &req);
393 if (req != MPI_REQUEST_NULL)
394 Request::wait(&req, MPI_STATUS_IGNORE);
395 xbt_mutex_release(send_win->atomic_mut_);
400 int Win::compare_and_swap(void *origin_addr, void *compare_addr,
401 void *result_addr, MPI_Datatype datatype, int target_rank,
402 MPI_Aint target_disp){
404 MPI_Win send_win = connected_wins_[target_rank];
406 if(opened_==0){//check that post/start has been done
407 // no fence or start .. lock ok ?
409 for (auto const& it : send_win->lockers_)
410 if (it == comm_->rank())
416 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
417 MPI_Request req = MPI_REQUEST_NULL;
418 xbt_mutex_acquire(send_win->atomic_mut_);
419 get(result_addr, 1, datatype, target_rank,
420 target_disp, 1, datatype, &req);
421 if (req != MPI_REQUEST_NULL)
422 Request::wait(&req, MPI_STATUS_IGNORE);
423 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
424 put(origin_addr, 1, datatype, target_rank,
425 target_disp, 1, datatype);
427 xbt_mutex_release(send_win->atomic_mut_);
431 int Win::start(MPI_Group group, int assert){
432 /* From MPI forum advices
433 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
434 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
435 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
436 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
437 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
438 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
439 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
440 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
441 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
442 must complete, without further dependencies. */
444 //naive, blocking implementation.
447 int size = group->size();
448 MPI_Request* reqs = xbt_new0(MPI_Request, size);
450 XBT_DEBUG("Entering MPI_Win_Start");
452 int src = group->index(j);
453 if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
454 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
460 Request::startall(size, reqs);
461 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
463 Request::unref(&reqs[i]);
466 opened_++; //we're open for business !
469 XBT_DEBUG("Leaving MPI_Win_Start");
473 int Win::post(MPI_Group group, int assert){
474 //let's make a synchronous send here
477 int size = group->size();
478 MPI_Request* reqs = xbt_new0(MPI_Request, size);
480 XBT_DEBUG("Entering MPI_Win_Post");
482 int dst=group->index(j);
483 if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
484 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
491 Request::startall(size, reqs);
492 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
494 Request::unref(&reqs[i]);
497 opened_++; //we're open for business !
500 XBT_DEBUG("Leaving MPI_Win_Post");
506 xbt_die("Complete called on already opened MPI_Win");
508 XBT_DEBUG("Entering MPI_Win_Complete");
511 int size = group_->size();
512 MPI_Request* reqs = xbt_new0(MPI_Request, size);
515 int dst=group_->index(j);
516 if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
517 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
523 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
524 Request::startall(size, reqs);
525 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
528 Request::unref(&reqs[i]);
532 int finished = finish_comms();
533 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
535 Group::unref(group_);
536 opened_--; //we're closed for business !
541 //naive, blocking implementation.
542 XBT_DEBUG("Entering MPI_Win_Wait");
545 int size = group_->size();
546 MPI_Request* reqs = xbt_new0(MPI_Request, size);
549 int src=group_->index(j);
550 if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
551 reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
557 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
558 Request::startall(size, reqs);
559 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
561 Request::unref(&reqs[i]);
564 int finished = finish_comms();
565 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
567 Group::unref(group_);
568 opened_--; //we're opened for business !
572 int Win::lock(int lock_type, int rank, int assert){
573 MPI_Win target_win = connected_wins_[rank];
575 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
576 xbt_mutex_acquire(target_win->lock_mut_);
577 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
578 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
579 xbt_mutex_release(target_win->lock_mut_);
581 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
582 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
584 target_win->lockers_.push_back(comm_->rank());
586 int finished = finish_comms(rank);
587 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
588 finished = target_win->finish_comms(rank_);
589 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
593 int Win::lock_all(int assert){
595 int retval = MPI_SUCCESS;
596 for (i=0; i<comm_->size();i++){
597 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
598 if(ret != MPI_SUCCESS)
604 int Win::unlock(int rank){
605 MPI_Win target_win = connected_wins_[rank];
606 int target_mode = target_win->mode_;
607 target_win->mode_= 0;
608 target_win->lockers_.remove(comm_->rank());
609 if (target_mode==MPI_LOCK_EXCLUSIVE){
610 xbt_mutex_release(target_win->lock_mut_);
613 int finished = finish_comms(rank);
614 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
615 finished = target_win->finish_comms(rank_);
616 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
620 int Win::unlock_all(){
622 int retval = MPI_SUCCESS;
623 for (i=0; i<comm_->size();i++){
624 int ret = this->unlock(i);
625 if(ret != MPI_SUCCESS)
631 int Win::flush(int rank){
632 MPI_Win target_win = connected_wins_[rank];
633 int finished = finish_comms(rank);
634 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
635 finished = target_win->finish_comms(rank_);
636 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
640 int Win::flush_local(int rank){
641 int finished = finish_comms(rank);
642 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
646 int Win::flush_all(){
649 finished = finish_comms();
650 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
651 for (i=0; i<comm_->size();i++){
652 finished = connected_wins_[i]->finish_comms(rank_);
653 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
658 int Win::flush_local_all(){
659 int finished = finish_comms();
660 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
664 Win* Win::f2c(int id){
665 return static_cast<Win*>(F2C::f2c(id));
669 int Win::finish_comms(){
670 xbt_mutex_acquire(mut_);
671 //Finish own requests
672 std::vector<MPI_Request> *reqqs = requests_;
673 int size = static_cast<int>(reqqs->size());
675 MPI_Request* treqs = &(*reqqs)[0];
676 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
679 xbt_mutex_release(mut_);
683 int Win::finish_comms(int rank){
684 xbt_mutex_acquire(mut_);
685 //Finish own requests
686 std::vector<MPI_Request> *reqqs = requests_;
687 int size = static_cast<int>(reqqs->size());
690 std::vector<MPI_Request> myreqqs;
691 std::vector<MPI_Request>::iterator iter = reqqs->begin();
692 while (iter != reqqs->end()){
693 if(((*iter)!=MPI_REQUEST_NULL) && (((*iter)->src() == rank) || ((*iter)->dst() == rank))){
694 myreqqs.push_back(*iter);
695 iter = reqqs->erase(iter);
702 MPI_Request* treqs = &myreqqs[0];
703 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
707 xbt_mutex_release(mut_);