1 /* Copyright (c) 2007-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
15 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm){
16 int comm_size = comm->size();
17 int rank = comm->rank();
18 XBT_DEBUG("Creating window");
19 if(info!=MPI_INFO_NULL)
23 group_ = MPI_GROUP_NULL;
24 requests_ = new std::vector<MPI_Request>();
25 mut_=xbt_mutex_init();
26 connected_wins_ = new MPI_Win[comm_size];
27 connected_wins_[rank] = this;
30 bar_ = MSG_barrier_init(comm_size);
32 mpi_coll_allgather_fun(&(connected_wins_[rank]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
35 mpi_coll_bcast_fun(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
37 mpi_coll_barrier_fun(comm);
41 //As per the standard, perform a barrier to ensure every async comm is finished
42 MSG_barrier_wait(bar_);
43 xbt_mutex_acquire(mut_);
45 xbt_mutex_release(mut_);
46 delete[] connected_wins_;
47 if (name_ != nullptr){
50 if(info_!=MPI_INFO_NULL){
51 MPI_Info_free(&info_);
54 mpi_coll_barrier_fun(comm_);
55 int rank=comm_->rank();
57 MSG_barrier_destroy(bar_);
58 xbt_mutex_destroy(mut_);
61 void Win::get_name(char* name, int* length){
67 *length = strlen(name_);
68 strncpy(name, name_, *length+1);
71 void Win::get_group(MPI_Group* group){
72 if(comm_ != MPI_COMM_NULL){
73 *group = comm_->group();
75 *group = MPI_GROUP_NULL;
79 void Win::set_name(char* name){
80 name_ = xbt_strdup(name);
83 int Win::fence(int assert)
85 XBT_DEBUG("Entering fence");
88 if (assert != MPI_MODE_NOPRECEDE) {
89 // This is not the first fence => finalize what came before
90 MSG_barrier_wait(bar_);
91 xbt_mutex_acquire(mut_);
92 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
93 // Without this, the vector could get redimensionned when another process pushes.
94 // This would result in the array used by Request::waitall() to be invalidated.
95 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
96 std::vector<MPI_Request> *reqs = requests_;
97 int size = static_cast<int>(reqs->size());
98 // start all requests that have been prepared by another process
100 for (const auto& req : *reqs) {
101 if (req && (req->flags() & PREPARED))
105 MPI_Request* treqs = &(*reqs)[0];
107 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
110 xbt_mutex_release(mut_);
114 MSG_barrier_wait(bar_);
115 XBT_DEBUG("Leaving fence");
120 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
121 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
123 if(opened_==0)//check that post/start has been done
125 //get receiver pointer
126 MPI_Win recv_win = connected_wins_[target_rank];
128 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
129 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
131 if(target_rank != comm_->rank()){
132 //prepare send_request
133 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
134 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
136 //prepare receiver request
137 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
138 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
140 //push request to receiver's win
141 xbt_mutex_acquire(recv_win->mut_);
142 recv_win->requests_->push_back(rreq);
143 xbt_mutex_release(recv_win->mut_);
147 //push request to sender's win
148 xbt_mutex_acquire(mut_);
149 requests_->push_back(sreq);
150 xbt_mutex_release(mut_);
152 smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
158 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
159 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
161 if(opened_==0)//check that post/start has been done
164 MPI_Win send_win = connected_wins_[target_rank];
166 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
167 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
169 if(target_rank != comm_->rank()){
170 //prepare send_request
171 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
172 comm_->group()->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, send_win->comm_,
175 //prepare receiver request
176 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
177 comm_->group()->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, comm_,
180 //start the send, with another process than us as sender.
182 //push request to receiver's win
183 xbt_mutex_acquire(send_win->mut_);
184 send_win->requests_->push_back(sreq);
185 xbt_mutex_release(send_win->mut_);
189 //push request to sender's win
190 xbt_mutex_acquire(mut_);
191 requests_->push_back(rreq);
192 xbt_mutex_release(mut_);
194 smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
201 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
202 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op)
204 if(opened_==0)//check that post/start has been done
206 //FIXME: local version
207 //get receiver pointer
208 MPI_Win recv_win = connected_wins_[target_rank];
210 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
211 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
212 //As the tag will be used for ordering of the operations, add count to it
213 //prepare send_request
214 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
215 smpi_process_index(), comm_->group()->index(target_rank), SMPI_RMA_TAG+3+count_, comm_, op);
217 //prepare receiver request
218 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
219 smpi_process_index(), comm_->group()->index(target_rank), SMPI_RMA_TAG+3+count_, recv_win->comm_, op);
222 //push request to receiver's win
223 xbt_mutex_acquire(recv_win->mut_);
224 recv_win->requests_->push_back(rreq);
225 xbt_mutex_release(recv_win->mut_);
229 //push request to sender's win
230 xbt_mutex_acquire(mut_);
231 requests_->push_back(sreq);
232 xbt_mutex_release(mut_);
237 int Win::start(MPI_Group group, int assert){
238 /* From MPI forum advices
239 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
240 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
241 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
242 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
243 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
244 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
245 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
246 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
247 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
248 must complete, without further dependencies. */
250 //naive, blocking implementation.
253 int size = group->size();
254 MPI_Request* reqs = xbt_new0(MPI_Request, size);
257 int src = group->index(j);
258 if (src != smpi_process_index() && src != MPI_UNDEFINED) {
259 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
265 Request::startall(size, reqs);
266 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
268 Request::unuse(&reqs[i]);
271 opened_++; //we're open for business !
277 int Win::post(MPI_Group group, int assert){
278 //let's make a synchronous send here
281 int size = group->size();
282 MPI_Request* reqs = xbt_new0(MPI_Request, size);
285 int dst=group->index(j);
286 if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
287 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
294 Request::startall(size, reqs);
295 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
297 Request::unuse(&reqs[i]);
300 opened_++; //we're open for business !
308 xbt_die("Complete called on already opened MPI_Win");
310 XBT_DEBUG("Entering MPI_Win_Complete");
313 int size = group_->size();
314 MPI_Request* reqs = xbt_new0(MPI_Request, size);
317 int dst=group_->index(j);
318 if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
319 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
325 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
326 Request::startall(size, reqs);
327 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
330 Request::unuse(&reqs[i]);
334 //now we can finish RMA calls
335 xbt_mutex_acquire(mut_);
336 std::vector<MPI_Request> *reqqs = requests_;
337 size = static_cast<int>(reqqs->size());
339 XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
341 // start all requests that have been prepared by another process
342 for (const auto& req : *reqqs) {
343 if (req && (req->flags() & PREPARED))
347 MPI_Request* treqs = &(*reqqs)[0];
348 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
351 xbt_mutex_release(mut_);
354 opened_--; //we're closed for business !
359 //naive, blocking implementation.
360 XBT_DEBUG("Entering MPI_Win_Wait");
362 int size = group_->size();
363 MPI_Request* reqs = xbt_new0(MPI_Request, size);
366 int src=group_->index(j);
367 if(src!=smpi_process_index() && src!=MPI_UNDEFINED){
368 reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
374 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
375 Request::startall(size, reqs);
376 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
378 Request::unuse(&reqs[i]);
381 xbt_mutex_acquire(mut_);
382 std::vector<MPI_Request> *reqqs = requests_;
383 size = static_cast<int>(reqqs->size());
385 XBT_DEBUG("Win_wait - Finishing %d RMA calls", size);
387 // start all requests that have been prepared by another process
388 for (const auto& req : *reqqs) {
389 if (req && (req->flags() & PREPARED))
393 MPI_Request* treqs = &(*reqqs)[0];
394 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
397 xbt_mutex_release(mut_);
400 opened_--; //we're opened for business !