2 /* Copyright (c) 2007-2015. The SimGrid Team.
3 * All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
14 xbt_bar_t creation_bar = nullptr;
16 typedef struct s_smpi_mpi_win{
25 MPI_Win* connected_wins;
32 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
35 int comm_size = smpi_comm_size(comm);
36 int rank=smpi_comm_rank(comm);
37 XBT_DEBUG("Creating window");
39 win = xbt_new(s_smpi_mpi_win_t, 1);
42 win->disp_unit = disp_unit;
45 if(info!=MPI_INFO_NULL)
50 win->group = MPI_GROUP_NULL;
51 win->requests = xbt_dynar_new(sizeof(MPI_Request), nullptr);
52 win->connected_wins = xbt_new0(MPI_Win, comm_size);
53 win->connected_wins[rank] = win;
56 win->bar=xbt_barrier_init(comm_size);
58 mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win),
61 mpi_coll_bcast_fun( &(win->bar), sizeof(xbt_bar_t), MPI_BYTE, 0, comm);
63 mpi_coll_barrier_fun(comm);
68 int smpi_mpi_win_free( MPI_Win* win){
69 //As per the standard, perform a barrier to ensure every async comm is finished
70 xbt_barrier_wait((*win)->bar);
71 xbt_dynar_free(&(*win)->requests);
72 xbt_free((*win)->connected_wins);
73 if ((*win)->name != nullptr){
74 xbt_free((*win)->name);
76 if((*win)->info!=MPI_INFO_NULL){
77 MPI_Info_free(&(*win)->info);
80 mpi_coll_barrier_fun((*win)->comm);
81 int rank=smpi_comm_rank((*win)->comm);
83 xbt_barrier_destroy((*win)->bar);
89 void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
90 if(win->name==nullptr){
95 *length = strlen(win->name);
96 strncpy(name, win->name, *length+1);
99 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
100 if(win->comm != MPI_COMM_NULL){
101 *group = smpi_comm_group(win->comm);
105 void smpi_mpi_win_set_name(MPI_Win win, char* name){
106 win->name = xbt_strdup(name);
109 int smpi_mpi_win_fence( int assert, MPI_Win win){
110 XBT_DEBUG("Entering fence");
113 if(assert != MPI_MODE_NOPRECEDE){
114 xbt_barrier_wait(win->bar);
116 xbt_dynar_t reqs = win->requests;
117 int size = xbt_dynar_length(reqs);
120 // start all requests that have been prepared by another process
121 xbt_dynar_foreach(reqs, cpt, req){
122 if (req->flags & PREPARED)
126 MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqs));
127 win->requests=xbt_dynar_new(sizeof(MPI_Request), nullptr);
128 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
132 win->assert = assert;
134 xbt_barrier_wait(win->bar);
135 XBT_DEBUG("Leaving fence ");
140 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
141 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
143 if(win->opened==0)//check that post/start has been done
145 //get receiver pointer
146 MPI_Win recv_win = win->connected_wins[target_rank];
148 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
149 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
151 if(target_rank != smpi_comm_rank(win->comm)){
152 //prepare send_request
153 MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
154 smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, win->comm, MPI_OP_NULL);
156 //prepare receiver request
157 MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
158 smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
160 //push request to receiver's win
161 xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq);
164 smpi_mpi_start(sreq);
166 //push request to sender's win
167 xbt_dynar_push_as(win->requests, MPI_Request, sreq);
169 smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
175 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
176 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
178 if(win->opened==0)//check that post/start has been done
181 MPI_Win send_win = win->connected_wins[target_rank];
183 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base) + target_disp * send_win->disp_unit);
184 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
186 if(target_rank != smpi_comm_rank(win->comm)){
187 //prepare send_request
188 MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
189 smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, send_win->comm,
192 //prepare receiver request
193 MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
194 smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, win->comm,
197 //start the send, with another process than us as sender.
198 smpi_mpi_start(sreq);
200 //push request to receiver's win
201 xbt_dynar_push_as(send_win->requests, MPI_Request, sreq);
204 smpi_mpi_start(rreq);
206 //push request to sender's win
207 xbt_dynar_push_as(win->requests, MPI_Request, rreq);
209 smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
216 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
217 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
219 if(win->opened==0)//check that post/start has been done
221 //FIXME: local version
222 //get receiver pointer
223 MPI_Win recv_win = win->connected_wins[target_rank];
225 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
226 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
228 //prepare send_request
229 MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
230 smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, win->comm, op);
232 //prepare receiver request
233 MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
234 smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, recv_win->comm, op);
235 //push request to receiver's win
236 xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq);
238 smpi_mpi_start(sreq);
240 //push request to sender's win
241 xbt_dynar_push_as(win->requests, MPI_Request, sreq);
246 int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
247 /* From MPI forum advices
248 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
249 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
250 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
251 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
252 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
253 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
254 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
255 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
256 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
257 must complete, without further dependencies. */
259 //naive, blocking implementation.
261 int size = smpi_group_size(group);
262 MPI_Request* reqs = xbt_new0(MPI_Request, size);
265 int src=smpi_group_index(group,j);
266 if(src!=smpi_process_index()){
267 reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,RMA_TAG+4, MPI_COMM_WORLD);
273 smpi_mpi_startall(size, reqs);
274 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
276 smpi_mpi_request_free(&reqs[i]);
279 win->opened++; //we're open for business !
281 smpi_group_use(group);
285 int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
286 //let's make a synchronous send here
288 int size = smpi_group_size(group);
289 MPI_Request* reqs = xbt_new0(MPI_Request, size);
292 int dst=smpi_group_index(group,j);
293 if(dst!=smpi_process_index()){
294 reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, RMA_TAG+4, MPI_COMM_WORLD);
301 smpi_mpi_startall(size, reqs);
302 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
304 smpi_mpi_request_free(&reqs[i]);
307 win->opened++; //we're open for business !
309 smpi_group_use(group);
313 int smpi_mpi_win_complete(MPI_Win win){
315 xbt_die("Complete called on already opened MPI_Win");
317 XBT_DEBUG("Entering MPI_Win_Complete");
319 int size = smpi_group_size(win->group);
320 MPI_Request* reqs = xbt_new0(MPI_Request, size);
323 int dst=smpi_group_index(win->group,j);
324 if(dst!=smpi_process_index()){
325 reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, RMA_TAG+5, MPI_COMM_WORLD);
331 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
332 smpi_mpi_startall(size, reqs);
333 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
336 smpi_mpi_request_free(&reqs[i]);
340 //now we can finish RMA calls
342 xbt_dynar_t reqqs = win->requests;
343 size = xbt_dynar_length(reqqs);
345 XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
348 // start all requests that have been prepared by another process
349 xbt_dynar_foreach(reqqs, cpt, req){
350 if (req->flags & PREPARED)
354 MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqqs));
355 win->requests=xbt_dynar_new(sizeof(MPI_Request), nullptr);
356 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
358 smpi_group_unuse(win->group);
359 win->opened--; //we're closed for business !
363 int smpi_mpi_win_wait(MPI_Win win){
364 //naive, blocking implementation.
365 XBT_DEBUG("Entering MPI_Win_Wait");
367 int size = smpi_group_size(win->group);
368 MPI_Request* reqs = xbt_new0(MPI_Request, size);
371 int src=smpi_group_index(win->group,j);
372 if(src!=smpi_process_index()){
373 reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,RMA_TAG+5, MPI_COMM_WORLD);
379 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
380 smpi_mpi_startall(size, reqs);
381 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
383 smpi_mpi_request_free(&reqs[i]);
387 xbt_dynar_t reqqs = win->requests;
388 size = xbt_dynar_length(reqqs);
390 XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
394 // start all requests that have been prepared by another process
395 xbt_dynar_foreach(reqqs, cpt, req){
396 if (req->flags & PREPARED)
400 MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqqs));
401 win->requests=xbt_dynar_new(sizeof(MPI_Request), nullptr);
402 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
404 smpi_group_unuse(win->group);
405 win->opened--; //we're opened for business !