2 /* Copyright (c) 2007-2015. The SimGrid Team.
3 * All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
14 xbt_bar_t creation_bar = NULL;
16 typedef struct s_smpi_mpi_win{
25 MPI_Win* connected_wins;
32 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
35 int comm_size = smpi_comm_size(comm);
36 int rank=smpi_comm_rank(comm);
37 XBT_DEBUG("Creating window");
39 win = xbt_new(s_smpi_mpi_win_t, 1);
42 win->disp_unit = disp_unit;
45 if(info!=MPI_INFO_NULL)
50 win->group = MPI_GROUP_NULL;
51 win->requests = xbt_dynar_new(sizeof(MPI_Request), NULL);
52 win->connected_wins = xbt_new0(MPI_Win, comm_size);
53 win->connected_wins[rank] = win;
56 win->bar=xbt_barrier_init(comm_size);
59 mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win),
62 mpi_coll_bcast_fun( &(win->bar), sizeof(xbt_bar_t), MPI_BYTE, 0, comm);
64 mpi_coll_barrier_fun(comm);
69 int smpi_mpi_win_free( MPI_Win* win){
70 //As per the standard, perform a barrier to ensure every async comm is finished
71 xbt_barrier_wait((*win)->bar);
72 xbt_dynar_free(&(*win)->requests);
73 xbt_free((*win)->connected_wins);
74 if ((*win)->name != NULL){
75 xbt_free((*win)->name);
77 if((*win)->info!=MPI_INFO_NULL){
78 MPI_Info_free(&(*win)->info);
85 void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
91 *length = strlen(win->name);
92 strcpy(name, win->name);
95 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
96 if(win->comm != MPI_COMM_NULL){
97 *group = smpi_comm_group(win->comm);
98 smpi_group_use(*group);
102 void smpi_mpi_win_set_name(MPI_Win win, char* name){
103 win->name = xbt_strdup(name);;
106 int smpi_mpi_win_fence( int assert, MPI_Win win){
107 XBT_DEBUG("Entering fence");
110 if(assert != MPI_MODE_NOPRECEDE){
111 xbt_barrier_wait(win->bar);
113 xbt_dynar_t reqs = win->requests;
114 int size = xbt_dynar_length(reqs);
117 // start all requests that have been prepared by another process
118 xbt_dynar_foreach(reqs, cpt, req){
119 if (req->flags & PREPARED) smpi_mpi_start(req);
122 MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqs));
123 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
125 win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
128 win->assert = assert;
130 xbt_barrier_wait(win->bar);
131 XBT_DEBUG("Leaving fence ");
136 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
137 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
139 if(!win->opened)//check that post/start has been done
141 //get receiver pointer
142 MPI_Win recv_win = win->connected_wins[target_rank];
144 void* recv_addr = (void*) ( ((char*)recv_win->base) + target_disp * recv_win->disp_unit);
145 smpi_datatype_use(origin_datatype);
146 smpi_datatype_use(target_datatype);
147 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
149 if(target_rank != smpi_comm_rank(win->comm)){
150 //prepare send_request
151 MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
152 smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, win->comm, MPI_OP_NULL);
154 //prepare receiver request
155 MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
156 smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
158 //push request to receiver's win
159 xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq);
162 smpi_mpi_start(sreq);
164 //push request to sender's win
165 xbt_dynar_push_as(win->requests, MPI_Request, sreq);
167 smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
173 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
174 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
176 if(!win->opened)//check that post/start has been done
179 MPI_Win send_win = win->connected_wins[target_rank];
181 void* send_addr = (void*)( ((char*)send_win->base) + target_disp * send_win->disp_unit);
182 smpi_datatype_use(origin_datatype);
183 smpi_datatype_use(target_datatype);
184 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
186 if(target_rank != smpi_comm_rank(win->comm)){
187 //prepare send_request
188 MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
189 smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, send_win->comm,
192 //prepare receiver request
193 MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
194 smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, win->comm,
197 //start the send, with another process than us as sender.
198 smpi_mpi_start(sreq);
200 //push request to receiver's win
201 xbt_dynar_push_as(send_win->requests, MPI_Request, sreq);
204 smpi_mpi_start(rreq);
206 //push request to sender's win
207 xbt_dynar_push_as(win->requests, MPI_Request, rreq);
209 smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
216 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
217 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
219 if(!win->opened)//check that post/start has been done
221 //FIXME: local version
222 //get receiver pointer
223 MPI_Win recv_win = win->connected_wins[target_rank];
225 void* recv_addr = (void*)( ((char*)recv_win->base) + target_disp * recv_win->disp_unit);
226 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
228 smpi_datatype_use(origin_datatype);
229 smpi_datatype_use(target_datatype);
231 //prepare send_request
232 MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
233 smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, win->comm, op);
235 //prepare receiver request
236 MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
237 smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, recv_win->comm, op);
238 //push request to receiver's win
239 xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq);
241 smpi_mpi_start(sreq);
243 //push request to sender's win
244 xbt_dynar_push_as(win->requests, MPI_Request, sreq);
249 int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
250 /* From MPI forum advices
251 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
252 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
253 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
254 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
255 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
256 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
257 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
258 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
259 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
260 must complete, without further dependencies. */
262 //naive, blocking implementation.
264 int size = smpi_group_size(group);
265 MPI_Request* reqs = xbt_new0(MPI_Request, size);
267 // for(i=0;i<size;i++){
269 int src=smpi_group_index(group,j);
270 if(src!=smpi_process_index()){
271 reqs[i]=smpi_irecv_init(NULL, 0, MPI_CHAR, src,RMA_TAG+4, MPI_COMM_WORLD);
277 smpi_mpi_startall(size, reqs);
278 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
280 smpi_mpi_request_free(&reqs[i]);
283 win->opened++; //we're open for business !
285 smpi_group_use(group);
289 int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
290 //let's make a synchronous send here
292 int size = smpi_group_size(group);
293 MPI_Request* reqs = xbt_new0(MPI_Request, size);
296 int dst=smpi_group_index(group,j);
297 if(dst!=smpi_process_index()){
298 reqs[i]=smpi_mpi_send_init(NULL, 0, MPI_CHAR, dst, RMA_TAG+4, MPI_COMM_WORLD);
305 smpi_mpi_startall(size, reqs);
306 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
308 smpi_mpi_request_free(&reqs[i]);
311 win->opened++; //we're open for business !
313 smpi_group_use(group);
317 int smpi_mpi_win_complete(MPI_Win win){
319 xbt_die("Complete called on already opened MPI_Win");
320 // xbt_barrier_wait(win->bar);
321 //MPI_Comm comm = smpi_comm_new(win->group, NULL);
322 //mpi_coll_barrier_fun(comm);
323 //smpi_comm_destroy(comm);
325 XBT_DEBUG("Entering MPI_Win_Complete");
327 int size = smpi_group_size(win->group);
328 MPI_Request* reqs = xbt_new0(MPI_Request, size);
331 int dst=smpi_group_index(win->group,j);
332 if(dst!=smpi_process_index()){
333 reqs[i]=smpi_mpi_send_init(NULL, 0, MPI_CHAR, dst, RMA_TAG+5, MPI_COMM_WORLD);
339 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
340 smpi_mpi_startall(size, reqs);
341 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
344 smpi_mpi_request_free(&reqs[i]);
348 //now we can finish RMA calls
350 xbt_dynar_t reqqs = win->requests;
351 size = xbt_dynar_length(reqqs);
353 XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
356 // start all requests that have been prepared by another process
357 xbt_dynar_foreach(reqqs, cpt, req){
358 if (req->flags & PREPARED) smpi_mpi_start(req);
361 MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqqs));
362 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
364 win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
365 win->opened--; //we're closed for business !
369 int smpi_mpi_win_wait(MPI_Win win){
370 // xbt_barrier_wait(win->bar);
371 //MPI_Comm comm = smpi_comm_new(win->group, NULL);
372 //mpi_coll_barrier_fun(comm);
373 //smpi_comm_destroy(comm);
374 //naive, blocking implementation.
375 XBT_DEBUG("Entering MPI_Win_Wait");
377 int size = smpi_group_size(win->group);
378 MPI_Request* reqs = xbt_new0(MPI_Request, size);
380 // for(i=0;i<size;i++){
382 int src=smpi_group_index(win->group,j);
383 if(src!=smpi_process_index()){
384 reqs[i]=smpi_irecv_init(NULL, 0, MPI_CHAR, src,RMA_TAG+5, MPI_COMM_WORLD);
390 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
391 smpi_mpi_startall(size, reqs);
392 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
394 smpi_mpi_request_free(&reqs[i]);
398 xbt_dynar_t reqqs = win->requests;
399 size = xbt_dynar_length(reqqs);
401 XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
405 // start all requests that have been prepared by another process
406 xbt_dynar_foreach(reqqs, cpt, req){
407 if (req->flags & PREPARED) smpi_mpi_start(req);
410 MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqqs));
411 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
413 win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
414 win->opened--; //we're opened for business !