1 /* Copyright (c) 2007-2022. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include <simgrid/modelchecker.h>
7 #include "smpi_win.hpp"
10 #include "smpi_coll.hpp"
11 #include "smpi_comm.hpp"
12 #include "smpi_datatype.hpp"
13 #include "smpi_info.hpp"
14 #include "smpi_keyvals.hpp"
15 #include "smpi_request.hpp"
16 #include "src/smpi/include/smpi_actor.hpp"
17 #include "src/mc/mc_replay.hpp"
21 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
23 #define CHECK_RMA_REMOTE_WIN(fun, win)\
24 if(target_count*target_datatype->get_extent()>win->size_){\
25 XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
26 fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
27 simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
28 return MPI_ERR_RMA_RANGE;\
31 #define CHECK_WIN_LOCKED(win) \
32 if (opened_ == 0) { /*check that post/start has been done*/ \
33 bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
38 namespace simgrid::smpi {
39 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
40 int Win::keyval_id_=0;
42 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
45 , disp_unit_(disp_unit)
48 , connected_wins_(comm->size())
50 , allocated_(allocated)
53 XBT_DEBUG("Creating window");
54 if(info!=MPI_INFO_NULL)
56 connected_wins_[rank_] = this;
58 comm->add_rma_win(this);
61 colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
63 if (MC_is_active() || MC_record_replay_is_active()){
64 s4u::Barrier* bar_ptr;
66 bar_ = s4u::Barrier::create(comm->size());
69 colls::bcast(&bar_ptr, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
71 bar_ = s4u::BarrierPtr(bar_ptr);
76 int Win::del(Win* win){
77 //As per the standard, perform a barrier to ensure every async comm is finished
78 if (MC_is_active() || MC_record_replay_is_active())
81 colls::barrier(win->comm_);
82 win->flush_local_all();
84 if (win->info_ != MPI_INFO_NULL)
85 simgrid::smpi::Info::unref(win->info_);
86 if (win->errhandler_ != MPI_ERRHANDLER_NULL)
87 simgrid::smpi::Errhandler::unref(win->errhandler_);
89 win->comm_->remove_rma_win(win);
91 colls::barrier(win->comm_);
92 Comm::unref(win->comm_);
93 if (not win->lockers_.empty() || win->opened_ < 0) {
94 XBT_WARN("Freeing a locked or opened window");
100 F2C::free_f(win->f2c_id());
101 win->cleanup_attr<Win>();
107 int Win::attach(void* /*base*/, MPI_Aint size)
109 if (not(base_ == MPI_BOTTOM || base_ == nullptr))
111 base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
116 int Win::detach(const void* /*base*/)
123 void Win::get_name(char* name, int* length) const
125 *length = static_cast<int>(name_.length());
126 if (not name_.empty()) {
127 name_.copy(name, *length);
128 name[*length] = '\0';
132 void Win::get_group(MPI_Group* group){
133 if(comm_ != MPI_COMM_NULL){
134 *group = comm_->group();
136 *group = MPI_GROUP_NULL;
145 int Win::rank() const
150 MPI_Comm Win::comm() const
155 MPI_Aint Win::size() const
160 void* Win::base() const
165 int Win::disp_unit() const
170 bool Win::dynamic() const
175 void Win::set_info(MPI_Info info)
177 if (info_ != MPI_INFO_NULL)
178 simgrid::smpi::Info::unref(info_);
180 if (info_ != MPI_INFO_NULL)
184 void Win::set_name(const char* name){
188 int Win::fence(int assert)
190 XBT_DEBUG("Entering fence");
192 if (not (assert & MPI_MODE_NOPRECEDE)) {
193 // This is not the first fence => finalize what came before
194 if (MC_is_active() || MC_record_replay_is_active())
197 colls::barrier(comm_);
202 if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
205 if (MC_is_active() || MC_record_replay_is_active())
208 colls::barrier(comm_);
209 XBT_DEBUG("Leaving fence");
214 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
215 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
217 //get receiver pointer
218 Win* recv_win = connected_wins_[target_rank];
220 CHECK_WIN_LOCKED(recv_win)
221 CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
223 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
225 if (target_rank != rank_) { // This is not for myself, so we need to send messages
226 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
227 // prepare send_request
229 Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
232 //prepare receiver request
233 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
234 SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
239 if(request!=nullptr){
243 requests_.push_back(sreq);
247 //push request to receiver's win
248 recv_win->mut_->lock();
249 recv_win->requests_.push_back(rreq);
251 recv_win->mut_->unlock();
253 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
254 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
256 *request = MPI_REQUEST_NULL;
262 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
263 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
266 Win* send_win = connected_wins_[target_rank];
268 CHECK_WIN_LOCKED(send_win)
269 CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
271 const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
272 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
274 if (target_rank != rank_) {
275 //prepare send_request
276 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
277 SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
279 //prepare receiver request
280 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
281 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
283 //start the send, with another process than us as sender.
285 // push request to sender's win
286 send_win->mut_->lock();
287 send_win->requests_.push_back(sreq);
288 send_win->mut_->unlock();
293 if(request!=nullptr){
297 requests_.push_back(rreq);
301 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
303 *request=MPI_REQUEST_NULL;
308 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
309 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
311 XBT_DEBUG("Entering MPI_Win_Accumulate");
312 //get receiver pointer
313 Win* recv_win = connected_wins_[target_rank];
315 //FIXME: local version
316 CHECK_WIN_LOCKED(recv_win)
317 CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
319 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
320 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
321 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
322 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
323 // prepare send_request
325 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
326 SMPI_RMA_TAG - 3 - count_, comm_, op);
328 // prepare receiver request
329 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
330 SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
336 // push request to receiver's win
337 recv_win->mut_->lock();
338 recv_win->requests_.push_back(rreq);
340 recv_win->mut_->unlock();
342 if (request != nullptr) {
346 requests_.push_back(sreq);
350 // FIXME: The current implementation fails to ensure the correct ordering of the accumulate requests. The following
351 // 'flush' is a workaround to fix that.
353 XBT_DEBUG("Leaving MPI_Win_Accumulate");
357 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
358 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
359 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
362 const Win* send_win = connected_wins_[target_rank];
364 CHECK_WIN_LOCKED(send_win)
365 CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
367 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
368 //need to be sure ops are correctly ordered, so finish request here ? slow.
369 MPI_Request req = MPI_REQUEST_NULL;
370 send_win->atomic_mut_->lock();
371 get(result_addr, result_count, result_datatype, target_rank,
372 target_disp, target_count, target_datatype, &req);
373 if (req != MPI_REQUEST_NULL)
374 Request::wait(&req, MPI_STATUS_IGNORE);
376 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
377 target_disp, target_count, target_datatype, op, &req);
378 if (req != MPI_REQUEST_NULL)
379 Request::wait(&req, MPI_STATUS_IGNORE);
380 send_win->atomic_mut_->unlock();
384 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
385 int target_rank, MPI_Aint target_disp)
388 const Win* send_win = connected_wins_[target_rank];
390 CHECK_WIN_LOCKED(send_win)
392 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
393 MPI_Request req = MPI_REQUEST_NULL;
394 send_win->atomic_mut_->lock();
395 get(result_addr, 1, datatype, target_rank,
396 target_disp, 1, datatype, &req);
397 if (req != MPI_REQUEST_NULL)
398 Request::wait(&req, MPI_STATUS_IGNORE);
399 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
400 put(origin_addr, 1, datatype, target_rank,
401 target_disp, 1, datatype);
403 send_win->atomic_mut_->unlock();
407 int Win::start(MPI_Group group, int /*assert*/)
409 /* From MPI forum advices
410 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
411 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
412 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
413 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
414 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
415 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
416 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
417 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
418 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
419 must complete, without further dependencies. */
421 //naive, blocking implementation.
422 XBT_DEBUG("Entering MPI_Win_Start");
423 std::vector<MPI_Request> reqs;
424 for (int i = 0; i < group->size(); i++) {
425 int src = comm_->group()->rank(group->actor(i));
426 xbt_assert(src != MPI_UNDEFINED);
428 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
430 int size = static_cast<int>(reqs.size());
432 Request::startall(size, reqs.data());
433 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
434 for (auto& req : reqs)
435 Request::unref(&req);
439 opened_--; // we're open for business !
440 XBT_DEBUG("Leaving MPI_Win_Start");
444 int Win::post(MPI_Group group, int /*assert*/)
446 //let's make a synchronous send here
447 XBT_DEBUG("Entering MPI_Win_Post");
448 std::vector<MPI_Request> reqs;
449 for (int i = 0; i < group->size(); i++) {
450 int dst = comm_->group()->rank(group->actor(i));
451 xbt_assert(dst != MPI_UNDEFINED);
453 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
455 int size = static_cast<int>(reqs.size());
457 Request::startall(size, reqs.data());
458 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
459 for (auto& req : reqs)
460 Request::unref(&req);
464 opened_--; // we're open for business !
465 XBT_DEBUG("Leaving MPI_Win_Post");
470 xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
472 XBT_DEBUG("Entering MPI_Win_Complete");
473 std::vector<MPI_Request> reqs;
474 for (int i = 0; i < dst_group_->size(); i++) {
475 int dst = comm_->group()->rank(dst_group_->actor(i));
476 xbt_assert(dst != MPI_UNDEFINED);
478 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
480 int size = static_cast<int>(reqs.size());
482 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
483 Request::startall(size, reqs.data());
484 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
485 for (auto& req : reqs)
486 Request::unref(&req);
490 opened_++; //we're closed for business !
491 Group::unref(dst_group_);
492 dst_group_ = MPI_GROUP_NULL;
497 //naive, blocking implementation.
498 XBT_DEBUG("Entering MPI_Win_Wait");
499 std::vector<MPI_Request> reqs;
500 for (int i = 0; i < src_group_->size(); i++) {
501 int src = comm_->group()->rank(src_group_->actor(i));
502 xbt_assert(src != MPI_UNDEFINED);
504 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
506 int size = static_cast<int>(reqs.size());
508 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
509 Request::startall(size, reqs.data());
510 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
511 for (auto& req : reqs)
512 Request::unref(&req);
516 opened_++; //we're closed for business !
517 Group::unref(src_group_);
518 src_group_ = MPI_GROUP_NULL;
522 int Win::lock(int lock_type, int rank, int /*assert*/)
524 MPI_Win target_win = connected_wins_[rank];
526 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
527 target_win->lock_mut_->lock();
528 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
529 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
530 target_win->lock_mut_->unlock();
532 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
533 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
535 target_win->lockers_.push_back(rank_);
541 int Win::lock_all(int assert){
542 int retval = MPI_SUCCESS;
543 for (int i = 0; i < comm_->size(); i++) {
544 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
545 if (ret != MPI_SUCCESS)
551 int Win::unlock(int rank){
552 MPI_Win target_win = connected_wins_[rank];
553 int target_mode = target_win->mode_;
554 target_win->mode_= 0;
555 target_win->lockers_.remove(rank_);
556 if (target_mode==MPI_LOCK_EXCLUSIVE){
557 target_win->lock_mut_->unlock();
564 int Win::unlock_all(){
565 int retval = MPI_SUCCESS;
566 for (int i = 0; i < comm_->size(); i++) {
567 int ret = this->unlock(i);
568 if (ret != MPI_SUCCESS)
574 int Win::flush(int rank){
575 int finished = finish_comms(rank);
576 XBT_DEBUG("Win_flush on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
578 finished = connected_wins_[rank]->finish_comms(rank_);
579 XBT_DEBUG("Win_flush on remote %d for local %d - Finished %d RMA calls", rank, rank_, finished);
584 int Win::flush_local(int rank){
585 int finished = finish_comms(rank);
586 XBT_DEBUG("Win_flush_local on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
590 int Win::flush_all(){
591 int finished = finish_comms();
592 XBT_DEBUG("Win_flush_all on local %d - Finished %d RMA calls", rank_, finished);
593 for (int i = 0; i < comm_->size(); i++) {
595 finished = connected_wins_[i]->finish_comms(rank_);
596 XBT_DEBUG("Win_flush_all on remote %d for local %d - Finished %d RMA calls", i, rank_, finished);
602 int Win::flush_local_all(){
603 int finished = finish_comms();
604 XBT_DEBUG("Win_flush_local_all on local %d - Finished %d RMA calls", rank_, finished);
608 Win* Win::f2c(int id){
609 return static_cast<Win*>(F2C::f2c(id));
612 int Win::finish_comms(){
613 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
614 // Without this, the vector could get redimensioned when another process pushes.
615 // This would result in the array used by Request::waitall() to be invalidated.
616 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
618 //Finish own requests
619 int size = static_cast<int>(requests_.size());
621 MPI_Request* treqs = requests_.data();
622 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
629 int Win::finish_comms(int rank){
630 // See comment about the mutex in finish_comms() above
632 // Finish own requests
633 // Let's see if we're either the destination or the sender of this request
634 // because we only wait for requests that we are responsible for.
635 // Also use the process id here since the request itself returns from src()
636 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
637 aid_t proc_id = comm_->group()->actor(rank);
638 auto it = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
639 return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
641 std::vector<MPI_Request> myreqqs(it, end(requests_));
642 requests_.erase(it, end(requests_));
643 int size = static_cast<int>(myreqqs.size());
645 MPI_Request* treqs = myreqqs.data();
646 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
653 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
655 const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
656 for (int i = 0; not target_win && i < comm_->size(); i++) {
657 if (connected_wins_[i]->size_ > 0)
658 target_win = connected_wins_[i];
661 *size = target_win->size_;
662 *disp_unit = target_win->disp_unit_;
663 *static_cast<void**>(baseptr) = target_win->base_;
666 *static_cast<void**>(baseptr) = nullptr;
671 MPI_Errhandler Win::errhandler()
673 if (errhandler_ != MPI_ERRHANDLER_NULL)
678 void Win::set_errhandler(MPI_Errhandler errhandler)
680 if (errhandler_ != MPI_ERRHANDLER_NULL)
681 simgrid::smpi::Errhandler::unref(errhandler_);
682 errhandler_ = errhandler;
683 if (errhandler_ != MPI_ERRHANDLER_NULL)
686 } // namespace simgrid::smpi