Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
2c990641104f2649cb0becab41217be2c8ae00da
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/virtu.h"
9 #include "mc/mc.h"
10 #include "xbt/replay.h"
11 #include <errno.h>
12 #include "simix/smx_private.h"
13 #include "surf/surf.h"
14 #include "simgrid/sg_config.h"
15
16
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
18
19
20 static int match_recv(void* a, void* b, smx_action_t ignored) {
21    MPI_Request ref = (MPI_Request)a;
22    MPI_Request req = (MPI_Request)b;
23    XBT_DEBUG("Trying to match a recv of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
24
25   xbt_assert(ref, "Cannot match recv against null reference");
26   xbt_assert(req, "Cannot match recv against null request");
27   if((ref->src == MPI_ANY_SOURCE || req->src == ref->src)
28     && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag)){
29     //we match, we can transfer some values
30     // FIXME : move this to the copy function ?
31     if(ref->src == MPI_ANY_SOURCE)ref->real_src = req->src;
32     if(ref->tag == MPI_ANY_TAG)ref->real_tag = req->tag;
33     if(ref->real_size < req->real_size) ref->truncated = 1;
34     if(req->detached==1){
35         ref->detached_sender=req; //tie the sender to the receiver, as it is detached and has to be freed in the receiver
36     }
37     return 1;
38   }else return 0;
39 }
40
41 static int match_send(void* a, void* b,smx_action_t ignored) {
42    MPI_Request ref = (MPI_Request)a;
43    MPI_Request req = (MPI_Request)b;
44    XBT_DEBUG("Trying to match a send of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
45    xbt_assert(ref, "Cannot match send against null reference");
46    xbt_assert(req, "Cannot match send against null request");
47
48    if((req->src == MPI_ANY_SOURCE || req->src == ref->src)
49              && (req->tag == MPI_ANY_TAG || req->tag == ref->tag))
50    {
51      if(req->src == MPI_ANY_SOURCE)req->real_src = ref->src;
52      if(req->tag == MPI_ANY_TAG)req->real_tag = ref->tag;
53      if(req->real_size < ref->real_size) req->truncated = 1;
54      if(ref->detached==1){
55          req->detached_sender=ref; //tie the sender to the receiver, as it is detached and has to be freed in the receiver
56      }
57
58      return 1;
59    } else return 0;
60 }
61
62 static MPI_Request build_request(void *buf, int count,
63                                  MPI_Datatype datatype, int src, int dst,
64                                  int tag, MPI_Comm comm, unsigned flags)
65 {
66   MPI_Request request;
67
68   void *old_buf = NULL;
69
70   request = xbt_new(s_smpi_mpi_request_t, 1);
71
72   s_smpi_subtype_t *subtype = datatype->substruct;
73
74   if(datatype->has_subtype == 1){
75     // This part handles the problem of non-contiguous memory
76     old_buf = buf;
77     buf = xbt_malloc(count*smpi_datatype_size(datatype));
78     if (flags & SEND) {
79       subtype->serialize(old_buf, buf, count, datatype->substruct);
80     }
81   }
82
83   request->buf = buf;
84   // This part handles the problem of non-contiguous memory (for the
85   // unserialisation at the reception)
86   request->old_buf = old_buf;
87   request->old_type = datatype;
88
89   request->size = smpi_datatype_size(datatype) * count;
90   request->src = src;
91   request->dst = dst;
92   request->tag = tag;
93   request->comm = comm;
94   request->action = NULL;
95   request->flags = flags;
96   request->detached = 0;
97   request->detached_sender = NULL;
98
99   request->truncated = 0;
100   request->real_size = 0;
101   request->real_tag = 0;
102
103   request->refcount=1;
104 #ifdef HAVE_TRACING
105   request->send = 0;
106   request->recv = 0;
107 #endif
108   if (flags & SEND) smpi_datatype_unuse(datatype);
109
110   return request;
111 }
112
113
114 void smpi_empty_status(MPI_Status * status) {
115   if(status != MPI_STATUS_IGNORE) {
116       status->MPI_SOURCE=MPI_ANY_SOURCE;
117       status->MPI_TAG=MPI_ANY_TAG;
118       status->count=0;
119   }
120 }
121
122 void smpi_action_trace_run(char *path)
123 {
124   char *name;
125   xbt_dynar_t todo;
126   xbt_dict_cursor_t cursor;
127
128   action_fp=NULL;
129   if (path) {
130     action_fp = fopen(path, "r");
131     xbt_assert(action_fp != NULL, "Cannot open %s: %s", path,
132                strerror(errno));
133   }
134
135   if (!xbt_dict_is_empty(action_queues)) {
136     XBT_WARN
137       ("Not all actions got consumed. If the simulation ended successfully (without deadlock), you may want to add new processes to your deployment file.");
138
139
140     xbt_dict_foreach(action_queues, cursor, name, todo) {
141       XBT_WARN("Still %lu actions for %s", xbt_dynar_length(todo), name);
142     }
143   }
144
145   if (path)
146     fclose(action_fp);
147   xbt_dict_free(&action_queues);
148   action_queues = xbt_dict_new_homogeneous(NULL);
149 }
150
151 static void smpi_mpi_request_free_voidp(void* request)
152 {
153   MPI_Request req = request;
154   smpi_mpi_request_free(&req);
155 }
156
157 /* MPI Low level calls */
158 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
159                                int dst, int tag, MPI_Comm comm)
160 {
161   MPI_Request request =
162     build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
163                   comm, PERSISTENT | SEND);
164   request->refcount++;
165   return request;
166 }
167
168 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
169                                int src, int tag, MPI_Comm comm)
170 {
171   MPI_Request request =
172     build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
173                   comm, PERSISTENT | RECV);
174   request->refcount++;
175   return request;
176 }
177
178 void smpi_mpi_start(MPI_Request request)
179 {
180   smx_rdv_t mailbox;
181
182   xbt_assert(!request->action,
183              "Cannot (re)start a non-finished communication");
184   if(request->flags & RECV) {
185     print_request("New recv", request);
186     if (request->size < sg_cfg_get_int("smpi/async_small_thres"))
187       mailbox = smpi_process_mailbox_small();
188     else
189       mailbox = smpi_process_mailbox();
190     // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
191     request->real_size=request->size;
192     smpi_datatype_use(request->old_type);
193     request->action = simcall_comm_irecv(mailbox, request->buf, &request->real_size, &match_recv, request);
194   } else {
195
196     int receiver = smpi_group_index(smpi_comm_group(request->comm), request->dst);
197 /*    if(receiver == MPI_UNDEFINED) {*/
198 /*      XBT_WARN("Trying to send a message to a wrong rank");*/
199 /*      return;*/
200 /*    }*/
201     print_request("New send", request);
202     if (request->size < sg_cfg_get_int("smpi/async_small_thres")) { // eager mode
203       mailbox = smpi_process_remote_mailbox_small(receiver);
204     }else{
205       XBT_DEBUG("Send request %p is not in the permanent receive mailbox (buf: %p)",request,request->buf);
206       mailbox = smpi_process_remote_mailbox(receiver);
207     }
208     if (request->size < 64*1024 ) { //(FIXME: this limit should be configurable)
209       void *oldbuf = NULL;
210       request->detached = 1;
211       request->refcount++;
212       if(request->old_type->has_subtype == 0){
213         oldbuf = request->buf;
214         if (oldbuf){
215           request->buf = xbt_malloc(request->size);
216           memcpy(request->buf,oldbuf,request->size);
217         }
218       }
219       XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
220     }
221     // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
222     request->real_size=request->size;
223     smpi_datatype_use(request->old_type);
224
225     request->action =
226       simcall_comm_isend(mailbox, request->size, -1.0,
227                          request->buf, request->real_size,
228                          &match_send,
229                          &smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
230                          request,
231                          // detach if msg size < eager/rdv switch limit
232                          request->detached);
233
234 #ifdef HAVE_TRACING
235     /* FIXME: detached sends are not traceable (request->action == NULL) */
236     if (request->action)
237       simcall_set_category(request->action, TRACE_internal_smpi_get_category());
238 #endif
239
240   }
241
242 }
243
244 void smpi_mpi_startall(int count, MPI_Request * requests)
245 {
246   int i;
247
248   for(i = 0; i < count; i++) {
249     smpi_mpi_start(requests[i]);
250   }
251 }
252
253 void smpi_mpi_request_free(MPI_Request * request)
254 {
255
256   if((*request) != MPI_REQUEST_NULL){
257     (*request)->refcount--;
258     if((*request)->refcount<0) xbt_die("wrong refcount");
259
260     if((*request)->refcount==0){
261         xbt_free(*request);
262         *request = MPI_REQUEST_NULL;
263     }
264   }else{
265       xbt_die("freeing an already free request");
266   }
267 }
268
269 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
270                             int dst, int tag, MPI_Comm comm)
271 {
272   MPI_Request request =
273     build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
274                   comm, NON_PERSISTENT | SEND);
275
276   return request;
277 }
278
279 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
280                            int dst, int tag, MPI_Comm comm)
281 {
282   MPI_Request request =
283       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
284                     comm, NON_PERSISTENT | SEND);
285
286   smpi_mpi_start(request);
287   return request;
288 }
289
290 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
291                             int src, int tag, MPI_Comm comm)
292 {
293   MPI_Request request =
294     build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
295                   comm, NON_PERSISTENT | RECV);
296   return request;
297 }
298
299 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
300                            int src, int tag, MPI_Comm comm)
301 {
302   MPI_Request request =
303       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
304                     comm, NON_PERSISTENT | RECV);
305
306   smpi_mpi_start(request);
307   return request;
308 }
309
310 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
311                    int tag, MPI_Comm comm, MPI_Status * status)
312 {
313   MPI_Request request;
314   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
315   smpi_mpi_wait(&request, status);
316 }
317
318
319
320 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
321                    int tag, MPI_Comm comm)
322 {
323   MPI_Request request;
324   request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
325   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
326
327 }
328
329 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
330                        int dst, int sendtag, void *recvbuf, int recvcount,
331                        MPI_Datatype recvtype, int src, int recvtag,
332                        MPI_Comm comm, MPI_Status * status)
333 {
334   MPI_Request requests[2];
335   MPI_Status stats[2];
336
337   requests[0] =
338     smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
339   requests[1] =
340     smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
341   smpi_mpi_startall(2, requests);
342   smpi_mpi_waitall(2, requests, stats);
343   if(status != MPI_STATUS_IGNORE) {
344     // Copy receive status
345     memcpy(status, &stats[1], sizeof(MPI_Status));
346   }
347 }
348
349 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
350 {
351   return status->count / smpi_datatype_size(datatype);
352 }
353
354 static void finish_wait(MPI_Request * request, MPI_Status * status)
355 {
356   MPI_Request req = *request;
357   if(!(req->detached && req->flags & SEND)){
358     if(status != MPI_STATUS_IGNORE) {
359       status->MPI_SOURCE = req->src == MPI_ANY_SOURCE ? req->real_src : req->src;
360       status->MPI_TAG = req->tag == MPI_ANY_TAG ? req->real_tag : req->tag;
361       if(req->truncated)
362       status->MPI_ERROR = MPI_ERR_TRUNCATE;
363       else status->MPI_ERROR = MPI_SUCCESS ;
364       // this handles the case were size in receive differs from size in send
365       // FIXME: really this should just contain the count of receive-type blocks,
366       // right?
367       status->count = req->real_size;
368     }
369
370     print_request("Finishing", req);
371     MPI_Datatype datatype = req->old_type;
372
373     if(datatype->has_subtype == 1){
374         // This part handles the problem of non-contignous memory
375         // the unserialization at the reception
376       s_smpi_subtype_t *subtype = datatype->substruct;
377       if(req->flags & RECV) {
378         subtype->unserialize(req->buf, req->old_buf, req->real_size/smpi_datatype_size(datatype) , datatype->substruct);
379       }
380       if(req->detached == 0) free(req->buf);
381     }
382     smpi_datatype_unuse(datatype);
383   }
384
385   if(req->detached_sender!=NULL){
386     smpi_mpi_request_free(&(req->detached_sender));
387   }
388
389   if(req->flags & NON_PERSISTENT) {
390     smpi_mpi_request_free(request);
391   } else {
392     req->action = NULL;
393   }
394 }
395
396 int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
397   int flag;
398
399   //assume that request is not MPI_REQUEST_NULL (filtered in PMPI_Test or smpi_mpi_testall before)
400   if ((*request)->action == NULL)
401     flag = 1;
402   else
403     flag = simcall_comm_test((*request)->action);
404   if(flag) {
405     (*request)->refcount++;
406     finish_wait(request, status);
407   }else{
408     smpi_empty_status(status);
409   }
410   return flag;
411 }
412
413 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
414                      MPI_Status * status)
415 {
416   xbt_dynar_t comms;
417   int i, flag, size;
418   int* map;
419
420   *index = MPI_UNDEFINED;
421   flag = 0;
422   if(count > 0) {
423     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
424     map = xbt_new(int, count);
425     size = 0;
426     for(i = 0; i < count; i++) {
427       if((requests[i]!=MPI_REQUEST_NULL) && requests[i]->action) {
428          xbt_dynar_push(comms, &requests[i]->action);
429          map[size] = i;
430          size++;
431       }
432     }
433     if(size > 0) {
434       i = simcall_comm_testany(comms);
435       // not MPI_UNDEFINED, as this is a simix return code
436       if(i != -1) {
437         *index = map[i];
438         finish_wait(&requests[*index], status);
439         flag = 1;
440       }
441     }else{
442         //all requests are null or inactive, return true
443         flag=1;
444         smpi_empty_status(status);
445     }
446     xbt_free(map);
447     xbt_dynar_free(&comms);
448   }
449
450   return flag;
451 }
452
453
454 int smpi_mpi_testall(int count, MPI_Request requests[],
455                      MPI_Status status[])
456 {
457   MPI_Status stat;
458   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
459   int flag=1;
460   int i;
461   for(i=0; i<count; i++){
462     if(requests[i]!= MPI_REQUEST_NULL){
463       if (smpi_mpi_test(&requests[i], pstat)!=1){
464         flag=0;
465       }
466     }else{
467       smpi_empty_status(pstat);
468     }
469     if(status != MPI_STATUSES_IGNORE) {
470       memcpy(&status[i], pstat, sizeof(*pstat));
471     }
472   }
473   return flag;
474 }
475
476 void smpi_mpi_probe(int source, int tag, MPI_Comm comm, MPI_Status* status){
477   int flag=0;
478   //FIXME find another wait to avoid busy waiting ?
479   // the issue here is that we have to wait on a nonexistent comm
480   while(flag==0){
481     smpi_mpi_iprobe(source, tag, comm, &flag, status);
482     XBT_DEBUG("Busy Waiting on probing : %d", flag);
483     if(!flag) {
484       simcall_process_sleep(0.0001);
485     }
486   }
487 }
488
489 void smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status){
490   MPI_Request request =build_request(NULL, 0, MPI_CHAR, source, smpi_comm_rank(comm), tag,
491             comm, NON_PERSISTENT | RECV);
492
493   // behave like a receive, but don't do it
494   smx_rdv_t mailbox;
495
496   print_request("New iprobe", request);
497   // We have to test both mailboxes as we don't know if we will receive one one or another
498     if (sg_cfg_get_int("smpi/async_small_thres")>0){
499         mailbox = smpi_process_mailbox_small();
500         XBT_DEBUG("trying to probe the perm recv mailbox");
501         request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
502     }
503     if (request->action==NULL){
504         mailbox = smpi_process_mailbox();
505         XBT_DEBUG("trying to probe the other mailbox");
506         request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
507     }
508
509   if(request->action){
510     MPI_Request req = (MPI_Request)SIMIX_comm_get_src_data(request->action);
511     *flag = 1;
512     if(status != MPI_STATUS_IGNORE) {
513       status->MPI_SOURCE = req->src;
514       status->MPI_TAG = req->tag;
515       status->MPI_ERROR = MPI_SUCCESS;
516       status->count = req->real_size;
517     }
518   }
519   else *flag = 0;
520   smpi_mpi_request_free(&request);
521
522   return;
523 }
524
525 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
526 {
527   print_request("Waiting", *request);
528   if ((*request)->action != NULL) { // this is not a detached send
529     simcall_comm_wait((*request)->action, -1.0);
530   }
531   finish_wait(request, status);
532
533   // FIXME for a detached send, finish_wait is not called:
534 }
535
536 int smpi_mpi_waitany(int count, MPI_Request requests[],
537                      MPI_Status * status)
538 {
539   xbt_dynar_t comms;
540   int i, size, index;
541   int *map;
542
543   index = MPI_UNDEFINED;
544   if(count > 0) {
545     // Wait for a request to complete
546     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
547     map = xbt_new(int, count);
548     size = 0;
549     XBT_DEBUG("Wait for one of %d", count);
550     for(i = 0; i < count; i++) {
551       if(requests[i] != MPI_REQUEST_NULL) {
552         if (requests[i]->action != NULL) {
553           XBT_DEBUG("Waiting any %p ", requests[i]);
554           xbt_dynar_push(comms, &requests[i]->action);
555           map[size] = i;
556           size++;
557         }else{
558          //This is a finished detached request, let's return this one
559          size=0;//so we free the dynar but don't do the waitany call
560          index=i;
561          finish_wait(&requests[i], status);//cleanup if refcount = 0
562          requests[i]=MPI_REQUEST_NULL;//set to null
563          break;
564          }
565       }
566     }
567     if(size > 0) {
568       i = simcall_comm_waitany(comms);
569
570       // not MPI_UNDEFINED, as this is a simix return code
571       if (i != -1) {
572         index = map[i];
573         finish_wait(&requests[index], status);
574       }
575     }
576     xbt_free(map);
577     xbt_dynar_free(&comms);
578   }
579
580   if (index==MPI_UNDEFINED)
581     smpi_empty_status(status);
582
583   return index;
584 }
585
586 int smpi_mpi_waitall(int count, MPI_Request requests[],
587                       MPI_Status status[])
588 {
589   int  index, c;
590   MPI_Status stat;
591   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
592   int retvalue=MPI_SUCCESS;
593   //tag invalid requests in the set
594   for(c = 0; c < count; c++) {
595     if(requests[c]==MPI_REQUEST_NULL || requests[c]->dst == MPI_PROC_NULL ){
596       if(status != MPI_STATUSES_IGNORE)
597         smpi_empty_status(&status[c]);
598     }else if(requests[c]->src == MPI_PROC_NULL ){
599       if(status != MPI_STATUSES_IGNORE) {
600         smpi_empty_status(&status[c]);
601         status[c].MPI_SOURCE=MPI_PROC_NULL;
602       }
603     }
604   }
605   for(c = 0; c < count; c++) {
606       if(MC_is_active()) {
607         smpi_mpi_wait(&requests[c], pstat);
608         index = c;
609       } else {
610         index = smpi_mpi_waitany(count, requests, pstat);
611         if(index == MPI_UNDEFINED) {
612           break;
613        }
614       if(status != MPI_STATUSES_IGNORE) {
615         memcpy(&status[index], pstat, sizeof(*pstat));
616         if(status[index].MPI_ERROR==MPI_ERR_TRUNCATE)retvalue=MPI_ERR_IN_STATUS;
617
618       }
619     }
620   }
621
622   return retvalue;
623 }
624
625 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
626                       MPI_Status status[])
627 {
628   int i, count, index;
629   MPI_Status stat;
630   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
631
632   count = 0;
633   for(i = 0; i < incount; i++)
634   {
635     index=smpi_mpi_waitany(incount, requests, pstat);
636     if(index!=MPI_UNDEFINED){
637       indices[count] = index;
638       count++;
639       if(status != MPI_STATUSES_IGNORE) {
640         memcpy(&status[index], pstat, sizeof(*pstat));
641       }
642     }else{
643       return MPI_UNDEFINED;
644     }
645   }
646   return count;
647 }
648
649 int smpi_mpi_testsome(int incount, MPI_Request requests[], int *indices,
650                       MPI_Status status[])
651 {
652   int i, count, count_dead;
653   MPI_Status stat;
654   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
655
656   count = 0;
657   count_dead = 0;
658   for(i = 0; i < incount; i++) {
659     if((requests[i] != MPI_REQUEST_NULL)) {
660       if(smpi_mpi_test(&requests[i], pstat)) {
661          indices[count] = i;
662          count++;
663          if(status != MPI_STATUSES_IGNORE) {
664             memcpy(&status[i], pstat, sizeof(*pstat));
665          }
666       }
667     }else{
668       count_dead++;
669     }
670   }
671   if(count_dead==incount)return MPI_UNDEFINED;
672   else return count;
673 }
674
675 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
676                     MPI_Comm comm)
677 {
678   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
679   nary_tree_bcast(buf, count, datatype, root, comm, 4);
680 }
681
682 void smpi_mpi_barrier(MPI_Comm comm)
683 {
684   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
685   nary_tree_barrier(comm, 4);
686 }
687
688 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
689                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
690                      int root, MPI_Comm comm)
691 {
692   int system_tag = 666;
693   int rank, size, src, index;
694   MPI_Aint lb = 0, recvext = 0;
695   MPI_Request *requests;
696
697   rank = smpi_comm_rank(comm);
698   size = smpi_comm_size(comm);
699   if(rank != root) {
700     // Send buffer to root
701     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
702   } else {
703     // FIXME: check for errors
704     smpi_datatype_extent(recvtype, &lb, &recvext);
705     // Local copy from root
706     smpi_datatype_copy(sendbuf, sendcount, sendtype,
707                        (char *)recvbuf + root * recvcount * recvext, recvcount, recvtype);
708     // Receive buffers from senders
709     requests = xbt_new(MPI_Request, size - 1);
710     index = 0;
711     for(src = 0; src < size; src++) {
712       if(src != root) {
713         requests[index] = smpi_irecv_init((char *)recvbuf + src * recvcount * recvext,
714                                           recvcount, recvtype,
715                                           src, system_tag, comm);
716         index++;
717       }
718     }
719     // Wait for completion of irecv's.
720     smpi_mpi_startall(size - 1, requests);
721     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
722     xbt_free(requests);
723   }
724 }
725
726 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
727                       void *recvbuf, int *recvcounts, int *displs,
728                       MPI_Datatype recvtype, int root, MPI_Comm comm)
729 {
730   int system_tag = 666;
731   int rank, size, src, index;
732   MPI_Aint lb = 0, recvext = 0;
733   MPI_Request *requests;
734
735   rank = smpi_comm_rank(comm);
736   size = smpi_comm_size(comm);
737   if(rank != root) {
738     // Send buffer to root
739     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
740   } else {
741     // FIXME: check for errors
742     smpi_datatype_extent(recvtype, &lb, &recvext);
743     // Local copy from root
744     smpi_datatype_copy(sendbuf, sendcount, sendtype,
745                        (char *)recvbuf + displs[root] * recvext,
746                        recvcounts[root], recvtype);
747     // Receive buffers from senders
748     requests = xbt_new(MPI_Request, size - 1);
749     index = 0;
750     for(src = 0; src < size; src++) {
751       if(src != root) {
752         requests[index] =
753           smpi_irecv_init((char *)recvbuf + displs[src] * recvext,
754                           recvcounts[src], recvtype, src, system_tag, comm);
755         index++;
756       }
757     }
758     // Wait for completion of irecv's.
759     smpi_mpi_startall(size - 1, requests);
760     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
761     xbt_free(requests);
762   }
763 }
764
765 void smpi_mpi_allgather(void *sendbuf, int sendcount,
766                         MPI_Datatype sendtype, void *recvbuf,
767                         int recvcount, MPI_Datatype recvtype,
768                         MPI_Comm comm)
769 {
770   int system_tag = 666;
771   int rank, size, other, index;
772   MPI_Aint lb = 0, recvext = 0;
773   MPI_Request *requests;
774
775   rank = smpi_comm_rank(comm);
776   size = smpi_comm_size(comm);
777   // FIXME: check for errors
778   smpi_datatype_extent(recvtype, &lb, &recvext);
779   // Local copy from self
780   smpi_datatype_copy(sendbuf, sendcount, sendtype,
781                      (char *)recvbuf + rank * recvcount * recvext, recvcount,
782                      recvtype);
783   // Send/Recv buffers to/from others;
784   requests = xbt_new(MPI_Request, 2 * (size - 1));
785   index = 0;
786   for(other = 0; other < size; other++) {
787     if(other != rank) {
788       requests[index] =
789         smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
790                         comm);
791       index++;
792       requests[index] = smpi_irecv_init((char *)recvbuf + other * recvcount * recvext,
793                                         recvcount, recvtype, other,
794                                         system_tag, comm);
795       index++;
796     }
797   }
798   // Wait for completion of all comms.
799   smpi_mpi_startall(2 * (size - 1), requests);
800   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
801   xbt_free(requests);
802 }
803
804 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
805                          MPI_Datatype sendtype, void *recvbuf,
806                          int *recvcounts, int *displs,
807                          MPI_Datatype recvtype, MPI_Comm comm)
808 {
809   int system_tag = 666;
810   int rank, size, other, index;
811   MPI_Aint lb = 0, recvext = 0;
812   MPI_Request *requests;
813
814   rank = smpi_comm_rank(comm);
815   size = smpi_comm_size(comm);
816   // FIXME: check for errors
817   smpi_datatype_extent(recvtype, &lb, &recvext);
818   // Local copy from self
819   smpi_datatype_copy(sendbuf, sendcount, sendtype,
820                      (char *)recvbuf + displs[rank] * recvext,
821                      recvcounts[rank], recvtype);
822   // Send buffers to others;
823   requests = xbt_new(MPI_Request, 2 * (size - 1));
824   index = 0;
825   for(other = 0; other < size; other++) {
826     if(other != rank) {
827       requests[index] =
828         smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
829                         comm);
830       index++;
831       requests[index] =
832         smpi_irecv_init((char *)recvbuf + displs[other] * recvext, recvcounts[other],
833                         recvtype, other, system_tag, comm);
834       index++;
835     }
836   }
837   // Wait for completion of all comms.
838   smpi_mpi_startall(2 * (size - 1), requests);
839   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
840   xbt_free(requests);
841 }
842
843 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
844                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
845                       int root, MPI_Comm comm)
846 {
847   int system_tag = 666;
848   int rank, size, dst, index;
849   MPI_Aint lb = 0, sendext = 0;
850   MPI_Request *requests;
851
852   rank = smpi_comm_rank(comm);
853   size = smpi_comm_size(comm);
854   if(rank != root) {
855     // Recv buffer from root
856     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
857                   MPI_STATUS_IGNORE);
858   } else {
859     // FIXME: check for errors
860     smpi_datatype_extent(sendtype, &lb, &sendext);
861     // Local copy from root
862     smpi_datatype_copy((char *)sendbuf + root * sendcount * sendext,
863                        sendcount, sendtype, recvbuf, recvcount, recvtype);
864     // Send buffers to receivers
865     requests = xbt_new(MPI_Request, size - 1);
866     index = 0;
867     for(dst = 0; dst < size; dst++) {
868       if(dst != root) {
869         requests[index] = smpi_isend_init((char *)sendbuf + dst * sendcount * sendext,
870                                           sendcount, sendtype, dst,
871                                           system_tag, comm);
872         index++;
873       }
874     }
875     // Wait for completion of isend's.
876     smpi_mpi_startall(size - 1, requests);
877     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
878     xbt_free(requests);
879   }
880 }
881
882 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
883                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
884                        MPI_Datatype recvtype, int root, MPI_Comm comm)
885 {
886   int system_tag = 666;
887   int rank, size, dst, index;
888   MPI_Aint lb = 0, sendext = 0;
889   MPI_Request *requests;
890
891   rank = smpi_comm_rank(comm);
892   size = smpi_comm_size(comm);
893   if(rank != root) {
894     // Recv buffer from root
895     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
896                   MPI_STATUS_IGNORE);
897   } else {
898     // FIXME: check for errors
899     smpi_datatype_extent(sendtype, &lb, &sendext);
900     // Local copy from root
901     smpi_datatype_copy((char *)sendbuf + displs[root] * sendext, sendcounts[root],
902                        sendtype, recvbuf, recvcount, recvtype);
903     // Send buffers to receivers
904     requests = xbt_new(MPI_Request, size - 1);
905     index = 0;
906     for(dst = 0; dst < size; dst++) {
907       if(dst != root) {
908         requests[index] =
909           smpi_isend_init((char *)sendbuf + displs[dst] * sendext, sendcounts[dst],
910                           sendtype, dst, system_tag, comm);
911         index++;
912       }
913     }
914     // Wait for completion of isend's.
915     smpi_mpi_startall(size - 1, requests);
916     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
917     xbt_free(requests);
918   }
919 }
920
921 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
922                      MPI_Datatype datatype, MPI_Op op, int root,
923                      MPI_Comm comm)
924 {
925   int system_tag = 666;
926   int rank, size, src, index;
927   MPI_Aint lb = 0, dataext = 0;
928   MPI_Request *requests;
929   void **tmpbufs;
930
931   rank = smpi_comm_rank(comm);
932   size = smpi_comm_size(comm);
933   if(rank != root) {
934     // Send buffer to root
935     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
936   } else {
937     // FIXME: check for errors
938     smpi_datatype_extent(datatype, &lb, &dataext);
939     // Local copy from root
940     if (sendbuf && recvbuf)
941       smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
942     // Receive buffers from senders
943     //TODO: make a MPI_barrier here ?
944     requests = xbt_new(MPI_Request, size - 1);
945     tmpbufs = xbt_new(void *, size - 1);
946     index = 0;
947     for(src = 0; src < size; src++) {
948       if(src != root) {
949         // FIXME: possibly overkill we we have contiguous/noncontiguous data
950         //  mapping...
951         tmpbufs[index] = xbt_malloc(count * dataext);
952         requests[index] =
953           smpi_irecv_init(tmpbufs[index], count, datatype, src,
954                           system_tag, comm);
955         index++;
956       }
957     }
958     // Wait for completion of irecv's.
959     smpi_mpi_startall(size - 1, requests);
960     for(src = 0; src < size - 1; src++) {
961       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
962       XBT_DEBUG("finished waiting any request with index %d", index);
963       if(index == MPI_UNDEFINED) {
964         break;
965       }
966       if(op) /* op can be MPI_OP_NULL that does nothing */
967         smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
968     }
969     for(index = 0; index < size - 1; index++) {
970       xbt_free(tmpbufs[index]);
971     }
972     xbt_free(tmpbufs);
973     xbt_free(requests);
974   }
975 }
976
977 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
978                         MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
979 {
980   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
981   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
982 }
983
984 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
985                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
986 {
987   int system_tag = 666;
988   int rank, size, other, index;
989   MPI_Aint lb = 0, dataext = 0;
990   MPI_Request *requests;
991   void **tmpbufs;
992
993   rank = smpi_comm_rank(comm);
994   size = smpi_comm_size(comm);
995
996   // FIXME: check for errors
997   smpi_datatype_extent(datatype, &lb, &dataext);
998
999   // Local copy from self
1000   smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
1001
1002   // Send/Recv buffers to/from others;
1003   requests = xbt_new(MPI_Request, size - 1);
1004   tmpbufs = xbt_new(void *, rank);
1005   index = 0;
1006   for(other = 0; other < rank; other++) {
1007     // FIXME: possibly overkill we we have contiguous/noncontiguous data
1008     // mapping...
1009     tmpbufs[index] = xbt_malloc(count * dataext);
1010     requests[index] =
1011       smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
1012                       comm);
1013     index++;
1014   }
1015   for(other = rank + 1; other < size; other++) {
1016     requests[index] =
1017       smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
1018     index++;
1019   }
1020   // Wait for completion of all comms.
1021   smpi_mpi_startall(size - 1, requests);
1022   for(other = 0; other < size - 1; other++) {
1023     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
1024     if(index == MPI_UNDEFINED) {
1025       break;
1026     }
1027     if(index < rank) {
1028       // #Request is below rank: it's a irecv
1029       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
1030     }
1031   }
1032   for(index = 0; index < rank; index++) {
1033     xbt_free(tmpbufs[index]);
1034   }
1035   xbt_free(tmpbufs);
1036   xbt_free(requests);
1037 }