1 /* Copyright (c) 2007-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
12 typedef struct s_smpi_mpi_win{
19 std::vector<MPI_Request> *requests;
22 MPI_Win* connected_wins;
26 int count; //for ordering the accs
30 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
33 int comm_size = smpi_comm_size(comm);
34 int rank=smpi_comm_rank(comm);
35 XBT_DEBUG("Creating window");
37 win = xbt_new(s_smpi_mpi_win_t, 1);
40 win->disp_unit = disp_unit;
43 if(info!=MPI_INFO_NULL)
48 win->group = MPI_GROUP_NULL;
49 win->requests = new std::vector<MPI_Request>();
50 win->mut=xbt_mutex_init();
51 win->connected_wins = xbt_new0(MPI_Win, comm_size);
52 win->connected_wins[rank] = win;
55 win->bar = MSG_barrier_init(comm_size);
57 mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win),
60 mpi_coll_bcast_fun(&(win->bar), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
62 mpi_coll_barrier_fun(comm);
67 int smpi_mpi_win_free( MPI_Win* win){
68 //As per the standard, perform a barrier to ensure every async comm is finished
69 MSG_barrier_wait((*win)->bar);
70 xbt_mutex_acquire((*win)->mut);
71 delete (*win)->requests;
72 xbt_mutex_release((*win)->mut);
73 xbt_free((*win)->connected_wins);
74 if ((*win)->name != nullptr){
75 xbt_free((*win)->name);
77 if((*win)->info!=MPI_INFO_NULL){
78 MPI_Info_free(&(*win)->info);
81 mpi_coll_barrier_fun((*win)->comm);
82 int rank=smpi_comm_rank((*win)->comm);
84 MSG_barrier_destroy((*win)->bar);
85 xbt_mutex_destroy((*win)->mut);
91 void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
92 if(win->name==nullptr){
97 *length = strlen(win->name);
98 strncpy(name, win->name, *length+1);
101 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
102 if(win->comm != MPI_COMM_NULL){
103 *group = smpi_comm_group(win->comm);
105 *group = MPI_GROUP_NULL;
109 void smpi_mpi_win_set_name(MPI_Win win, char* name){
110 win->name = xbt_strdup(name);
113 int smpi_mpi_win_fence(int assert, MPI_Win win)
115 XBT_DEBUG("Entering fence");
116 if (win->opened == 0)
118 if (assert != MPI_MODE_NOPRECEDE) {
119 // This is not the first fence => finalize what came before
120 MSG_barrier_wait(win->bar);
121 xbt_mutex_acquire(win->mut);
122 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
123 // Without this, the vector could get redimensionned when another process pushes.
124 // This would result in the array used by smpi_mpi_waitall() to be invalidated.
125 // Another solution would be to copy the data and cleanup the vector *before* smpi_mpi_waitall
126 std::vector<MPI_Request> *reqs = win->requests;
127 int size = static_cast<int>(reqs->size());
128 // start all requests that have been prepared by another process
130 for (const auto& req : *reqs) {
131 if (req && (req->flags & PREPARED))
135 MPI_Request* treqs = &(*reqs)[0];
137 smpi_mpi_waitall(size, treqs, MPI_STATUSES_IGNORE);
140 xbt_mutex_release(win->mut);
142 win->assert = assert;
144 MSG_barrier_wait(win->bar);
145 XBT_DEBUG("Leaving fence");
150 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
151 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
153 if(win->opened==0)//check that post/start has been done
155 //get receiver pointer
156 MPI_Win recv_win = win->connected_wins[target_rank];
158 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
159 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
161 if(target_rank != smpi_comm_rank(win->comm)){
162 //prepare send_request
163 MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
164 smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, win->comm, MPI_OP_NULL);
166 //prepare receiver request
167 MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
168 smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
170 //push request to receiver's win
171 xbt_mutex_acquire(recv_win->mut);
172 recv_win->requests->push_back(rreq);
173 xbt_mutex_release(recv_win->mut);
175 smpi_mpi_start(sreq);
177 //push request to sender's win
178 xbt_mutex_acquire(win->mut);
179 win->requests->push_back(sreq);
180 xbt_mutex_release(win->mut);
182 smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
188 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
189 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
191 if(win->opened==0)//check that post/start has been done
194 MPI_Win send_win = win->connected_wins[target_rank];
196 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base) + target_disp * send_win->disp_unit);
197 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
199 if(target_rank != smpi_comm_rank(win->comm)){
200 //prepare send_request
201 MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
202 smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), SMPI_RMA_TAG+2, send_win->comm,
205 //prepare receiver request
206 MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
207 smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), SMPI_RMA_TAG+2, win->comm,
210 //start the send, with another process than us as sender.
211 smpi_mpi_start(sreq);
212 //push request to receiver's win
213 xbt_mutex_acquire(send_win->mut);
214 send_win->requests->push_back(sreq);
215 xbt_mutex_release(send_win->mut);
218 smpi_mpi_start(rreq);
219 //push request to sender's win
220 xbt_mutex_acquire(win->mut);
221 win->requests->push_back(rreq);
222 xbt_mutex_release(win->mut);
224 smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
231 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
232 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
234 if(win->opened==0)//check that post/start has been done
236 //FIXME: local version
237 //get receiver pointer
238 MPI_Win recv_win = win->connected_wins[target_rank];
240 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
241 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
242 //As the tag will be used for ordering of the operations, add count to it
243 //prepare send_request
244 MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
245 smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3+win->count, win->comm, op);
247 //prepare receiver request
248 MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
249 smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3+win->count, recv_win->comm, op);
252 //push request to receiver's win
253 xbt_mutex_acquire(recv_win->mut);
254 recv_win->requests->push_back(rreq);
255 xbt_mutex_release(recv_win->mut);
257 smpi_mpi_start(sreq);
259 //push request to sender's win
260 xbt_mutex_acquire(win->mut);
261 win->requests->push_back(sreq);
262 xbt_mutex_release(win->mut);
267 int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
268 /* From MPI forum advices
269 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
270 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
271 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
272 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
273 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
274 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
275 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
276 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
277 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
278 must complete, without further dependencies. */
280 //naive, blocking implementation.
282 int size = smpi_group_size(group);
283 MPI_Request* reqs = xbt_new0(MPI_Request, size);
286 int src=smpi_group_index(group,j);
287 if(src!=smpi_process_index()&& src!=MPI_UNDEFINED){
288 reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+4, MPI_COMM_WORLD);
294 smpi_mpi_startall(size, reqs);
295 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
297 smpi_mpi_request_free(&reqs[i]);
300 win->opened++; //we're open for business !
302 smpi_group_use(group);
306 int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
307 //let's make a synchronous send here
309 int size = smpi_group_size(group);
310 MPI_Request* reqs = xbt_new0(MPI_Request, size);
313 int dst=smpi_group_index(group,j);
314 if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
315 reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
322 smpi_mpi_startall(size, reqs);
323 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
325 smpi_mpi_request_free(&reqs[i]);
328 win->opened++; //we're open for business !
330 smpi_group_use(group);
334 int smpi_mpi_win_complete(MPI_Win win){
336 xbt_die("Complete called on already opened MPI_Win");
338 XBT_DEBUG("Entering MPI_Win_Complete");
340 int size = smpi_group_size(win->group);
341 MPI_Request* reqs = xbt_new0(MPI_Request, size);
344 int dst=smpi_group_index(win->group,j);
345 if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
346 reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
352 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
353 smpi_mpi_startall(size, reqs);
354 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
357 smpi_mpi_request_free(&reqs[i]);
361 //now we can finish RMA calls
362 xbt_mutex_acquire(win->mut);
363 std::vector<MPI_Request> *reqqs = win->requests;
364 size = static_cast<int>(reqqs->size());
366 XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
367 // start all requests that have been prepared by another process
368 for (auto req: *reqqs){
369 if (req && (req->flags & PREPARED))
373 MPI_Request* treqs = size > 0 ? &(*reqqs)[0] : nullptr;
374 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
376 xbt_mutex_release(win->mut);
378 smpi_group_unuse(win->group);
379 win->opened--; //we're closed for business !
383 int smpi_mpi_win_wait(MPI_Win win){
384 //naive, blocking implementation.
385 XBT_DEBUG("Entering MPI_Win_Wait");
387 int size = smpi_group_size(win->group);
388 MPI_Request* reqs = xbt_new0(MPI_Request, size);
391 int src=smpi_group_index(win->group,j);
392 if(src!=smpi_process_index() && src!=MPI_UNDEFINED){
393 reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
399 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
400 smpi_mpi_startall(size, reqs);
401 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
403 smpi_mpi_request_free(&reqs[i]);
406 xbt_mutex_acquire(win->mut);
407 std::vector<MPI_Request> *reqqs = win->requests;
408 size = static_cast<int>(reqqs->size());
410 XBT_DEBUG("Win_wait - Finishing %d RMA calls", size);
412 // start all requests that have been prepared by another process
413 for(auto req: *reqqs){
414 if (req && (req->flags & PREPARED))
418 MPI_Request* treqs = size > 0 ? &(*reqqs)[0] : nullptr;
419 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
421 xbt_mutex_release(win->mut);
423 smpi_group_unuse(win->group);
424 win->opened--; //we're opened for business !