Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Continuing work on datatypes
[simgrid.git] / src / smpi / smpi_win.cpp
1 /* Copyright (c) 2007-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <vector>
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 namespace simgrid{
13 namespace smpi{
14
15 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm){
16   int comm_size = comm->size();
17   int rank      = comm->rank();
18   XBT_DEBUG("Creating window");
19   if(info!=MPI_INFO_NULL)
20     info->refcount++;
21   name_ = nullptr;
22   opened_ = 0;
23   group_ = MPI_GROUP_NULL;
24   requests_ = new std::vector<MPI_Request>();
25   mut_=xbt_mutex_init();
26   connected_wins_ = new MPI_Win[comm_size];
27   connected_wins_[rank] = this;
28   count_ = 0;
29   if(rank==0){
30     bar_ = MSG_barrier_init(comm_size);
31   }
32   mpi_coll_allgather_fun(&(connected_wins_[rank]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
33                          MPI_BYTE, comm);
34
35   mpi_coll_bcast_fun(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
36
37   mpi_coll_barrier_fun(comm);
38 }
39
40 Win::~Win(){
41   //As per the standard, perform a barrier to ensure every async comm is finished
42   MSG_barrier_wait(bar_);
43   xbt_mutex_acquire(mut_);
44   delete requests_;
45   xbt_mutex_release(mut_);
46   delete[] connected_wins_;
47   if (name_ != nullptr){
48     xbt_free(name_);
49   }
50   if(info_!=MPI_INFO_NULL){
51     MPI_Info_free(&info_);
52   }
53
54   mpi_coll_barrier_fun(comm_);
55   int rank=comm_->rank();
56   if(rank == 0)
57     MSG_barrier_destroy(bar_);
58   xbt_mutex_destroy(mut_);
59 }
60
61 void Win::get_name(char* name, int* length){
62   if(name_==nullptr){
63     *length=0;
64     name=nullptr;
65     return;
66   }
67   *length = strlen(name_);
68   strncpy(name, name_, *length+1);
69 }
70
71 void Win::get_group(MPI_Group* group){
72   if(comm_ != MPI_COMM_NULL){
73     *group = comm_->group();
74   } else {
75     *group = MPI_GROUP_NULL;
76   }
77 }
78
79 void Win::set_name(char* name){
80   name_ = xbt_strdup(name);
81 }
82
83 int Win::fence(int assert)
84 {
85   XBT_DEBUG("Entering fence");
86   if (opened_ == 0)
87     opened_=1;
88   if (assert != MPI_MODE_NOPRECEDE) {
89     // This is not the first fence => finalize what came before
90     MSG_barrier_wait(bar_);
91     xbt_mutex_acquire(mut_);
92     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
93     // Without this, the vector could get redimensionned when another process pushes.
94     // This would result in the array used by Request::waitall() to be invalidated.
95     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
96     std::vector<MPI_Request> *reqs = requests_;
97     int size = static_cast<int>(reqs->size());
98     // start all requests that have been prepared by another process
99     if (size > 0) {
100       for (const auto& req : *reqs) {
101         if (req && (req->flags() & PREPARED))
102           req->start();
103       }
104
105       MPI_Request* treqs = &(*reqs)[0];
106
107       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
108     }
109     count_=0;
110     xbt_mutex_release(mut_);
111   }
112   assert_ = assert;
113
114   MSG_barrier_wait(bar_);
115   XBT_DEBUG("Leaving fence");
116
117   return MPI_SUCCESS;
118 }
119
120 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
121               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
122 {
123   if(opened_==0)//check that post/start has been done
124     return MPI_ERR_WIN;
125   //get receiver pointer
126   MPI_Win recv_win = connected_wins_[target_rank];
127
128   if(target_count*target_datatype->get_extent()>recv_win->size_)
129     return MPI_ERR_ARG;
130
131   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
132   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
133
134   if(target_rank != comm_->rank()){
135     //prepare send_request
136     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
137         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
138
139     //prepare receiver request
140     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
141         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
142
143     //push request to receiver's win
144     xbt_mutex_acquire(recv_win->mut_);
145     recv_win->requests_->push_back(rreq);
146     xbt_mutex_release(recv_win->mut_);
147     //start send
148     sreq->start();
149
150     //push request to sender's win
151     xbt_mutex_acquire(mut_);
152     requests_->push_back(sreq);
153     xbt_mutex_release(mut_);
154   }else{
155     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
156   }
157
158   return MPI_SUCCESS;
159 }
160
161 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
162               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
163 {
164   if(opened_==0)//check that post/start has been done
165     return MPI_ERR_WIN;
166   //get sender pointer
167   MPI_Win send_win = connected_wins_[target_rank];
168
169   if(target_count*target_datatype->get_extent()>send_win->size_)
170     return MPI_ERR_ARG;
171
172   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
173   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
174
175   if(target_rank != comm_->rank()){
176     //prepare send_request
177     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
178         comm_->group()->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, send_win->comm_,
179         MPI_OP_NULL);
180
181     //prepare receiver request
182     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
183         comm_->group()->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, comm_,
184         MPI_OP_NULL);
185
186     //start the send, with another process than us as sender. 
187     sreq->start();
188     //push request to receiver's win
189     xbt_mutex_acquire(send_win->mut_);
190     send_win->requests_->push_back(sreq);
191     xbt_mutex_release(send_win->mut_);
192
193     //start recv
194     rreq->start();
195     //push request to sender's win
196     xbt_mutex_acquire(mut_);
197     requests_->push_back(rreq);
198     xbt_mutex_release(mut_);
199   }else{
200     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
201   }
202
203   return MPI_SUCCESS;
204 }
205
206
207 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
208               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op)
209 {
210   if(opened_==0)//check that post/start has been done
211     return MPI_ERR_WIN;
212   //FIXME: local version 
213   //get receiver pointer
214   MPI_Win recv_win = connected_wins_[target_rank];
215
216   if(target_count*target_datatype->get_extent()>recv_win->size_)
217     return MPI_ERR_ARG;
218
219   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
220   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
221     //As the tag will be used for ordering of the operations, add count to it
222     //prepare send_request
223     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
224         smpi_process_index(), comm_->group()->index(target_rank), SMPI_RMA_TAG+3+count_, comm_, op);
225
226     //prepare receiver request
227     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
228         smpi_process_index(), comm_->group()->index(target_rank), SMPI_RMA_TAG+3+count_, recv_win->comm_, op);
229
230     count_++;
231     //push request to receiver's win
232     xbt_mutex_acquire(recv_win->mut_);
233     recv_win->requests_->push_back(rreq);
234     xbt_mutex_release(recv_win->mut_);
235     //start send
236     sreq->start();
237
238     //push request to sender's win
239     xbt_mutex_acquire(mut_);
240     requests_->push_back(sreq);
241     xbt_mutex_release(mut_);
242
243   return MPI_SUCCESS;
244 }
245
246 int Win::start(MPI_Group group, int assert){
247     /* From MPI forum advices
248     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
249     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
250     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
251     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
252     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
253     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
254     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
255     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
256     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
257     must complete, without further dependencies.  */
258
259   //naive, blocking implementation.
260     int i             = 0;
261     int j             = 0;
262     int size          = group->size();
263     MPI_Request* reqs = xbt_new0(MPI_Request, size);
264
265     while (j != size) {
266       int src = group->index(j);
267       if (src != smpi_process_index() && src != MPI_UNDEFINED) {
268         reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
269         i++;
270       }
271       j++;
272   }
273   size=i;
274   Request::startall(size, reqs);
275   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
276   for(i=0;i<size;i++){
277     Request::unuse(&reqs[i]);
278   }
279   xbt_free(reqs);
280   opened_++; //we're open for business !
281   group_=group;
282   group->use();
283   return MPI_SUCCESS;
284 }
285
286 int Win::post(MPI_Group group, int assert){
287   //let's make a synchronous send here
288   int i             = 0;
289   int j             = 0;
290   int size = group->size();
291   MPI_Request* reqs = xbt_new0(MPI_Request, size);
292
293   while(j!=size){
294     int dst=group->index(j);
295     if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
296       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
297       i++;
298     }
299     j++;
300   }
301   size=i;
302
303   Request::startall(size, reqs);
304   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
305   for(i=0;i<size;i++){
306     Request::unuse(&reqs[i]);
307   }
308   xbt_free(reqs);
309   opened_++; //we're open for business !
310   group_=group;
311   group->use();
312   return MPI_SUCCESS;
313 }
314
315 int Win::complete(){
316   if(opened_==0)
317     xbt_die("Complete called on already opened MPI_Win");
318
319   XBT_DEBUG("Entering MPI_Win_Complete");
320   int i             = 0;
321   int j             = 0;
322   int size = group_->size();
323   MPI_Request* reqs = xbt_new0(MPI_Request, size);
324
325   while(j!=size){
326     int dst=group_->index(j);
327     if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
328       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
329       i++;
330     }
331     j++;
332   }
333   size=i;
334   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
335   Request::startall(size, reqs);
336   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
337
338   for(i=0;i<size;i++){
339     Request::unuse(&reqs[i]);
340   }
341   xbt_free(reqs);
342
343   //now we can finish RMA calls
344   xbt_mutex_acquire(mut_);
345   std::vector<MPI_Request> *reqqs = requests_;
346   size = static_cast<int>(reqqs->size());
347
348   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
349   if (size > 0) {
350     // start all requests that have been prepared by another process
351     for (const auto& req : *reqqs) {
352       if (req && (req->flags() & PREPARED))
353         req->start();
354     }
355
356     MPI_Request* treqs = &(*reqqs)[0];
357     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
358     reqqs->clear();
359   }
360   xbt_mutex_release(mut_);
361
362   group_->unuse();
363   opened_--; //we're closed for business !
364   return MPI_SUCCESS;
365 }
366
367 int Win::wait(){
368   //naive, blocking implementation.
369   XBT_DEBUG("Entering MPI_Win_Wait");
370   int i=0,j=0;
371   int size = group_->size();
372   MPI_Request* reqs = xbt_new0(MPI_Request, size);
373
374   while(j!=size){
375     int src=group_->index(j);
376     if(src!=smpi_process_index() && src!=MPI_UNDEFINED){
377       reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
378       i++;
379     }
380     j++;
381   }
382   size=i;
383   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
384   Request::startall(size, reqs);
385   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
386   for(i=0;i<size;i++){
387     Request::unuse(&reqs[i]);
388   }
389   xbt_free(reqs);
390   xbt_mutex_acquire(mut_);
391   std::vector<MPI_Request> *reqqs = requests_;
392   size = static_cast<int>(reqqs->size());
393
394   XBT_DEBUG("Win_wait - Finishing %d RMA calls", size);
395   if (size > 0) {
396     // start all requests that have been prepared by another process
397     for (const auto& req : *reqqs) {
398       if (req && (req->flags() & PREPARED))
399         req->start();
400     }
401
402     MPI_Request* treqs = &(*reqqs)[0];
403     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
404     reqqs->clear();
405   }
406   xbt_mutex_release(mut_);
407
408   group_->unuse();
409   opened_--; //we're opened for business !
410   return MPI_SUCCESS;
411 }
412
413 }
414 }