Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
welcome simgrid::smpi::Request
[simgrid.git] / src / smpi / smpi_win.cpp
1 /* Copyright (c) 2007-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <vector>
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 namespace simgrid{
13 namespace smpi{
14
15 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm){
16   int comm_size = comm->size();
17   int rank      = comm->rank();
18   XBT_DEBUG("Creating window");
19   if(info!=MPI_INFO_NULL)
20     info->refcount++;
21   name_ = nullptr;
22   opened_ = 0;
23   group_ = MPI_GROUP_NULL;
24   requests_ = new std::vector<MPI_Request>();
25   mut_=xbt_mutex_init();
26   connected_wins_ = new MPI_Win[comm_size];
27   connected_wins_[rank] = this;
28   count_ = 0;
29   if(rank==0){
30     bar_ = MSG_barrier_init(comm_size);
31   }
32   mpi_coll_allgather_fun(&(connected_wins_[rank]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
33                          MPI_BYTE, comm);
34
35   mpi_coll_bcast_fun(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
36
37   mpi_coll_barrier_fun(comm);
38 }
39
40 Win::~Win(){
41   //As per the standard, perform a barrier to ensure every async comm is finished
42   MSG_barrier_wait(bar_);
43   xbt_mutex_acquire(mut_);
44   delete requests_;
45   xbt_mutex_release(mut_);
46   delete[] connected_wins_;
47   if (name_ != nullptr){
48     xbt_free(name_);
49   }
50   if(info_!=MPI_INFO_NULL){
51     MPI_Info_free(&info_);
52   }
53
54   mpi_coll_barrier_fun(comm_);
55   int rank=comm_->rank();
56   if(rank == 0)
57     MSG_barrier_destroy(bar_);
58   xbt_mutex_destroy(mut_);
59 }
60
61 void Win::get_name(char* name, int* length){
62   if(name_==nullptr){
63     *length=0;
64     name=nullptr;
65     return;
66   }
67   *length = strlen(name_);
68   strncpy(name, name_, *length+1);
69 }
70
71 void Win::get_group(MPI_Group* group){
72   if(comm_ != MPI_COMM_NULL){
73     *group = comm_->group();
74   } else {
75     *group = MPI_GROUP_NULL;
76   }
77 }
78
79 void Win::set_name(char* name){
80   name_ = xbt_strdup(name);
81 }
82
83 int Win::fence(int assert)
84 {
85   XBT_DEBUG("Entering fence");
86   if (opened_ == 0)
87     opened_=1;
88   if (assert != MPI_MODE_NOPRECEDE) {
89     // This is not the first fence => finalize what came before
90     MSG_barrier_wait(bar_);
91     xbt_mutex_acquire(mut_);
92     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
93     // Without this, the vector could get redimensionned when another process pushes.
94     // This would result in the array used by Request::waitall() to be invalidated.
95     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
96     std::vector<MPI_Request> *reqs = requests_;
97     int size = static_cast<int>(reqs->size());
98     // start all requests that have been prepared by another process
99     if (size > 0) {
100       for (const auto& req : *reqs) {
101         if (req && (req->flags() & PREPARED))
102           req->start();
103       }
104
105       MPI_Request* treqs = &(*reqs)[0];
106
107       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
108     }
109     count_=0;
110     xbt_mutex_release(mut_);
111   }
112   assert_ = assert;
113
114   MSG_barrier_wait(bar_);
115   XBT_DEBUG("Leaving fence");
116
117   return MPI_SUCCESS;
118 }
119
120 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
121               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
122 {
123   if(opened_==0)//check that post/start has been done
124     return MPI_ERR_WIN;
125   //get receiver pointer
126   MPI_Win recv_win = connected_wins_[target_rank];
127
128   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
129   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
130
131   if(target_rank != comm_->rank()){
132     //prepare send_request
133     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
134         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
135
136     //prepare receiver request
137     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
138         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
139
140     //push request to receiver's win
141     xbt_mutex_acquire(recv_win->mut_);
142     recv_win->requests_->push_back(rreq);
143     xbt_mutex_release(recv_win->mut_);
144     //start send
145     sreq->start();
146
147     //push request to sender's win
148     xbt_mutex_acquire(mut_);
149     requests_->push_back(sreq);
150     xbt_mutex_release(mut_);
151   }else{
152     smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
153   }
154
155   return MPI_SUCCESS;
156 }
157
158 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
159               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
160 {
161   if(opened_==0)//check that post/start has been done
162     return MPI_ERR_WIN;
163   //get sender pointer
164   MPI_Win send_win = connected_wins_[target_rank];
165
166   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
167   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
168
169   if(target_rank != comm_->rank()){
170     //prepare send_request
171     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
172         comm_->group()->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, send_win->comm_,
173         MPI_OP_NULL);
174
175     //prepare receiver request
176     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
177         comm_->group()->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, comm_,
178         MPI_OP_NULL);
179
180     //start the send, with another process than us as sender. 
181     sreq->start();
182     //push request to receiver's win
183     xbt_mutex_acquire(send_win->mut_);
184     send_win->requests_->push_back(sreq);
185     xbt_mutex_release(send_win->mut_);
186
187     //start recv
188     rreq->start();
189     //push request to sender's win
190     xbt_mutex_acquire(mut_);
191     requests_->push_back(rreq);
192     xbt_mutex_release(mut_);
193   }else{
194     smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
195   }
196
197   return MPI_SUCCESS;
198 }
199
200
201 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
202               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op)
203 {
204   if(opened_==0)//check that post/start has been done
205     return MPI_ERR_WIN;
206   //FIXME: local version 
207   //get receiver pointer
208   MPI_Win recv_win = connected_wins_[target_rank];
209
210   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
211   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
212     //As the tag will be used for ordering of the operations, add count to it
213     //prepare send_request
214     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
215         smpi_process_index(), comm_->group()->index(target_rank), SMPI_RMA_TAG+3+count_, comm_, op);
216
217     //prepare receiver request
218     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
219         smpi_process_index(), comm_->group()->index(target_rank), SMPI_RMA_TAG+3+count_, recv_win->comm_, op);
220
221     count_++;
222     //push request to receiver's win
223     xbt_mutex_acquire(recv_win->mut_);
224     recv_win->requests_->push_back(rreq);
225     xbt_mutex_release(recv_win->mut_);
226     //start send
227     sreq->start();
228
229     //push request to sender's win
230     xbt_mutex_acquire(mut_);
231     requests_->push_back(sreq);
232     xbt_mutex_release(mut_);
233
234   return MPI_SUCCESS;
235 }
236
237 int Win::start(MPI_Group group, int assert){
238     /* From MPI forum advices
239     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
240     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
241     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
242     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
243     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
244     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
245     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
246     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
247     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
248     must complete, without further dependencies.  */
249
250   //naive, blocking implementation.
251     int i             = 0;
252     int j             = 0;
253     int size          = group->size();
254     MPI_Request* reqs = xbt_new0(MPI_Request, size);
255
256     while (j != size) {
257       int src = group->index(j);
258       if (src != smpi_process_index() && src != MPI_UNDEFINED) {
259         reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
260         i++;
261       }
262       j++;
263   }
264   size=i;
265   Request::startall(size, reqs);
266   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
267   for(i=0;i<size;i++){
268     Request::unuse(&reqs[i]);
269   }
270   xbt_free(reqs);
271   opened_++; //we're open for business !
272   group_=group;
273   group->use();
274   return MPI_SUCCESS;
275 }
276
277 int Win::post(MPI_Group group, int assert){
278   //let's make a synchronous send here
279   int i             = 0;
280   int j             = 0;
281   int size = group->size();
282   MPI_Request* reqs = xbt_new0(MPI_Request, size);
283
284   while(j!=size){
285     int dst=group->index(j);
286     if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
287       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
288       i++;
289     }
290     j++;
291   }
292   size=i;
293
294   Request::startall(size, reqs);
295   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
296   for(i=0;i<size;i++){
297     Request::unuse(&reqs[i]);
298   }
299   xbt_free(reqs);
300   opened_++; //we're open for business !
301   group_=group;
302   group->use();
303   return MPI_SUCCESS;
304 }
305
306 int Win::complete(){
307   if(opened_==0)
308     xbt_die("Complete called on already opened MPI_Win");
309
310   XBT_DEBUG("Entering MPI_Win_Complete");
311   int i             = 0;
312   int j             = 0;
313   int size = group_->size();
314   MPI_Request* reqs = xbt_new0(MPI_Request, size);
315
316   while(j!=size){
317     int dst=group_->index(j);
318     if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){
319       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
320       i++;
321     }
322     j++;
323   }
324   size=i;
325   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
326   Request::startall(size, reqs);
327   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
328
329   for(i=0;i<size;i++){
330     Request::unuse(&reqs[i]);
331   }
332   xbt_free(reqs);
333
334   //now we can finish RMA calls
335   xbt_mutex_acquire(mut_);
336   std::vector<MPI_Request> *reqqs = requests_;
337   size = static_cast<int>(reqqs->size());
338
339   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
340   if (size > 0) {
341     // start all requests that have been prepared by another process
342     for (const auto& req : *reqqs) {
343       if (req && (req->flags() & PREPARED))
344         req->start();
345     }
346
347     MPI_Request* treqs = &(*reqqs)[0];
348     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
349     reqqs->clear();
350   }
351   xbt_mutex_release(mut_);
352
353   group_->unuse();
354   opened_--; //we're closed for business !
355   return MPI_SUCCESS;
356 }
357
358 int Win::wait(){
359   //naive, blocking implementation.
360   XBT_DEBUG("Entering MPI_Win_Wait");
361   int i=0,j=0;
362   int size = group_->size();
363   MPI_Request* reqs = xbt_new0(MPI_Request, size);
364
365   while(j!=size){
366     int src=group_->index(j);
367     if(src!=smpi_process_index() && src!=MPI_UNDEFINED){
368       reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
369       i++;
370     }
371     j++;
372   }
373   size=i;
374   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
375   Request::startall(size, reqs);
376   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
377   for(i=0;i<size;i++){
378     Request::unuse(&reqs[i]);
379   }
380   xbt_free(reqs);
381   xbt_mutex_acquire(mut_);
382   std::vector<MPI_Request> *reqqs = requests_;
383   size = static_cast<int>(reqqs->size());
384
385   XBT_DEBUG("Win_wait - Finishing %d RMA calls", size);
386   if (size > 0) {
387     // start all requests that have been prepared by another process
388     for (const auto& req : *reqqs) {
389       if (req && (req->flags() & PREPARED))
390         req->start();
391     }
392
393     MPI_Request* treqs = &(*reqqs)[0];
394     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
395     reqqs->clear();
396   }
397   xbt_mutex_release(mut_);
398
399   group_->unuse();
400   opened_--; //we're opened for business !
401   return MPI_SUCCESS;
402 }
403
404 }
405 }