1 /* Copyright (c) 2007-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
14 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
15 int Win::keyval_id_=0;
17 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm){
18 int comm_size = comm->size();
20 XBT_DEBUG("Creating window");
21 if(info!=MPI_INFO_NULL)
25 group_ = MPI_GROUP_NULL;
26 requests_ = new std::vector<MPI_Request>();
27 mut_=xbt_mutex_init();
28 lock_mut_=xbt_mutex_init();
29 connected_wins_ = new MPI_Win[comm_size];
30 connected_wins_[rank_] = this;
33 bar_ = MSG_barrier_init(comm_size);
36 comm->add_rma_win(this);
38 Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
41 Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
47 //As per the standard, perform a barrier to ensure every async comm is finished
48 MSG_barrier_wait(bar_);
50 int finished = finish_comms();
51 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
54 delete[] connected_wins_;
55 if (name_ != nullptr){
58 if(info_!=MPI_INFO_NULL){
59 MPI_Info_free(&info_);
62 comm_->remove_rma_win(this);
64 Colls::barrier(comm_);
65 int rank=comm_->rank();
67 MSG_barrier_destroy(bar_);
68 xbt_mutex_destroy(mut_);
69 xbt_mutex_destroy(lock_mut_);
74 void Win::get_name(char* name, int* length){
80 *length = strlen(name_);
81 strncpy(name, name_, *length+1);
84 void Win::get_group(MPI_Group* group){
85 if(comm_ != MPI_COMM_NULL){
86 *group = comm_->group();
88 *group = MPI_GROUP_NULL;
104 int Win::disp_unit(){
109 void Win::set_name(char* name){
110 name_ = xbt_strdup(name);
113 int Win::fence(int assert)
115 XBT_DEBUG("Entering fence");
118 if (assert != MPI_MODE_NOPRECEDE) {
119 // This is not the first fence => finalize what came before
120 MSG_barrier_wait(bar_);
121 xbt_mutex_acquire(mut_);
122 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
123 // Without this, the vector could get redimensionned when another process pushes.
124 // This would result in the array used by Request::waitall() to be invalidated.
125 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
126 std::vector<MPI_Request> *reqs = requests_;
127 int size = static_cast<int>(reqs->size());
128 // start all requests that have been prepared by another process
130 for (const auto& req : *reqs) {
131 if (req && (req->flags() & PREPARED))
135 MPI_Request* treqs = &(*reqs)[0];
137 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
140 xbt_mutex_release(mut_);
144 MSG_barrier_wait(bar_);
145 XBT_DEBUG("Leaving fence");
150 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
151 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
153 //get receiver pointer
154 MPI_Win recv_win = connected_wins_[target_rank];
156 if(opened_==0){//check that post/start has been done
157 // no fence or start .. lock ok ?
159 for(auto it : recv_win->lockers_)
160 if (it == comm_->rank())
166 if(target_count*target_datatype->get_extent()>recv_win->size_)
169 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
170 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
172 if(target_rank != comm_->rank()){
173 //prepare send_request
174 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
175 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
177 //prepare receiver request
178 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
179 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
181 //push request to receiver's win
182 xbt_mutex_acquire(recv_win->mut_);
183 recv_win->requests_->push_back(rreq);
184 xbt_mutex_release(recv_win->mut_);
188 //push request to sender's win
189 xbt_mutex_acquire(mut_);
190 requests_->push_back(sreq);
191 xbt_mutex_release(mut_);
193 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
199 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
200 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
203 MPI_Win send_win = connected_wins_[target_rank];
205 if(opened_==0){//check that post/start has been done
206 // no fence or start .. lock ok ?
208 for(auto it : send_win->lockers_)
209 if (it == comm_->rank())
215 if(target_count*target_datatype->get_extent()>send_win->size_)
218 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
219 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
221 if(target_rank != comm_->rank()){
222 //prepare send_request
223 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
224 comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
227 //prepare receiver request
228 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
229 comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
232 //start the send, with another process than us as sender.
234 //push request to receiver's win
235 xbt_mutex_acquire(send_win->mut_);
236 send_win->requests_->push_back(sreq);
237 xbt_mutex_release(send_win->mut_);
241 //push request to sender's win
242 xbt_mutex_acquire(mut_);
243 requests_->push_back(rreq);
244 xbt_mutex_release(mut_);
246 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
253 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
254 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op)
257 //get receiver pointer
258 MPI_Win recv_win = connected_wins_[target_rank];
260 if(opened_==0){//check that post/start has been done
261 // no fence or start .. lock ok ?
263 for(auto it : recv_win->lockers_)
264 if (it == comm_->rank())
269 //FIXME: local version
271 if(target_count*target_datatype->get_extent()>recv_win->size_)
274 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
275 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
276 //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
277 //prepare send_request
279 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
280 smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, comm_, op);
282 //prepare receiver request
283 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
284 smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, recv_win->comm_, op);
287 //push request to receiver's win
288 xbt_mutex_acquire(recv_win->mut_);
289 recv_win->requests_->push_back(rreq);
290 xbt_mutex_release(recv_win->mut_);
294 //push request to sender's win
295 xbt_mutex_acquire(mut_);
296 requests_->push_back(sreq);
297 xbt_mutex_release(mut_);
302 int Win::start(MPI_Group group, int assert){
303 /* From MPI forum advices
304 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
305 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
306 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
307 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
308 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
309 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
310 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
311 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
312 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
313 must complete, without further dependencies. */
315 //naive, blocking implementation.
318 int size = group->size();
319 MPI_Request* reqs = xbt_new0(MPI_Request, size);
322 int src = group->index(j);
323 if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
324 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
330 Request::startall(size, reqs);
331 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
333 Request::unref(&reqs[i]);
336 opened_++; //we're open for business !
342 int Win::post(MPI_Group group, int assert){
343 //let's make a synchronous send here
346 int size = group->size();
347 MPI_Request* reqs = xbt_new0(MPI_Request, size);
350 int dst=group->index(j);
351 if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
352 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
359 Request::startall(size, reqs);
360 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
362 Request::unref(&reqs[i]);
365 opened_++; //we're open for business !
373 xbt_die("Complete called on already opened MPI_Win");
375 XBT_DEBUG("Entering MPI_Win_Complete");
378 int size = group_->size();
379 MPI_Request* reqs = xbt_new0(MPI_Request, size);
382 int dst=group_->index(j);
383 if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
384 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
390 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
391 Request::startall(size, reqs);
392 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
395 Request::unref(&reqs[i]);
399 int finished = finish_comms();
400 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
402 Group::unref(group_);
403 opened_--; //we're closed for business !
408 //naive, blocking implementation.
409 XBT_DEBUG("Entering MPI_Win_Wait");
411 int size = group_->size();
412 MPI_Request* reqs = xbt_new0(MPI_Request, size);
415 int src=group_->index(j);
416 if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
417 reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
423 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
424 Request::startall(size, reqs);
425 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
427 Request::unref(&reqs[i]);
430 int finished = finish_comms();
431 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
433 Group::unref(group_);
434 opened_--; //we're opened for business !
438 int Win::lock(int lock_type, int rank, int assert){
439 MPI_Win target_win = connected_wins_[rank];
441 //window already locked, we have to wait
442 if (lock_type == MPI_LOCK_EXCLUSIVE){
443 XBT_DEBUG("Win_lock - Entering lock %d", rank);
444 xbt_mutex_acquire(target_win->lock_mut_);
445 XBT_DEBUG("Win_lock - Released from lock %d", rank);
448 xbt_mutex_acquire(target_win->mut_);
449 target_win->lockers_.push_back(comm_->rank());
450 xbt_mutex_release(target_win->mut_);
452 int finished = finish_comms();
453 XBT_DEBUG("Win_lock - Finished %d RMA calls", finished);
458 int Win::unlock(int rank){
459 MPI_Win target_win = connected_wins_[rank];
461 xbt_mutex_acquire(target_win->mut_);
462 int size=target_win->lockers_.size();
463 target_win->lockers_.remove(comm_->rank());
466 if (size<=1){//0 or 1 lockers -> exclusive assumed
467 xbt_mutex_try_acquire(target_win->lock_mut_);
468 xbt_mutex_release(target_win->lock_mut_);
470 xbt_mutex_release(target_win->mut_);
471 int finished = finish_comms();
472 XBT_DEBUG("Win_unlock - Finished %d RMA calls", finished);
477 Win* Win::f2c(int id){
478 return static_cast<Win*>(F2C::f2c(id));
482 int Win::finish_comms(){
483 xbt_mutex_acquire(mut_);
484 //Finish own requests
485 std::vector<MPI_Request> *reqqs = requests_;
486 int size = static_cast<int>(reqqs->size());
488 // start all requests that have been prepared by another process
489 for (const auto& req : *reqqs) {
490 if (req && (req->flags() & PREPARED))
494 MPI_Request* treqs = &(*reqqs)[0];
495 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
498 xbt_mutex_release(mut_);