1 /* Copyright (c) 2007-2017. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
14 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
15 int Win::keyval_id_=0;
17 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated){
18 int comm_size = comm->size();
20 XBT_DEBUG("Creating window");
21 if(info!=MPI_INFO_NULL)
25 group_ = MPI_GROUP_NULL;
26 requests_ = new std::vector<MPI_Request>();
27 mut_=xbt_mutex_init();
28 lock_mut_=xbt_mutex_init();
29 connected_wins_ = new MPI_Win[comm_size];
30 connected_wins_[rank_] = this;
33 bar_ = MSG_barrier_init(comm_size);
37 comm->add_rma_win(this);
39 Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
42 Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
48 //As per the standard, perform a barrier to ensure every async comm is finished
49 MSG_barrier_wait(bar_);
51 int finished = finish_comms();
52 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
55 delete[] connected_wins_;
56 if (name_ != nullptr){
59 if(info_!=MPI_INFO_NULL){
60 MPI_Info_free(&info_);
63 comm_->remove_rma_win(this);
65 Colls::barrier(comm_);
66 int rank=comm_->rank();
68 MSG_barrier_destroy(bar_);
69 xbt_mutex_destroy(mut_);
70 xbt_mutex_destroy(lock_mut_);
78 int Win::attach (void *base, MPI_Aint size){
79 if (!(base_ == MPI_BOTTOM || base_ == 0))
81 base_=0;//actually the address will be given in the RMA calls, as being the disp.
86 int Win::detach (void *base){
92 void Win::get_name(char* name, int* length){
98 *length = strlen(name_);
99 strncpy(name, name_, *length+1);
102 void Win::get_group(MPI_Group* group){
103 if(comm_ != MPI_COMM_NULL){
104 *group = comm_->group();
106 *group = MPI_GROUP_NULL;
110 MPI_Info Win::info(){
111 if(info_== MPI_INFO_NULL)
121 MPI_Aint Win::size(){
129 int Win::disp_unit(){
133 void Win::set_info(MPI_Info info){
134 if(info_!= MPI_INFO_NULL)
139 void Win::set_name(char* name){
140 name_ = xbt_strdup(name);
143 int Win::fence(int assert)
145 XBT_DEBUG("Entering fence");
148 if (assert != MPI_MODE_NOPRECEDE) {
149 // This is not the first fence => finalize what came before
150 MSG_barrier_wait(bar_);
151 xbt_mutex_acquire(mut_);
152 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
153 // Without this, the vector could get redimensionned when another process pushes.
154 // This would result in the array used by Request::waitall() to be invalidated.
155 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
156 std::vector<MPI_Request> *reqs = requests_;
157 int size = static_cast<int>(reqs->size());
158 // start all requests that have been prepared by another process
160 for (const auto& req : *reqs) {
161 if (req && (req->flags() & PREPARED))
165 MPI_Request* treqs = &(*reqs)[0];
167 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
170 xbt_mutex_release(mut_);
174 MSG_barrier_wait(bar_);
175 XBT_DEBUG("Leaving fence");
180 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
181 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
183 //get receiver pointer
184 MPI_Win recv_win = connected_wins_[target_rank];
186 if(opened_==0){//check that post/start has been done
187 // no fence or start .. lock ok ?
189 for(auto it : recv_win->lockers_)
190 if (it == comm_->rank())
196 if(target_count*target_datatype->get_extent()>recv_win->size_)
199 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
200 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
202 if(target_rank != comm_->rank()){
203 //prepare send_request
204 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
205 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
207 //prepare receiver request
208 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
209 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
211 //push request to receiver's win
212 xbt_mutex_acquire(recv_win->mut_);
213 recv_win->requests_->push_back(rreq);
214 xbt_mutex_release(recv_win->mut_);
218 //push request to sender's win
219 xbt_mutex_acquire(mut_);
220 requests_->push_back(sreq);
221 xbt_mutex_release(mut_);
223 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
229 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
230 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
233 MPI_Win send_win = connected_wins_[target_rank];
235 if(opened_==0){//check that post/start has been done
236 // no fence or start .. lock ok ?
238 for(auto it : send_win->lockers_)
239 if (it == comm_->rank())
245 if(target_count*target_datatype->get_extent()>send_win->size_)
248 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
249 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
251 if(target_rank != comm_->rank()){
252 //prepare send_request
253 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
254 comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
257 //prepare receiver request
258 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
259 comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
262 //start the send, with another process than us as sender.
264 //push request to receiver's win
265 xbt_mutex_acquire(send_win->mut_);
266 send_win->requests_->push_back(sreq);
267 xbt_mutex_release(send_win->mut_);
271 //push request to sender's win
272 xbt_mutex_acquire(mut_);
273 requests_->push_back(rreq);
274 xbt_mutex_release(mut_);
276 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
283 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
284 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op)
287 //get receiver pointer
288 MPI_Win recv_win = connected_wins_[target_rank];
290 if(opened_==0){//check that post/start has been done
291 // no fence or start .. lock ok ?
293 for(auto it : recv_win->lockers_)
294 if (it == comm_->rank())
299 //FIXME: local version
301 if(target_count*target_datatype->get_extent()>recv_win->size_)
304 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
305 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
306 //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
307 //prepare send_request
309 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
310 smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, comm_, op);
312 //prepare receiver request
313 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
314 smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, recv_win->comm_, op);
317 //push request to receiver's win
318 xbt_mutex_acquire(recv_win->mut_);
319 recv_win->requests_->push_back(rreq);
320 xbt_mutex_release(recv_win->mut_);
324 //push request to sender's win
325 xbt_mutex_acquire(mut_);
326 requests_->push_back(sreq);
327 xbt_mutex_release(mut_);
332 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr,
333 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count,
334 MPI_Datatype target_datatype, MPI_Op op){
337 MPI_Win send_win = connected_wins_[target_rank];
339 if(opened_==0){//check that post/start has been done
340 // no fence or start .. lock ok ?
342 for(auto it : send_win->lockers_)
343 if (it == comm_->rank())
349 if(target_count*target_datatype->get_extent()>send_win->size_)
352 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
354 get(result_addr, result_count, result_datatype, target_rank,
355 target_disp, target_count, target_datatype);
356 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
357 target_disp, target_count, target_datatype, op);
363 int Win::start(MPI_Group group, int assert){
364 /* From MPI forum advices
365 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
366 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
367 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
368 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
369 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
370 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
371 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
372 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
373 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
374 must complete, without further dependencies. */
376 //naive, blocking implementation.
379 int size = group->size();
380 MPI_Request* reqs = xbt_new0(MPI_Request, size);
383 int src = group->index(j);
384 if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
385 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
391 Request::startall(size, reqs);
392 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
394 Request::unref(&reqs[i]);
397 opened_++; //we're open for business !
403 int Win::post(MPI_Group group, int assert){
404 //let's make a synchronous send here
407 int size = group->size();
408 MPI_Request* reqs = xbt_new0(MPI_Request, size);
411 int dst=group->index(j);
412 if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
413 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
420 Request::startall(size, reqs);
421 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
423 Request::unref(&reqs[i]);
426 opened_++; //we're open for business !
434 xbt_die("Complete called on already opened MPI_Win");
436 XBT_DEBUG("Entering MPI_Win_Complete");
439 int size = group_->size();
440 MPI_Request* reqs = xbt_new0(MPI_Request, size);
443 int dst=group_->index(j);
444 if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
445 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
451 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
452 Request::startall(size, reqs);
453 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
456 Request::unref(&reqs[i]);
460 int finished = finish_comms();
461 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
463 Group::unref(group_);
464 opened_--; //we're closed for business !
469 //naive, blocking implementation.
470 XBT_DEBUG("Entering MPI_Win_Wait");
473 int size = group_->size();
474 MPI_Request* reqs = xbt_new0(MPI_Request, size);
477 int src=group_->index(j);
478 if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
479 reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
485 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
486 Request::startall(size, reqs);
487 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
489 Request::unref(&reqs[i]);
492 int finished = finish_comms();
493 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
495 Group::unref(group_);
496 opened_--; //we're opened for business !
500 int Win::lock(int lock_type, int rank, int assert){
504 MPI_Win target_win = connected_wins_[rank];
506 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
507 xbt_mutex_acquire(target_win->lock_mut_);
508 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
509 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
510 xbt_mutex_release(target_win->lock_mut_);
512 } else if(!(target_win->mode_==MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
513 target_win->mode_+= lock_type; // don't set to exclusive if it's already shared
515 target_win->lockers_.push_back(comm_->rank());
517 int finished = finish_comms();
518 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
523 int Win::unlock(int rank){
527 MPI_Win target_win = connected_wins_[rank];
528 int target_mode = target_win->mode_;
529 target_win->mode_= 0;
530 target_win->lockers_.remove(comm_->rank());
531 if (target_mode==MPI_LOCK_EXCLUSIVE){
532 xbt_mutex_release(target_win->lock_mut_);
535 int finished = finish_comms();
536 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
541 Win* Win::f2c(int id){
542 return static_cast<Win*>(F2C::f2c(id));
546 int Win::finish_comms(){
547 xbt_mutex_acquire(mut_);
548 //Finish own requests
549 std::vector<MPI_Request> *reqqs = requests_;
550 int size = static_cast<int>(reqqs->size());
552 // start all requests that have been prepared by another process
553 for (const auto& req : *reqqs) {
554 if (req && (req->flags() & PREPARED))
558 MPI_Request* treqs = &(*reqqs)[0];
559 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
562 xbt_mutex_release(mut_);