1 /* Copyright (c) 2007-2019. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
22 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
23 int Win::keyval_id_=0;
25 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
26 int comm_size = comm->size();
28 XBT_DEBUG("Creating window");
29 if(info!=MPI_INFO_NULL)
33 group_ = MPI_GROUP_NULL;
34 requests_ = new std::vector<MPI_Request>();
35 mut_ = s4u::Mutex::create();
36 lock_mut_ = s4u::Mutex::create();
37 atomic_mut_ = s4u::Mutex::create();
38 connected_wins_ = new MPI_Win[comm_size];
39 connected_wins_[rank_] = this;
42 bar_ = new s4u::Barrier(comm_size);
45 errhandler_=MPI_ERRORS_ARE_FATAL;
46 comm->add_rma_win(this);
49 colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win), MPI_BYTE,
52 colls::bcast(&(bar_), sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
58 //As per the standard, perform a barrier to ensure every async comm is finished
61 int finished = finish_comms();
62 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
65 delete[] connected_wins_;
66 if (name_ != nullptr){
69 if(info_!=MPI_INFO_NULL){
70 MPI_Info_free(&info_);
73 comm_->remove_rma_win(this);
75 colls::barrier(comm_);
87 int Win::attach(void* /*base*/, MPI_Aint size)
89 if (not(base_ == MPI_BOTTOM || base_ == 0))
91 base_=0;//actually the address will be given in the RMA calls, as being the disp.
96 int Win::detach(const void* /*base*/)
103 void Win::get_name(char* name, int* length){
109 *length = strlen(name_);
110 strncpy(name, name_, *length+1);
113 void Win::get_group(MPI_Group* group){
114 if(comm_ != MPI_COMM_NULL){
115 *group = comm_->group();
117 *group = MPI_GROUP_NULL;
121 MPI_Info Win::info(){
122 if(info_== MPI_INFO_NULL)
132 MPI_Aint Win::size(){
140 int Win::disp_unit(){
148 void Win::set_info(MPI_Info info){
149 if(info_!= MPI_INFO_NULL)
154 void Win::set_name(const char* name){
155 name_ = xbt_strdup(name);
158 int Win::fence(int assert)
160 XBT_DEBUG("Entering fence");
163 if (assert != MPI_MODE_NOPRECEDE) {
164 // This is not the first fence => finalize what came before
167 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
168 // Without this, the vector could get redimensioned when another process pushes.
169 // This would result in the array used by Request::waitall() to be invalidated.
170 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
171 std::vector<MPI_Request> *reqs = requests_;
172 int size = static_cast<int>(reqs->size());
173 // start all requests that have been prepared by another process
175 MPI_Request* treqs = &(*reqs)[0];
176 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
182 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
187 XBT_DEBUG("Leaving fence");
192 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
193 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
195 //get receiver pointer
196 MPI_Win recv_win = connected_wins_[target_rank];
198 if(opened_==0){//check that post/start has been done
199 // no fence or start .. lock ok ?
201 for (auto const& it : recv_win->lockers_)
202 if (it == comm_->rank())
208 if(target_count*target_datatype->get_extent()>recv_win->size_)
211 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
213 if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
214 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
215 // prepare send_request
217 // TODO cheinrich Check for rank / pid conversion
218 Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
221 //prepare receiver request
222 // TODO cheinrich Check for rank / pid conversion
223 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
224 target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
229 if(request!=nullptr){
233 requests_->push_back(sreq);
237 //push request to receiver's win
238 recv_win->mut_->lock();
239 recv_win->requests_->push_back(rreq);
241 recv_win->mut_->unlock();
243 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
244 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
246 *request = MPI_REQUEST_NULL;
252 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
253 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
256 MPI_Win send_win = connected_wins_[target_rank];
258 if(opened_==0){//check that post/start has been done
259 // no fence or start .. lock ok ?
261 for (auto const& it : send_win->lockers_)
262 if (it == comm_->rank())
268 if(target_count*target_datatype->get_extent()>send_win->size_)
271 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
272 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
274 if(target_rank != comm_->rank()){
275 //prepare send_request
276 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
277 send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
279 //prepare receiver request
280 MPI_Request rreq = Request::rma_recv_init(
281 origin_addr, origin_count, origin_datatype, target_rank,
282 comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
283 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
285 //start the send, with another process than us as sender.
287 //push request to receiver's win
288 send_win->mut_->lock();
289 send_win->requests_->push_back(sreq);
290 send_win->mut_->unlock();
295 if(request!=nullptr){
299 requests_->push_back(rreq);
303 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
305 *request=MPI_REQUEST_NULL;
310 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
311 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
313 XBT_DEBUG("Entering MPI_Win_Accumulate");
314 //get receiver pointer
315 MPI_Win recv_win = connected_wins_[target_rank];
317 if(opened_==0){//check that post/start has been done
318 // no fence or start .. lock ok ?
320 for (auto const& it : recv_win->lockers_)
321 if (it == comm_->rank())
326 //FIXME: local version
328 if(target_count*target_datatype->get_extent()>recv_win->size_)
331 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
332 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
333 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
334 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
335 // prepare send_request
337 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
338 SMPI_RMA_TAG - 3 - count_, comm_, op);
340 // prepare receiver request
341 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
342 recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
348 // push request to receiver's win
349 recv_win->mut_->lock();
350 recv_win->requests_->push_back(rreq);
352 recv_win->mut_->unlock();
354 if (request != nullptr) {
358 requests_->push_back(sreq);
362 XBT_DEBUG("Leaving MPI_Win_Accumulate");
366 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
367 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
368 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
371 MPI_Win send_win = connected_wins_[target_rank];
373 if(opened_==0){//check that post/start has been done
374 // no fence or start .. lock ok ?
376 for (auto const& it : send_win->lockers_)
377 if (it == comm_->rank())
383 if(target_count*target_datatype->get_extent()>send_win->size_)
386 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
387 //need to be sure ops are correctly ordered, so finish request here ? slow.
389 send_win->atomic_mut_->lock();
390 get(result_addr, result_count, result_datatype, target_rank,
391 target_disp, target_count, target_datatype, &req);
392 if (req != MPI_REQUEST_NULL)
393 Request::wait(&req, MPI_STATUS_IGNORE);
395 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
396 target_disp, target_count, target_datatype, op, &req);
397 if (req != MPI_REQUEST_NULL)
398 Request::wait(&req, MPI_STATUS_IGNORE);
399 send_win->atomic_mut_->unlock();
403 int Win::compare_and_swap(const void *origin_addr, void *compare_addr,
404 void *result_addr, MPI_Datatype datatype, int target_rank,
405 MPI_Aint target_disp){
407 MPI_Win send_win = connected_wins_[target_rank];
409 if(opened_==0){//check that post/start has been done
410 // no fence or start .. lock ok ?
412 for (auto const& it : send_win->lockers_)
413 if (it == comm_->rank())
419 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
420 MPI_Request req = MPI_REQUEST_NULL;
421 send_win->atomic_mut_->lock();
422 get(result_addr, 1, datatype, target_rank,
423 target_disp, 1, datatype, &req);
424 if (req != MPI_REQUEST_NULL)
425 Request::wait(&req, MPI_STATUS_IGNORE);
426 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
427 put(origin_addr, 1, datatype, target_rank,
428 target_disp, 1, datatype);
430 send_win->atomic_mut_->unlock();
434 int Win::start(MPI_Group group, int /*assert*/)
436 /* From MPI forum advices
437 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
438 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
439 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
440 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
441 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
442 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
443 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
444 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
445 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
446 must complete, without further dependencies. */
448 //naive, blocking implementation.
451 int size = group->size();
452 MPI_Request* reqs = xbt_new0(MPI_Request, size);
454 XBT_DEBUG("Entering MPI_Win_Start");
456 int src = comm_->group()->rank(group->actor(j));
457 if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
458 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
464 Request::startall(size, reqs);
465 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
466 for (i = 0; i < size; i++) {
467 Request::unref(&reqs[i]);
470 opened_++; //we're open for business !
473 XBT_DEBUG("Leaving MPI_Win_Start");
477 int Win::post(MPI_Group group, int /*assert*/)
479 //let's make a synchronous send here
482 int size = group->size();
483 MPI_Request* reqs = xbt_new0(MPI_Request, size);
485 XBT_DEBUG("Entering MPI_Win_Post");
487 int dst = comm_->group()->rank(group->actor(j));
488 if (dst != rank_ && dst != MPI_UNDEFINED) {
489 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
496 Request::startall(size, reqs);
497 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
499 Request::unref(&reqs[i]);
502 opened_++; //we're open for business !
505 XBT_DEBUG("Leaving MPI_Win_Post");
511 xbt_die("Complete called on already opened MPI_Win");
513 XBT_DEBUG("Entering MPI_Win_Complete");
516 int size = group_->size();
517 MPI_Request* reqs = xbt_new0(MPI_Request, size);
520 int dst = comm_->group()->rank(group_->actor(j));
521 if (dst != rank_ && dst != MPI_UNDEFINED) {
522 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
528 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
529 Request::startall(size, reqs);
530 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
533 Request::unref(&reqs[i]);
537 int finished = finish_comms();
538 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
540 Group::unref(group_);
541 opened_--; //we're closed for business !
546 //naive, blocking implementation.
547 XBT_DEBUG("Entering MPI_Win_Wait");
550 int size = group_->size();
551 MPI_Request* reqs = xbt_new0(MPI_Request, size);
554 int src = comm_->group()->rank(group_->actor(j));
555 if (src != rank_ && src != MPI_UNDEFINED) {
556 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
562 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
563 Request::startall(size, reqs);
564 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
566 Request::unref(&reqs[i]);
569 int finished = finish_comms();
570 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
572 Group::unref(group_);
573 opened_--; //we're opened for business !
577 int Win::lock(int lock_type, int rank, int /*assert*/)
579 MPI_Win target_win = connected_wins_[rank];
581 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
582 target_win->lock_mut_->lock();
583 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
584 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
585 target_win->lock_mut_->unlock();
587 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
588 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
590 target_win->lockers_.push_back(comm_->rank());
592 int finished = finish_comms(rank);
593 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
594 finished = target_win->finish_comms(rank_);
595 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
599 int Win::lock_all(int assert){
601 int retval = MPI_SUCCESS;
602 for (i=0; i<comm_->size();i++){
603 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
604 if(ret != MPI_SUCCESS)
610 int Win::unlock(int rank){
611 MPI_Win target_win = connected_wins_[rank];
612 int target_mode = target_win->mode_;
613 target_win->mode_= 0;
614 target_win->lockers_.remove(comm_->rank());
615 if (target_mode==MPI_LOCK_EXCLUSIVE){
616 target_win->lock_mut_->unlock();
619 int finished = finish_comms(rank);
620 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
621 finished = target_win->finish_comms(rank_);
622 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
626 int Win::unlock_all(){
628 int retval = MPI_SUCCESS;
629 for (i=0; i<comm_->size();i++){
630 int ret = this->unlock(i);
631 if (ret != MPI_SUCCESS)
637 int Win::flush(int rank){
638 MPI_Win target_win = connected_wins_[rank];
639 int finished = finish_comms(rank_);
640 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
641 finished = target_win->finish_comms(rank);
642 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
646 int Win::flush_local(int rank){
647 int finished = finish_comms(rank);
648 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
652 int Win::flush_all(){
653 int finished = finish_comms();
654 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
655 for (int i = 0; i < comm_->size(); i++) {
656 finished = connected_wins_[i]->finish_comms(rank_);
657 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
662 int Win::flush_local_all(){
663 int finished = finish_comms();
664 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
668 Win* Win::f2c(int id){
669 return static_cast<Win*>(F2C::f2c(id));
672 int Win::finish_comms(){
674 //Finish own requests
675 std::vector<MPI_Request> *reqqs = requests_;
676 int size = static_cast<int>(reqqs->size());
678 MPI_Request* treqs = &(*reqqs)[0];
679 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
686 int Win::finish_comms(int rank){
688 //Finish own requests
689 std::vector<MPI_Request> *reqqs = requests_;
690 int size = static_cast<int>(reqqs->size());
693 std::vector<MPI_Request> myreqqs;
694 std::vector<MPI_Request>::iterator iter = reqqs->begin();
695 int proc_id = comm_->group()->actor(rank)->get_pid();
696 while (iter != reqqs->end()){
697 // Let's see if we're either the destination or the sender of this request
698 // because we only wait for requests that we are responsible for.
699 // Also use the process id here since the request itself returns from src()
700 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
701 if (((*iter) != MPI_REQUEST_NULL) && (((*iter)->src() == proc_id) || ((*iter)->dst() == proc_id))) {
702 myreqqs.push_back(*iter);
703 iter = reqqs->erase(iter);
710 MPI_Request* treqs = &myreqqs[0];
711 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
719 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr)
721 MPI_Win target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
722 for (int i = 0; not target_win && i < comm_->size(); i++) {
723 if (connected_wins_[i]->size_ > 0)
724 target_win = connected_wins_[i];
727 *size = target_win->size_;
728 *disp_unit = target_win->disp_unit_;
729 *static_cast<void**>(baseptr) = target_win->base_;
732 *static_cast<void**>(baseptr) = xbt_malloc(0);
737 MPI_Errhandler Win::errhandler()
742 void Win::set_errhandler(MPI_Errhandler errhandler)
744 errhandler_ = errhandler;
745 if (errhandler_ != MPI_ERRHANDLER_NULL)
749 } // namespace simgrid