1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
21 #define CHECK_RMA_REMOTE_WIN(fun, win)\
22 if(target_count*target_datatype->get_extent()>win->size_){\
23 XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
24 fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
25 simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
26 return MPI_ERR_RMA_RANGE;\
29 #define CHECK_WIN_LOCKED(win) \
30 if (opened_ == 0) { /*check that post/start has been done*/ \
31 bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
38 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
39 int Win::keyval_id_=0;
41 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
44 , disp_unit_(disp_unit)
47 , connected_wins_(comm->size())
49 , allocated_(allocated)
52 XBT_DEBUG("Creating window");
53 if(info!=MPI_INFO_NULL)
55 connected_wins_[rank_] = this;
57 bar_ = new s4u::Barrier(comm->size());
60 comm->add_rma_win(this);
63 colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
66 colls::bcast(&bar_, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
73 //As per the standard, perform a barrier to ensure every async comm is finished
78 if (info_ != MPI_INFO_NULL)
79 simgrid::smpi::Info::unref(info_);
80 if (errhandler_ != MPI_ERRHANDLER_NULL)
81 simgrid::smpi::Errhandler::unref(errhandler_);
83 comm_->remove_rma_win(this);
85 colls::barrier(comm_);
94 F2C::free_f(this->f2c_id());
98 int Win::attach(void* /*base*/, MPI_Aint size)
100 if (not(base_ == MPI_BOTTOM || base_ == nullptr))
102 base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
107 int Win::detach(const void* /*base*/)
114 void Win::get_name(char* name, int* length) const
116 *length = static_cast<int>(name_.length());
117 if (not name_.empty()) {
118 name_.copy(name, *length);
119 name[*length] = '\0';
123 void Win::get_group(MPI_Group* group){
124 if(comm_ != MPI_COMM_NULL){
125 *group = comm_->group();
127 *group = MPI_GROUP_NULL;
136 int Win::rank() const
141 MPI_Comm Win::comm() const
146 MPI_Aint Win::size() const
151 void* Win::base() const
156 int Win::disp_unit() const
161 bool Win::dynamic() const
166 void Win::set_info(MPI_Info info)
168 if (info_ != MPI_INFO_NULL)
169 simgrid::smpi::Info::unref(info_);
171 if (info_ != MPI_INFO_NULL)
175 void Win::set_name(const char* name){
179 int Win::fence(int assert)
181 XBT_DEBUG("Entering fence");
183 if (not (assert & MPI_MODE_NOPRECEDE)) {
184 // This is not the first fence => finalize what came before
190 if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
195 XBT_DEBUG("Leaving fence");
200 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
201 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
203 //get receiver pointer
204 Win* recv_win = connected_wins_[target_rank];
206 CHECK_WIN_LOCKED(recv_win)
207 CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
209 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
211 if (target_rank != rank_) { // This is not for myself, so we need to send messages
212 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
213 // prepare send_request
215 Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
218 //prepare receiver request
219 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
220 SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
225 if(request!=nullptr){
229 requests_.push_back(sreq);
233 //push request to receiver's win
234 recv_win->mut_->lock();
235 recv_win->requests_.push_back(rreq);
237 recv_win->mut_->unlock();
239 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
240 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
242 *request = MPI_REQUEST_NULL;
248 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
249 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
252 Win* send_win = connected_wins_[target_rank];
254 CHECK_WIN_LOCKED(send_win)
255 CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
257 const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
258 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
260 if (target_rank != rank_) {
261 //prepare send_request
262 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
263 SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
265 //prepare receiver request
266 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
267 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
269 //start the send, with another process than us as sender.
271 // push request to sender's win
272 send_win->mut_->lock();
273 send_win->requests_.push_back(sreq);
274 send_win->mut_->unlock();
279 if(request!=nullptr){
283 requests_.push_back(rreq);
287 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
289 *request=MPI_REQUEST_NULL;
294 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
295 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
297 XBT_DEBUG("Entering MPI_Win_Accumulate");
298 //get receiver pointer
299 Win* recv_win = connected_wins_[target_rank];
301 //FIXME: local version
302 CHECK_WIN_LOCKED(recv_win)
303 CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
305 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
306 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
307 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
308 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
309 // prepare send_request
311 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
312 SMPI_RMA_TAG - 3 - count_, comm_, op);
314 // prepare receiver request
315 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
316 SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
322 // push request to receiver's win
323 recv_win->mut_->lock();
324 recv_win->requests_.push_back(rreq);
326 recv_win->mut_->unlock();
328 if (request != nullptr) {
332 requests_.push_back(sreq);
336 // FIXME: The current implementation fails to ensure the correct ordering of the accumulate requests. The following
337 // 'flush' is a workaround to fix that.
339 XBT_DEBUG("Leaving MPI_Win_Accumulate");
343 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
344 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
345 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
348 const Win* send_win = connected_wins_[target_rank];
350 CHECK_WIN_LOCKED(send_win)
351 CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
353 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
354 //need to be sure ops are correctly ordered, so finish request here ? slow.
355 MPI_Request req = MPI_REQUEST_NULL;
356 send_win->atomic_mut_->lock();
357 get(result_addr, result_count, result_datatype, target_rank,
358 target_disp, target_count, target_datatype, &req);
359 if (req != MPI_REQUEST_NULL)
360 Request::wait(&req, MPI_STATUS_IGNORE);
362 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
363 target_disp, target_count, target_datatype, op, &req);
364 if (req != MPI_REQUEST_NULL)
365 Request::wait(&req, MPI_STATUS_IGNORE);
366 send_win->atomic_mut_->unlock();
370 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
371 int target_rank, MPI_Aint target_disp)
374 const Win* send_win = connected_wins_[target_rank];
376 CHECK_WIN_LOCKED(send_win)
378 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
379 MPI_Request req = MPI_REQUEST_NULL;
380 send_win->atomic_mut_->lock();
381 get(result_addr, 1, datatype, target_rank,
382 target_disp, 1, datatype, &req);
383 if (req != MPI_REQUEST_NULL)
384 Request::wait(&req, MPI_STATUS_IGNORE);
385 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
386 put(origin_addr, 1, datatype, target_rank,
387 target_disp, 1, datatype);
389 send_win->atomic_mut_->unlock();
393 int Win::start(MPI_Group group, int /*assert*/)
395 /* From MPI forum advices
396 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
397 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
398 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
399 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
400 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
401 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
402 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
403 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
404 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
405 must complete, without further dependencies. */
407 //naive, blocking implementation.
408 XBT_DEBUG("Entering MPI_Win_Start");
409 std::vector<MPI_Request> reqs;
410 for (int i = 0; i < group->size(); i++) {
411 int src = comm_->group()->rank(group->actor(i));
412 xbt_assert(src != MPI_UNDEFINED);
414 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
416 int size = static_cast<int>(reqs.size());
418 Request::startall(size, reqs.data());
419 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
420 for (auto& req : reqs)
421 Request::unref(&req);
425 opened_++; // we're open for business !
426 XBT_DEBUG("Leaving MPI_Win_Start");
430 int Win::post(MPI_Group group, int /*assert*/)
432 //let's make a synchronous send here
433 XBT_DEBUG("Entering MPI_Win_Post");
434 std::vector<MPI_Request> reqs;
435 for (int i = 0; i < group->size(); i++) {
436 int dst = comm_->group()->rank(group->actor(i));
437 xbt_assert(dst != MPI_UNDEFINED);
439 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
441 int size = static_cast<int>(reqs.size());
443 Request::startall(size, reqs.data());
444 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
445 for (auto& req : reqs)
446 Request::unref(&req);
450 opened_++; // we're open for business !
451 XBT_DEBUG("Leaving MPI_Win_Post");
456 xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
458 XBT_DEBUG("Entering MPI_Win_Complete");
459 std::vector<MPI_Request> reqs;
460 for (int i = 0; i < dst_group_->size(); i++) {
461 int dst = comm_->group()->rank(dst_group_->actor(i));
462 xbt_assert(dst != MPI_UNDEFINED);
464 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
466 int size = static_cast<int>(reqs.size());
468 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
469 Request::startall(size, reqs.data());
470 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
471 for (auto& req : reqs)
472 Request::unref(&req);
476 opened_--; //we're closed for business !
477 Group::unref(dst_group_);
478 dst_group_ = MPI_GROUP_NULL;
483 //naive, blocking implementation.
484 XBT_DEBUG("Entering MPI_Win_Wait");
485 std::vector<MPI_Request> reqs;
486 for (int i = 0; i < src_group_->size(); i++) {
487 int src = comm_->group()->rank(src_group_->actor(i));
488 xbt_assert(src != MPI_UNDEFINED);
490 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
492 int size = static_cast<int>(reqs.size());
494 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
495 Request::startall(size, reqs.data());
496 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
497 for (auto& req : reqs)
498 Request::unref(&req);
502 opened_--; //we're closed for business !
503 Group::unref(src_group_);
504 src_group_ = MPI_GROUP_NULL;
508 int Win::lock(int lock_type, int rank, int /*assert*/)
510 MPI_Win target_win = connected_wins_[rank];
512 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
513 target_win->lock_mut_->lock();
514 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
515 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
516 target_win->lock_mut_->unlock();
518 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
519 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
521 target_win->lockers_.push_back(rank_);
527 int Win::lock_all(int assert){
528 int retval = MPI_SUCCESS;
529 for (int i = 0; i < comm_->size(); i++) {
530 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
531 if (ret != MPI_SUCCESS)
537 int Win::unlock(int rank){
538 MPI_Win target_win = connected_wins_[rank];
539 int target_mode = target_win->mode_;
540 target_win->mode_= 0;
541 target_win->lockers_.remove(rank_);
542 if (target_mode==MPI_LOCK_EXCLUSIVE){
543 target_win->lock_mut_->unlock();
550 int Win::unlock_all(){
551 int retval = MPI_SUCCESS;
552 for (int i = 0; i < comm_->size(); i++) {
553 int ret = this->unlock(i);
554 if (ret != MPI_SUCCESS)
560 int Win::flush(int rank){
561 int finished = finish_comms(rank);
562 XBT_DEBUG("Win_flush on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
564 finished = connected_wins_[rank]->finish_comms(rank_);
565 XBT_DEBUG("Win_flush on remote %d for local %d - Finished %d RMA calls", rank, rank_, finished);
570 int Win::flush_local(int rank){
571 int finished = finish_comms(rank);
572 XBT_DEBUG("Win_flush_local on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
576 int Win::flush_all(){
577 int finished = finish_comms();
578 XBT_DEBUG("Win_flush_all on local %d - Finished %d RMA calls", rank_, finished);
579 for (int i = 0; i < comm_->size(); i++) {
581 finished = connected_wins_[i]->finish_comms(rank_);
582 XBT_DEBUG("Win_flush_all on remote %d for local %d - Finished %d RMA calls", i, rank_, finished);
588 int Win::flush_local_all(){
589 int finished = finish_comms();
590 XBT_DEBUG("Win_flush_local_all on local %d - Finished %d RMA calls", rank_, finished);
594 Win* Win::f2c(int id){
595 return static_cast<Win*>(F2C::f2c(id));
598 int Win::finish_comms(){
599 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
600 // Without this, the vector could get redimensioned when another process pushes.
601 // This would result in the array used by Request::waitall() to be invalidated.
602 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
604 //Finish own requests
605 int size = static_cast<int>(requests_.size());
607 MPI_Request* treqs = requests_.data();
608 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
615 int Win::finish_comms(int rank){
616 // See comment about the mutex in finish_comms() above
618 // Finish own requests
619 // Let's see if we're either the destination or the sender of this request
620 // because we only wait for requests that we are responsible for.
621 // Also use the process id here since the request itself returns from src()
622 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
623 aid_t proc_id = comm_->group()->actor(rank);
624 auto it = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
625 return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
627 std::vector<MPI_Request> myreqqs(it, end(requests_));
628 requests_.erase(it, end(requests_));
629 int size = static_cast<int>(myreqqs.size());
631 MPI_Request* treqs = myreqqs.data();
632 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
639 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
641 const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
642 for (int i = 0; not target_win && i < comm_->size(); i++) {
643 if (connected_wins_[i]->size_ > 0)
644 target_win = connected_wins_[i];
647 *size = target_win->size_;
648 *disp_unit = target_win->disp_unit_;
649 *static_cast<void**>(baseptr) = target_win->base_;
652 *static_cast<void**>(baseptr) = nullptr;
657 MPI_Errhandler Win::errhandler()
659 if (errhandler_ != MPI_ERRHANDLER_NULL)
664 void Win::set_errhandler(MPI_Errhandler errhandler)
666 if (errhandler_ != MPI_ERRHANDLER_NULL)
667 simgrid::smpi::Errhandler::unref(errhandler_);
668 errhandler_ = errhandler;
669 if (errhandler_ != MPI_ERRHANDLER_NULL)
673 } // namespace simgrid