Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Introduce smpi::Process
[simgrid.git] / src / smpi / smpi_win.cpp
1 /* Copyright (c) 2007-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <vector>
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
11
12 namespace simgrid{
13 namespace smpi{
14 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
15 int Win::keyval_id_=0;
16
17 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm){
18   int comm_size = comm->size();
19   int rank      = comm->rank();
20   XBT_DEBUG("Creating window");
21   if(info!=MPI_INFO_NULL)
22     info->ref();
23   name_ = nullptr;
24   opened_ = 0;
25   group_ = MPI_GROUP_NULL;
26   requests_ = new std::vector<MPI_Request>();
27   mut_=xbt_mutex_init();
28   connected_wins_ = new MPI_Win[comm_size];
29   connected_wins_[rank] = this;
30   count_ = 0;
31   if(rank==0){
32     bar_ = MSG_barrier_init(comm_size);
33   }
34   Colls::allgather(&(connected_wins_[rank]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
35                          MPI_BYTE, comm);
36
37   Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
38
39   Colls::barrier(comm);
40 }
41
42 Win::~Win(){
43   //As per the standard, perform a barrier to ensure every async comm is finished
44   MSG_barrier_wait(bar_);
45   xbt_mutex_acquire(mut_);
46   delete requests_;
47   xbt_mutex_release(mut_);
48   delete[] connected_wins_;
49   if (name_ != nullptr){
50     xbt_free(name_);
51   }
52   if(info_!=MPI_INFO_NULL){
53     MPI_Info_free(&info_);
54   }
55
56   Colls::barrier(comm_);
57   int rank=comm_->rank();
58   if(rank == 0)
59     MSG_barrier_destroy(bar_);
60   xbt_mutex_destroy(mut_);
61
62   cleanup_attr<Win>();
63 }
64
65 void Win::get_name(char* name, int* length){
66   if(name_==nullptr){
67     *length=0;
68     name=nullptr;
69     return;
70   }
71   *length = strlen(name_);
72   strncpy(name, name_, *length+1);
73 }
74
75 void Win::get_group(MPI_Group* group){
76   if(comm_ != MPI_COMM_NULL){
77     *group = comm_->group();
78   } else {
79     *group = MPI_GROUP_NULL;
80   }
81 }
82
83 MPI_Aint Win::size(){
84   return size_;
85 }
86
87 void* Win::base(){
88   return base_;
89 }
90
91 int Win::disp_unit(){
92   return disp_unit_;
93 }
94
95
96 void Win::set_name(char* name){
97   name_ = xbt_strdup(name);
98 }
99
100 int Win::fence(int assert)
101 {
102   XBT_DEBUG("Entering fence");
103   if (opened_ == 0)
104     opened_=1;
105   if (assert != MPI_MODE_NOPRECEDE) {
106     // This is not the first fence => finalize what came before
107     MSG_barrier_wait(bar_);
108     xbt_mutex_acquire(mut_);
109     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
110     // Without this, the vector could get redimensionned when another process pushes.
111     // This would result in the array used by Request::waitall() to be invalidated.
112     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
113     std::vector<MPI_Request> *reqs = requests_;
114     int size = static_cast<int>(reqs->size());
115     // start all requests that have been prepared by another process
116     if (size > 0) {
117       for (const auto& req : *reqs) {
118         if (req && (req->flags() & PREPARED))
119           req->start();
120       }
121
122       MPI_Request* treqs = &(*reqs)[0];
123
124       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
125     }
126     count_=0;
127     xbt_mutex_release(mut_);
128   }
129   assert_ = assert;
130
131   MSG_barrier_wait(bar_);
132   XBT_DEBUG("Leaving fence");
133
134   return MPI_SUCCESS;
135 }
136
137 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
138               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
139 {
140   if(opened_==0)//check that post/start has been done
141     return MPI_ERR_WIN;
142   //get receiver pointer
143   MPI_Win recv_win = connected_wins_[target_rank];
144
145   if(target_count*target_datatype->get_extent()>recv_win->size_)
146     return MPI_ERR_ARG;
147
148   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
149   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
150
151   if(target_rank != comm_->rank()){
152     //prepare send_request
153     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
154         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
155
156     //prepare receiver request
157     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
158         comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
159
160     //push request to receiver's win
161     xbt_mutex_acquire(recv_win->mut_);
162     recv_win->requests_->push_back(rreq);
163     xbt_mutex_release(recv_win->mut_);
164     //start send
165     sreq->start();
166
167     //push request to sender's win
168     xbt_mutex_acquire(mut_);
169     requests_->push_back(sreq);
170     xbt_mutex_release(mut_);
171   }else{
172     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
173   }
174
175   return MPI_SUCCESS;
176 }
177
178 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
179               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype)
180 {
181   if(opened_==0)//check that post/start has been done
182     return MPI_ERR_WIN;
183   //get sender pointer
184   MPI_Win send_win = connected_wins_[target_rank];
185
186   if(target_count*target_datatype->get_extent()>send_win->size_)
187     return MPI_ERR_ARG;
188
189   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
190   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
191
192   if(target_rank != comm_->rank()){
193     //prepare send_request
194     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
195         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
196         MPI_OP_NULL);
197
198     //prepare receiver request
199     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
200         comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
201         MPI_OP_NULL);
202
203     //start the send, with another process than us as sender. 
204     sreq->start();
205     //push request to receiver's win
206     xbt_mutex_acquire(send_win->mut_);
207     send_win->requests_->push_back(sreq);
208     xbt_mutex_release(send_win->mut_);
209
210     //start recv
211     rreq->start();
212     //push request to sender's win
213     xbt_mutex_acquire(mut_);
214     requests_->push_back(rreq);
215     xbt_mutex_release(mut_);
216   }else{
217     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
218   }
219
220   return MPI_SUCCESS;
221 }
222
223
224 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
225               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op)
226 {
227   if(opened_==0)//check that post/start has been done
228     return MPI_ERR_WIN;
229   //FIXME: local version 
230   //get receiver pointer
231   MPI_Win recv_win = connected_wins_[target_rank];
232
233   if(target_count*target_datatype->get_extent()>recv_win->size_)
234     return MPI_ERR_ARG;
235
236   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
237   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
238     //As the tag will be used for ordering of the operations, add count to it
239     //prepare send_request
240     MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
241         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG+3+count_, comm_, op);
242
243     //prepare receiver request
244     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
245         smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG+3+count_, recv_win->comm_, op);
246
247     count_++;
248     //push request to receiver's win
249     xbt_mutex_acquire(recv_win->mut_);
250     recv_win->requests_->push_back(rreq);
251     xbt_mutex_release(recv_win->mut_);
252     //start send
253     sreq->start();
254
255     //push request to sender's win
256     xbt_mutex_acquire(mut_);
257     requests_->push_back(sreq);
258     xbt_mutex_release(mut_);
259
260   return MPI_SUCCESS;
261 }
262
263 int Win::start(MPI_Group group, int assert){
264     /* From MPI forum advices
265     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
266     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
267     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
268     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
269     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
270     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
271     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
272     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
273     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
274     must complete, without further dependencies.  */
275
276   //naive, blocking implementation.
277     int i             = 0;
278     int j             = 0;
279     int size          = group->size();
280     MPI_Request* reqs = xbt_new0(MPI_Request, size);
281
282     while (j != size) {
283       int src = group->index(j);
284       if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
285         reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
286         i++;
287       }
288       j++;
289   }
290   size=i;
291   Request::startall(size, reqs);
292   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
293   for(i=0;i<size;i++){
294     Request::unref(&reqs[i]);
295   }
296   xbt_free(reqs);
297   opened_++; //we're open for business !
298   group_=group;
299   group->ref();
300   return MPI_SUCCESS;
301 }
302
303 int Win::post(MPI_Group group, int assert){
304   //let's make a synchronous send here
305   int i             = 0;
306   int j             = 0;
307   int size = group->size();
308   MPI_Request* reqs = xbt_new0(MPI_Request, size);
309
310   while(j!=size){
311     int dst=group->index(j);
312     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
313       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
314       i++;
315     }
316     j++;
317   }
318   size=i;
319
320   Request::startall(size, reqs);
321   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
322   for(i=0;i<size;i++){
323     Request::unref(&reqs[i]);
324   }
325   xbt_free(reqs);
326   opened_++; //we're open for business !
327   group_=group;
328   group->ref();
329   return MPI_SUCCESS;
330 }
331
332 int Win::complete(){
333   if(opened_==0)
334     xbt_die("Complete called on already opened MPI_Win");
335
336   XBT_DEBUG("Entering MPI_Win_Complete");
337   int i             = 0;
338   int j             = 0;
339   int size = group_->size();
340   MPI_Request* reqs = xbt_new0(MPI_Request, size);
341
342   while(j!=size){
343     int dst=group_->index(j);
344     if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
345       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
346       i++;
347     }
348     j++;
349   }
350   size=i;
351   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
352   Request::startall(size, reqs);
353   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
354
355   for(i=0;i<size;i++){
356     Request::unref(&reqs[i]);
357   }
358   xbt_free(reqs);
359
360   //now we can finish RMA calls
361   xbt_mutex_acquire(mut_);
362   std::vector<MPI_Request> *reqqs = requests_;
363   size = static_cast<int>(reqqs->size());
364
365   XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
366   if (size > 0) {
367     // start all requests that have been prepared by another process
368     for (const auto& req : *reqqs) {
369       if (req && (req->flags() & PREPARED))
370         req->start();
371     }
372
373     MPI_Request* treqs = &(*reqqs)[0];
374     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
375     reqqs->clear();
376   }
377   xbt_mutex_release(mut_);
378
379   Group::unref(group_);
380   opened_--; //we're closed for business !
381   return MPI_SUCCESS;
382 }
383
384 int Win::wait(){
385   //naive, blocking implementation.
386   XBT_DEBUG("Entering MPI_Win_Wait");
387   int i=0,j=0;
388   int size = group_->size();
389   MPI_Request* reqs = xbt_new0(MPI_Request, size);
390
391   while(j!=size){
392     int src=group_->index(j);
393     if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
394       reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
395       i++;
396     }
397     j++;
398   }
399   size=i;
400   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
401   Request::startall(size, reqs);
402   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
403   for(i=0;i<size;i++){
404     Request::unref(&reqs[i]);
405   }
406   xbt_free(reqs);
407   xbt_mutex_acquire(mut_);
408   std::vector<MPI_Request> *reqqs = requests_;
409   size = static_cast<int>(reqqs->size());
410
411   XBT_DEBUG("Win_wait - Finishing %d RMA calls", size);
412   if (size > 0) {
413     // start all requests that have been prepared by another process
414     for (const auto& req : *reqqs) {
415       if (req && (req->flags() & PREPARED))
416         req->start();
417     }
418
419     MPI_Request* treqs = &(*reqqs)[0];
420     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
421     reqqs->clear();
422   }
423   xbt_mutex_release(mut_);
424
425   Group::unref(group_);
426   opened_--; //we're opened for business !
427   return MPI_SUCCESS;
428 }
429
430 Win* Win::f2c(int id){
431   return static_cast<Win*>(F2C::f2c(id));
432 }
433
434 }
435 }