1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
21 #define CHECK_RMA_REMOTE_WIN(fun, win)\
22 if(target_count*target_datatype->get_extent()>win->size_){\
23 XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
24 fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
25 simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
26 return MPI_ERR_RMA_RANGE;\
29 #define CHECK_WIN_LOCKED(win) \
30 if (opened_ == 0) { /*check that post/start has been done*/ \
31 bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
38 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
39 int Win::keyval_id_=0;
41 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
44 , disp_unit_(disp_unit)
47 , connected_wins_(comm->size())
49 , allocated_(allocated)
52 XBT_DEBUG("Creating window");
53 if(info!=MPI_INFO_NULL)
55 connected_wins_[rank_] = this;
57 bar_ = new s4u::Barrier(comm->size());
60 comm->add_rma_win(this);
63 colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
66 colls::bcast(&bar_, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
73 //As per the standard, perform a barrier to ensure every async comm is finished
76 int finished = finish_comms();
77 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
79 if (info_ != MPI_INFO_NULL)
80 simgrid::smpi::Info::unref(info_);
81 if (errhandler_ != MPI_ERRHANDLER_NULL)
82 simgrid::smpi::Errhandler::unref(errhandler_);
84 comm_->remove_rma_win(this);
86 colls::barrier(comm_);
95 F2C::free_f(this->f2c_id());
99 int Win::attach(void* /*base*/, MPI_Aint size)
101 if (not(base_ == MPI_BOTTOM || base_ == nullptr))
103 base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
108 int Win::detach(const void* /*base*/)
115 void Win::get_name(char* name, int* length) const
117 *length = static_cast<int>(name_.length());
118 if (not name_.empty()) {
119 name_.copy(name, *length);
120 name[*length] = '\0';
124 void Win::get_group(MPI_Group* group){
125 if(comm_ != MPI_COMM_NULL){
126 *group = comm_->group();
128 *group = MPI_GROUP_NULL;
134 if (info_ == MPI_INFO_NULL)
140 int Win::rank() const
145 MPI_Comm Win::comm() const
150 MPI_Aint Win::size() const
155 void* Win::base() const
160 int Win::disp_unit() const
165 bool Win::dynamic() const
170 void Win::set_info(MPI_Info info)
172 if (info_ != MPI_INFO_NULL)
173 simgrid::smpi::Info::unref(info_);
175 if (info_ != MPI_INFO_NULL)
179 void Win::set_name(const char* name){
183 int Win::fence(int assert)
185 XBT_DEBUG("Entering fence");
188 if (not (assert & MPI_MODE_NOPRECEDE)) {
189 // This is not the first fence => finalize what came before
192 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
193 // Without this, the vector could get redimensioned when another process pushes.
194 // This would result in the array used by Request::waitall() to be invalidated.
195 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
197 // start all requests that have been prepared by another process
198 if (not requests_.empty()) {
199 int size = static_cast<int>(requests_.size());
200 MPI_Request* treqs = requests_.data();
201 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
207 if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
212 XBT_DEBUG("Leaving fence");
217 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
218 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
220 //get receiver pointer
221 Win* recv_win = connected_wins_[target_rank];
223 CHECK_WIN_LOCKED(recv_win)
224 CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
226 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
228 if (target_rank != rank_) { // This is not for myself, so we need to send messages
229 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
230 // prepare send_request
232 Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
235 //prepare receiver request
236 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
237 SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
242 if(request!=nullptr){
246 requests_.push_back(sreq);
250 //push request to receiver's win
251 recv_win->mut_->lock();
252 recv_win->requests_.push_back(rreq);
254 recv_win->mut_->unlock();
256 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
257 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
259 *request = MPI_REQUEST_NULL;
265 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
266 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
269 Win* send_win = connected_wins_[target_rank];
271 CHECK_WIN_LOCKED(send_win)
272 CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
274 const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
275 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
277 if (target_rank != rank_) {
278 //prepare send_request
279 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
280 SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
282 //prepare receiver request
283 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
284 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
286 //start the send, with another process than us as sender.
288 //push request to receiver's win
289 send_win->mut_->lock();
290 send_win->requests_.push_back(sreq);
291 send_win->mut_->unlock();
296 if(request!=nullptr){
300 requests_.push_back(rreq);
304 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
306 *request=MPI_REQUEST_NULL;
311 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
312 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
314 XBT_DEBUG("Entering MPI_Win_Accumulate");
315 //get receiver pointer
316 Win* recv_win = connected_wins_[target_rank];
318 //FIXME: local version
319 CHECK_WIN_LOCKED(recv_win)
320 CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
322 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
323 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
324 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
325 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
326 // prepare send_request
328 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
329 SMPI_RMA_TAG - 3 - count_, comm_, op);
331 // prepare receiver request
332 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
333 SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
339 // push request to receiver's win
340 recv_win->mut_->lock();
341 recv_win->requests_.push_back(rreq);
343 recv_win->mut_->unlock();
345 if (request != nullptr) {
349 requests_.push_back(sreq);
353 XBT_DEBUG("Leaving MPI_Win_Accumulate");
357 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
358 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
359 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
362 const Win* send_win = connected_wins_[target_rank];
364 CHECK_WIN_LOCKED(send_win)
365 CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
367 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
368 //need to be sure ops are correctly ordered, so finish request here ? slow.
370 send_win->atomic_mut_->lock();
371 get(result_addr, result_count, result_datatype, target_rank,
372 target_disp, target_count, target_datatype, &req);
373 if (req != MPI_REQUEST_NULL)
374 Request::wait(&req, MPI_STATUS_IGNORE);
376 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
377 target_disp, target_count, target_datatype, op, &req);
378 if (req != MPI_REQUEST_NULL)
379 Request::wait(&req, MPI_STATUS_IGNORE);
380 send_win->atomic_mut_->unlock();
384 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
385 int target_rank, MPI_Aint target_disp)
388 const Win* send_win = connected_wins_[target_rank];
390 CHECK_WIN_LOCKED(send_win)
392 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
393 MPI_Request req = MPI_REQUEST_NULL;
394 send_win->atomic_mut_->lock();
395 get(result_addr, 1, datatype, target_rank,
396 target_disp, 1, datatype, &req);
397 if (req != MPI_REQUEST_NULL)
398 Request::wait(&req, MPI_STATUS_IGNORE);
399 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
400 put(origin_addr, 1, datatype, target_rank,
401 target_disp, 1, datatype);
403 send_win->atomic_mut_->unlock();
407 int Win::start(MPI_Group group, int /*assert*/)
409 /* From MPI forum advices
410 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
411 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
412 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
413 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
414 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
415 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
416 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
417 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
418 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
419 must complete, without further dependencies. */
421 //naive, blocking implementation.
422 XBT_DEBUG("Entering MPI_Win_Start");
423 std::vector<MPI_Request> reqs;
424 for (int i = 0; i < group->size(); i++) {
425 int src = comm_->group()->rank(group->actor(i));
426 xbt_assert(src != MPI_UNDEFINED);
428 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
430 int size = static_cast<int>(reqs.size());
432 Request::startall(size, reqs.data());
433 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
434 for (auto& req : reqs)
435 Request::unref(&req);
439 opened_++; // we're open for business !
440 XBT_DEBUG("Leaving MPI_Win_Start");
444 int Win::post(MPI_Group group, int /*assert*/)
446 //let's make a synchronous send here
447 XBT_DEBUG("Entering MPI_Win_Post");
448 std::vector<MPI_Request> reqs;
449 for (int i = 0; i < group->size(); i++) {
450 int dst = comm_->group()->rank(group->actor(i));
451 xbt_assert(dst != MPI_UNDEFINED);
453 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
455 int size = static_cast<int>(reqs.size());
457 Request::startall(size, reqs.data());
458 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
459 for (auto& req : reqs)
460 Request::unref(&req);
464 opened_++; // we're open for business !
465 XBT_DEBUG("Leaving MPI_Win_Post");
470 xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
472 XBT_DEBUG("Entering MPI_Win_Complete");
473 std::vector<MPI_Request> reqs;
474 for (int i = 0; i < dst_group_->size(); i++) {
475 int dst = comm_->group()->rank(dst_group_->actor(i));
476 xbt_assert(dst != MPI_UNDEFINED);
478 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
480 int size = static_cast<int>(reqs.size());
482 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
483 Request::startall(size, reqs.data());
484 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
485 for (auto& req : reqs)
486 Request::unref(&req);
488 int finished = finish_comms();
489 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
491 opened_--; //we're closed for business !
492 Group::unref(dst_group_);
493 dst_group_ = MPI_GROUP_NULL;
498 //naive, blocking implementation.
499 XBT_DEBUG("Entering MPI_Win_Wait");
500 std::vector<MPI_Request> reqs;
501 for (int i = 0; i < src_group_->size(); i++) {
502 int src = comm_->group()->rank(src_group_->actor(i));
503 xbt_assert(src != MPI_UNDEFINED);
505 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
507 int size = static_cast<int>(reqs.size());
509 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
510 Request::startall(size, reqs.data());
511 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
512 for (auto& req : reqs)
513 Request::unref(&req);
515 int finished = finish_comms();
516 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
518 opened_--; //we're closed for business !
519 Group::unref(src_group_);
520 src_group_ = MPI_GROUP_NULL;
524 int Win::lock(int lock_type, int rank, int /*assert*/)
526 MPI_Win target_win = connected_wins_[rank];
528 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
529 target_win->lock_mut_->lock();
530 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
531 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
532 target_win->lock_mut_->unlock();
534 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
535 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
537 target_win->lockers_.push_back(rank_);
539 int finished = finish_comms(rank);
540 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
541 finished = target_win->finish_comms(rank_);
542 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
546 int Win::lock_all(int assert){
547 int retval = MPI_SUCCESS;
548 for (int i = 0; i < comm_->size(); i++) {
549 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
550 if (ret != MPI_SUCCESS)
556 int Win::unlock(int rank){
557 MPI_Win target_win = connected_wins_[rank];
558 int target_mode = target_win->mode_;
559 target_win->mode_= 0;
560 target_win->lockers_.remove(rank_);
561 if (target_mode==MPI_LOCK_EXCLUSIVE){
562 target_win->lock_mut_->unlock();
565 int finished = finish_comms(rank);
566 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
567 finished = target_win->finish_comms(rank_);
568 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
572 int Win::unlock_all(){
573 int retval = MPI_SUCCESS;
574 for (int i = 0; i < comm_->size(); i++) {
575 int ret = this->unlock(i);
576 if (ret != MPI_SUCCESS)
582 int Win::flush(int rank){
583 MPI_Win target_win = connected_wins_[rank];
584 int finished = finish_comms(rank_);
585 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
586 finished = target_win->finish_comms(rank);
587 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
591 int Win::flush_local(int rank){
592 int finished = finish_comms(rank);
593 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
597 int Win::flush_all(){
598 int finished = finish_comms();
599 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
600 for (int i = 0; i < comm_->size(); i++) {
601 finished = connected_wins_[i]->finish_comms(rank_);
602 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
607 int Win::flush_local_all(){
608 int finished = finish_comms();
609 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
613 Win* Win::f2c(int id){
614 return static_cast<Win*>(F2C::f2c(id));
617 int Win::finish_comms(){
619 //Finish own requests
620 int size = static_cast<int>(requests_.size());
622 MPI_Request* treqs = requests_.data();
623 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
630 int Win::finish_comms(int rank){
632 // Finish own requests
633 // Let's see if we're either the destination or the sender of this request
634 // because we only wait for requests that we are responsible for.
635 // Also use the process id here since the request itself returns from src()
636 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
637 aid_t proc_id = comm_->group()->actor(rank);
638 auto it = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
639 return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
641 std::vector<MPI_Request> myreqqs(it, end(requests_));
642 requests_.erase(it, end(requests_));
643 int size = static_cast<int>(myreqqs.size());
645 MPI_Request* treqs = myreqqs.data();
646 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
653 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
655 const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
656 for (int i = 0; not target_win && i < comm_->size(); i++) {
657 if (connected_wins_[i]->size_ > 0)
658 target_win = connected_wins_[i];
661 *size = target_win->size_;
662 *disp_unit = target_win->disp_unit_;
663 *static_cast<void**>(baseptr) = target_win->base_;
666 *static_cast<void**>(baseptr) = nullptr;
671 MPI_Errhandler Win::errhandler()
673 if (errhandler_ != MPI_ERRHANDLER_NULL)
678 void Win::set_errhandler(MPI_Errhandler errhandler)
680 if (errhandler_ != MPI_ERRHANDLER_NULL)
681 simgrid::smpi::Errhandler::unref(errhandler_);
682 errhandler_ = errhandler;
683 if (errhandler_ != MPI_ERRHANDLER_NULL)
687 } // namespace simgrid