1 /* Copyright (c) 2007-2020. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
22 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
23 int Win::keyval_id_=0;
25 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic)
28 , disp_unit_(disp_unit)
32 , allocated_(allocated)
35 XBT_DEBUG("Creating window");
36 if(info!=MPI_INFO_NULL)
38 int comm_size = comm->size();
41 group_ = MPI_GROUP_NULL;
42 requests_ = new std::vector<MPI_Request>();
43 mut_ = s4u::Mutex::create();
44 lock_mut_ = s4u::Mutex::create();
45 atomic_mut_ = s4u::Mutex::create();
46 connected_wins_ = new MPI_Win[comm_size];
47 connected_wins_[rank_] = this;
50 bar_ = new s4u::Barrier(comm_size);
53 errhandler_=MPI_ERRORS_ARE_FATAL;
55 comm->add_rma_win(this);
58 colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win), MPI_BYTE,
61 colls::bcast(&(bar_), sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
67 //As per the standard, perform a barrier to ensure every async comm is finished
70 int finished = finish_comms();
71 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
74 delete[] connected_wins_;
75 if (name_ != nullptr){
78 if (info_ != MPI_INFO_NULL)
79 simgrid::smpi::Info::unref(info_);
80 if (errhandler_ != MPI_ERRHANDLER_NULL)
81 simgrid::smpi::Errhandler::unref(errhandler_);
83 comm_->remove_rma_win(this);
85 colls::barrier(comm_);
97 int Win::attach(void* /*base*/, MPI_Aint size)
99 if (not(base_ == MPI_BOTTOM || base_ == nullptr))
101 base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
106 int Win::detach(const void* /*base*/)
113 void Win::get_name(char* name, int* length) const
120 *length = strlen(name_);
121 strncpy(name, name_, *length+1);
124 void Win::get_group(MPI_Group* group){
125 if(comm_ != MPI_COMM_NULL){
126 *group = comm_->group();
128 *group = MPI_GROUP_NULL;
134 if (info_ == MPI_INFO_NULL)
140 int Win::rank() const
145 MPI_Aint Win::size() const
150 void* Win::base() const
155 int Win::disp_unit() const
160 int Win::dynamic() const
165 void Win::set_info(MPI_Info info)
167 if (info_ != MPI_INFO_NULL)
168 simgrid::smpi::Info::unref(info_);
170 if (info_ != MPI_INFO_NULL)
174 void Win::set_name(const char* name){
175 name_ = xbt_strdup(name);
178 int Win::fence(int assert)
180 XBT_DEBUG("Entering fence");
183 if (assert != MPI_MODE_NOPRECEDE) {
184 // This is not the first fence => finalize what came before
187 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
188 // Without this, the vector could get redimensioned when another process pushes.
189 // This would result in the array used by Request::waitall() to be invalidated.
190 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
191 std::vector<MPI_Request> *reqs = requests_;
192 int size = static_cast<int>(reqs->size());
193 // start all requests that have been prepared by another process
195 MPI_Request* treqs = &(*reqs)[0];
196 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
202 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
207 XBT_DEBUG("Leaving fence");
212 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
213 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
215 //get receiver pointer
216 const Win* recv_win = connected_wins_[target_rank];
218 if(opened_==0){//check that post/start has been done
219 // no fence or start .. lock ok ?
221 for (auto const& it : recv_win->lockers_)
222 if (it == comm_->rank())
228 if(target_count*target_datatype->get_extent()>recv_win->size_)
231 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
233 if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
234 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
235 // prepare send_request
237 // TODO cheinrich Check for rank / pid conversion
238 Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
241 //prepare receiver request
242 // TODO cheinrich Check for rank / pid conversion
243 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
244 target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
249 if(request!=nullptr){
253 requests_->push_back(sreq);
257 //push request to receiver's win
258 recv_win->mut_->lock();
259 recv_win->requests_->push_back(rreq);
261 recv_win->mut_->unlock();
263 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
264 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
266 *request = MPI_REQUEST_NULL;
272 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
273 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
276 const Win* send_win = connected_wins_[target_rank];
278 if(opened_==0){//check that post/start has been done
279 // no fence or start .. lock ok ?
281 for (auto const& it : send_win->lockers_)
282 if (it == comm_->rank())
288 if(target_count*target_datatype->get_extent()>send_win->size_)
291 const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
292 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
294 if(target_rank != comm_->rank()){
295 //prepare send_request
296 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
297 send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
299 //prepare receiver request
300 MPI_Request rreq = Request::rma_recv_init(
301 origin_addr, origin_count, origin_datatype, target_rank,
302 comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
303 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
305 //start the send, with another process than us as sender.
307 //push request to receiver's win
308 send_win->mut_->lock();
309 send_win->requests_->push_back(sreq);
310 send_win->mut_->unlock();
315 if(request!=nullptr){
319 requests_->push_back(rreq);
323 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
325 *request=MPI_REQUEST_NULL;
330 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
331 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
333 XBT_DEBUG("Entering MPI_Win_Accumulate");
334 //get receiver pointer
335 const Win* recv_win = connected_wins_[target_rank];
337 if(opened_==0){//check that post/start has been done
338 // no fence or start .. lock ok ?
340 for (auto const& it : recv_win->lockers_)
341 if (it == comm_->rank())
346 //FIXME: local version
348 if(target_count*target_datatype->get_extent()>recv_win->size_)
351 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
352 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
353 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
354 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
355 // prepare send_request
357 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
358 SMPI_RMA_TAG - 3 - count_, comm_, op);
360 // prepare receiver request
361 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
362 recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
368 // push request to receiver's win
369 recv_win->mut_->lock();
370 recv_win->requests_->push_back(rreq);
372 recv_win->mut_->unlock();
374 if (request != nullptr) {
378 requests_->push_back(sreq);
382 XBT_DEBUG("Leaving MPI_Win_Accumulate");
386 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
387 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
388 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
391 const Win* send_win = connected_wins_[target_rank];
393 if(opened_==0){//check that post/start has been done
394 // no fence or start .. lock ok ?
396 for (auto const& it : send_win->lockers_)
397 if (it == comm_->rank())
403 if(target_count*target_datatype->get_extent()>send_win->size_)
406 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
407 //need to be sure ops are correctly ordered, so finish request here ? slow.
409 send_win->atomic_mut_->lock();
410 get(result_addr, result_count, result_datatype, target_rank,
411 target_disp, target_count, target_datatype, &req);
412 if (req != MPI_REQUEST_NULL)
413 Request::wait(&req, MPI_STATUS_IGNORE);
415 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
416 target_disp, target_count, target_datatype, op, &req);
417 if (req != MPI_REQUEST_NULL)
418 Request::wait(&req, MPI_STATUS_IGNORE);
419 send_win->atomic_mut_->unlock();
423 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
424 int target_rank, MPI_Aint target_disp)
427 const Win* send_win = connected_wins_[target_rank];
429 if(opened_==0){//check that post/start has been done
430 // no fence or start .. lock ok ?
432 for (auto const& it : send_win->lockers_)
433 if (it == comm_->rank())
439 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
440 MPI_Request req = MPI_REQUEST_NULL;
441 send_win->atomic_mut_->lock();
442 get(result_addr, 1, datatype, target_rank,
443 target_disp, 1, datatype, &req);
444 if (req != MPI_REQUEST_NULL)
445 Request::wait(&req, MPI_STATUS_IGNORE);
446 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
447 put(origin_addr, 1, datatype, target_rank,
448 target_disp, 1, datatype);
450 send_win->atomic_mut_->unlock();
454 int Win::start(MPI_Group group, int /*assert*/)
456 /* From MPI forum advices
457 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
458 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
459 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
460 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
461 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
462 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
463 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
464 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
465 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
466 must complete, without further dependencies. */
468 //naive, blocking implementation.
471 int size = group->size();
472 MPI_Request* reqs = xbt_new0(MPI_Request, size);
474 XBT_DEBUG("Entering MPI_Win_Start");
476 int src = comm_->group()->rank(group->actor(j));
477 if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
478 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
484 Request::startall(size, reqs);
485 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
486 for (i = 0; i < size; i++) {
487 Request::unref(&reqs[i]);
490 opened_++; //we're open for business !
493 XBT_DEBUG("Leaving MPI_Win_Start");
497 int Win::post(MPI_Group group, int /*assert*/)
499 //let's make a synchronous send here
502 int size = group->size();
503 MPI_Request* reqs = xbt_new0(MPI_Request, size);
505 XBT_DEBUG("Entering MPI_Win_Post");
507 int dst = comm_->group()->rank(group->actor(j));
508 if (dst != rank_ && dst != MPI_UNDEFINED) {
509 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
516 Request::startall(size, reqs);
517 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
519 Request::unref(&reqs[i]);
522 opened_++; //we're open for business !
525 XBT_DEBUG("Leaving MPI_Win_Post");
531 xbt_die("Complete called on already opened MPI_Win");
533 XBT_DEBUG("Entering MPI_Win_Complete");
536 int size = group_->size();
537 MPI_Request* reqs = xbt_new0(MPI_Request, size);
540 int dst = comm_->group()->rank(group_->actor(j));
541 if (dst != rank_ && dst != MPI_UNDEFINED) {
542 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
548 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
549 Request::startall(size, reqs);
550 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
553 Request::unref(&reqs[i]);
557 int finished = finish_comms();
558 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
560 Group::unref(group_);
561 opened_--; //we're closed for business !
566 //naive, blocking implementation.
567 XBT_DEBUG("Entering MPI_Win_Wait");
570 int size = group_->size();
571 MPI_Request* reqs = xbt_new0(MPI_Request, size);
574 int src = comm_->group()->rank(group_->actor(j));
575 if (src != rank_ && src != MPI_UNDEFINED) {
576 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
582 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
583 Request::startall(size, reqs);
584 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
586 Request::unref(&reqs[i]);
589 int finished = finish_comms();
590 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
592 Group::unref(group_);
593 opened_--; //we're opened for business !
597 int Win::lock(int lock_type, int rank, int /*assert*/)
599 MPI_Win target_win = connected_wins_[rank];
601 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
602 target_win->lock_mut_->lock();
603 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
604 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
605 target_win->lock_mut_->unlock();
607 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
608 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
610 target_win->lockers_.push_back(comm_->rank());
612 int finished = finish_comms(rank);
613 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
614 finished = target_win->finish_comms(rank_);
615 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
619 int Win::lock_all(int assert){
620 int retval = MPI_SUCCESS;
621 for (int i = 0; i < comm_->size(); i++) {
622 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
623 if (ret != MPI_SUCCESS)
629 int Win::unlock(int rank){
630 MPI_Win target_win = connected_wins_[rank];
631 int target_mode = target_win->mode_;
632 target_win->mode_= 0;
633 target_win->lockers_.remove(comm_->rank());
634 if (target_mode==MPI_LOCK_EXCLUSIVE){
635 target_win->lock_mut_->unlock();
638 int finished = finish_comms(rank);
639 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
640 finished = target_win->finish_comms(rank_);
641 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
645 int Win::unlock_all(){
646 int retval = MPI_SUCCESS;
647 for (int i = 0; i < comm_->size(); i++) {
648 int ret = this->unlock(i);
649 if (ret != MPI_SUCCESS)
655 int Win::flush(int rank){
656 MPI_Win target_win = connected_wins_[rank];
657 int finished = finish_comms(rank_);
658 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
659 finished = target_win->finish_comms(rank);
660 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
664 int Win::flush_local(int rank){
665 int finished = finish_comms(rank);
666 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
670 int Win::flush_all(){
671 int finished = finish_comms();
672 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
673 for (int i = 0; i < comm_->size(); i++) {
674 finished = connected_wins_[i]->finish_comms(rank_);
675 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
680 int Win::flush_local_all(){
681 int finished = finish_comms();
682 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
686 Win* Win::f2c(int id){
687 return static_cast<Win*>(F2C::f2c(id));
690 int Win::finish_comms(){
692 //Finish own requests
693 std::vector<MPI_Request> *reqqs = requests_;
694 int size = static_cast<int>(reqqs->size());
696 MPI_Request* treqs = &(*reqqs)[0];
697 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
704 int Win::finish_comms(int rank){
706 //Finish own requests
707 std::vector<MPI_Request> *reqqs = requests_;
708 int size = static_cast<int>(reqqs->size());
711 std::vector<MPI_Request> myreqqs;
712 auto iter = reqqs->begin();
713 int proc_id = comm_->group()->actor(rank)->get_pid();
714 while (iter != reqqs->end()){
715 // Let's see if we're either the destination or the sender of this request
716 // because we only wait for requests that we are responsible for.
717 // Also use the process id here since the request itself returns from src()
718 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
719 if (((*iter) != MPI_REQUEST_NULL) && (((*iter)->src() == proc_id) || ((*iter)->dst() == proc_id))) {
720 myreqqs.push_back(*iter);
721 iter = reqqs->erase(iter);
728 MPI_Request* treqs = &myreqqs[0];
729 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
737 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
739 const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
740 for (int i = 0; not target_win && i < comm_->size(); i++) {
741 if (connected_wins_[i]->size_ > 0)
742 target_win = connected_wins_[i];
745 *size = target_win->size_;
746 *disp_unit = target_win->disp_unit_;
747 *static_cast<void**>(baseptr) = target_win->base_;
750 *static_cast<void**>(baseptr) = nullptr;
755 MPI_Errhandler Win::errhandler()
757 if (errhandler_ != MPI_ERRHANDLER_NULL)
762 void Win::set_errhandler(MPI_Errhandler errhandler)
764 if (errhandler_ != MPI_ERRHANDLER_NULL)
765 simgrid::smpi::Errhandler::unref(errhandler_);
766 errhandler_ = errhandler;
767 if (errhandler_ != MPI_ERRHANDLER_NULL)
771 } // namespace simgrid