1 /* Copyright (c) 2007-2019. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
22 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
23 int Win::keyval_id_=0;
25 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
26 int comm_size = comm->size();
28 XBT_DEBUG("Creating window");
29 if(info!=MPI_INFO_NULL)
33 group_ = MPI_GROUP_NULL;
34 requests_ = new std::vector<MPI_Request>();
35 mut_ = s4u::Mutex::create();
36 lock_mut_ = s4u::Mutex::create();
37 atomic_mut_ = s4u::Mutex::create();
38 connected_wins_ = new MPI_Win[comm_size];
39 connected_wins_[rank_] = this;
42 bar_ = new s4u::Barrier(comm_size);
45 errhandler_=MPI_ERRORS_ARE_FATAL;
46 comm->add_rma_win(this);
49 Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
52 Colls::bcast(&(bar_), sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
58 //As per the standard, perform a barrier to ensure every async comm is finished
61 int finished = finish_comms();
62 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
65 delete[] connected_wins_;
66 if (name_ != nullptr){
69 if(info_!=MPI_INFO_NULL){
70 MPI_Info_free(&info_);
73 comm_->remove_rma_win(this);
75 Colls::barrier(comm_);
87 int Win::attach(void* /*base*/, MPI_Aint size)
89 if (not(base_ == MPI_BOTTOM || base_ == 0))
91 base_=0;//actually the address will be given in the RMA calls, as being the disp.
96 int Win::detach(const void* /*base*/)
103 void Win::get_name(char* name, int* length){
109 *length = strlen(name_);
110 strncpy(name, name_, *length+1);
113 void Win::get_group(MPI_Group* group){
114 if(comm_ != MPI_COMM_NULL){
115 *group = comm_->group();
117 *group = MPI_GROUP_NULL;
121 MPI_Info Win::info(){
122 if(info_== MPI_INFO_NULL)
132 MPI_Aint Win::size(){
140 int Win::disp_unit(){
148 void Win::set_info(MPI_Info info){
149 if(info_!= MPI_INFO_NULL)
154 void Win::set_name(const char* name){
155 name_ = xbt_strdup(name);
158 int Win::fence(int assert)
160 XBT_DEBUG("Entering fence");
163 if (assert != MPI_MODE_NOPRECEDE) {
164 // This is not the first fence => finalize what came before
167 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
168 // Without this, the vector could get redimensioned when another process pushes.
169 // This would result in the array used by Request::waitall() to be invalidated.
170 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
171 std::vector<MPI_Request> *reqs = requests_;
172 int size = static_cast<int>(reqs->size());
173 // start all requests that have been prepared by another process
175 MPI_Request* treqs = &(*reqs)[0];
176 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
182 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
187 XBT_DEBUG("Leaving fence");
192 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
193 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
195 //get receiver pointer
196 MPI_Win recv_win = connected_wins_[target_rank];
198 if(opened_==0){//check that post/start has been done
199 // no fence or start .. lock ok ?
201 for (auto const& it : recv_win->lockers_)
202 if (it == comm_->rank())
208 if(target_count*target_datatype->get_extent()>recv_win->size_)
211 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
213 if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
214 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
215 // prepare send_request
217 // TODO cheinrich Check for rank / pid conversion
218 Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
221 //prepare receiver request
222 // TODO cheinrich Check for rank / pid conversion
223 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
224 target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
229 if(request!=nullptr){
233 requests_->push_back(sreq);
237 //push request to receiver's win
238 recv_win->mut_->lock();
239 recv_win->requests_->push_back(rreq);
241 recv_win->mut_->unlock();
244 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
245 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
247 *request = MPI_REQUEST_NULL;
253 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
254 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
257 MPI_Win send_win = connected_wins_[target_rank];
259 if(opened_==0){//check that post/start has been done
260 // no fence or start .. lock ok ?
262 for (auto const& it : send_win->lockers_)
263 if (it == comm_->rank())
269 if(target_count*target_datatype->get_extent()>send_win->size_)
272 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
273 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
275 if(target_rank != comm_->rank()){
276 //prepare send_request
277 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
278 send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
280 //prepare receiver request
281 MPI_Request rreq = Request::rma_recv_init(
282 origin_addr, origin_count, origin_datatype, target_rank,
283 comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
284 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
286 //start the send, with another process than us as sender.
288 //push request to receiver's win
289 send_win->mut_->lock();
290 send_win->requests_->push_back(sreq);
291 send_win->mut_->unlock();
296 if(request!=nullptr){
300 requests_->push_back(rreq);
305 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
307 *request=MPI_REQUEST_NULL;
314 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
315 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
317 XBT_DEBUG("Entering MPI_Win_Accumulate");
318 //get receiver pointer
319 MPI_Win recv_win = connected_wins_[target_rank];
321 if(opened_==0){//check that post/start has been done
322 // no fence or start .. lock ok ?
324 for (auto const& it : recv_win->lockers_)
325 if (it == comm_->rank())
330 //FIXME: local version
332 if(target_count*target_datatype->get_extent()>recv_win->size_)
335 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
336 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
337 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
338 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
339 // prepare send_request
341 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
342 SMPI_RMA_TAG - 3 - count_, comm_, op);
344 // prepare receiver request
345 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
346 recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
352 // push request to receiver's win
353 recv_win->mut_->lock();
354 recv_win->requests_->push_back(rreq);
356 recv_win->mut_->unlock();
358 if (request != nullptr) {
362 requests_->push_back(sreq);
366 XBT_DEBUG("Leaving MPI_Win_Accumulate");
370 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
371 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
372 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
375 MPI_Win send_win = connected_wins_[target_rank];
377 if(opened_==0){//check that post/start has been done
378 // no fence or start .. lock ok ?
380 for (auto const& it : send_win->lockers_)
381 if (it == comm_->rank())
387 if(target_count*target_datatype->get_extent()>send_win->size_)
390 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
391 //need to be sure ops are correctly ordered, so finish request here ? slow.
393 send_win->atomic_mut_->lock();
394 get(result_addr, result_count, result_datatype, target_rank,
395 target_disp, target_count, target_datatype, &req);
396 if (req != MPI_REQUEST_NULL)
397 Request::wait(&req, MPI_STATUS_IGNORE);
399 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
400 target_disp, target_count, target_datatype, op, &req);
401 if (req != MPI_REQUEST_NULL)
402 Request::wait(&req, MPI_STATUS_IGNORE);
403 send_win->atomic_mut_->unlock();
408 int Win::compare_and_swap(const void *origin_addr, void *compare_addr,
409 void *result_addr, MPI_Datatype datatype, int target_rank,
410 MPI_Aint target_disp){
412 MPI_Win send_win = connected_wins_[target_rank];
414 if(opened_==0){//check that post/start has been done
415 // no fence or start .. lock ok ?
417 for (auto const& it : send_win->lockers_)
418 if (it == comm_->rank())
424 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
425 MPI_Request req = MPI_REQUEST_NULL;
426 send_win->atomic_mut_->lock();
427 get(result_addr, 1, datatype, target_rank,
428 target_disp, 1, datatype, &req);
429 if (req != MPI_REQUEST_NULL)
430 Request::wait(&req, MPI_STATUS_IGNORE);
431 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
432 put(origin_addr, 1, datatype, target_rank,
433 target_disp, 1, datatype);
435 send_win->atomic_mut_->unlock();
439 int Win::start(MPI_Group group, int /*assert*/)
441 /* From MPI forum advices
442 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
443 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
444 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
445 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
446 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
447 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
448 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
449 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
450 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
451 must complete, without further dependencies. */
453 //naive, blocking implementation.
456 int size = group->size();
457 MPI_Request* reqs = xbt_new0(MPI_Request, size);
459 XBT_DEBUG("Entering MPI_Win_Start");
461 int src = comm_->group()->rank(group->actor(j));
462 if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
463 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
469 Request::startall(size, reqs);
470 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
471 for (i = 0; i < size; i++) {
472 Request::unref(&reqs[i]);
475 opened_++; //we're open for business !
478 XBT_DEBUG("Leaving MPI_Win_Start");
482 int Win::post(MPI_Group group, int /*assert*/)
484 //let's make a synchronous send here
487 int size = group->size();
488 MPI_Request* reqs = xbt_new0(MPI_Request, size);
490 XBT_DEBUG("Entering MPI_Win_Post");
492 int dst = comm_->group()->rank(group->actor(j));
493 if (dst != rank_ && dst != MPI_UNDEFINED) {
494 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
501 Request::startall(size, reqs);
502 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
504 Request::unref(&reqs[i]);
507 opened_++; //we're open for business !
510 XBT_DEBUG("Leaving MPI_Win_Post");
516 xbt_die("Complete called on already opened MPI_Win");
518 XBT_DEBUG("Entering MPI_Win_Complete");
521 int size = group_->size();
522 MPI_Request* reqs = xbt_new0(MPI_Request, size);
525 int dst = comm_->group()->rank(group_->actor(j));
526 if (dst != rank_ && dst != MPI_UNDEFINED) {
527 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
533 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
534 Request::startall(size, reqs);
535 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
538 Request::unref(&reqs[i]);
542 int finished = finish_comms();
543 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
545 Group::unref(group_);
546 opened_--; //we're closed for business !
551 //naive, blocking implementation.
552 XBT_DEBUG("Entering MPI_Win_Wait");
555 int size = group_->size();
556 MPI_Request* reqs = xbt_new0(MPI_Request, size);
559 int src = comm_->group()->rank(group_->actor(j));
560 if (src != rank_ && src != MPI_UNDEFINED) {
561 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
567 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
568 Request::startall(size, reqs);
569 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
571 Request::unref(&reqs[i]);
574 int finished = finish_comms();
575 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
577 Group::unref(group_);
578 opened_--; //we're opened for business !
582 int Win::lock(int lock_type, int rank, int /*assert*/)
584 MPI_Win target_win = connected_wins_[rank];
586 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
587 target_win->lock_mut_->lock();
588 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
589 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
590 target_win->lock_mut_->unlock();
592 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
593 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
595 target_win->lockers_.push_back(comm_->rank());
597 int finished = finish_comms(rank);
598 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
599 finished = target_win->finish_comms(rank_);
600 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
604 int Win::lock_all(int assert){
606 int retval = MPI_SUCCESS;
607 for (i=0; i<comm_->size();i++){
608 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
609 if(ret != MPI_SUCCESS)
615 int Win::unlock(int rank){
616 MPI_Win target_win = connected_wins_[rank];
617 int target_mode = target_win->mode_;
618 target_win->mode_= 0;
619 target_win->lockers_.remove(comm_->rank());
620 if (target_mode==MPI_LOCK_EXCLUSIVE){
621 target_win->lock_mut_->unlock();
624 int finished = finish_comms(rank);
625 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
626 finished = target_win->finish_comms(rank_);
627 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
631 int Win::unlock_all(){
633 int retval = MPI_SUCCESS;
634 for (i=0; i<comm_->size();i++){
635 int ret = this->unlock(i);
636 if (ret != MPI_SUCCESS)
642 int Win::flush(int rank){
643 MPI_Win target_win = connected_wins_[rank];
644 int finished = finish_comms(rank_);
645 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
646 finished = target_win->finish_comms(rank);
647 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
651 int Win::flush_local(int rank){
652 int finished = finish_comms(rank);
653 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
657 int Win::flush_all(){
658 int finished = finish_comms();
659 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
660 for (int i = 0; i < comm_->size(); i++) {
661 finished = connected_wins_[i]->finish_comms(rank_);
662 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
667 int Win::flush_local_all(){
668 int finished = finish_comms();
669 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
673 Win* Win::f2c(int id){
674 return static_cast<Win*>(F2C::f2c(id));
677 int Win::finish_comms(){
679 //Finish own requests
680 std::vector<MPI_Request> *reqqs = requests_;
681 int size = static_cast<int>(reqqs->size());
683 MPI_Request* treqs = &(*reqqs)[0];
684 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
691 int Win::finish_comms(int rank){
693 //Finish own requests
694 std::vector<MPI_Request> *reqqs = requests_;
695 int size = static_cast<int>(reqqs->size());
698 std::vector<MPI_Request> myreqqs;
699 std::vector<MPI_Request>::iterator iter = reqqs->begin();
700 int proc_id = comm_->group()->actor(rank)->get_pid();
701 while (iter != reqqs->end()){
702 // Let's see if we're either the destination or the sender of this request
703 // because we only wait for requests that we are responsible for.
704 // Also use the process id here since the request itself returns from src()
705 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
706 if (((*iter) != MPI_REQUEST_NULL) && (((*iter)->src() == proc_id) || ((*iter)->dst() == proc_id))) {
707 myreqqs.push_back(*iter);
708 iter = reqqs->erase(iter);
715 MPI_Request* treqs = &myreqqs[0];
716 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
724 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr)
726 MPI_Win target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
727 for (int i = 0; not target_win && i < comm_->size(); i++) {
728 if (connected_wins_[i]->size_ > 0)
729 target_win = connected_wins_[i];
732 *size = target_win->size_;
733 *disp_unit = target_win->disp_unit_;
734 *static_cast<void**>(baseptr) = target_win->base_;
737 *static_cast<void**>(baseptr) = xbt_malloc(0);
742 MPI_Errhandler Win::errhandler(){
746 void Win::set_errhandler(MPI_Errhandler errhandler){
747 errhandler_=errhandler;
748 if(errhandler_!= MPI_ERRHANDLER_NULL)