Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of scm.gforge.inria.fr:/gitroot/simgrid/simgrid
[simgrid.git] / src / smpi / smpi_rma.cpp
1
2 /* Copyright (c) 2007-2015. The SimGrid Team.
3  * All rights reserved.                                                     */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "private.h"
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 #define RMA_TAG -1234
13
14 xbt_bar_t creation_bar = NULL;
15
16 typedef struct s_smpi_mpi_win{
17   void* base;
18   MPI_Aint size;
19   int disp_unit;
20   MPI_Comm comm;
21   MPI_Info info;
22   int assert;
23   xbt_dynar_t requests;
24   xbt_bar_t bar;
25   MPI_Win* connected_wins;
26   char* name;
27   int opened;
28   MPI_Group group;
29 } s_smpi_mpi_win_t;
30
31
32 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
33   MPI_Win win;
34
35   int comm_size = smpi_comm_size(comm);
36   int rank=smpi_comm_rank(comm);
37   XBT_DEBUG("Creating window");
38
39   win = xbt_new(s_smpi_mpi_win_t, 1);
40   win->base = base;
41   win->size = size;
42   win->disp_unit = disp_unit;
43   win->assert = 0;
44   win->info = info;
45   if(info!=MPI_INFO_NULL)
46     info->refcount++;
47   win->comm = comm;
48   win->name = NULL;
49   win->opened = 0;
50   win->group = MPI_GROUP_NULL;
51   win->requests = xbt_dynar_new(sizeof(MPI_Request), NULL);
52   win->connected_wins = xbt_new0(MPI_Win, comm_size);
53   win->connected_wins[rank] = win;
54
55   if(rank==0){
56     win->bar=xbt_barrier_init(comm_size);
57   }
58
59   mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win),
60                          MPI_BYTE, comm);
61
62   mpi_coll_bcast_fun( &(win->bar), sizeof(xbt_bar_t), MPI_BYTE, 0, comm);
63
64   mpi_coll_barrier_fun(comm);
65
66   return win;
67 }
68
69 int smpi_mpi_win_free( MPI_Win* win){
70   //As per the standard, perform a barrier to ensure every async comm is finished
71   xbt_barrier_wait((*win)->bar);
72   xbt_dynar_free(&(*win)->requests);
73   xbt_free((*win)->connected_wins);
74   if ((*win)->name != NULL){
75     xbt_free((*win)->name);
76   }
77   if((*win)->info!=MPI_INFO_NULL){
78     MPI_Info_free(&(*win)->info);
79   }
80   xbt_free(*win);
81   *win = MPI_WIN_NULL;
82   return MPI_SUCCESS;
83 }
84
85 void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
86   if(win->name==NULL){
87     *length=0;
88     name=NULL;
89     return;
90   }
91   *length = strlen(win->name);
92   strcpy(name, win->name);
93 }
94
95 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
96   if(win->comm != MPI_COMM_NULL){
97     *group = smpi_comm_group(win->comm);
98     smpi_group_use(*group);
99   }
100 }
101
102 void smpi_mpi_win_set_name(MPI_Win win, char* name){
103   win->name = xbt_strdup(name);;
104 }
105
106 int smpi_mpi_win_fence( int assert,  MPI_Win win){
107   XBT_DEBUG("Entering fence");
108   if(!win->opened)
109     win->opened=1;
110   if(assert != MPI_MODE_NOPRECEDE){
111     xbt_barrier_wait(win->bar);
112
113     xbt_dynar_t reqs = win->requests;
114     int size = xbt_dynar_length(reqs);
115     unsigned int cpt=0;
116     MPI_Request req;
117     // start all requests that have been prepared by another process
118     xbt_dynar_foreach(reqs, cpt, req){
119       if (req->flags & PREPARED) smpi_mpi_start(req);
120     }
121
122     MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqs));
123     win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
124     smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
125     xbt_free(treqs);
126
127   }
128   win->assert = assert;
129
130   xbt_barrier_wait(win->bar);
131   XBT_DEBUG("Leaving fence ");
132
133   return MPI_SUCCESS;
134 }
135
136 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
137               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
138 {
139   if(!win->opened)//check that post/start has been done
140     return MPI_ERR_WIN;
141   //get receiver pointer
142   MPI_Win recv_win = win->connected_wins[target_rank];
143
144   void* recv_addr = (void*) ( ((char*)recv_win->base) + target_disp * recv_win->disp_unit);
145   smpi_datatype_use(origin_datatype);
146   smpi_datatype_use(target_datatype);
147   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
148
149   if(target_rank != smpi_comm_rank(win->comm)){
150     //prepare send_request
151     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
152         smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, win->comm, MPI_OP_NULL);
153
154     //prepare receiver request
155     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
156         smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
157
158     //push request to receiver's win
159     xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq);
160
161     //start send
162     smpi_mpi_start(sreq);
163
164     //push request to sender's win
165     xbt_dynar_push_as(win->requests, MPI_Request, sreq);
166   }else{
167     smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
168   }
169
170   return MPI_SUCCESS;
171 }
172
173 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
174               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
175 {
176   if(!win->opened)//check that post/start has been done
177     return MPI_ERR_WIN;
178   //get sender pointer
179   MPI_Win send_win = win->connected_wins[target_rank];
180
181   void* send_addr = (void*)( ((char*)send_win->base) + target_disp * send_win->disp_unit);
182   smpi_datatype_use(origin_datatype);
183   smpi_datatype_use(target_datatype);
184   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
185
186   if(target_rank != smpi_comm_rank(win->comm)){
187     //prepare send_request
188     MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
189         smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, send_win->comm,
190         MPI_OP_NULL);
191
192     //prepare receiver request
193     MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
194         smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, win->comm,
195         MPI_OP_NULL);
196
197     //start the send, with another process than us as sender. 
198     smpi_mpi_start(sreq);
199
200     //push request to receiver's win
201     xbt_dynar_push_as(send_win->requests, MPI_Request, sreq);
202
203     //start recv
204     smpi_mpi_start(rreq);
205
206     //push request to sender's win
207     xbt_dynar_push_as(win->requests, MPI_Request, rreq);
208   }else{
209     smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
210   }
211
212   return MPI_SUCCESS;
213 }
214
215
216 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
217               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
218 {
219   if(!win->opened)//check that post/start has been done
220     return MPI_ERR_WIN;
221   //FIXME: local version 
222   //get receiver pointer
223   MPI_Win recv_win = win->connected_wins[target_rank];
224
225   void* recv_addr = (void*)( ((char*)recv_win->base) + target_disp * recv_win->disp_unit);
226   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
227
228   smpi_datatype_use(origin_datatype);
229   smpi_datatype_use(target_datatype);
230
231     //prepare send_request
232     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
233         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, win->comm, op);
234
235     //prepare receiver request
236     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
237         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, recv_win->comm, op);
238     //push request to receiver's win
239     xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq);
240     //start send
241     smpi_mpi_start(sreq);
242
243     //push request to sender's win
244     xbt_dynar_push_as(win->requests, MPI_Request, sreq);
245
246   return MPI_SUCCESS;
247 }
248
249 int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
250     /* From MPI forum advices
251     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
252     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
253     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
254     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
255     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
256     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
257     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
258     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
259     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
260     must complete, without further dependencies.  */
261
262   //naive, blocking implementation.
263   int i=0,j=0;
264   int size = smpi_group_size(group);
265   MPI_Request* reqs = xbt_new0(MPI_Request, size);
266
267 //  for(i=0;i<size;i++){
268   while(j!=size){
269     int src=smpi_group_index(group,j);
270     if(src!=smpi_process_index()){
271       reqs[i]=smpi_irecv_init(NULL, 0, MPI_CHAR, src,RMA_TAG+4, MPI_COMM_WORLD);
272       i++;
273     }
274     j++;
275   }
276   size=i;
277   smpi_mpi_startall(size, reqs);
278   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
279   for(i=0;i<size;i++){
280     smpi_mpi_request_free(&reqs[i]);
281   }
282   xbt_free(reqs);
283   win->opened++; //we're open for business !
284   win->group=group;
285   smpi_group_use(group);
286   return MPI_SUCCESS;
287 }
288
289 int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
290   //let's make a synchronous send here
291   int i=0,j=0;
292   int size = smpi_group_size(group);
293   MPI_Request* reqs = xbt_new0(MPI_Request, size);
294
295   while(j!=size){
296     int dst=smpi_group_index(group,j);
297     if(dst!=smpi_process_index()){
298       reqs[i]=smpi_mpi_send_init(NULL, 0, MPI_CHAR, dst, RMA_TAG+4, MPI_COMM_WORLD);
299       i++;
300     }
301     j++;
302   }
303   size=i;
304
305   smpi_mpi_startall(size, reqs);
306   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
307   for(i=0;i<size;i++){
308     smpi_mpi_request_free(&reqs[i]);
309   }
310   xbt_free(reqs);
311   win->opened++; //we're open for business !
312   win->group=group;
313   smpi_group_use(group);
314   return MPI_SUCCESS;
315 }
316
317 int smpi_mpi_win_complete(MPI_Win win){
318   if(win->opened==0)
319     xbt_die("Complete called on already opened MPI_Win");
320 //  xbt_barrier_wait(win->bar);
321   //MPI_Comm comm = smpi_comm_new(win->group, NULL);
322   //mpi_coll_barrier_fun(comm);
323   //smpi_comm_destroy(comm);
324
325   XBT_DEBUG("Entering MPI_Win_Complete");
326   int i=0,j=0;
327   int size = smpi_group_size(win->group);
328   MPI_Request* reqs = xbt_new0(MPI_Request, size);
329
330   while(j!=size){
331     int dst=smpi_group_index(win->group,j);
332     if(dst!=smpi_process_index()){
333       reqs[i]=smpi_mpi_send_init(NULL, 0, MPI_CHAR, dst, RMA_TAG+5, MPI_COMM_WORLD);
334       i++;
335     }
336     j++;
337   }
338   size=i;
339   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
340   smpi_mpi_startall(size, reqs);
341   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
342
343   for(i=0;i<size;i++){
344     smpi_mpi_request_free(&reqs[i]);
345   }
346   xbt_free(reqs);
347
348   //now we can finish RMA calls
349
350   xbt_dynar_t reqqs = win->requests;
351   size = xbt_dynar_length(reqqs);
352
353   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
354   unsigned int cpt=0;
355   MPI_Request req;
356   // start all requests that have been prepared by another process
357   xbt_dynar_foreach(reqqs, cpt, req){
358     if (req->flags & PREPARED) smpi_mpi_start(req);
359   }
360
361   MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqqs));
362   win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
363   smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
364   xbt_free(treqs);
365   win->opened--; //we're closed for business !
366   return MPI_SUCCESS;
367 }
368
369 int smpi_mpi_win_wait(MPI_Win win){
370 //  xbt_barrier_wait(win->bar);
371   //MPI_Comm comm = smpi_comm_new(win->group, NULL);
372   //mpi_coll_barrier_fun(comm);
373   //smpi_comm_destroy(comm);
374   //naive, blocking implementation.
375   XBT_DEBUG("Entering MPI_Win_Wait");
376   int i=0,j=0;
377   int size = smpi_group_size(win->group);
378   MPI_Request* reqs = xbt_new0(MPI_Request, size);
379
380 //  for(i=0;i<size;i++){
381   while(j!=size){
382     int src=smpi_group_index(win->group,j);
383     if(src!=smpi_process_index()){
384       reqs[i]=smpi_irecv_init(NULL, 0, MPI_CHAR, src,RMA_TAG+5, MPI_COMM_WORLD);
385       i++;
386     }
387     j++;
388   }
389   size=i;
390   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
391   smpi_mpi_startall(size, reqs);
392   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
393   for(i=0;i<size;i++){
394     smpi_mpi_request_free(&reqs[i]);
395   }
396   xbt_free(reqs);
397
398   xbt_dynar_t reqqs = win->requests;
399   size = xbt_dynar_length(reqqs);
400
401   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
402
403   unsigned int cpt=0;
404   MPI_Request req;
405   // start all requests that have been prepared by another process
406   xbt_dynar_foreach(reqqs, cpt, req){
407     if (req->flags & PREPARED) smpi_mpi_start(req);
408   }
409
410   MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqqs));
411   win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
412   smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
413   xbt_free(treqs);
414   win->opened--; //we're opened for business !
415   return MPI_SUCCESS;
416 }