1 /* Copyright (c) 2007-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
9 #include "src/smpi/smpi_group.hpp"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
13 typedef struct s_smpi_mpi_win{
20 std::vector<MPI_Request> *requests;
23 MPI_Win* connected_wins;
27 int count; //for ordering the accs
31 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
32 int comm_size = smpi_comm_size(comm);
33 int rank = smpi_comm_rank(comm);
34 XBT_DEBUG("Creating window");
36 MPI_Win win = xbt_new(s_smpi_mpi_win_t, 1);
39 win->disp_unit = disp_unit;
42 if(info!=MPI_INFO_NULL)
47 win->group = MPI_GROUP_NULL;
48 win->requests = new std::vector<MPI_Request>();
49 win->mut=xbt_mutex_init();
50 win->connected_wins = xbt_new0(MPI_Win, comm_size);
51 win->connected_wins[rank] = win;
54 win->bar = MSG_barrier_init(comm_size);
56 mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win),
59 mpi_coll_bcast_fun(&(win->bar), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
61 mpi_coll_barrier_fun(comm);
66 int smpi_mpi_win_free( MPI_Win* win){
67 //As per the standard, perform a barrier to ensure every async comm is finished
68 MSG_barrier_wait((*win)->bar);
69 xbt_mutex_acquire((*win)->mut);
70 delete (*win)->requests;
71 xbt_mutex_release((*win)->mut);
72 xbt_free((*win)->connected_wins);
73 if ((*win)->name != nullptr){
74 xbt_free((*win)->name);
76 if((*win)->info!=MPI_INFO_NULL){
77 MPI_Info_free(&(*win)->info);
80 mpi_coll_barrier_fun((*win)->comm);
81 int rank=smpi_comm_rank((*win)->comm);
83 MSG_barrier_destroy((*win)->bar);
84 xbt_mutex_destroy((*win)->mut);
90 void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
91 if(win->name==nullptr){
96 *length = strlen(win->name);
97 strncpy(name, win->name, *length+1);
100 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
101 if(win->comm != MPI_COMM_NULL){
102 *group = smpi_comm_group(win->comm);
104 *group = MPI_GROUP_NULL;
108 void smpi_mpi_win_set_name(MPI_Win win, char* name){
109 win->name = xbt_strdup(name);
112 int smpi_mpi_win_fence(int assert, MPI_Win win)
114 XBT_DEBUG("Entering fence");
115 if (win->opened == 0)
117 if (assert != MPI_MODE_NOPRECEDE) {
118 // This is not the first fence => finalize what came before
119 MSG_barrier_wait(win->bar);
120 xbt_mutex_acquire(win->mut);
121 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
122 // Without this, the vector could get redimensionned when another process pushes.
123 // This would result in the array used by smpi_mpi_waitall() to be invalidated.
124 // Another solution would be to copy the data and cleanup the vector *before* smpi_mpi_waitall
125 std::vector<MPI_Request> *reqs = win->requests;
126 int size = static_cast<int>(reqs->size());
127 // start all requests that have been prepared by another process
129 for (const auto& req : *reqs) {
130 if (req && (req->flags & PREPARED))
134 MPI_Request* treqs = &(*reqs)[0];
136 smpi_mpi_waitall(size, treqs, MPI_STATUSES_IGNORE);
139 xbt_mutex_release(win->mut);
141 win->assert = assert;
143 MSG_barrier_wait(win->bar);
144 XBT_DEBUG("Leaving fence");
149 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
150 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
152 if(win->opened==0)//check that post/start has been done
154 //get receiver pointer
155 MPI_Win recv_win = win->connected_wins[target_rank];
157 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
158 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
160 if(target_rank != smpi_comm_rank(win->comm)){
161 //prepare send_request
162 MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
163 smpi_comm_group(win->comm)->index(target_rank), SMPI_RMA_TAG+1, win->comm, MPI_OP_NULL);
165 //prepare receiver request
166 MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
167 smpi_comm_group(win->comm)->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
169 //push request to receiver's win
170 xbt_mutex_acquire(recv_win->mut);
171 recv_win->requests->push_back(rreq);
172 xbt_mutex_release(recv_win->mut);
174 smpi_mpi_start(sreq);
176 //push request to sender's win
177 xbt_mutex_acquire(win->mut);
178 win->requests->push_back(sreq);
179 xbt_mutex_release(win->mut);
181 smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
187 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
188 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
190 if(win->opened==0)//check that post/start has been done
193 MPI_Win send_win = win->connected_wins[target_rank];
195 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base) + target_disp * send_win->disp_unit);
196 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
198 if(target_rank != smpi_comm_rank(win->comm)){
199 //prepare send_request
200 MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
201 smpi_comm_group(win->comm)->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, send_win->comm,
204 //prepare receiver request
205 MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
206 smpi_comm_group(win->comm)->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, win->comm,
209 //start the send, with another process than us as sender.
210 smpi_mpi_start(sreq);
211 //push request to receiver's win
212 xbt_mutex_acquire(send_win->mut);
213 send_win->requests->push_back(sreq);
214 xbt_mutex_release(send_win->mut);
217 smpi_mpi_start(rreq);
218 //push request to sender's win
219 xbt_mutex_acquire(win->mut);
220 win->requests->push_back(rreq);
221 xbt_mutex_release(win->mut);
223 smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
230 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
231 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
233 if(win->opened==0)//check that post/start has been done
235 //FIXME: local version
236 //get receiver pointer
237 MPI_Win recv_win = win->connected_wins[target_rank];
239 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
240 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
241 //As the tag will be used for ordering of the operations, add count to it
242 //prepare send_request
243 MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
244 smpi_process_index(), smpi_comm_group(win->comm)->index(target_rank), SMPI_RMA_TAG+3+win->count, win->comm, op);
246 //prepare receiver request
247 MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
248 smpi_process_index(), smpi_comm_group(win->comm)->index(target_rank), SMPI_RMA_TAG+3+win->count, recv_win->comm, op);
251 //push request to receiver's win
252 xbt_mutex_acquire(recv_win->mut);
253 recv_win->requests->push_back(rreq);
254 xbt_mutex_release(recv_win->mut);
256 smpi_mpi_start(sreq);
258 //push request to sender's win
259 xbt_mutex_acquire(win->mut);
260 win->requests->push_back(sreq);
261 xbt_mutex_release(win->mut);
266 int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
267 /* From MPI forum advices
268 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
269 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
270 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
271 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
272 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
273 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
274 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
275 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
276 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
277 must complete, without further dependencies. */
279 //naive, blocking implementation.
282 int size = group->getsize();
283 MPI_Request* reqs = xbt_new0(MPI_Request, size);
286 int src = group->index(j);
287 if (src != smpi_process_index() && src != MPI_UNDEFINED) {
288 reqs[i] = smpi_irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
294 smpi_mpi_startall(size, reqs);
295 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
297 smpi_mpi_request_free(&reqs[i]);
300 win->opened++; //we're open for business !
306 int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
307 //let's make a synchronous send here
310 int size = group->getsize();
311 MPI_Request* reqs = xbt_new0(MPI_Request, size);
314 int dst=group->index(j);
315 if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
316 reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
323 smpi_mpi_startall(size, reqs);
324 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
326 smpi_mpi_request_free(&reqs[i]);
329 win->opened++; //we're open for business !
335 int smpi_mpi_win_complete(MPI_Win win){
337 xbt_die("Complete called on already opened MPI_Win");
339 XBT_DEBUG("Entering MPI_Win_Complete");
342 int size = win->group->getsize();
343 MPI_Request* reqs = xbt_new0(MPI_Request, size);
346 int dst=win->group->index(j);
347 if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
348 reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
354 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
355 smpi_mpi_startall(size, reqs);
356 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
359 smpi_mpi_request_free(&reqs[i]);
363 //now we can finish RMA calls
364 xbt_mutex_acquire(win->mut);
365 std::vector<MPI_Request> *reqqs = win->requests;
366 size = static_cast<int>(reqqs->size());
368 XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
370 // start all requests that have been prepared by another process
371 for (const auto& req : *reqqs) {
372 if (req && (req->flags & PREPARED))
376 MPI_Request* treqs = &(*reqqs)[0];
377 smpi_mpi_waitall(size, treqs, MPI_STATUSES_IGNORE);
380 xbt_mutex_release(win->mut);
383 win->opened--; //we're closed for business !
387 int smpi_mpi_win_wait(MPI_Win win){
388 //naive, blocking implementation.
389 XBT_DEBUG("Entering MPI_Win_Wait");
391 int size = win->group->getsize();
392 MPI_Request* reqs = xbt_new0(MPI_Request, size);
395 int src=win->group->index(j);
396 if(src!=smpi_process_index() && src!=MPI_UNDEFINED){
397 reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
403 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
404 smpi_mpi_startall(size, reqs);
405 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
407 smpi_mpi_request_free(&reqs[i]);
410 xbt_mutex_acquire(win->mut);
411 std::vector<MPI_Request> *reqqs = win->requests;
412 size = static_cast<int>(reqqs->size());
414 XBT_DEBUG("Win_wait - Finishing %d RMA calls", size);
416 // start all requests that have been prepared by another process
417 for (const auto& req : *reqqs) {
418 if (req && (req->flags & PREPARED))
422 MPI_Request* treqs = &(*reqqs)[0];
423 smpi_mpi_waitall(size, treqs, MPI_STATUSES_IGNORE);
426 xbt_mutex_release(win->mut);
429 win->opened--; //we're opened for business !