Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
356ab585e069a6d0fbfb30d1aa5b714bcca38feb
[simgrid.git] / src / smpi / smpi_rma.c
1
2 /* Copyright (c) 2007-2014. The SimGrid Team.
3  * All rights reserved.                                                     */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "private.h"
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 #define RMA_TAG -1234
13
14 xbt_bar_t creation_bar = NULL;
15
16 typedef struct s_smpi_mpi_win{
17   void* base;
18   MPI_Aint size;
19   int disp_unit;
20   MPI_Comm comm;
21   MPI_Info info;
22   int assert;
23   xbt_dynar_t requests;
24   xbt_bar_t bar;
25   MPI_Win* connected_wins;
26   char* name;
27 } s_smpi_mpi_win_t;
28
29
30 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
31
32   MPI_Win win;
33   
34   int comm_size = smpi_comm_size(comm);
35   int rank=smpi_comm_rank(comm);
36   XBT_DEBUG("Creating window");
37
38   win = xbt_new(s_smpi_mpi_win_t, 1);
39   win->base = base;
40   win->size = size;
41   win->disp_unit = disp_unit;
42   win->assert = 0;
43   win->info = info;
44   if(info!=MPI_INFO_NULL)
45     info->refcount++;
46   win->comm = comm;
47   win->name = NULL;
48   win->requests = xbt_dynar_new(sizeof(MPI_Request), NULL);
49   win->connected_wins = xbt_malloc0(comm_size*sizeof(MPI_Win));
50   win->connected_wins[rank] = win;
51   
52   if(rank==0){
53     win->bar=xbt_barrier_init(comm_size);
54   }
55   
56   mpi_coll_allgather_fun(&(win->connected_wins[rank]),
57                      sizeof(MPI_Win),
58                      MPI_BYTE,
59                      win->connected_wins,
60                      sizeof(MPI_Win),
61                      MPI_BYTE,
62                      comm);
63                      
64   mpi_coll_bcast_fun( &(win->bar),
65                      sizeof(xbt_bar_t),
66                      MPI_BYTE,
67                      0,
68                      comm);
69                      
70   mpi_coll_barrier_fun(comm);
71   
72   return win;
73 }
74
75 int smpi_mpi_win_free( MPI_Win* win){
76
77   //As per the standard, perform a barrier to ensure every async comm is finished
78   xbt_barrier_wait((*win)->bar);
79   xbt_dynar_free(&(*win)->requests);
80   xbt_free((*win)->connected_wins);
81   if ((*win)->name != NULL){
82     xbt_free((*win)->name);
83   }
84   if((*win)->info!=MPI_INFO_NULL){
85     MPI_Info_free(&(*win)->info);
86   }
87   xbt_free(*win);
88   win = MPI_WIN_NULL;
89   return MPI_SUCCESS;
90 }
91
92 void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
93   if(win->name==NULL){
94     *length=0;
95     name=NULL;
96     return;
97   }
98   *length = strlen(win->name);
99   strcpy(name, win->name);
100 }
101
102 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
103   if(win->comm != MPI_COMM_NULL)
104     *group = smpi_comm_group(win->comm);
105 }
106
107 void smpi_mpi_win_set_name(MPI_Win win, char* name){
108   win->name = strdup(name);;
109 }
110
111
112 int smpi_mpi_win_fence( int assert,  MPI_Win win){
113
114   XBT_DEBUG("Entering fence");
115
116   if(assert != MPI_MODE_NOPRECEDE){
117     xbt_barrier_wait(win->bar);
118
119     xbt_dynar_t reqs = win->requests;
120     int size = xbt_dynar_length(reqs);
121     unsigned int cpt=0;
122     MPI_Request req;
123     // start all requests that have been prepared by another process
124     xbt_dynar_foreach(reqs, cpt, req){
125       if (req->flags & PREPARED) smpi_mpi_start(req);
126     }
127
128     MPI_Request* treqs = xbt_dynar_to_array(reqs);
129     smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
130     xbt_free(treqs);
131     win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
132
133   }
134   win->assert = assert;
135   
136   xbt_barrier_wait(win->bar);
137   XBT_DEBUG("Leaving fence ");
138
139   return MPI_SUCCESS;
140 }
141
142 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
143               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
144 {
145   //get receiver pointer
146   MPI_Win recv_win = win->connected_wins[target_rank];
147
148   void* recv_addr = (void*) ( ((char*)recv_win->base) + target_disp * recv_win->disp_unit);
149   smpi_datatype_use(origin_datatype);
150   smpi_datatype_use(target_datatype);
151   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
152
153   if(target_rank != smpi_comm_rank(win->comm)){
154     //prepare send_request
155     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
156         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, win->comm, MPI_OP_NULL);
157
158     //prepare receiver request
159     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
160         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
161
162     //push request to receiver's win
163     xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq);
164
165     //start send
166     smpi_mpi_start(sreq);
167
168     //push request to sender's win
169     xbt_dynar_push_as(win->requests, MPI_Request, sreq);
170   }else{
171     smpi_datatype_copy(origin_addr, origin_count, origin_datatype,
172                        recv_addr, target_count, target_datatype);
173   }
174
175   return MPI_SUCCESS;
176 }
177
178 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
179               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
180 {
181   //get sender pointer
182   MPI_Win send_win = win->connected_wins[target_rank];
183
184   void* send_addr = (void*)( ((char*)send_win->base) + target_disp * send_win->disp_unit);
185   smpi_datatype_use(origin_datatype);
186   smpi_datatype_use(target_datatype);
187   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
188
189   if(target_rank != smpi_comm_rank(win->comm)){
190     //prepare send_request
191     MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
192         smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, send_win->comm, MPI_OP_NULL);
193
194     //prepare receiver request
195     MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
196         smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, win->comm, MPI_OP_NULL);
197         
198     //start the send, with another process than us as sender. 
199     smpi_mpi_start(sreq);
200     
201     //push request to receiver's win
202     xbt_dynar_push_as(send_win->requests, MPI_Request, sreq);
203
204     //start recv
205     smpi_mpi_start(rreq);
206
207     //push request to sender's win
208     xbt_dynar_push_as(win->requests, MPI_Request, rreq);
209   }else{
210     smpi_datatype_copy(send_addr, target_count, target_datatype,
211                        origin_addr, origin_count, origin_datatype);
212   }
213
214   return MPI_SUCCESS;
215 }
216
217
218 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
219               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
220 {
221   //FIXME: local version 
222   //get receiver pointer
223   MPI_Win recv_win = win->connected_wins[target_rank];
224
225   void* recv_addr = (void*)( ((char*)recv_win->base) + target_disp * recv_win->disp_unit);
226   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
227
228   smpi_datatype_use(origin_datatype);
229   smpi_datatype_use(target_datatype);
230
231
232     //prepare send_request
233     MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
234         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, win->comm, op);
235
236     //prepare receiver request
237     MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
238         smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, recv_win->comm, op);
239     //push request to receiver's win
240     xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq);
241     //start send
242     smpi_mpi_start(sreq);
243     
244     //push request to sender's win
245     xbt_dynar_push_as(win->requests, MPI_Request, sreq);
246   
247
248
249   return MPI_SUCCESS;
250 }
251