1 /* Copyright (c) 2007-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "src/smpi/smpi_coll.hpp"
7 #include "src/smpi/smpi_datatype.hpp"
8 #include "src/smpi/smpi_info.hpp"
9 #include "src/smpi/smpi_keyvals.hpp"
10 #include "src/smpi/smpi_process.hpp"
11 #include "src/smpi/smpi_request.hpp"
12 #include "src/smpi/smpi_win.hpp"
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
18 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
19 int Win::keyval_id_=0;
21 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
22 int comm_size = comm->size();
24 XBT_DEBUG("Creating window");
25 if(info!=MPI_INFO_NULL)
29 group_ = MPI_GROUP_NULL;
30 requests_ = new std::vector<MPI_Request>();
31 mut_=xbt_mutex_init();
32 lock_mut_=xbt_mutex_init();
33 atomic_mut_=xbt_mutex_init();
34 connected_wins_ = new MPI_Win[comm_size];
35 connected_wins_[rank_] = this;
38 bar_ = MSG_barrier_init(comm_size);
42 comm->add_rma_win(this);
44 Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
47 Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
53 //As per the standard, perform a barrier to ensure every async comm is finished
54 MSG_barrier_wait(bar_);
56 int finished = finish_comms();
57 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
60 delete[] connected_wins_;
61 if (name_ != nullptr){
64 if(info_!=MPI_INFO_NULL){
65 MPI_Info_free(&info_);
68 comm_->remove_rma_win(this);
70 Colls::barrier(comm_);
71 int rank=comm_->rank();
73 MSG_barrier_destroy(bar_);
74 xbt_mutex_destroy(mut_);
75 xbt_mutex_destroy(lock_mut_);
76 xbt_mutex_destroy(atomic_mut_);
84 int Win::attach (void *base, MPI_Aint size){
85 if (!(base_ == MPI_BOTTOM || base_ == 0))
87 base_=0;//actually the address will be given in the RMA calls, as being the disp.
92 int Win::detach (void *base){
98 void Win::get_name(char* name, int* length){
104 *length = strlen(name_);
105 strncpy(name, name_, *length+1);
108 void Win::get_group(MPI_Group* group){
109 if(comm_ != MPI_COMM_NULL){
110 *group = comm_->group();
112 *group = MPI_GROUP_NULL;
116 MPI_Info Win::info(){
117 if(info_== MPI_INFO_NULL)
127 MPI_Aint Win::size(){
135 int Win::disp_unit(){
143 void Win::set_info(MPI_Info info){
144 if(info_!= MPI_INFO_NULL)
149 void Win::set_name(char* name){
150 name_ = xbt_strdup(name);
153 int Win::fence(int assert)
155 XBT_DEBUG("Entering fence");
158 if (assert != MPI_MODE_NOPRECEDE) {
159 // This is not the first fence => finalize what came before
160 MSG_barrier_wait(bar_);
161 xbt_mutex_acquire(mut_);
162 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
163 // Without this, the vector could get redimensionned when another process pushes.
164 // This would result in the array used by Request::waitall() to be invalidated.
165 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
166 std::vector<MPI_Request> *reqs = requests_;
167 int size = static_cast<int>(reqs->size());
168 // start all requests that have been prepared by another process
170 MPI_Request* treqs = &(*reqs)[0];
171 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
174 xbt_mutex_release(mut_);
177 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
181 MSG_barrier_wait(bar_);
182 XBT_DEBUG("Leaving fence");
187 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
188 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
190 //get receiver pointer
191 MPI_Win recv_win = connected_wins_[target_rank];
193 if(opened_==0){//check that post/start has been done
194 // no fence or start .. lock ok ?
196 for(auto it : recv_win->lockers_)
197 if (it == comm_->rank())
203 if(target_count*target_datatype->get_extent()>recv_win->size_)
206 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
207 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
209 if(target_rank != comm_->rank()){
210 //prepare send_request
211 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
212 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
214 //prepare receiver request
215 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
216 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
221 if(request!=nullptr){
224 xbt_mutex_acquire(mut_);
225 requests_->push_back(sreq);
226 xbt_mutex_release(mut_);
229 //push request to receiver's win
230 xbt_mutex_acquire(recv_win->mut_);
231 recv_win->requests_->push_back(rreq);
233 xbt_mutex_release(recv_win->mut_);
236 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
238 *request = MPI_REQUEST_NULL;
244 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
245 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
248 MPI_Win send_win = connected_wins_[target_rank];
250 if(opened_==0){//check that post/start has been done
251 // no fence or start .. lock ok ?
253 for(auto it : send_win->lockers_)
254 if (it == comm_->rank())
260 if(target_count*target_datatype->get_extent()>send_win->size_)
263 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
264 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
266 if(target_rank != comm_->rank()){
267 //prepare send_request
268 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
269 comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
272 //prepare receiver request
273 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
274 comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
277 //start the send, with another process than us as sender.
279 //push request to receiver's win
280 xbt_mutex_acquire(send_win->mut_);
281 send_win->requests_->push_back(sreq);
282 xbt_mutex_release(send_win->mut_);
287 if(request!=nullptr){
290 xbt_mutex_acquire(mut_);
291 requests_->push_back(rreq);
292 xbt_mutex_release(mut_);
296 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
298 *request=MPI_REQUEST_NULL;
305 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
306 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
309 //get receiver pointer
310 MPI_Win recv_win = connected_wins_[target_rank];
312 if(opened_==0){//check that post/start has been done
313 // no fence or start .. lock ok ?
315 for(auto it : recv_win->lockers_)
316 if (it == comm_->rank())
321 //FIXME: local version
323 if(target_count*target_datatype->get_extent()>recv_win->size_)
326 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
327 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
328 //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
329 //prepare send_request
331 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
332 smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, comm_, op);
334 //prepare receiver request
335 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
336 smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, recv_win->comm_, op);
342 //push request to receiver's win
343 xbt_mutex_acquire(recv_win->mut_);
344 recv_win->requests_->push_back(rreq);
346 xbt_mutex_release(recv_win->mut_);
348 if(request!=nullptr){
351 xbt_mutex_acquire(mut_);
352 requests_->push_back(sreq);
353 xbt_mutex_release(mut_);
359 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr,
360 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count,
361 MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request){
364 MPI_Win send_win = connected_wins_[target_rank];
366 if(opened_==0){//check that post/start has been done
367 // no fence or start .. lock ok ?
369 for(auto it : send_win->lockers_)
370 if (it == comm_->rank())
376 if(target_count*target_datatype->get_extent()>send_win->size_)
379 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
380 //need to be sure ops are correctly ordered, so finish request here ? slow.
382 xbt_mutex_acquire(send_win->atomic_mut_);
383 get(result_addr, result_count, result_datatype, target_rank,
384 target_disp, target_count, target_datatype, &req);
385 if (req != MPI_REQUEST_NULL)
386 Request::wait(&req, MPI_STATUS_IGNORE);
388 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
389 target_disp, target_count, target_datatype, op, &req);
390 if (req != MPI_REQUEST_NULL)
391 Request::wait(&req, MPI_STATUS_IGNORE);
392 xbt_mutex_release(send_win->atomic_mut_);
397 int Win::compare_and_swap(void *origin_addr, void *compare_addr,
398 void *result_addr, MPI_Datatype datatype, int target_rank,
399 MPI_Aint target_disp){
401 MPI_Win send_win = connected_wins_[target_rank];
403 if(opened_==0){//check that post/start has been done
404 // no fence or start .. lock ok ?
406 for(auto it : send_win->lockers_)
407 if (it == comm_->rank())
413 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
415 xbt_mutex_acquire(send_win->atomic_mut_);
416 get(result_addr, 1, datatype, target_rank,
417 target_disp, 1, datatype, &req);
418 if (req != MPI_REQUEST_NULL)
419 Request::wait(&req, MPI_STATUS_IGNORE);
420 if(! memcmp (result_addr, compare_addr, datatype->get_extent() )){
421 put(origin_addr, 1, datatype, target_rank,
422 target_disp, 1, datatype);
424 xbt_mutex_release(send_win->atomic_mut_);
428 int Win::start(MPI_Group group, int assert){
429 /* From MPI forum advices
430 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
431 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
432 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
433 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
434 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
435 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
436 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
437 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
438 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
439 must complete, without further dependencies. */
441 //naive, blocking implementation.
444 int size = group->size();
445 MPI_Request* reqs = xbt_new0(MPI_Request, size);
448 int src = group->index(j);
449 if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
450 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
456 Request::startall(size, reqs);
457 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
459 Request::unref(&reqs[i]);
462 opened_++; //we're open for business !
468 int Win::post(MPI_Group group, int assert){
469 //let's make a synchronous send here
472 int size = group->size();
473 MPI_Request* reqs = xbt_new0(MPI_Request, size);
476 int dst=group->index(j);
477 if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
478 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
485 Request::startall(size, reqs);
486 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
488 Request::unref(&reqs[i]);
491 opened_++; //we're open for business !
499 xbt_die("Complete called on already opened MPI_Win");
501 XBT_DEBUG("Entering MPI_Win_Complete");
504 int size = group_->size();
505 MPI_Request* reqs = xbt_new0(MPI_Request, size);
508 int dst=group_->index(j);
509 if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
510 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
516 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
517 Request::startall(size, reqs);
518 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
521 Request::unref(&reqs[i]);
525 int finished = finish_comms();
526 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
528 Group::unref(group_);
529 opened_--; //we're closed for business !
534 //naive, blocking implementation.
535 XBT_DEBUG("Entering MPI_Win_Wait");
538 int size = group_->size();
539 MPI_Request* reqs = xbt_new0(MPI_Request, size);
542 int src=group_->index(j);
543 if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
544 reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
550 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
551 Request::startall(size, reqs);
552 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
554 Request::unref(&reqs[i]);
557 int finished = finish_comms();
558 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
560 Group::unref(group_);
561 opened_--; //we're opened for business !
565 int Win::lock(int lock_type, int rank, int assert){
566 MPI_Win target_win = connected_wins_[rank];
568 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
569 xbt_mutex_acquire(target_win->lock_mut_);
570 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
571 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
572 xbt_mutex_release(target_win->lock_mut_);
574 } else if(!(target_win->mode_==MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
575 target_win->mode_+= lock_type; // don't set to exclusive if it's already shared
577 target_win->lockers_.push_back(comm_->rank());
579 int finished = finish_comms(rank);
580 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
581 finished = target_win->finish_comms(rank_);
582 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
586 int Win::lock_all(int assert){
588 int retval = MPI_SUCCESS;
589 for (i=0; i<comm_->size();i++){
590 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
591 if(ret != MPI_SUCCESS)
597 int Win::unlock(int rank){
598 MPI_Win target_win = connected_wins_[rank];
599 int target_mode = target_win->mode_;
600 target_win->mode_= 0;
601 target_win->lockers_.remove(comm_->rank());
602 if (target_mode==MPI_LOCK_EXCLUSIVE){
603 xbt_mutex_release(target_win->lock_mut_);
606 int finished = finish_comms(rank);
607 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
608 finished = target_win->finish_comms(rank_);
609 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
613 int Win::unlock_all(){
615 int retval = MPI_SUCCESS;
616 for (i=0; i<comm_->size();i++){
617 int ret = this->unlock(i);
618 if(ret != MPI_SUCCESS)
624 int Win::flush(int rank){
625 MPI_Win target_win = connected_wins_[rank];
626 int finished = finish_comms(rank);
627 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
628 finished = target_win->finish_comms(rank_);
629 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
633 int Win::flush_local(int rank){
634 int finished = finish_comms(rank);
635 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
639 int Win::flush_all(){
642 finished = finish_comms();
643 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
644 for (i=0; i<comm_->size();i++){
645 finished = connected_wins_[i]->finish_comms(rank_);
646 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
651 int Win::flush_local_all(){
652 int finished = finish_comms();
653 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
657 Win* Win::f2c(int id){
658 return static_cast<Win*>(F2C::f2c(id));
662 int Win::finish_comms(){
663 xbt_mutex_acquire(mut_);
664 //Finish own requests
665 std::vector<MPI_Request> *reqqs = requests_;
666 int size = static_cast<int>(reqqs->size());
668 MPI_Request* treqs = &(*reqqs)[0];
669 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
672 xbt_mutex_release(mut_);
676 int Win::finish_comms(int rank){
677 xbt_mutex_acquire(mut_);
678 //Finish own requests
679 std::vector<MPI_Request> *reqqs = requests_;
680 int size = static_cast<int>(reqqs->size());
683 std::vector<MPI_Request>* myreqqs = new std::vector<MPI_Request>();
684 std::vector<MPI_Request>::iterator iter = reqqs->begin();
685 while (iter != reqqs->end()){
686 if(((*iter)!=MPI_REQUEST_NULL) && (((*iter)->src() == rank) || ((*iter)->dst() == rank))){
687 myreqqs->push_back(*iter);
688 iter = reqqs->erase(iter);
695 MPI_Request* treqs = &(*myreqqs)[0];
696 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
701 xbt_mutex_release(mut_);