1 /* Copyright (c) 2007-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
14 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
15 int Win::keyval_id_=0;
17 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm){
18 int comm_size = comm->size();
20 XBT_DEBUG("Creating window");
21 if(info!=MPI_INFO_NULL)
25 group_ = MPI_GROUP_NULL;
26 requests_ = new std::vector<MPI_Request>();
27 mut_=xbt_mutex_init();
28 lock_mut_=xbt_mutex_init();
29 connected_wins_ = new MPI_Win[comm_size];
30 connected_wins_[rank_] = this;
33 bar_ = MSG_barrier_init(comm_size);
36 comm->add_rma_win(this);
38 Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
41 Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
47 //As per the standard, perform a barrier to ensure every async comm is finished
48 MSG_barrier_wait(bar_);
50 int finished = finish_comms();
51 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
54 delete[] connected_wins_;
55 if (name_ != nullptr){
58 if(info_!=MPI_INFO_NULL){
59 MPI_Info_free(&info_);
62 comm_->remove_rma_win(this);
64 Colls::barrier(comm_);
65 int rank=comm_->rank();
67 MSG_barrier_destroy(bar_);
68 xbt_mutex_destroy(mut_);
69 xbt_mutex_destroy(lock_mut_);
74 void Win::get_name(char* name, int* length){
80 *length = strlen(name_);
81 strncpy(name, name_, *length+1);
84 void Win::get_group(MPI_Group* group){
85 if(comm_ != MPI_COMM_NULL){
86 *group = comm_->group();
88 *group = MPI_GROUP_NULL;
104 int Win::disp_unit(){
109 void Win::set_name(char* name){
110 name_ = xbt_strdup(name);
113 int Win::fence(int assert)
115 XBT_DEBUG("Entering fence");
118 if (assert != MPI_MODE_NOPRECEDE) {
119 // This is not the first fence => finalize what came before
120 MSG_barrier_wait(bar_);
121 xbt_mutex_acquire(mut_);
122 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
123 // Without this, the vector could get redimensionned when another process pushes.
124 // This would result in the array used by Request::waitall() to be invalidated.
125 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
126 std::vector<MPI_Request> *reqs = requests_;
127 int size = static_cast<int>(reqs->size());
128 // start all requests that have been prepared by another process
130 for (const auto& req : *reqs) {
131 if (req && (req->flags() & PREPARED))
135 MPI_Request* treqs = &(*reqs)[0];
137 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
140 xbt_mutex_release(mut_);
144 MSG_barrier_wait(bar_);
145 XBT_DEBUG("Leaving fence");
150 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
151 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
153 //get receiver pointer
154 MPI_Win recv_win = connected_wins_[target_rank];
156 if(opened_==0){//check that post/start has been done
157 // no fence or start .. lock ok ?
159 for(auto it : recv_win->lockers_)
160 if (it == comm_->rank())
166 if(target_count*target_datatype->get_extent()>recv_win->size_)
169 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
170 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
172 if(target_rank != comm_->rank()){
173 //prepare send_request
174 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
175 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
177 //prepare receiver request
178 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
179 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
181 //push request to receiver's win
182 xbt_mutex_acquire(recv_win->mut_);
183 recv_win->requests_->push_back(rreq);
184 xbt_mutex_release(recv_win->mut_);
188 //push request to sender's win
189 xbt_mutex_acquire(mut_);
190 requests_->push_back(sreq);
191 xbt_mutex_release(mut_);
193 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
199 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
200 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
203 MPI_Win send_win = connected_wins_[target_rank];
205 if(opened_==0){//check that post/start has been done
206 // no fence or start .. lock ok ?
208 for(auto it : send_win->lockers_)
209 if (it == comm_->rank())
215 if(target_count*target_datatype->get_extent()>send_win->size_)
218 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
219 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
221 if(target_rank != comm_->rank()){
222 //prepare send_request
223 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
224 comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
227 //prepare receiver request
228 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
229 comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
232 //start the send, with another process than us as sender.
234 //push request to receiver's win
235 xbt_mutex_acquire(send_win->mut_);
236 send_win->requests_->push_back(sreq);
237 xbt_mutex_release(send_win->mut_);
241 //push request to sender's win
242 xbt_mutex_acquire(mut_);
243 requests_->push_back(rreq);
244 xbt_mutex_release(mut_);
246 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
253 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
254 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op)
257 //get receiver pointer
258 MPI_Win recv_win = connected_wins_[target_rank];
260 if(opened_==0){//check that post/start has been done
261 // no fence or start .. lock ok ?
263 for(auto it : recv_win->lockers_)
264 if (it == comm_->rank())
269 //FIXME: local version
271 if(target_count*target_datatype->get_extent()>recv_win->size_)
274 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
275 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
276 //As the tag will be used for ordering of the operations, add count to it
277 //prepare send_request
278 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
279 smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG+3+count_, comm_, op);
281 //prepare receiver request
282 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
283 smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG+3+count_, recv_win->comm_, op);
286 //push request to receiver's win
287 xbt_mutex_acquire(recv_win->mut_);
288 recv_win->requests_->push_back(rreq);
289 xbt_mutex_release(recv_win->mut_);
293 //push request to sender's win
294 xbt_mutex_acquire(mut_);
295 requests_->push_back(sreq);
296 xbt_mutex_release(mut_);
301 int Win::start(MPI_Group group, int assert){
302 /* From MPI forum advices
303 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
304 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
305 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
306 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
307 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
308 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
309 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
310 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
311 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
312 must complete, without further dependencies. */
314 //naive, blocking implementation.
317 int size = group->size();
318 MPI_Request* reqs = xbt_new0(MPI_Request, size);
321 int src = group->index(j);
322 if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
323 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
329 Request::startall(size, reqs);
330 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
332 Request::unref(&reqs[i]);
335 opened_++; //we're open for business !
341 int Win::post(MPI_Group group, int assert){
342 //let's make a synchronous send here
345 int size = group->size();
346 MPI_Request* reqs = xbt_new0(MPI_Request, size);
349 int dst=group->index(j);
350 if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
351 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
358 Request::startall(size, reqs);
359 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
361 Request::unref(&reqs[i]);
364 opened_++; //we're open for business !
372 xbt_die("Complete called on already opened MPI_Win");
374 XBT_DEBUG("Entering MPI_Win_Complete");
377 int size = group_->size();
378 MPI_Request* reqs = xbt_new0(MPI_Request, size);
381 int dst=group_->index(j);
382 if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
383 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
389 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
390 Request::startall(size, reqs);
391 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
394 Request::unref(&reqs[i]);
398 int finished = finish_comms();
399 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
401 Group::unref(group_);
402 opened_--; //we're closed for business !
407 //naive, blocking implementation.
408 XBT_DEBUG("Entering MPI_Win_Wait");
410 int size = group_->size();
411 MPI_Request* reqs = xbt_new0(MPI_Request, size);
414 int src=group_->index(j);
415 if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
416 reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
422 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
423 Request::startall(size, reqs);
424 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
426 Request::unref(&reqs[i]);
429 int finished = finish_comms();
430 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
432 Group::unref(group_);
433 opened_--; //we're opened for business !
437 int Win::lock(int lock_type, int rank, int assert){
438 MPI_Win target_win = connected_wins_[rank];
440 //window already locked, we have to wait
441 if (lock_type == MPI_LOCK_EXCLUSIVE){
442 XBT_DEBUG("Win_lock - Entering lock %d", rank);
443 xbt_mutex_acquire(target_win->lock_mut_);
444 XBT_DEBUG("Win_lock - Released from lock %d", rank);
447 xbt_mutex_acquire(target_win->mut_);
448 target_win->lockers_.push_back(comm_->rank());
449 xbt_mutex_release(target_win->mut_);
451 int finished = finish_comms();
452 XBT_DEBUG("Win_lock - Finished %d RMA calls", finished);
457 int Win::unlock(int rank){
458 MPI_Win target_win = connected_wins_[rank];
460 xbt_mutex_acquire(target_win->mut_);
461 int size=target_win->lockers_.size();
462 target_win->lockers_.remove(comm_->rank());
465 if (size<=1){//0 or 1 lockers -> exclusive assumed
466 xbt_mutex_try_acquire(target_win->lock_mut_);
467 xbt_mutex_release(target_win->lock_mut_);
469 xbt_mutex_release(target_win->mut_);
470 int finished = finish_comms();
471 XBT_DEBUG("Win_unlock - Finished %d RMA calls", finished);
476 Win* Win::f2c(int id){
477 return static_cast<Win*>(F2C::f2c(id));
481 int Win::finish_comms(){
482 xbt_mutex_acquire(mut_);
483 //Finish own requests
484 std::vector<MPI_Request> *reqqs = requests_;
485 int size = static_cast<int>(reqqs->size());
487 // start all requests that have been prepared by another process
488 for (const auto& req : *reqqs) {
489 if (req && (req->flags() & PREPARED))
493 MPI_Request* treqs = &(*reqqs)[0];
494 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
497 xbt_mutex_release(mut_);