2 /* Copyright (c) 2007-2015. The SimGrid Team.
3 * All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
14 xbt_bar_t creation_bar = NULL;
16 typedef struct s_smpi_mpi_win{
25 MPI_Win* connected_wins;
32 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
35 int comm_size = smpi_comm_size(comm);
36 int rank=smpi_comm_rank(comm);
37 XBT_DEBUG("Creating window");
39 win = xbt_new(s_smpi_mpi_win_t, 1);
42 win->disp_unit = disp_unit;
45 if(info!=MPI_INFO_NULL)
50 win->group = MPI_GROUP_NULL;
51 win->requests = xbt_dynar_new(sizeof(MPI_Request), NULL);
52 win->connected_wins = xbt_new0(MPI_Win, comm_size);
53 win->connected_wins[rank] = win;
56 win->bar=xbt_barrier_init(comm_size);
59 mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win),
62 mpi_coll_bcast_fun( &(win->bar), sizeof(xbt_bar_t), MPI_BYTE, 0, comm);
64 mpi_coll_barrier_fun(comm);
69 int smpi_mpi_win_free( MPI_Win* win){
70 //As per the standard, perform a barrier to ensure every async comm is finished
71 xbt_barrier_wait((*win)->bar);
72 xbt_dynar_free(&(*win)->requests);
73 xbt_free((*win)->connected_wins);
74 if ((*win)->name != NULL){
75 xbt_free((*win)->name);
77 if((*win)->info!=MPI_INFO_NULL){
78 MPI_Info_free(&(*win)->info);
85 void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
91 *length = strlen(win->name);
92 strcpy(name, win->name);
95 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
96 if(win->comm != MPI_COMM_NULL){
97 *group = smpi_comm_group(win->comm);
98 smpi_group_use(*group);
102 void smpi_mpi_win_set_name(MPI_Win win, char* name){
103 win->name = xbt_strdup(name);;
106 int smpi_mpi_win_fence( int assert, MPI_Win win){
107 XBT_DEBUG("Entering fence");
110 if(assert != MPI_MODE_NOPRECEDE){
111 xbt_barrier_wait(win->bar);
113 xbt_dynar_t reqs = win->requests;
114 int size = xbt_dynar_length(reqs);
117 // start all requests that have been prepared by another process
118 xbt_dynar_foreach(reqs, cpt, req){
119 if (req->flags & PREPARED) smpi_mpi_start(req);
122 MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqs));
123 win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
124 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
128 win->assert = assert;
130 xbt_barrier_wait(win->bar);
131 XBT_DEBUG("Leaving fence ");
136 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
137 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
139 if(!win->opened)//check that post/start has been done
141 //get receiver pointer
142 MPI_Win recv_win = win->connected_wins[target_rank];
144 void* recv_addr = (void*) ( ((char*)recv_win->base) + target_disp * recv_win->disp_unit);
145 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
147 if(target_rank != smpi_comm_rank(win->comm)){
148 //prepare send_request
149 MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
150 smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, win->comm, MPI_OP_NULL);
152 //prepare receiver request
153 MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
154 smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
156 //push request to receiver's win
157 xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq);
160 smpi_mpi_start(sreq);
162 //push request to sender's win
163 xbt_dynar_push_as(win->requests, MPI_Request, sreq);
165 smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
171 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
172 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
174 if(!win->opened)//check that post/start has been done
177 MPI_Win send_win = win->connected_wins[target_rank];
179 void* send_addr = (void*)( ((char*)send_win->base) + target_disp * send_win->disp_unit);
180 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
182 if(target_rank != smpi_comm_rank(win->comm)){
183 //prepare send_request
184 MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
185 smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, send_win->comm,
188 //prepare receiver request
189 MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
190 smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, win->comm,
193 //start the send, with another process than us as sender.
194 smpi_mpi_start(sreq);
196 //push request to receiver's win
197 xbt_dynar_push_as(send_win->requests, MPI_Request, sreq);
200 smpi_mpi_start(rreq);
202 //push request to sender's win
203 xbt_dynar_push_as(win->requests, MPI_Request, rreq);
205 smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
212 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
213 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
215 if(!win->opened)//check that post/start has been done
217 //FIXME: local version
218 //get receiver pointer
219 MPI_Win recv_win = win->connected_wins[target_rank];
221 void* recv_addr = (void*)( ((char*)recv_win->base) + target_disp * recv_win->disp_unit);
222 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
224 //prepare send_request
225 MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
226 smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, win->comm, op);
228 //prepare receiver request
229 MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
230 smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, recv_win->comm, op);
231 //push request to receiver's win
232 xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq);
234 smpi_mpi_start(sreq);
236 //push request to sender's win
237 xbt_dynar_push_as(win->requests, MPI_Request, sreq);
242 int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
243 /* From MPI forum advices
244 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
245 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
246 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
247 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
248 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
249 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
250 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
251 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
252 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
253 must complete, without further dependencies. */
255 //naive, blocking implementation.
257 int size = smpi_group_size(group);
258 MPI_Request* reqs = xbt_new0(MPI_Request, size);
260 // for(i=0;i<size;i++){
262 int src=smpi_group_index(group,j);
263 if(src!=smpi_process_index()){
264 reqs[i]=smpi_irecv_init(NULL, 0, MPI_CHAR, src,RMA_TAG+4, MPI_COMM_WORLD);
270 smpi_mpi_startall(size, reqs);
271 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
273 smpi_mpi_request_free(&reqs[i]);
276 win->opened++; //we're open for business !
278 smpi_group_use(group);
282 int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
283 //let's make a synchronous send here
285 int size = smpi_group_size(group);
286 MPI_Request* reqs = xbt_new0(MPI_Request, size);
289 int dst=smpi_group_index(group,j);
290 if(dst!=smpi_process_index()){
291 reqs[i]=smpi_mpi_send_init(NULL, 0, MPI_CHAR, dst, RMA_TAG+4, MPI_COMM_WORLD);
298 smpi_mpi_startall(size, reqs);
299 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
301 smpi_mpi_request_free(&reqs[i]);
304 win->opened++; //we're open for business !
306 smpi_group_use(group);
310 int smpi_mpi_win_complete(MPI_Win win){
312 xbt_die("Complete called on already opened MPI_Win");
313 // xbt_barrier_wait(win->bar);
314 //MPI_Comm comm = smpi_comm_new(win->group, NULL);
315 //mpi_coll_barrier_fun(comm);
316 //smpi_comm_destroy(comm);
318 XBT_DEBUG("Entering MPI_Win_Complete");
320 int size = smpi_group_size(win->group);
321 MPI_Request* reqs = xbt_new0(MPI_Request, size);
324 int dst=smpi_group_index(win->group,j);
325 if(dst!=smpi_process_index()){
326 reqs[i]=smpi_mpi_send_init(NULL, 0, MPI_CHAR, dst, RMA_TAG+5, MPI_COMM_WORLD);
332 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
333 smpi_mpi_startall(size, reqs);
334 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
337 smpi_mpi_request_free(&reqs[i]);
341 //now we can finish RMA calls
343 xbt_dynar_t reqqs = win->requests;
344 size = xbt_dynar_length(reqqs);
346 XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
349 // start all requests that have been prepared by another process
350 xbt_dynar_foreach(reqqs, cpt, req){
351 if (req->flags & PREPARED) smpi_mpi_start(req);
354 MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqqs));
355 win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
356 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
358 smpi_group_unuse(win->group);
359 win->opened--; //we're closed for business !
363 int smpi_mpi_win_wait(MPI_Win win){
364 // xbt_barrier_wait(win->bar);
365 //MPI_Comm comm = smpi_comm_new(win->group, NULL);
366 //mpi_coll_barrier_fun(comm);
367 //smpi_comm_destroy(comm);
368 //naive, blocking implementation.
369 XBT_DEBUG("Entering MPI_Win_Wait");
371 int size = smpi_group_size(win->group);
372 MPI_Request* reqs = xbt_new0(MPI_Request, size);
374 // for(i=0;i<size;i++){
376 int src=smpi_group_index(win->group,j);
377 if(src!=smpi_process_index()){
378 reqs[i]=smpi_irecv_init(NULL, 0, MPI_CHAR, src,RMA_TAG+5, MPI_COMM_WORLD);
384 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
385 smpi_mpi_startall(size, reqs);
386 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
388 smpi_mpi_request_free(&reqs[i]);
392 xbt_dynar_t reqqs = win->requests;
393 size = xbt_dynar_length(reqqs);
395 XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
399 // start all requests that have been prepared by another process
400 xbt_dynar_foreach(reqqs, cpt, req){
401 if (req->flags & PREPARED) smpi_mpi_start(req);
404 MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqqs));
405 win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
406 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
408 smpi_group_unuse(win->group);
409 win->opened--; //we're opened for business !