1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
24 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
25 int Win::keyval_id_=0;
27 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic)
30 , disp_unit_(disp_unit)
33 , connected_wins_(comm->size())
35 , allocated_(allocated)
38 XBT_DEBUG("Creating window");
39 if(info!=MPI_INFO_NULL)
41 connected_wins_[rank_] = this;
43 bar_ = new s4u::Barrier(comm->size());
46 comm->add_rma_win(this);
49 colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
52 colls::bcast(&(bar_), sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
59 //As per the standard, perform a barrier to ensure every async comm is finished
62 int finished = finish_comms();
63 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
65 if (info_ != MPI_INFO_NULL)
66 simgrid::smpi::Info::unref(info_);
67 if (errhandler_ != MPI_ERRHANDLER_NULL)
68 simgrid::smpi::Errhandler::unref(errhandler_);
70 comm_->remove_rma_win(this);
72 colls::barrier(comm_);
81 F2C::free_f(this->c2f());
85 int Win::attach(void* /*base*/, MPI_Aint size)
87 if (not(base_ == MPI_BOTTOM || base_ == nullptr))
89 base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
94 int Win::detach(const void* /*base*/)
101 void Win::get_name(char* name, int* length) const
103 *length = static_cast<int>(name_.length());
104 if (not name_.empty()) {
105 name_.copy(name, *length);
106 name[*length] = '\0';
110 void Win::get_group(MPI_Group* group){
111 if(comm_ != MPI_COMM_NULL){
112 *group = comm_->group();
114 *group = MPI_GROUP_NULL;
120 if (info_ == MPI_INFO_NULL)
126 int Win::rank() const
131 MPI_Aint Win::size() const
136 void* Win::base() const
141 int Win::disp_unit() const
146 int Win::dynamic() const
151 void Win::set_info(MPI_Info info)
153 if (info_ != MPI_INFO_NULL)
154 simgrid::smpi::Info::unref(info_);
156 if (info_ != MPI_INFO_NULL)
160 void Win::set_name(const char* name){
164 int Win::fence(int assert)
166 XBT_DEBUG("Entering fence");
169 if (assert != MPI_MODE_NOPRECEDE) {
170 // This is not the first fence => finalize what came before
173 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
174 // Without this, the vector could get redimensioned when another process pushes.
175 // This would result in the array used by Request::waitall() to be invalidated.
176 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
178 // start all requests that have been prepared by another process
179 if (not requests_.empty()) {
180 int size = static_cast<int>(requests_.size());
181 MPI_Request* treqs = requests_.data();
182 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
188 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
193 XBT_DEBUG("Leaving fence");
198 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
199 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
201 //get receiver pointer
202 Win* recv_win = connected_wins_[target_rank];
204 if(opened_==0){//check that post/start has been done
205 // no fence or start .. lock ok ?
207 for (auto const& it : recv_win->lockers_)
208 if (it == comm_->rank())
214 if(target_count*target_datatype->get_extent()>recv_win->size_){
215 XBT_WARN("Trying to put more than the window size - Bailing out.");
216 return MPI_ERR_RMA_RANGE;
219 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
221 if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
222 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
223 // prepare send_request
225 // TODO cheinrich Check for rank / pid conversion
226 Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
229 //prepare receiver request
230 // TODO cheinrich Check for rank / pid conversion
231 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
232 target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
237 if(request!=nullptr){
241 requests_.push_back(sreq);
245 //push request to receiver's win
246 recv_win->mut_->lock();
247 recv_win->requests_.push_back(rreq);
249 recv_win->mut_->unlock();
251 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
252 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
254 *request = MPI_REQUEST_NULL;
260 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
261 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
264 Win* send_win = connected_wins_[target_rank];
266 if(opened_==0){//check that post/start has been done
267 // no fence or start .. lock ok ?
269 for (auto const& it : send_win->lockers_)
270 if (it == comm_->rank())
276 if(target_count*target_datatype->get_extent()>send_win->size_){
277 XBT_WARN("Trying to get more than the window size - Bailing out.");
278 return MPI_ERR_RMA_RANGE;
281 const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
282 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
284 if(target_rank != comm_->rank()){
285 //prepare send_request
286 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
287 send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
289 //prepare receiver request
290 MPI_Request rreq = Request::rma_recv_init(
291 origin_addr, origin_count, origin_datatype, target_rank,
292 comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
293 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
295 //start the send, with another process than us as sender.
297 //push request to receiver's win
298 send_win->mut_->lock();
299 send_win->requests_.push_back(sreq);
300 send_win->mut_->unlock();
305 if(request!=nullptr){
309 requests_.push_back(rreq);
313 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
315 *request=MPI_REQUEST_NULL;
320 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
321 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
323 XBT_DEBUG("Entering MPI_Win_Accumulate");
324 //get receiver pointer
325 Win* recv_win = connected_wins_[target_rank];
327 if(opened_==0){//check that post/start has been done
328 // no fence or start .. lock ok ?
330 for (auto const& it : recv_win->lockers_)
331 if (it == comm_->rank())
336 //FIXME: local version
338 if(target_count*target_datatype->get_extent()>recv_win->size_){
339 XBT_WARN("Trying to accumulate more than the window size - Bailing out.");
340 return MPI_ERR_RMA_RANGE;
343 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
344 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
345 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
346 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
347 // prepare send_request
349 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
350 SMPI_RMA_TAG - 3 - count_, comm_, op);
352 // prepare receiver request
353 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
354 recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
360 // push request to receiver's win
361 recv_win->mut_->lock();
362 recv_win->requests_.push_back(rreq);
364 recv_win->mut_->unlock();
366 if (request != nullptr) {
370 requests_.push_back(sreq);
374 XBT_DEBUG("Leaving MPI_Win_Accumulate");
378 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
379 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
380 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
383 const Win* send_win = connected_wins_[target_rank];
385 if(opened_==0){//check that post/start has been done
386 // no fence or start .. lock ok ?
388 for (auto const& it : send_win->lockers_)
389 if (it == comm_->rank())
395 if(target_count*target_datatype->get_extent()>send_win->size_){
396 XBT_WARN("Trying to get_accumulate more than the window size - Bailing out.");
397 return MPI_ERR_RMA_RANGE;
401 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
402 //need to be sure ops are correctly ordered, so finish request here ? slow.
404 send_win->atomic_mut_->lock();
405 get(result_addr, result_count, result_datatype, target_rank,
406 target_disp, target_count, target_datatype, &req);
407 if (req != MPI_REQUEST_NULL)
408 Request::wait(&req, MPI_STATUS_IGNORE);
410 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
411 target_disp, target_count, target_datatype, op, &req);
412 if (req != MPI_REQUEST_NULL)
413 Request::wait(&req, MPI_STATUS_IGNORE);
414 send_win->atomic_mut_->unlock();
418 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
419 int target_rank, MPI_Aint target_disp)
422 const Win* send_win = connected_wins_[target_rank];
424 if(opened_==0){//check that post/start has been done
425 // no fence or start .. lock ok ?
427 for (auto const& it : send_win->lockers_)
428 if (it == comm_->rank())
434 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
435 MPI_Request req = MPI_REQUEST_NULL;
436 send_win->atomic_mut_->lock();
437 get(result_addr, 1, datatype, target_rank,
438 target_disp, 1, datatype, &req);
439 if (req != MPI_REQUEST_NULL)
440 Request::wait(&req, MPI_STATUS_IGNORE);
441 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
442 put(origin_addr, 1, datatype, target_rank,
443 target_disp, 1, datatype);
445 send_win->atomic_mut_->unlock();
449 int Win::start(MPI_Group group, int /*assert*/)
451 /* From MPI forum advices
452 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
453 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
454 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
455 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
456 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
457 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
458 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
459 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
460 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
461 must complete, without further dependencies. */
463 //naive, blocking implementation.
466 int size = group->size();
467 std::vector<MPI_Request> reqs(size);
469 XBT_DEBUG("Entering MPI_Win_Start");
471 int src = comm_->group()->rank(group->actor(j));
472 if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
473 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
479 Request::startall(size, reqs.data());
480 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
481 for (i = 0; i < size; i++) {
482 Request::unref(&reqs[i]);
484 opened_++; //we're open for business !
487 XBT_DEBUG("Leaving MPI_Win_Start");
491 int Win::post(MPI_Group group, int /*assert*/)
493 //let's make a synchronous send here
496 int size = group->size();
497 std::vector<MPI_Request> reqs(size);
499 XBT_DEBUG("Entering MPI_Win_Post");
501 int dst = comm_->group()->rank(group->actor(j));
502 if (dst != rank_ && dst != MPI_UNDEFINED) {
503 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
510 Request::startall(size, reqs.data());
511 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
513 Request::unref(&reqs[i]);
515 opened_++; //we're open for business !
518 XBT_DEBUG("Leaving MPI_Win_Post");
524 xbt_die("Complete called on already opened MPI_Win");
526 XBT_DEBUG("Entering MPI_Win_Complete");
529 int size = group_->size();
530 std::vector<MPI_Request> reqs(size);
533 int dst = comm_->group()->rank(group_->actor(j));
534 if (dst != rank_ && dst != MPI_UNDEFINED) {
535 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
541 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
542 Request::startall(size, reqs.data());
543 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
546 Request::unref(&reqs[i]);
549 int finished = finish_comms();
550 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
552 Group::unref(group_);
553 opened_--; //we're closed for business !
558 //naive, blocking implementation.
559 XBT_DEBUG("Entering MPI_Win_Wait");
562 int size = group_->size();
563 std::vector<MPI_Request> reqs(size);
566 int src = comm_->group()->rank(group_->actor(j));
567 if (src != rank_ && src != MPI_UNDEFINED) {
568 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
574 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
575 Request::startall(size, reqs.data());
576 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
578 Request::unref(&reqs[i]);
580 int finished = finish_comms();
581 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
583 Group::unref(group_);
584 opened_--; //we're opened for business !
588 int Win::lock(int lock_type, int rank, int /*assert*/)
590 MPI_Win target_win = connected_wins_[rank];
592 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
593 target_win->lock_mut_->lock();
594 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
595 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
596 target_win->lock_mut_->unlock();
598 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
599 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
601 target_win->lockers_.push_back(comm_->rank());
603 int finished = finish_comms(rank);
604 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
605 finished = target_win->finish_comms(rank_);
606 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
610 int Win::lock_all(int assert){
611 int retval = MPI_SUCCESS;
612 for (int i = 0; i < comm_->size(); i++) {
613 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
614 if (ret != MPI_SUCCESS)
620 int Win::unlock(int rank){
621 MPI_Win target_win = connected_wins_[rank];
622 int target_mode = target_win->mode_;
623 target_win->mode_= 0;
624 target_win->lockers_.remove(comm_->rank());
625 if (target_mode==MPI_LOCK_EXCLUSIVE){
626 target_win->lock_mut_->unlock();
629 int finished = finish_comms(rank);
630 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
631 finished = target_win->finish_comms(rank_);
632 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
636 int Win::unlock_all(){
637 int retval = MPI_SUCCESS;
638 for (int i = 0; i < comm_->size(); i++) {
639 int ret = this->unlock(i);
640 if (ret != MPI_SUCCESS)
646 int Win::flush(int rank){
647 MPI_Win target_win = connected_wins_[rank];
648 int finished = finish_comms(rank_);
649 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
650 finished = target_win->finish_comms(rank);
651 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
655 int Win::flush_local(int rank){
656 int finished = finish_comms(rank);
657 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
661 int Win::flush_all(){
662 int finished = finish_comms();
663 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
664 for (int i = 0; i < comm_->size(); i++) {
665 finished = connected_wins_[i]->finish_comms(rank_);
666 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
671 int Win::flush_local_all(){
672 int finished = finish_comms();
673 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
677 Win* Win::f2c(int id){
678 return static_cast<Win*>(F2C::f2c(id));
681 int Win::finish_comms(){
683 //Finish own requests
684 int size = static_cast<int>(requests_.size());
686 MPI_Request* treqs = requests_.data();
687 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
694 int Win::finish_comms(int rank){
696 // Finish own requests
697 // Let's see if we're either the destination or the sender of this request
698 // because we only wait for requests that we are responsible for.
699 // Also use the process id here since the request itself returns from src()
700 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
701 int proc_id = comm_->group()->actor(rank)->get_pid();
702 auto it = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
703 return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
705 std::vector<MPI_Request> myreqqs(it, end(requests_));
706 requests_.erase(it, end(requests_));
707 int size = static_cast<int>(myreqqs.size());
709 MPI_Request* treqs = myreqqs.data();
710 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
717 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
719 const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
720 for (int i = 0; not target_win && i < comm_->size(); i++) {
721 if (connected_wins_[i]->size_ > 0)
722 target_win = connected_wins_[i];
725 *size = target_win->size_;
726 *disp_unit = target_win->disp_unit_;
727 *static_cast<void**>(baseptr) = target_win->base_;
730 *static_cast<void**>(baseptr) = nullptr;
735 MPI_Errhandler Win::errhandler()
737 if (errhandler_ != MPI_ERRHANDLER_NULL)
742 void Win::set_errhandler(MPI_Errhandler errhandler)
744 if (errhandler_ != MPI_ERRHANDLER_NULL)
745 simgrid::smpi::Errhandler::unref(errhandler_);
746 errhandler_ = errhandler;
747 if (errhandler_ != MPI_ERRHANDLER_NULL)
751 } // namespace simgrid