1 /* Copyright (c) 2007-2023. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include <simgrid/modelchecker.h>
7 #include "smpi_win.hpp"
10 #include "smpi_coll.hpp"
11 #include "smpi_comm.hpp"
12 #include "smpi_datatype.hpp"
13 #include "smpi_info.hpp"
14 #include "smpi_keyvals.hpp"
15 #include "smpi_request.hpp"
16 #include "src/smpi/include/smpi_actor.hpp"
17 #include "src/mc/mc_replay.hpp"
20 #include <mutex> // std::scoped_lock
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
24 #define CHECK_RMA_REMOTE_WIN(fun, win)\
25 if(target_count*target_datatype->get_extent()>win->size_){\
26 XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
27 fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
28 simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
29 return MPI_ERR_RMA_RANGE;\
32 #define CHECK_WIN_LOCKED(win) \
33 if (opened_ == 0) { /*check that post/start has been done*/ \
34 bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
39 namespace simgrid::smpi {
40 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
41 int Win::keyval_id_=0;
43 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
46 , disp_unit_(disp_unit)
49 , connected_wins_(comm->size())
51 , allocated_(allocated)
54 XBT_DEBUG("Creating window");
55 if(info!=MPI_INFO_NULL)
57 connected_wins_[rank_] = this;
59 comm->add_rma_win(this);
62 colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
64 if (MC_is_active() || MC_record_replay_is_active()){
65 s4u::Barrier* bar_ptr;
67 bar_ = s4u::Barrier::create(comm->size());
70 colls::bcast(&bar_ptr, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
72 bar_ = s4u::BarrierPtr(bar_ptr);
77 int Win::del(Win* win){
78 //As per the standard, perform a barrier to ensure every async comm is finished
79 if (MC_is_active() || MC_record_replay_is_active())
82 colls::barrier(win->comm_);
83 win->flush_local_all();
85 if (win->info_ != MPI_INFO_NULL)
86 simgrid::smpi::Info::unref(win->info_);
87 if (win->errhandler_ != MPI_ERRHANDLER_NULL)
88 simgrid::smpi::Errhandler::unref(win->errhandler_);
90 win->comm_->remove_rma_win(win);
92 colls::barrier(win->comm_);
93 Comm::unref(win->comm_);
94 if (not win->lockers_.empty() || win->opened_ < 0) {
95 XBT_WARN("Freeing a locked or opened window");
101 F2C::free_f(win->f2c_id());
102 win->cleanup_attr<Win>();
108 int Win::attach(void* /*base*/, MPI_Aint size)
110 if (not(base_ == MPI_BOTTOM || base_ == nullptr))
112 base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
117 int Win::detach(const void* /*base*/)
124 void Win::get_name(char* name, int* length) const
126 *length = static_cast<int>(name_.length());
127 if (not name_.empty()) {
128 name_.copy(name, *length);
129 name[*length] = '\0';
133 void Win::get_group(MPI_Group* group){
134 if(comm_ != MPI_COMM_NULL){
135 *group = comm_->group();
137 *group = MPI_GROUP_NULL;
146 int Win::rank() const
151 MPI_Comm Win::comm() const
156 MPI_Aint Win::size() const
161 void* Win::base() const
166 int Win::disp_unit() const
171 bool Win::dynamic() const
176 void Win::set_info(MPI_Info info)
178 if (info_ != MPI_INFO_NULL)
179 simgrid::smpi::Info::unref(info_);
181 if (info_ != MPI_INFO_NULL)
185 void Win::set_name(const char* name){
189 int Win::fence(int assert)
191 XBT_DEBUG("Entering fence");
193 if (not (assert & MPI_MODE_NOPRECEDE)) {
194 // This is not the first fence => finalize what came before
195 if (MC_is_active() || MC_record_replay_is_active())
198 colls::barrier(comm_);
203 if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
206 if (MC_is_active() || MC_record_replay_is_active())
209 colls::barrier(comm_);
210 XBT_DEBUG("Leaving fence");
215 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
216 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
218 //get receiver pointer
219 Win* recv_win = connected_wins_[target_rank];
221 CHECK_WIN_LOCKED(recv_win)
222 CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
224 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
226 if (target_rank != rank_) { // This is not for myself, so we need to send messages
227 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
228 // prepare send_request
230 Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
233 //prepare receiver request
234 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
235 SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
240 if(request!=nullptr){
243 const std::scoped_lock lock(*mut_);
244 requests_.push_back(sreq);
247 //push request to receiver's win
248 const std::scoped_lock recv_lock(*recv_win->mut_);
249 recv_win->requests_.push_back(rreq);
252 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
253 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
255 *request = MPI_REQUEST_NULL;
261 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
262 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
265 Win* send_win = connected_wins_[target_rank];
267 CHECK_WIN_LOCKED(send_win)
268 CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
270 const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
271 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
273 if (target_rank != rank_) {
274 //prepare send_request
275 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
276 SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
278 //prepare receiver request
279 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
280 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
282 //start the send, with another process than us as sender.
284 // push request to sender's win
285 if (const std::scoped_lock send_lock(*send_win->mut_); true) {
286 send_win->requests_.push_back(sreq);
292 if(request!=nullptr){
295 const std::scoped_lock lock(*mut_);
296 requests_.push_back(rreq);
299 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
301 *request=MPI_REQUEST_NULL;
306 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
307 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
309 XBT_DEBUG("Entering MPI_Win_Accumulate");
310 //get receiver pointer
311 Win* recv_win = connected_wins_[target_rank];
313 //FIXME: local version
314 CHECK_WIN_LOCKED(recv_win)
315 CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
317 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
318 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
319 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
320 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
321 // prepare send_request
323 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
324 SMPI_RMA_TAG - 3 - count_, comm_, op);
326 // prepare receiver request
327 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
328 SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
334 // push request to receiver's win
335 if (const std::scoped_lock recv_lock(*recv_win->mut_); true) {
336 recv_win->requests_.push_back(rreq);
340 if (request != nullptr) {
343 const std::scoped_lock lock(*mut_);
344 requests_.push_back(sreq);
347 // FIXME: The current implementation fails to ensure the correct ordering of the accumulate requests. The following
348 // 'flush' is a workaround to fix that.
350 XBT_DEBUG("Leaving MPI_Win_Accumulate");
354 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
355 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
356 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
359 const Win* send_win = connected_wins_[target_rank];
361 CHECK_WIN_LOCKED(send_win)
362 CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
364 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
365 //need to be sure ops are correctly ordered, so finish request here ? slow.
366 MPI_Request req = MPI_REQUEST_NULL;
367 const std::scoped_lock lock(*send_win->atomic_mut_);
368 get(result_addr, result_count, result_datatype, target_rank,
369 target_disp, target_count, target_datatype, &req);
370 if (req != MPI_REQUEST_NULL)
371 Request::wait(&req, MPI_STATUS_IGNORE);
373 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
374 target_disp, target_count, target_datatype, op, &req);
375 if (req != MPI_REQUEST_NULL)
376 Request::wait(&req, MPI_STATUS_IGNORE);
380 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
381 int target_rank, MPI_Aint target_disp)
384 const Win* send_win = connected_wins_[target_rank];
386 CHECK_WIN_LOCKED(send_win)
388 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
389 MPI_Request req = MPI_REQUEST_NULL;
390 const std::scoped_lock lock(*send_win->atomic_mut_);
391 get(result_addr, 1, datatype, target_rank,
392 target_disp, 1, datatype, &req);
393 if (req != MPI_REQUEST_NULL)
394 Request::wait(&req, MPI_STATUS_IGNORE);
395 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
396 put(origin_addr, 1, datatype, target_rank,
397 target_disp, 1, datatype);
402 int Win::start(MPI_Group group, int /*assert*/)
404 /* From MPI forum advices
405 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
406 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
407 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
408 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
409 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
410 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
411 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
412 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
413 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
414 must complete, without further dependencies. */
416 //naive, blocking implementation.
417 XBT_DEBUG("Entering MPI_Win_Start");
418 std::vector<MPI_Request> reqs;
419 for (int i = 0; i < group->size(); i++) {
420 int src = comm_->group()->rank(group->actor(i));
421 xbt_assert(src != MPI_UNDEFINED);
423 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
425 int size = static_cast<int>(reqs.size());
427 Request::startall(size, reqs.data());
428 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
429 for (auto& req : reqs)
430 Request::unref(&req);
434 opened_--; // we're open for business !
435 XBT_DEBUG("Leaving MPI_Win_Start");
439 int Win::post(MPI_Group group, int /*assert*/)
441 //let's make a synchronous send here
442 XBT_DEBUG("Entering MPI_Win_Post");
443 std::vector<MPI_Request> reqs;
444 for (int i = 0; i < group->size(); i++) {
445 int dst = comm_->group()->rank(group->actor(i));
446 xbt_assert(dst != MPI_UNDEFINED);
448 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
450 int size = static_cast<int>(reqs.size());
452 Request::startall(size, reqs.data());
453 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
454 for (auto& req : reqs)
455 Request::unref(&req);
459 opened_--; // we're open for business !
460 XBT_DEBUG("Leaving MPI_Win_Post");
465 xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
467 XBT_DEBUG("Entering MPI_Win_Complete");
468 std::vector<MPI_Request> reqs;
469 for (int i = 0; i < dst_group_->size(); i++) {
470 int dst = comm_->group()->rank(dst_group_->actor(i));
471 xbt_assert(dst != MPI_UNDEFINED);
473 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
475 int size = static_cast<int>(reqs.size());
477 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
478 Request::startall(size, reqs.data());
479 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
480 for (auto& req : reqs)
481 Request::unref(&req);
485 opened_++; //we're closed for business !
486 Group::unref(dst_group_);
487 dst_group_ = MPI_GROUP_NULL;
492 //naive, blocking implementation.
493 XBT_DEBUG("Entering MPI_Win_Wait");
494 std::vector<MPI_Request> reqs;
495 for (int i = 0; i < src_group_->size(); i++) {
496 int src = comm_->group()->rank(src_group_->actor(i));
497 xbt_assert(src != MPI_UNDEFINED);
499 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
501 int size = static_cast<int>(reqs.size());
503 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
504 Request::startall(size, reqs.data());
505 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
506 for (auto& req : reqs)
507 Request::unref(&req);
511 opened_++; //we're closed for business !
512 Group::unref(src_group_);
513 src_group_ = MPI_GROUP_NULL;
517 int Win::lock(int lock_type, int rank, int /*assert*/)
519 MPI_Win target_win = connected_wins_[rank];
521 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
522 target_win->lock_mut_->lock();
523 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
524 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
525 target_win->lock_mut_->unlock();
527 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
528 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
530 target_win->lockers_.push_back(rank_);
536 int Win::lock_all(int assert){
537 int retval = MPI_SUCCESS;
538 for (int i = 0; i < comm_->size(); i++) {
539 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
540 if (ret != MPI_SUCCESS)
546 int Win::unlock(int rank){
547 MPI_Win target_win = connected_wins_[rank];
548 int target_mode = target_win->mode_;
549 target_win->mode_= 0;
550 target_win->lockers_.remove(rank_);
551 if (target_mode==MPI_LOCK_EXCLUSIVE){
552 target_win->lock_mut_->unlock();
559 int Win::unlock_all(){
560 int retval = MPI_SUCCESS;
561 for (int i = 0; i < comm_->size(); i++) {
562 int ret = this->unlock(i);
563 if (ret != MPI_SUCCESS)
569 int Win::flush(int rank){
570 int finished = finish_comms(rank);
571 XBT_DEBUG("Win_flush on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
573 finished = connected_wins_[rank]->finish_comms(rank_);
574 XBT_DEBUG("Win_flush on remote %d for local %d - Finished %d RMA calls", rank, rank_, finished);
579 int Win::flush_local(int rank){
580 int finished = finish_comms(rank);
581 XBT_DEBUG("Win_flush_local on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
585 int Win::flush_all(){
586 int finished = finish_comms();
587 XBT_DEBUG("Win_flush_all on local %d - Finished %d RMA calls", rank_, finished);
588 for (int i = 0; i < comm_->size(); i++) {
590 finished = connected_wins_[i]->finish_comms(rank_);
591 XBT_DEBUG("Win_flush_all on remote %d for local %d - Finished %d RMA calls", i, rank_, finished);
597 int Win::flush_local_all(){
598 int finished = finish_comms();
599 XBT_DEBUG("Win_flush_local_all on local %d - Finished %d RMA calls", rank_, finished);
603 Win* Win::f2c(int id){
604 return static_cast<Win*>(F2C::f2c(id));
607 int Win::finish_comms(){
608 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
609 // Without this, the vector could get redimensioned when another process pushes.
610 // This would result in the array used by Request::waitall() to be invalidated.
611 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
612 const std::scoped_lock lock(*mut_);
613 //Finish own requests
614 int size = static_cast<int>(requests_.size());
616 MPI_Request* treqs = requests_.data();
617 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
623 int Win::finish_comms(int rank){
624 // See comment about the mutex in finish_comms() above
625 const std::scoped_lock lock(*mut_);
626 // Finish own requests
627 // Let's see if we're either the destination or the sender of this request
628 // because we only wait for requests that we are responsible for.
629 // Also use the process id here since the request itself returns from src()
630 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
631 aid_t proc_id = comm_->group()->actor(rank);
632 auto it = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
633 return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
635 std::vector<MPI_Request> myreqqs(it, end(requests_));
636 requests_.erase(it, end(requests_));
637 int size = static_cast<int>(myreqqs.size());
639 MPI_Request* treqs = myreqqs.data();
640 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
646 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
648 const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
649 for (int i = 0; not target_win && i < comm_->size(); i++) {
650 if (connected_wins_[i]->size_ > 0)
651 target_win = connected_wins_[i];
654 *size = target_win->size_;
655 *disp_unit = target_win->disp_unit_;
656 *static_cast<void**>(baseptr) = target_win->base_;
659 *static_cast<void**>(baseptr) = nullptr;
664 MPI_Errhandler Win::errhandler()
666 if (errhandler_ != MPI_ERRHANDLER_NULL)
671 void Win::set_errhandler(MPI_Errhandler errhandler)
673 if (errhandler_ != MPI_ERRHANDLER_NULL)
674 simgrid::smpi::Errhandler::unref(errhandler_);
675 errhandler_ = errhandler;
676 if (errhandler_ != MPI_ERRHANDLER_NULL)
679 } // namespace simgrid::smpi