Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
please sonar by removing useless asignments and return statements
[simgrid.git] / src / smpi / smpi_rma.cpp
1 /* Copyright (c) 2007-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <vector>
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 msg_bar_t creation_bar = nullptr;
13
14 typedef struct s_smpi_mpi_win{
15   void* base;
16   MPI_Aint size;
17   int disp_unit;
18   MPI_Comm comm;
19   MPI_Info info;
20   int assert;
21   std::vector<MPI_Request> *requests;
22   msg_bar_t bar;
23   MPI_Win* connected_wins;
24   char* name;
25   int opened;
26   MPI_Group group;
27 } s_smpi_mpi_win_t;
28
29
30 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
31   MPI_Win win;
32
33   int comm_size = smpi_comm_size(comm);
34   int rank=smpi_comm_rank(comm);
35   XBT_DEBUG("Creating window");
36
37   win = xbt_new(s_smpi_mpi_win_t, 1);
38   win->base = base;
39   win->size = size;
40   win->disp_unit = disp_unit;
41   win->assert = 0;
42   win->info = info;
43   if(info!=MPI_INFO_NULL)
44     info->refcount++;
45   win->comm = comm;
46   win->name = nullptr;
47   win->opened = 0;
48   win->group = MPI_GROUP_NULL;
49   win->requests = new std::vector<MPI_Request>();
50   win->connected_wins = xbt_new0(MPI_Win, comm_size);
51   win->connected_wins[rank] = win;
52
53   if(rank==0){
54     win->bar = MSG_barrier_init(comm_size);
55   }
56   mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win),
57                          MPI_BYTE, comm);
58
59   mpi_coll_bcast_fun(&(win->bar), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
60
61   mpi_coll_barrier_fun(comm);
62
63   return win;
64 }
65
66 int smpi_mpi_win_free( MPI_Win* win){
67   //As per the standard, perform a barrier to ensure every async comm is finished
68   MSG_barrier_wait((*win)->bar);
69   delete (*win)->requests;
70   xbt_free((*win)->connected_wins);
71   if ((*win)->name != nullptr){
72     xbt_free((*win)->name);
73   }
74   if((*win)->info!=MPI_INFO_NULL){
75     MPI_Info_free(&(*win)->info);
76   }
77
78   mpi_coll_barrier_fun((*win)->comm);
79   int rank=smpi_comm_rank((*win)->comm);
80   if(rank == 0)
81     MSG_barrier_destroy((*win)->bar);
82   xbt_free(*win);
83   *win = MPI_WIN_NULL;
84   return MPI_SUCCESS;
85 }
86
87 void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
88   if(win->name==nullptr){
89     *length=0;
90     name=nullptr;
91     return;
92   }
93   *length = strlen(win->name);
94   strncpy(name, win->name, *length+1);
95 }
96
97 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
98   if(win->comm != MPI_COMM_NULL){
99     *group = smpi_comm_group(win->comm);
100   }
101 }
102
103 void smpi_mpi_win_set_name(MPI_Win win, char* name){
104   win->name = xbt_strdup(name);
105 }
106
107 int smpi_mpi_win_fence( int assert,  MPI_Win win){
108   XBT_DEBUG("Entering fence");
109   if(win->opened==0)
110     win->opened=1;
111   if(assert != MPI_MODE_NOPRECEDE){
112     MSG_barrier_wait(win->bar);
113
114     std::vector<MPI_Request> *reqs = win->requests;
115     int size = static_cast<int>(reqs->size());
116     // start all requests that have been prepared by another process
117     for(auto req: *reqs){
118       if (req && (req->flags & PREPARED))
119         smpi_mpi_start(req);
120     }
121
122     MPI_Request* treqs = &(*reqs)[0];
123     smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
124   }
125   win->assert = assert;
126
127   MSG_barrier_wait(win->bar);
128   XBT_DEBUG("Leaving fence ");
129
130   return MPI_SUCCESS;
131 }
132
133 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
134               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
135 {
136   if(win->opened==0)//check that post/start has been done
137     return MPI_ERR_WIN;
138   //get receiver pointer
139   MPI_Win recv_win = win->connected_wins[target_rank];
140
141   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
142   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
143
144   if(target_rank != smpi_comm_rank(win->comm)){
145     //prepare send_request
146     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
147         smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, win->comm, MPI_OP_NULL);
148
149     //prepare receiver request
150     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
151         smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
152
153     //push request to receiver's win
154     recv_win->requests->push_back(rreq);
155
156     //start send
157     smpi_mpi_start(sreq);
158
159     //push request to sender's win
160     win->requests->push_back(sreq);
161   }else{
162     smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
163   }
164
165   return MPI_SUCCESS;
166 }
167
168 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
169               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
170 {
171   if(win->opened==0)//check that post/start has been done
172     return MPI_ERR_WIN;
173   //get sender pointer
174   MPI_Win send_win = win->connected_wins[target_rank];
175
176   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base) + target_disp * send_win->disp_unit);
177   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
178
179   if(target_rank != smpi_comm_rank(win->comm)){
180     //prepare send_request
181     MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
182         smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), SMPI_RMA_TAG+2, send_win->comm,
183         MPI_OP_NULL);
184
185     //prepare receiver request
186     MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
187         smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), SMPI_RMA_TAG+2, win->comm,
188         MPI_OP_NULL);
189
190     //start the send, with another process than us as sender. 
191     smpi_mpi_start(sreq);
192
193     //push request to receiver's win
194     send_win->requests->push_back(sreq);
195
196     //start recv
197     smpi_mpi_start(rreq);
198
199     //push request to sender's win
200     win->requests->push_back(rreq);
201   }else{
202     smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
203   }
204
205   return MPI_SUCCESS;
206 }
207
208
209 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
210               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
211 {
212   if(win->opened==0)//check that post/start has been done
213     return MPI_ERR_WIN;
214   //FIXME: local version 
215   //get receiver pointer
216   MPI_Win recv_win = win->connected_wins[target_rank];
217
218   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
219   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
220
221     //prepare send_request
222     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
223         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3, win->comm, op);
224
225     //prepare receiver request
226     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
227         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3, recv_win->comm, op);
228     //push request to receiver's win
229     recv_win->requests->push_back(rreq);
230     //start send
231     smpi_mpi_start(sreq);
232
233     //push request to sender's win
234     win->requests->push_back(sreq);
235
236   return MPI_SUCCESS;
237 }
238
239 int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
240     /* From MPI forum advices
241     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
242     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
243     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
244     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
245     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
246     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
247     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
248     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
249     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
250     must complete, without further dependencies.  */
251
252   //naive, blocking implementation.
253   int i=0,j=0;
254   int size = smpi_group_size(group);
255   MPI_Request* reqs = xbt_new0(MPI_Request, size);
256
257   while(j!=size){
258     int src=smpi_group_index(group,j);
259     if(src!=smpi_process_index()){
260       reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+4, MPI_COMM_WORLD);
261       i++;
262     }
263     j++;
264   }
265   size=i;
266   smpi_mpi_startall(size, reqs);
267   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
268   for(i=0;i<size;i++){
269     smpi_mpi_request_free(&reqs[i]);
270   }
271   xbt_free(reqs);
272   win->opened++; //we're open for business !
273   win->group=group;
274   smpi_group_use(group);
275   return MPI_SUCCESS;
276 }
277
278 int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
279   //let's make a synchronous send here
280   int i=0,j=0;
281   int size = smpi_group_size(group);
282   MPI_Request* reqs = xbt_new0(MPI_Request, size);
283
284   while(j!=size){
285     int dst=smpi_group_index(group,j);
286     if(dst!=smpi_process_index()){
287       reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
288       i++;
289     }
290     j++;
291   }
292   size=i;
293
294   smpi_mpi_startall(size, reqs);
295   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
296   for(i=0;i<size;i++){
297     smpi_mpi_request_free(&reqs[i]);
298   }
299   xbt_free(reqs);
300   win->opened++; //we're open for business !
301   win->group=group;
302   smpi_group_use(group);
303   return MPI_SUCCESS;
304 }
305
306 int smpi_mpi_win_complete(MPI_Win win){
307   if(win->opened==0)
308     xbt_die("Complete called on already opened MPI_Win");
309
310   XBT_DEBUG("Entering MPI_Win_Complete");
311   int i=0,j=0;
312   int size = smpi_group_size(win->group);
313   MPI_Request* reqs = xbt_new0(MPI_Request, size);
314
315   while(j!=size){
316     int dst=smpi_group_index(win->group,j);
317     if(dst!=smpi_process_index()){
318       reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
319       i++;
320     }
321     j++;
322   }
323   size=i;
324   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
325   smpi_mpi_startall(size, reqs);
326   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
327
328   for(i=0;i<size;i++){
329     smpi_mpi_request_free(&reqs[i]);
330   }
331   xbt_free(reqs);
332
333   //now we can finish RMA calls
334
335   std::vector<MPI_Request> *reqqs = win->requests;
336   size = static_cast<int>(reqqs->size());
337
338   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
339   // start all requests that have been prepared by another process
340   for (auto req: *reqqs){
341     if (req && (req->flags & PREPARED))
342       smpi_mpi_start(req);
343   }
344
345   MPI_Request* treqs = &(*reqqs)[0];
346   smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
347   reqqs->clear();
348   smpi_group_unuse(win->group);
349   win->opened--; //we're closed for business !
350   return MPI_SUCCESS;
351 }
352
353 int smpi_mpi_win_wait(MPI_Win win){
354   //naive, blocking implementation.
355   XBT_DEBUG("Entering MPI_Win_Wait");
356   int i=0,j=0;
357   int size = smpi_group_size(win->group);
358   MPI_Request* reqs = xbt_new0(MPI_Request, size);
359
360   while(j!=size){
361     int src=smpi_group_index(win->group,j);
362     if(src!=smpi_process_index()){
363       reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
364       i++;
365     }
366     j++;
367   }
368   size=i;
369   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
370   smpi_mpi_startall(size, reqs);
371   smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
372   for(i=0;i<size;i++){
373     smpi_mpi_request_free(&reqs[i]);
374   }
375   xbt_free(reqs);
376
377   std::vector<MPI_Request> *reqqs = win->requests;
378   size = static_cast<int>(reqqs->size());
379
380   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
381
382   // start all requests that have been prepared by another process
383   for(auto req: *reqqs){
384     if (req && (req->flags & PREPARED))
385       smpi_mpi_start(req);
386   }
387
388   MPI_Request* treqs = &(*reqqs)[0];
389   smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
390   reqqs->clear();
391   smpi_group_unuse(win->group);
392   win->opened--; //we're opened for business !
393   return MPI_SUCCESS;
394 }