1 /* Copyright (c) 2007-2017. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
14 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
15 int Win::keyval_id_=0;
17 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
18 int comm_size = comm->size();
20 XBT_DEBUG("Creating window");
21 if(info!=MPI_INFO_NULL)
25 group_ = MPI_GROUP_NULL;
26 requests_ = new std::vector<MPI_Request>();
27 mut_=xbt_mutex_init();
28 lock_mut_=xbt_mutex_init();
29 connected_wins_ = new MPI_Win[comm_size];
30 connected_wins_[rank_] = this;
33 bar_ = MSG_barrier_init(comm_size);
37 comm->add_rma_win(this);
39 Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
42 Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
48 //As per the standard, perform a barrier to ensure every async comm is finished
49 MSG_barrier_wait(bar_);
51 int finished = finish_comms();
52 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
55 delete[] connected_wins_;
56 if (name_ != nullptr){
59 if(info_!=MPI_INFO_NULL){
60 MPI_Info_free(&info_);
63 comm_->remove_rma_win(this);
65 Colls::barrier(comm_);
66 int rank=comm_->rank();
68 MSG_barrier_destroy(bar_);
69 xbt_mutex_destroy(mut_);
70 xbt_mutex_destroy(lock_mut_);
78 int Win::attach (void *base, MPI_Aint size){
79 if (!(base_ == MPI_BOTTOM || base_ == 0))
81 base_=0;//actually the address will be given in the RMA calls, as being the disp.
86 int Win::detach (void *base){
92 void Win::get_name(char* name, int* length){
98 *length = strlen(name_);
99 strncpy(name, name_, *length+1);
102 void Win::get_group(MPI_Group* group){
103 if(comm_ != MPI_COMM_NULL){
104 *group = comm_->group();
106 *group = MPI_GROUP_NULL;
110 MPI_Info Win::info(){
111 if(info_== MPI_INFO_NULL)
121 MPI_Aint Win::size(){
129 int Win::disp_unit(){
137 void Win::set_info(MPI_Info info){
138 if(info_!= MPI_INFO_NULL)
143 void Win::set_name(char* name){
144 name_ = xbt_strdup(name);
147 int Win::fence(int assert)
149 XBT_DEBUG("Entering fence");
152 if (assert != MPI_MODE_NOPRECEDE) {
153 // This is not the first fence => finalize what came before
154 MSG_barrier_wait(bar_);
155 xbt_mutex_acquire(mut_);
156 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
157 // Without this, the vector could get redimensionned when another process pushes.
158 // This would result in the array used by Request::waitall() to be invalidated.
159 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
160 std::vector<MPI_Request> *reqs = requests_;
161 int size = static_cast<int>(reqs->size());
162 // start all requests that have been prepared by another process
164 for (const auto& req : *reqs) {
165 if (req && (req->flags() & PREPARED))
169 MPI_Request* treqs = &(*reqs)[0];
171 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
174 xbt_mutex_release(mut_);
178 MSG_barrier_wait(bar_);
179 XBT_DEBUG("Leaving fence");
184 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
185 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
187 //get receiver pointer
188 MPI_Win recv_win = connected_wins_[target_rank];
190 if(opened_==0){//check that post/start has been done
191 // no fence or start .. lock ok ?
193 for(auto it : recv_win->lockers_)
194 if (it == comm_->rank())
200 if(target_count*target_datatype->get_extent()>recv_win->size_)
203 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
204 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
206 if(target_rank != comm_->rank()){
207 //prepare send_request
208 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
209 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
211 //prepare receiver request
212 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
213 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
215 //push request to receiver's win
216 xbt_mutex_acquire(recv_win->mut_);
217 recv_win->requests_->push_back(rreq);
218 xbt_mutex_release(recv_win->mut_);
222 //push request to sender's win
223 xbt_mutex_acquire(mut_);
224 requests_->push_back(sreq);
225 xbt_mutex_release(mut_);
227 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
233 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
234 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
237 MPI_Win send_win = connected_wins_[target_rank];
239 if(opened_==0){//check that post/start has been done
240 // no fence or start .. lock ok ?
242 for(auto it : send_win->lockers_)
243 if (it == comm_->rank())
249 if(target_count*target_datatype->get_extent()>send_win->size_)
252 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
253 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
255 if(target_rank != comm_->rank()){
256 //prepare send_request
257 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
258 comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
261 //prepare receiver request
262 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
263 comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
266 //start the send, with another process than us as sender.
268 //push request to receiver's win
269 xbt_mutex_acquire(send_win->mut_);
270 send_win->requests_->push_back(sreq);
271 xbt_mutex_release(send_win->mut_);
275 //push request to sender's win
276 xbt_mutex_acquire(mut_);
277 requests_->push_back(rreq);
278 xbt_mutex_release(mut_);
280 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
287 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
288 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op)
291 //get receiver pointer
292 MPI_Win recv_win = connected_wins_[target_rank];
294 if(opened_==0){//check that post/start has been done
295 // no fence or start .. lock ok ?
297 for(auto it : recv_win->lockers_)
298 if (it == comm_->rank())
303 //FIXME: local version
305 if(target_count*target_datatype->get_extent()>recv_win->size_)
308 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
309 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
310 //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
311 //prepare send_request
313 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
314 smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, comm_, op);
316 //prepare receiver request
317 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
318 smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, recv_win->comm_, op);
321 //push request to receiver's win
322 xbt_mutex_acquire(recv_win->mut_);
323 recv_win->requests_->push_back(rreq);
324 xbt_mutex_release(recv_win->mut_);
328 //push request to sender's win
329 xbt_mutex_acquire(mut_);
330 requests_->push_back(sreq);
331 xbt_mutex_release(mut_);
336 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr,
337 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count,
338 MPI_Datatype target_datatype, MPI_Op op){
341 MPI_Win send_win = connected_wins_[target_rank];
343 if(opened_==0){//check that post/start has been done
344 // no fence or start .. lock ok ?
346 for(auto it : send_win->lockers_)
347 if (it == comm_->rank())
353 if(target_count*target_datatype->get_extent()>send_win->size_)
356 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
358 get(result_addr, result_count, result_datatype, target_rank,
359 target_disp, target_count, target_datatype);
360 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
361 target_disp, target_count, target_datatype, op);
367 int Win::start(MPI_Group group, int assert){
368 /* From MPI forum advices
369 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
370 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
371 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
372 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
373 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
374 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
375 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
376 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
377 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
378 must complete, without further dependencies. */
380 //naive, blocking implementation.
383 int size = group->size();
384 MPI_Request* reqs = xbt_new0(MPI_Request, size);
387 int src = group->index(j);
388 if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
389 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
395 Request::startall(size, reqs);
396 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
398 Request::unref(&reqs[i]);
401 opened_++; //we're open for business !
407 int Win::post(MPI_Group group, int assert){
408 //let's make a synchronous send here
411 int size = group->size();
412 MPI_Request* reqs = xbt_new0(MPI_Request, size);
415 int dst=group->index(j);
416 if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
417 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
424 Request::startall(size, reqs);
425 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
427 Request::unref(&reqs[i]);
430 opened_++; //we're open for business !
438 xbt_die("Complete called on already opened MPI_Win");
440 XBT_DEBUG("Entering MPI_Win_Complete");
443 int size = group_->size();
444 MPI_Request* reqs = xbt_new0(MPI_Request, size);
447 int dst=group_->index(j);
448 if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
449 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
455 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
456 Request::startall(size, reqs);
457 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
460 Request::unref(&reqs[i]);
464 int finished = finish_comms();
465 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
467 Group::unref(group_);
468 opened_--; //we're closed for business !
473 //naive, blocking implementation.
474 XBT_DEBUG("Entering MPI_Win_Wait");
477 int size = group_->size();
478 MPI_Request* reqs = xbt_new0(MPI_Request, size);
481 int src=group_->index(j);
482 if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
483 reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
489 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
490 Request::startall(size, reqs);
491 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
493 Request::unref(&reqs[i]);
496 int finished = finish_comms();
497 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
499 Group::unref(group_);
500 opened_--; //we're opened for business !
504 int Win::lock(int lock_type, int rank, int assert){
508 MPI_Win target_win = connected_wins_[rank];
510 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
511 xbt_mutex_acquire(target_win->lock_mut_);
512 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
513 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
514 xbt_mutex_release(target_win->lock_mut_);
516 } else if(!(target_win->mode_==MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
517 target_win->mode_+= lock_type; // don't set to exclusive if it's already shared
519 target_win->lockers_.push_back(comm_->rank());
521 int finished = finish_comms();
522 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
527 int Win::unlock(int rank){
531 MPI_Win target_win = connected_wins_[rank];
532 int target_mode = target_win->mode_;
533 target_win->mode_= 0;
534 target_win->lockers_.remove(comm_->rank());
535 if (target_mode==MPI_LOCK_EXCLUSIVE){
536 xbt_mutex_release(target_win->lock_mut_);
539 int finished = finish_comms();
540 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
545 Win* Win::f2c(int id){
546 return static_cast<Win*>(F2C::f2c(id));
550 int Win::finish_comms(){
551 xbt_mutex_acquire(mut_);
552 //Finish own requests
553 std::vector<MPI_Request> *reqqs = requests_;
554 int size = static_cast<int>(reqqs->size());
556 // start all requests that have been prepared by another process
557 for (const auto& req : *reqqs) {
558 if (req && (req->flags() & PREPARED))
562 MPI_Request* treqs = &(*reqqs)[0];
563 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
566 xbt_mutex_release(mut_);