1 /* Copyright (c) 2007-2019. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
22 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
23 int Win::keyval_id_=0;
25 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
26 int comm_size = comm->size();
28 XBT_DEBUG("Creating window");
29 if(info!=MPI_INFO_NULL)
33 group_ = MPI_GROUP_NULL;
34 requests_ = new std::vector<MPI_Request>();
35 mut_ = s4u::Mutex::create();
36 lock_mut_ = s4u::Mutex::create();
37 atomic_mut_ = s4u::Mutex::create();
38 connected_wins_ = new MPI_Win[comm_size];
39 connected_wins_[rank_] = this;
42 bar_ = new s4u::Barrier(comm_size);
45 errhandler_=MPI_ERRORS_ARE_FATAL;
46 comm->add_rma_win(this);
49 Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
52 Colls::bcast(&(bar_), sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
58 //As per the standard, perform a barrier to ensure every async comm is finished
61 int finished = finish_comms();
62 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
65 delete[] connected_wins_;
66 if (name_ != nullptr){
69 if(info_!=MPI_INFO_NULL){
70 MPI_Info_free(&info_);
73 comm_->remove_rma_win(this);
75 Colls::barrier(comm_);
87 int Win::attach(void* /*base*/, MPI_Aint size)
89 if (not(base_ == MPI_BOTTOM || base_ == 0))
91 base_=0;//actually the address will be given in the RMA calls, as being the disp.
96 int Win::detach(const void* /*base*/)
103 void Win::get_name(char* name, int* length){
109 *length = strlen(name_);
110 strncpy(name, name_, *length+1);
113 void Win::get_group(MPI_Group* group){
114 if(comm_ != MPI_COMM_NULL){
115 *group = comm_->group();
117 *group = MPI_GROUP_NULL;
121 MPI_Info Win::info(){
122 if(info_== MPI_INFO_NULL)
132 MPI_Aint Win::size(){
140 int Win::disp_unit(){
148 void Win::set_info(MPI_Info info){
149 if(info_!= MPI_INFO_NULL)
154 void Win::set_name(const char* name){
155 name_ = xbt_strdup(name);
158 int Win::fence(int assert)
160 XBT_DEBUG("Entering fence");
163 if (assert != MPI_MODE_NOPRECEDE) {
164 // This is not the first fence => finalize what came before
167 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
168 // Without this, the vector could get redimensionned when another process pushes.
169 // This would result in the array used by Request::waitall() to be invalidated.
170 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
171 std::vector<MPI_Request> *reqs = requests_;
172 int size = static_cast<int>(reqs->size());
173 // start all requests that have been prepared by another process
175 MPI_Request* treqs = &(*reqs)[0];
176 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
182 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
187 XBT_DEBUG("Leaving fence");
192 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
193 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
195 //get receiver pointer
196 MPI_Win recv_win = connected_wins_[target_rank];
198 if(opened_==0){//check that post/start has been done
199 // no fence or start .. lock ok ?
201 for (auto const& it : recv_win->lockers_)
202 if (it == comm_->rank())
208 if(target_count*target_datatype->get_extent()>recv_win->size_)
211 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
213 if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
214 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
215 // prepare send_request
217 // TODO cheinrich Check for rank / pid conversion
218 Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
221 //prepare receiver request
222 // TODO cheinrich Check for rank / pid conversion
223 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
224 target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
229 if(request!=nullptr){
233 requests_->push_back(sreq);
237 //push request to receiver's win
238 recv_win->mut_->lock();
239 recv_win->requests_->push_back(rreq);
241 recv_win->mut_->unlock();
244 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
245 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
247 *request = MPI_REQUEST_NULL;
253 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
254 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
257 MPI_Win send_win = connected_wins_[target_rank];
259 if(opened_==0){//check that post/start has been done
260 // no fence or start .. lock ok ?
262 for (auto const& it : send_win->lockers_)
263 if (it == comm_->rank())
269 if(target_count*target_datatype->get_extent()>send_win->size_)
272 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
273 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
275 if(target_rank != comm_->rank()){
276 //prepare send_request
277 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
278 send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
280 //prepare receiver request
281 MPI_Request rreq = Request::rma_recv_init(
282 origin_addr, origin_count, origin_datatype, target_rank,
283 comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
284 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
286 //start the send, with another process than us as sender.
288 //push request to receiver's win
289 send_win->mut_->lock();
290 send_win->requests_->push_back(sreq);
291 send_win->mut_->unlock();
296 if(request!=nullptr){
300 requests_->push_back(rreq);
305 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
307 *request=MPI_REQUEST_NULL;
314 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
315 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
317 XBT_DEBUG("Entering MPI_Win_Accumulate");
318 //get receiver pointer
319 MPI_Win recv_win = connected_wins_[target_rank];
321 if(opened_==0){//check that post/start has been done
322 // no fence or start .. lock ok ?
324 for (auto const& it : recv_win->lockers_)
325 if (it == comm_->rank())
330 //FIXME: local version
332 if(target_count*target_datatype->get_extent()>recv_win->size_)
335 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
336 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
337 //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
338 //prepare send_request
340 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
341 SMPI_RMA_TAG - 3 - count_, comm_, op);
343 // prepare receiver request
344 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
345 recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
351 // push request to receiver's win
352 recv_win->mut_->lock();
353 recv_win->requests_->push_back(rreq);
355 recv_win->mut_->unlock();
357 if (request != nullptr) {
361 requests_->push_back(sreq);
365 XBT_DEBUG("Leaving MPI_Win_Accumulate");
369 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
370 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
371 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
374 MPI_Win send_win = connected_wins_[target_rank];
376 if(opened_==0){//check that post/start has been done
377 // no fence or start .. lock ok ?
379 for (auto const& it : send_win->lockers_)
380 if (it == comm_->rank())
386 if(target_count*target_datatype->get_extent()>send_win->size_)
389 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
390 //need to be sure ops are correctly ordered, so finish request here ? slow.
392 send_win->atomic_mut_->lock();
393 get(result_addr, result_count, result_datatype, target_rank,
394 target_disp, target_count, target_datatype, &req);
395 if (req != MPI_REQUEST_NULL)
396 Request::wait(&req, MPI_STATUS_IGNORE);
398 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
399 target_disp, target_count, target_datatype, op, &req);
400 if (req != MPI_REQUEST_NULL)
401 Request::wait(&req, MPI_STATUS_IGNORE);
402 send_win->atomic_mut_->unlock();
407 int Win::compare_and_swap(const void *origin_addr, void *compare_addr,
408 void *result_addr, MPI_Datatype datatype, int target_rank,
409 MPI_Aint target_disp){
411 MPI_Win send_win = connected_wins_[target_rank];
413 if(opened_==0){//check that post/start has been done
414 // no fence or start .. lock ok ?
416 for (auto const& it : send_win->lockers_)
417 if (it == comm_->rank())
423 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
424 MPI_Request req = MPI_REQUEST_NULL;
425 send_win->atomic_mut_->lock();
426 get(result_addr, 1, datatype, target_rank,
427 target_disp, 1, datatype, &req);
428 if (req != MPI_REQUEST_NULL)
429 Request::wait(&req, MPI_STATUS_IGNORE);
430 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
431 put(origin_addr, 1, datatype, target_rank,
432 target_disp, 1, datatype);
434 send_win->atomic_mut_->unlock();
438 int Win::start(MPI_Group group, int /*assert*/)
440 /* From MPI forum advices
441 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
442 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
443 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
444 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
445 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
446 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
447 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
448 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
449 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
450 must complete, without further dependencies. */
452 //naive, blocking implementation.
455 int size = group->size();
456 MPI_Request* reqs = xbt_new0(MPI_Request, size);
458 XBT_DEBUG("Entering MPI_Win_Start");
460 int src = comm_->group()->rank(group->actor(j));
461 if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
462 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
468 Request::startall(size, reqs);
469 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
470 for (i = 0; i < size; i++) {
471 Request::unref(&reqs[i]);
474 opened_++; //we're open for business !
477 XBT_DEBUG("Leaving MPI_Win_Start");
481 int Win::post(MPI_Group group, int /*assert*/)
483 //let's make a synchronous send here
486 int size = group->size();
487 MPI_Request* reqs = xbt_new0(MPI_Request, size);
489 XBT_DEBUG("Entering MPI_Win_Post");
491 int dst = comm_->group()->rank(group->actor(j));
492 if (dst != rank_ && dst != MPI_UNDEFINED) {
493 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
500 Request::startall(size, reqs);
501 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
503 Request::unref(&reqs[i]);
506 opened_++; //we're open for business !
509 XBT_DEBUG("Leaving MPI_Win_Post");
515 xbt_die("Complete called on already opened MPI_Win");
517 XBT_DEBUG("Entering MPI_Win_Complete");
520 int size = group_->size();
521 MPI_Request* reqs = xbt_new0(MPI_Request, size);
524 int dst = comm_->group()->rank(group_->actor(j));
525 if (dst != rank_ && dst != MPI_UNDEFINED) {
526 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
532 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
533 Request::startall(size, reqs);
534 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
537 Request::unref(&reqs[i]);
541 int finished = finish_comms();
542 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
544 Group::unref(group_);
545 opened_--; //we're closed for business !
550 //naive, blocking implementation.
551 XBT_DEBUG("Entering MPI_Win_Wait");
554 int size = group_->size();
555 MPI_Request* reqs = xbt_new0(MPI_Request, size);
558 int src = comm_->group()->rank(group_->actor(j));
559 if (src != rank_ && src != MPI_UNDEFINED) {
560 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
566 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
567 Request::startall(size, reqs);
568 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
570 Request::unref(&reqs[i]);
573 int finished = finish_comms();
574 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
576 Group::unref(group_);
577 opened_--; //we're opened for business !
581 int Win::lock(int lock_type, int rank, int /*assert*/)
583 MPI_Win target_win = connected_wins_[rank];
585 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
586 target_win->lock_mut_->lock();
587 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
588 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
589 target_win->lock_mut_->unlock();
591 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
592 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
594 target_win->lockers_.push_back(comm_->rank());
596 int finished = finish_comms(rank);
597 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
598 finished = target_win->finish_comms(rank_);
599 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
603 int Win::lock_all(int assert){
605 int retval = MPI_SUCCESS;
606 for (i=0; i<comm_->size();i++){
607 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
608 if(ret != MPI_SUCCESS)
614 int Win::unlock(int rank){
615 MPI_Win target_win = connected_wins_[rank];
616 int target_mode = target_win->mode_;
617 target_win->mode_= 0;
618 target_win->lockers_.remove(comm_->rank());
619 if (target_mode==MPI_LOCK_EXCLUSIVE){
620 target_win->lock_mut_->unlock();
623 int finished = finish_comms(rank);
624 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
625 finished = target_win->finish_comms(rank_);
626 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
630 int Win::unlock_all(){
632 int retval = MPI_SUCCESS;
633 for (i=0; i<comm_->size();i++){
634 int ret = this->unlock(i);
635 if (ret != MPI_SUCCESS)
641 int Win::flush(int rank){
642 MPI_Win target_win = connected_wins_[rank];
643 int finished = finish_comms(rank_);
644 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
645 finished = target_win->finish_comms(rank);
646 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
650 int Win::flush_local(int rank){
651 int finished = finish_comms(rank);
652 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
656 int Win::flush_all(){
657 int finished = finish_comms();
658 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
659 for (int i = 0; i < comm_->size(); i++) {
660 finished = connected_wins_[i]->finish_comms(rank_);
661 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
666 int Win::flush_local_all(){
667 int finished = finish_comms();
668 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
672 Win* Win::f2c(int id){
673 return static_cast<Win*>(F2C::f2c(id));
676 int Win::finish_comms(){
678 //Finish own requests
679 std::vector<MPI_Request> *reqqs = requests_;
680 int size = static_cast<int>(reqqs->size());
682 MPI_Request* treqs = &(*reqqs)[0];
683 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
690 int Win::finish_comms(int rank){
692 //Finish own requests
693 std::vector<MPI_Request> *reqqs = requests_;
694 int size = static_cast<int>(reqqs->size());
697 std::vector<MPI_Request> myreqqs;
698 std::vector<MPI_Request>::iterator iter = reqqs->begin();
699 int proc_id = comm_->group()->actor(rank)->get_pid();
700 while (iter != reqqs->end()){
701 // Let's see if we're either the destination or the sender of this request
702 // because we only wait for requests that we are responsible for.
703 // Also use the process id here since the request itself returns from src()
704 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
705 if (((*iter) != MPI_REQUEST_NULL) && (((*iter)->src() == proc_id) || ((*iter)->dst() == proc_id))) {
706 myreqqs.push_back(*iter);
707 iter = reqqs->erase(iter);
714 MPI_Request* treqs = &myreqqs[0];
715 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
723 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr)
725 MPI_Win target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
726 for (int i = 0; not target_win && i < comm_->size(); i++) {
727 if (connected_wins_[i]->size_ > 0)
728 target_win = connected_wins_[i];
731 *size = target_win->size_;
732 *disp_unit = target_win->disp_unit_;
733 *static_cast<void**>(baseptr) = target_win->base_;
736 *static_cast<void**>(baseptr) = xbt_malloc(0);
741 MPI_Errhandler Win::errhandler(){
745 void Win::set_errhandler(MPI_Errhandler errhandler){
746 errhandler_=errhandler;
747 if(errhandler_!= MPI_ERRHANDLER_NULL)