Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Use simgrid function instead of MPI in collectives
[simgrid.git] / src / smpi / smpi_coll.c
1 /* smpi_coll.c -- various optimized routing for collectives                   */
2
3 /* Copyright (c) 2009, 2010. The SimGrid Team.
4  * All rights reserved.                                                     */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 #include <stdio.h>
10 #include <string.h>
11 #include <assert.h>
12
13 #include "private.h"
14 #include "colls/colls.h"
15
16 s_mpi_coll_description_t mpi_coll_allgather_description[] = {
17   {"default",
18    "allgather default collective",
19    smpi_mpi_allgather},
20 COLL_ALLGATHERS(COLL_DESCRIPTION, COLL_COMMA),
21   {NULL, NULL, NULL}      /* this array must be NULL terminated */
22 };
23
24 s_mpi_coll_description_t mpi_coll_allreduce_description[] = {
25   {"default",
26    "allreduce default collective",
27    smpi_mpi_allreduce},
28 COLL_ALLREDUCES(COLL_DESCRIPTION, COLL_COMMA),
29   {NULL, NULL, NULL}      /* this array must be NULL terminated */
30 };
31
32 s_mpi_coll_description_t mpi_coll_alltoall_description[] = {
33   {"ompi",
34    "Ompi alltoall default collective",
35    smpi_coll_tuned_alltoall_ompi},
36 COLL_ALLTOALLS(COLL_DESCRIPTION, COLL_COMMA),
37   {"bruck",
38    "Alltoall Bruck (SG) collective",
39    smpi_coll_tuned_alltoall_bruck},
40   {"basic_linear",
41    "Alltoall basic linear (SG) collective",
42    smpi_coll_tuned_alltoall_basic_linear},
43   {"pairwise",
44    "Alltoall pairwise (SG) collective",
45    smpi_coll_tuned_alltoall_pairwise},
46   {NULL, NULL, NULL}      /* this array must be NULL terminated */
47 };
48
49 s_mpi_coll_description_t mpi_coll_bcast_description[] = {
50   {"default",
51    "allgather default collective",
52    smpi_mpi_bcast},
53 COLL_BCASTS(COLL_DESCRIPTION, COLL_COMMA),
54   {NULL, NULL, NULL}      /* this array must be NULL terminated */
55 };
56
57 s_mpi_coll_description_t mpi_coll_reduce_description[] = {
58   {"default",
59    "allgather default collective",
60    smpi_mpi_reduce},
61 COLL_REDUCES(COLL_DESCRIPTION, COLL_COMMA),
62   {NULL, NULL, NULL}      /* this array must be NULL terminated */
63 };
64
65
66
67 /** Displays the long description of all registered models, and quit */
68 void coll_help(const char *category, s_mpi_coll_description_t * table)
69 {
70   int i;
71   printf("Long description of the %s models accepted by this simulator:\n",
72          category);
73   for (i = 0; table[i].name; i++)
74     printf("  %s: %s\n", table[i].name, table[i].description);
75 }
76
77 int find_coll_description(s_mpi_coll_description_t * table,
78                            const char *name)
79 {
80   int i;
81   char *name_list = NULL;
82
83   for (i = 0; table[i].name; i++)
84     if (!strcmp(name, table[i].name)) {
85       return i;
86     }
87   name_list = strdup(table[0].name);
88   for (i = 1; table[i].name; i++) {
89     name_list =
90         xbt_realloc(name_list,
91                     strlen(name_list) + strlen(table[i].name) + 3);
92     strcat(name_list, ", ");
93     strcat(name_list, table[i].name);
94   }
95   xbt_die("Model '%s' is invalid! Valid models are: %s.", name, name_list);
96   return -1;
97 }
98
99
100
101 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_coll, smpi,
102                                 "Logging specific to SMPI (coll)");
103
104 int (*mpi_coll_allgather_fun)(void *, int, MPI_Datatype, void*, int, MPI_Datatype, MPI_Comm);
105 int (*mpi_coll_allreduce_fun)(void *sbuf, void *rbuf, int rcount, MPI_Datatype dtype, MPI_Op op, MPI_Comm comm);
106 int (*mpi_coll_alltoall_fun)(void *, int, MPI_Datatype, void*, int, MPI_Datatype, MPI_Comm);
107 int (*mpi_coll_bcast_fun)(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm com);
108 int (*mpi_coll_reduce_fun)(void *buf, void *rbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm);
109
110 struct s_proc_tree {
111   int PROCTREE_A;
112   int numChildren;
113   int *child;
114   int parent;
115   int me;
116   int root;
117   int isRoot;
118 };
119 typedef struct s_proc_tree *proc_tree_t;
120
121 /**
122  * alloc and init
123  **/
124 static proc_tree_t alloc_tree(int arity)
125 {
126   proc_tree_t tree;
127   int i;
128
129   tree = xbt_new(struct s_proc_tree, 1);
130   tree->PROCTREE_A = arity;
131   tree->isRoot = 0;
132   tree->numChildren = 0;
133   tree->child = xbt_new(int, arity);
134   for (i = 0; i < arity; i++) {
135     tree->child[i] = -1;
136   }
137   tree->root = -1;
138   tree->parent = -1;
139   return tree;
140 }
141
142 /**
143  * free
144  **/
145 static void free_tree(proc_tree_t tree)
146 {
147   xbt_free(tree->child);
148   xbt_free(tree);
149 }
150
151 /**
152  * Build the tree depending on a process rank (index) and the group size (extent)
153  * @param root the rank of the tree root
154  * @param rank the rank of the calling process
155  * @param size the total number of processes
156  **/
157 static void build_tree(int root, int rank, int size, proc_tree_t * tree)
158 {
159   int index = (rank - root + size) % size;
160   int firstChildIdx = index * (*tree)->PROCTREE_A + 1;
161   int i;
162
163   (*tree)->me = rank;
164   (*tree)->root = root;
165
166   for (i = 0; i < (*tree)->PROCTREE_A && firstChildIdx + i < size; i++) {
167     (*tree)->child[i] = (firstChildIdx + i + root) % size;
168     (*tree)->numChildren++;
169   }
170   if (rank == root) {
171     (*tree)->isRoot = 1;
172   } else {
173     (*tree)->isRoot = 0;
174     (*tree)->parent = (((index - 1) / (*tree)->PROCTREE_A) + root) % size;
175   }
176 }
177
178 /**
179  * bcast
180  **/
181 static void tree_bcast(void *buf, int count, MPI_Datatype datatype,
182                        MPI_Comm comm, proc_tree_t tree)
183 {
184   int system_tag = 999;         // used negative int but smpi_create_request() declares this illegal (to be checked)
185   int rank, i;
186   MPI_Request *requests;
187
188   rank = smpi_comm_rank(comm);
189   /* wait for data from my parent in the tree */
190   if (!tree->isRoot) {
191     XBT_DEBUG("<%d> tree_bcast(): i am not root: recv from %d, tag=%d)",
192            rank, tree->parent, system_tag + rank);
193     smpi_mpi_recv(buf, count, datatype, tree->parent, system_tag + rank,
194                   comm, MPI_STATUS_IGNORE);
195   }
196   requests = xbt_new(MPI_Request, tree->numChildren);
197   XBT_DEBUG("<%d> creates %d requests (1 per child)", rank,
198          tree->numChildren);
199   /* iniates sends to ranks lower in the tree */
200   for (i = 0; i < tree->numChildren; i++) {
201     if (tree->child[i] == -1) {
202       requests[i] = MPI_REQUEST_NULL;
203     } else {
204       XBT_DEBUG("<%d> send to <%d>, tag=%d", rank, tree->child[i],
205              system_tag + tree->child[i]);
206       requests[i] =
207           smpi_isend_init(buf, count, datatype, tree->child[i],
208                           system_tag + tree->child[i], comm);
209     }
210   }
211   smpi_mpi_startall(tree->numChildren, requests);
212   smpi_mpi_waitall(tree->numChildren, requests, MPI_STATUS_IGNORE);
213   xbt_free(requests);
214 }
215
216 /**
217  * anti-bcast
218  **/
219 static void tree_antibcast(void *buf, int count, MPI_Datatype datatype,
220                            MPI_Comm comm, proc_tree_t tree)
221 {
222   int system_tag = 999;         // used negative int but smpi_create_request() declares this illegal (to be checked)
223   int rank, i;
224   MPI_Request *requests;
225
226   rank = smpi_comm_rank(comm);
227   // everyone sends to its parent, except root.
228   if (!tree->isRoot) {
229     XBT_DEBUG("<%d> tree_antibcast(): i am not root: send to %d, tag=%d)",
230            rank, tree->parent, system_tag + rank);
231     smpi_mpi_send(buf, count, datatype, tree->parent, system_tag + rank,
232                   comm);
233   }
234   //every one receives as many messages as it has children
235   requests = xbt_new(MPI_Request, tree->numChildren);
236   XBT_DEBUG("<%d> creates %d requests (1 per child)", rank,
237          tree->numChildren);
238   for (i = 0; i < tree->numChildren; i++) {
239     if (tree->child[i] == -1) {
240       requests[i] = MPI_REQUEST_NULL;
241     } else {
242       XBT_DEBUG("<%d> recv from <%d>, tag=%d", rank, tree->child[i],
243              system_tag + tree->child[i]);
244       requests[i] =
245           smpi_irecv_init(buf, count, datatype, tree->child[i],
246                           system_tag + tree->child[i], comm);
247     }
248   }
249   smpi_mpi_startall(tree->numChildren, requests);
250   smpi_mpi_waitall(tree->numChildren, requests, MPI_STATUS_IGNORE);
251   xbt_free(requests);
252 }
253
254 /**
255  * bcast with a binary, ternary, or whatever tree ..
256  **/
257 void nary_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root,
258                      MPI_Comm comm, int arity)
259 {
260   proc_tree_t tree = alloc_tree(arity);
261   int rank, size;
262
263   rank = smpi_comm_rank(comm);
264   size = smpi_comm_size(comm);
265   build_tree(root, rank, size, &tree);
266   tree_bcast(buf, count, datatype, comm, tree);
267   free_tree(tree);
268 }
269
270 /**
271  * barrier with a binary, ternary, or whatever tree ..
272  **/
273 void nary_tree_barrier(MPI_Comm comm, int arity)
274 {
275   proc_tree_t tree = alloc_tree(arity);
276   int rank, size;
277   char dummy = '$';
278
279   rank = smpi_comm_rank(comm);
280   size = smpi_comm_size(comm);
281   build_tree(0, rank, size, &tree);
282   tree_antibcast(&dummy, 1, MPI_CHAR, comm, tree);
283   tree_bcast(&dummy, 1, MPI_CHAR, comm, tree);
284   free_tree(tree);
285 }
286
287 int smpi_coll_tuned_alltoall_ompi(void *sendbuf, int sendcount,
288                                    MPI_Datatype sendtype, void *recvbuf,
289                                    int recvcount, MPI_Datatype recvtype,
290                                    MPI_Comm comm)
291 {
292   int size, sendsize;   
293   size = smpi_comm_size(comm);  
294   sendsize = smpi_datatype_size(sendtype) * sendcount;  
295   if (sendsize < 200 && size > 12) {
296     return
297         smpi_coll_tuned_alltoall_bruck(sendbuf, sendcount, sendtype,
298                                        recvbuf, recvcount, recvtype,
299                                        comm);
300   } else if (sendsize < 3000) {
301     return
302         smpi_coll_tuned_alltoall_basic_linear(sendbuf, sendcount,
303                                               sendtype, recvbuf,
304                                               recvcount, recvtype, comm);
305   } else {
306     return
307         smpi_coll_tuned_alltoall_pairwise(sendbuf, sendcount, sendtype,
308                                           recvbuf, recvcount, recvtype,
309                                           comm);
310   }
311 }
312
313 /**
314  * Alltoall Bruck
315  *
316  * Openmpi calls this routine when the message size sent to each rank < 2000 bytes and size < 12
317  * FIXME: uh, check smpi_pmpi again, but this routine is called for > 12, not
318  * less...
319  **/
320 int smpi_coll_tuned_alltoall_bruck(void *sendbuf, int sendcount,
321                                    MPI_Datatype sendtype, void *recvbuf,
322                                    int recvcount, MPI_Datatype recvtype,
323                                    MPI_Comm comm)
324 {
325   int system_tag = 777;
326   int i, rank, size, err, count;
327   MPI_Aint lb;
328   MPI_Aint sendext = 0;
329   MPI_Aint recvext = 0;
330   MPI_Request *requests;
331
332   // FIXME: check implementation
333   rank = smpi_comm_rank(comm);
334   size = smpi_comm_size(comm);
335   XBT_DEBUG("<%d> algorithm alltoall_bruck() called.", rank);
336   err = smpi_datatype_extent(sendtype, &lb, &sendext);
337   err = smpi_datatype_extent(recvtype, &lb, &recvext);
338   /* Local copy from self */
339   err =
340       smpi_datatype_copy((char *)sendbuf + rank * sendcount * sendext, 
341                          sendcount, sendtype, 
342                          (char *)recvbuf + rank * recvcount * recvext,
343                          recvcount, recvtype);
344   if (err == MPI_SUCCESS && size > 1) {
345     /* Initiate all send/recv to/from others. */
346     requests = xbt_new(MPI_Request, 2 * (size - 1));
347     count = 0;
348     /* Create all receives that will be posted first */
349     for (i = 0; i < size; ++i) {
350       if (i == rank) {
351         XBT_DEBUG("<%d> skip request creation [src = %d, recvcount = %d]",
352                rank, i, recvcount);
353         continue;
354       }
355       requests[count] =
356           smpi_irecv_init((char *)recvbuf + i * recvcount * recvext, recvcount,
357                           recvtype, i, system_tag, comm);
358       count++;
359     }
360     /* Now create all sends  */
361     for (i = 0; i < size; ++i) {
362       if (i == rank) {
363         XBT_DEBUG("<%d> skip request creation [dst = %d, sendcount = %d]",
364                rank, i, sendcount);
365         continue;
366       }
367       requests[count] =
368           smpi_isend_init((char *)sendbuf + i * sendcount * sendext, sendcount,
369                           sendtype, i, system_tag, comm);
370       count++;
371     }
372     /* Wait for them all. */
373     smpi_mpi_startall(count, requests);
374     XBT_DEBUG("<%d> wait for %d requests", rank, count);
375     smpi_mpi_waitall(count, requests, MPI_STATUS_IGNORE);
376     xbt_free(requests);
377   }
378   return MPI_SUCCESS;
379 }
380
381 /**
382  * Alltoall basic_linear (STARMPI:alltoall-simple)
383  **/
384 int smpi_coll_tuned_alltoall_basic_linear(void *sendbuf, int sendcount,
385                                           MPI_Datatype sendtype,
386                                           void *recvbuf, int recvcount,
387                                           MPI_Datatype recvtype,
388                                           MPI_Comm comm)
389 {
390   int system_tag = 888;
391   int i, rank, size, err, count;
392   MPI_Aint lb = 0, sendext = 0, recvext = 0;
393   MPI_Request *requests;
394
395   /* Initialize. */
396   rank = smpi_comm_rank(comm);
397   size = smpi_comm_size(comm);
398   XBT_DEBUG("<%d> algorithm alltoall_basic_linear() called.", rank);
399   err = smpi_datatype_extent(sendtype, &lb, &sendext);
400   err = smpi_datatype_extent(recvtype, &lb, &recvext);
401   /* simple optimization */
402   err = smpi_datatype_copy((char *)sendbuf + rank * sendcount * sendext, 
403                            sendcount, sendtype, 
404                            (char *)recvbuf + rank * recvcount * recvext, 
405                            recvcount, recvtype);
406   if (err == MPI_SUCCESS && size > 1) {
407     /* Initiate all send/recv to/from others. */
408     requests = xbt_new(MPI_Request, 2 * (size - 1));
409     /* Post all receives first -- a simple optimization */
410     count = 0;
411     for (i = (rank + 1) % size; i != rank; i = (i + 1) % size) {
412       requests[count] =
413           smpi_irecv_init((char *)recvbuf + i * recvcount * recvext, recvcount, 
414                           recvtype, i, system_tag, comm);
415       count++;
416     }
417     /* Now post all sends in reverse order
418      *   - We would like to minimize the search time through message queue
419      *     when messages actually arrive in the order in which they were posted.
420      * TODO: check the previous assertion
421      */
422     for (i = (rank + size - 1) % size; i != rank; i = (i + size - 1) % size) {
423       requests[count] =
424           smpi_isend_init((char *)sendbuf + i * sendcount * sendext, sendcount,
425                           sendtype, i, system_tag, comm);
426       count++;
427     }
428     /* Wait for them all. */
429     smpi_mpi_startall(count, requests);
430     XBT_DEBUG("<%d> wait for %d requests", rank, count);
431     smpi_mpi_waitall(count, requests, MPI_STATUS_IGNORE);
432     xbt_free(requests);
433   }
434   return err;
435 }
436
437 /**
438  * Alltoall pairwise
439  *
440  * this algorithm performs size steps (1<=s<=size) and
441  * at each step s, a process p sends iand receive to.from a unique distinct remote process
442  * size=5 : s=1:  4->0->1, 0->1->2, 1->2->3, ...
443  *          s=2:  3->0->2, 4->1->3, 0->2->4, 1->3->0 , 2->4->1
444  *          ....
445  * Openmpi calls this routine when the message size sent to each rank is greater than 3000 bytes
446  **/
447 int smpi_coll_tuned_alltoall_pairwise(void *sendbuf, int sendcount,
448                                       MPI_Datatype sendtype, void *recvbuf,
449                                       int recvcount, MPI_Datatype recvtype,
450                                       MPI_Comm comm)
451 {
452   int system_tag = 999;
453   int rank, size, step, sendto, recvfrom, sendsize, recvsize;
454
455   rank = smpi_comm_rank(comm);
456   size = smpi_comm_size(comm);
457   XBT_DEBUG("<%d> algorithm alltoall_pairwise() called.", rank);
458   sendsize = smpi_datatype_size(sendtype);
459   recvsize = smpi_datatype_size(recvtype);
460   /* Perform pairwise exchange - starting from 1 so the local copy is last */
461   for (step = 1; step < size + 1; step++) {
462     /* who do we talk to in this step? */
463     sendto = (rank + step) % size;
464     recvfrom = (rank + size - step) % size;
465     /* send and receive */
466     smpi_mpi_sendrecv(&((char *) sendbuf)[sendto * sendsize * sendcount],
467                       sendcount, sendtype, sendto, system_tag,
468                       &((char *) recvbuf)[recvfrom * recvsize * recvcount],
469                       recvcount, recvtype, recvfrom, system_tag, comm,
470                       MPI_STATUS_IGNORE);
471   }
472   return MPI_SUCCESS;
473 }
474
475 int smpi_coll_basic_alltoallv(void *sendbuf, int *sendcounts,
476                               int *senddisps, MPI_Datatype sendtype,
477                               void *recvbuf, int *recvcounts,
478                               int *recvdisps, MPI_Datatype recvtype,
479                               MPI_Comm comm)
480 {
481   int system_tag = 889;
482   int i, rank, size, err, count;
483   MPI_Aint lb = 0, sendext = 0, recvext = 0;
484   MPI_Request *requests;
485
486   /* Initialize. */
487   rank = smpi_comm_rank(comm);
488   size = smpi_comm_size(comm);
489   XBT_DEBUG("<%d> algorithm basic_alltoallv() called.", rank);
490   err = smpi_datatype_extent(sendtype, &lb, &sendext);
491   err = smpi_datatype_extent(recvtype, &lb, &recvext);
492   /* Local copy from self */
493   err =
494       smpi_datatype_copy((char *)sendbuf + senddisps[rank] * sendext, 
495                          sendcounts[rank], sendtype,
496                          (char *)recvbuf + recvdisps[rank] * recvext, 
497                          recvcounts[rank], recvtype);
498   if (err == MPI_SUCCESS && size > 1) {
499     /* Initiate all send/recv to/from others. */
500     requests = xbt_new(MPI_Request, 2 * (size - 1));
501     count = 0;
502     /* Create all receives that will be posted first */
503     for (i = 0; i < size; ++i) {
504       if (i == rank || recvcounts[i] == 0) {
505         XBT_DEBUG
506             ("<%d> skip request creation [src = %d, recvcounts[src] = %d]",
507              rank, i, recvcounts[i]);
508         continue;
509       }
510       requests[count] =
511           smpi_irecv_init((char *)recvbuf + recvdisps[i] * recvext, 
512                           recvcounts[i], recvtype, i, system_tag, comm);
513       count++;
514     }
515     /* Now create all sends  */
516     for (i = 0; i < size; ++i) {
517       if (i == rank || sendcounts[i] == 0) {
518         XBT_DEBUG
519             ("<%d> skip request creation [dst = %d, sendcounts[dst] = %d]",
520              rank, i, sendcounts[i]);
521         continue;
522       }
523       requests[count] =
524           smpi_isend_init((char *)sendbuf + senddisps[i] * sendext, 
525                           sendcounts[i], sendtype, i, system_tag, comm);
526       count++;
527     }
528     /* Wait for them all. */
529     smpi_mpi_startall(count, requests);
530     XBT_DEBUG("<%d> wait for %d requests", rank, count);
531     smpi_mpi_waitall(count, requests, MPI_STATUS_IGNORE);
532     xbt_free(requests);
533   }
534   return err;
535 }