1 /* Copyright (c) 2007-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
15 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm){
16 int comm_size = comm->size();
17 int rank = comm->rank();
18 XBT_DEBUG("Creating window");
19 if(info!=MPI_INFO_NULL)
23 group_ = MPI_GROUP_NULL;
24 requests_ = new std::vector<MPI_Request>();
25 mut_=xbt_mutex_init();
26 connected_wins_ = new MPI_Win[comm_size];
27 connected_wins_[rank] = this;
30 bar_ = MSG_barrier_init(comm_size);
32 mpi_coll_allgather_fun(&(connected_wins_[rank]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
35 mpi_coll_bcast_fun(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
37 mpi_coll_barrier_fun(comm);
41 //As per the standard, perform a barrier to ensure every async comm is finished
42 MSG_barrier_wait(bar_);
43 xbt_mutex_acquire(mut_);
45 xbt_mutex_release(mut_);
46 delete[] connected_wins_;
47 if (name_ != nullptr){
50 if(info_!=MPI_INFO_NULL){
51 MPI_Info_free(&info_);
54 mpi_coll_barrier_fun(comm_);
55 int rank=comm_->rank();
57 MSG_barrier_destroy(bar_);
58 xbt_mutex_destroy(mut_);
61 void Win::get_name(char* name, int* length){
67 *length = strlen(name_);
68 strncpy(name, name_, *length+1);
71 void Win::get_group(MPI_Group* group){
72 if(comm_ != MPI_COMM_NULL){
73 *group = comm_->group();
75 *group = MPI_GROUP_NULL;
79 void Win::set_name(char* name){
80 name_ = xbt_strdup(name);
83 int Win::fence(int assert)
85 XBT_DEBUG("Entering fence");
88 if (assert != MPI_MODE_NOPRECEDE) {
89 // This is not the first fence => finalize what came before
90 MSG_barrier_wait(bar_);
91 xbt_mutex_acquire(mut_);
92 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
93 // Without this, the vector could get redimensionned when another process pushes.
94 // This would result in the array used by Request::waitall() to be invalidated.
95 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
96 std::vector<MPI_Request> *reqs = requests_;
97 int size = static_cast<int>(reqs->size());
98 // start all requests that have been prepared by another process
100 for (const auto& req : *reqs) {
101 if (req && (req->flags() & PREPARED))
105 MPI_Request* treqs = &(*reqs)[0];
107 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
110 xbt_mutex_release(mut_);
114 MSG_barrier_wait(bar_);
115 XBT_DEBUG("Leaving fence");
120 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
121 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
123 if(opened_==0)//check that post/start has been done
125 //get receiver pointer
126 MPI_Win recv_win = connected_wins_[target_rank];
128 if(target_count*target_datatype->get_extent()>recv_win->size_)
131 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
132 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
134 if(target_rank != comm_->rank()){
135 //prepare send_request
136 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
137 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
139 //prepare receiver request
140 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
141 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
143 //push request to receiver's win
144 xbt_mutex_acquire(recv_win->mut_);
145 recv_win->requests_->push_back(rreq);
146 xbt_mutex_release(recv_win->mut_);
150 //push request to sender's win
151 xbt_mutex_acquire(mut_);
152 requests_->push_back(sreq);
153 xbt_mutex_release(mut_);
155 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
161 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
162 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
164 if(opened_==0)//check that post/start has been done
167 MPI_Win send_win = connected_wins_[target_rank];
169 if(target_count*target_datatype->get_extent()>send_win->size_)
172 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
173 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
175 if(target_rank != comm_->rank()){
176 //prepare send_request
177 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
178 comm_->group()->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, send_win->comm_,
181 //prepare receiver request
182 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
183 comm_->group()->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, comm_,
186 //start the send, with another process than us as sender.
188 //push request to receiver's win
189 xbt_mutex_acquire(send_win->mut_);
190 send_win->requests_->push_back(sreq);
191 xbt_mutex_release(send_win->mut_);
195 //push request to sender's win
196 xbt_mutex_acquire(mut_);
197 requests_->push_back(rreq);
198 xbt_mutex_release(mut_);
200 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
207 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
208 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op)
210 if(opened_==0)//check that post/start has been done
212 //FIXME: local version
213 //get receiver pointer
214 MPI_Win recv_win = connected_wins_[target_rank];
216 if(target_count*target_datatype->get_extent()>recv_win->size_)
219 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
220 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
221 //As the tag will be used for ordering of the operations, add count to it
222 //prepare send_request
223 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
224 smpi_process_index(), comm_->group()->index(target_rank), SMPI_RMA_TAG+3+count_, comm_, op);
226 //prepare receiver request
227 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
228 smpi_process_index(), comm_->group()->index(target_rank), SMPI_RMA_TAG+3+count_, recv_win->comm_, op);
231 //push request to receiver's win
232 xbt_mutex_acquire(recv_win->mut_);
233 recv_win->requests_->push_back(rreq);
234 xbt_mutex_release(recv_win->mut_);
238 //push request to sender's win
239 xbt_mutex_acquire(mut_);
240 requests_->push_back(sreq);
241 xbt_mutex_release(mut_);
246 int Win::start(MPI_Group group, int assert){
247 /* From MPI forum advices
248 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
249 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
250 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
251 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
252 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
253 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
254 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
255 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
256 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
257 must complete, without further dependencies. */
259 //naive, blocking implementation.
262 int size = group->size();
263 MPI_Request* reqs = xbt_new0(MPI_Request, size);
266 int src = group->index(j);
267 if (src != smpi_process_index() && src != MPI_UNDEFINED) {
268 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
274 Request::startall(size, reqs);
275 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
277 Request::unref(&reqs[i]);
280 opened_++; //we're open for business !
286 int Win::post(MPI_Group group, int assert){
287 //let's make a synchronous send here
290 int size = group->size();
291 MPI_Request* reqs = xbt_new0(MPI_Request, size);
294 int dst=group->index(j);
295 if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
296 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
303 Request::startall(size, reqs);
304 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
306 Request::unref(&reqs[i]);
309 opened_++; //we're open for business !
317 xbt_die("Complete called on already opened MPI_Win");
319 XBT_DEBUG("Entering MPI_Win_Complete");
322 int size = group_->size();
323 MPI_Request* reqs = xbt_new0(MPI_Request, size);
326 int dst=group_->index(j);
327 if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
328 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
334 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
335 Request::startall(size, reqs);
336 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
339 Request::unref(&reqs[i]);
343 //now we can finish RMA calls
344 xbt_mutex_acquire(mut_);
345 std::vector<MPI_Request> *reqqs = requests_;
346 size = static_cast<int>(reqqs->size());
348 XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
350 // start all requests that have been prepared by another process
351 for (const auto& req : *reqqs) {
352 if (req && (req->flags() & PREPARED))
356 MPI_Request* treqs = &(*reqqs)[0];
357 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
360 xbt_mutex_release(mut_);
362 Group::unref(group_);
363 opened_--; //we're closed for business !
368 //naive, blocking implementation.
369 XBT_DEBUG("Entering MPI_Win_Wait");
371 int size = group_->size();
372 MPI_Request* reqs = xbt_new0(MPI_Request, size);
375 int src=group_->index(j);
376 if(src!=smpi_process_index() && src!=MPI_UNDEFINED){
377 reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
383 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
384 Request::startall(size, reqs);
385 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
387 Request::unref(&reqs[i]);
390 xbt_mutex_acquire(mut_);
391 std::vector<MPI_Request> *reqqs = requests_;
392 size = static_cast<int>(reqqs->size());
394 XBT_DEBUG("Win_wait - Finishing %d RMA calls", size);
396 // start all requests that have been prepared by another process
397 for (const auto& req : *reqqs) {
398 if (req && (req->flags() & PREPARED))
402 MPI_Request* treqs = &(*reqqs)[0];
403 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
406 xbt_mutex_release(mut_);
408 Group::unref(group_);
409 opened_--; //we're opened for business !
413 Win* Win::f2c(int id){
414 return static_cast<Win*>(F2C::f2c(id));