Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
* added support for optimized collectives:
authorgenaud <genaud@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Thu, 2 Jul 2009 16:22:39 +0000 (16:22 +0000)
committergenaud <genaud@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Thu, 2 Jul 2009 16:22:39 +0000 (16:22 +0000)
  smpi_coll.c has tree structures and tree implementations

* Bcast has its original implementation renamed as flat_tree_bcast()
  and the new default one is binomial_tree_bcast()

git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6443 48e7efb5-ca39-0410-a469-dd3cf9ba447f

ChangeLog
src/Makefile.am
src/smpi/smpi_base.c
src/smpi/smpi_coll.c [new file with mode: 0644]
src/smpi/smpi_mpi.c
src/smpi/smpi_receiver.c
src/smpi/smpi_sender.c

index b78d7de..4454991 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -2,7 +2,8 @@ SimGrid (3.3.2-svn) unstable; urgency=low
 
  SMPI:
   * Implement some more MPI primitives: 
-    MPI_Waitany, MPI_Waitall, MPI_Allreduce
+    MPI_Waitany, MPI_Waitall, MPI_Reduce, MPI_Allreduce
+  * Add support for optimized collectives (Bcast is now binomial by default)
 
  SURF: 
   * Extract the routing logic into its own object.
index a184411..a9ea7dc 100644 (file)
@@ -215,7 +215,8 @@ SMPI_SRC= \
   smpi/smpi_mpi.c \
   smpi/smpi_sender.c \
   smpi/smpi_receiver.c \
-  smpi/smpi_util.c
+  smpi/smpi_util.c \
+  smpi/smpi_coll.c
 
 MSG_SRC=  msg/msg_config.c \
   msg/task.c msg/host.c msg/m_process.c msg/gos.c \
index 2a79614..e5af68d 100644 (file)
@@ -262,6 +262,15 @@ int smpi_mpi_irecv(smpi_mpi_request_t request)
   return retval;
 }
 
+void  print_req( smpi_mpi_request_t r ); 
+void  print_req( smpi_mpi_request_t r ) {
+        printf("***req %p-> src=%d dst=%d tag=%d completed=0x%x consumed=0x%x\n",r,r->src,r->dst,r->tag,r->completed,r->consumed);
+}
+
+
+/**
+ * wait and friends ...
+ **/
 int smpi_mpi_wait(smpi_mpi_request_t request, smpi_mpi_status_t * status)
 {
   int retval = MPI_SUCCESS;
@@ -270,6 +279,10 @@ int smpi_mpi_wait(smpi_mpi_request_t request, smpi_mpi_status_t * status)
     retval = MPI_ERR_INTERN;
   } else {
     SIMIX_mutex_lock(request->mutex);
+
+#ifdef DEBUG_STEPH
+    print_req( request );  //@@
+#endif
     while (!request->completed) {
       SIMIX_cond_wait(request->cond, request->mutex);
     }
@@ -284,23 +297,30 @@ int smpi_mpi_wait(smpi_mpi_request_t request, smpi_mpi_status_t * status)
   return retval;
 }
 
+/**
+ * waitall
+ **/
 int smpi_mpi_waitall(int count, smpi_mpi_request_t requests[],
-                     smpi_mpi_status_t status[])
+                smpi_mpi_status_t status[])
 {
-  int cpt;
-  int index;
-  int retval;
-  smpi_mpi_status_t stat;
-
-  for (cpt = 0; cpt < count; cpt++) {
-    retval = smpi_mpi_waitany(count, requests, &index, &stat);
-    if (retval != MPI_SUCCESS)
-      return retval;
-    memcpy(&(status[index]), &stat, sizeof(stat));
-  }
-  return MPI_SUCCESS;
+        int cpt;
+        int index;
+        int retval;
+        smpi_mpi_status_t stat;
+
+        for (cpt = 0; cpt < count; cpt++) {
+                retval = smpi_mpi_waitany(count, requests, &index, &stat);
+                if (retval != MPI_SUCCESS)
+                        return retval;
+                if (MPI_STATUS_IGNORE != status)
+                        memcpy(&(status[index]), &stat, sizeof(stat));
+        }
+        return MPI_SUCCESS;
 }
 
+/**
+ * waitany
+ **/
 int smpi_mpi_waitany(int count, smpi_mpi_request_t * requests, int *index,
                      smpi_mpi_status_t * status)
 {
@@ -312,7 +332,9 @@ int smpi_mpi_waitany(int count, smpi_mpi_request_t * requests, int *index,
   }
   /* First check if one of them is already done */
   for (cpt = 0; cpt < count; cpt++) {
+          printf("...exam req[%d] of msg from [%d]\n",cpt,requests[cpt]->src);
     if (requests[cpt]->completed && !requests[cpt]->consumed) { /* got ya */
+          printf("...found match req[%d] of msg from [%d]\n",cpt,requests[cpt]->src);
       *index = cpt;
       goto found_request;
     }
@@ -321,7 +343,12 @@ int smpi_mpi_waitany(int count, smpi_mpi_request_t * requests, int *index,
   /* FIXME: should use a SIMIX_cond_waitany, when implemented. For now, block on the first one */
   while (1) {
     for (cpt = 0; cpt < count; cpt++) {
+
+#ifdef DEBUG_STEPH
+      print_req( requests[cpt] );
+#endif
       if (!requests[cpt]->completed) {  /* this one is not done, wait on it */
+              printf("... blocked waiting a msg %d->%d, tag=%d\n",requests[cpt]->src,requests[cpt]->dst,requests[cpt]->tag);
         while (!requests[cpt]->completed)
           SIMIX_cond_wait(requests[cpt]->cond, requests[cpt]->mutex);
 
@@ -334,8 +361,14 @@ int smpi_mpi_waitany(int count, smpi_mpi_request_t * requests, int *index,
   }
 
 found_request:
+#ifdef DEBUG_STEPH
+      print_req( requests[cpt] );
+#endif
   requests[*index]->consumed = 1;
-
+#ifdef DEBUG_STEPH
+      print_req( requests[cpt] );
+#endif
+          printf("...accessing *req[%d]->consumed\n",cpt);
   if (NULL != status) {
     status->MPI_SOURCE = requests[*index]->src;
     status->MPI_TAG = requests[*index]->tag;
diff --git a/src/smpi/smpi_coll.c b/src/smpi/smpi_coll.c
new file mode 100644 (file)
index 0000000..23dee9b
--- /dev/null
@@ -0,0 +1,244 @@
+/* $Id$tag */
+
+/* smpi_coll.c -- various optimized routing for collectives                   */
+
+/* Copyright (c) 2009 Stephane Genaud.                                        */
+/* All rights reserved.                                                       */
+
+/* This program is free software; you can redistribute it and/or modify it
+ *  * under the terms of the license (GNU LGPL) which comes with this package. */
+
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "private.h"
+
+
+/* proctree taken and translated from P2P-MPI */
+
+struct proc_tree {
+        int PROCTREE_A;
+        int numChildren;
+        int * child;
+        int parent;
+        int me;
+        int root;
+        int isRoot;
+};
+typedef struct proc_tree * proc_tree_t;
+
+
+
+/* prototypes */
+void build_tree( int index, int extent, proc_tree_t *tree);
+proc_tree_t alloc_tree( int arity );
+void free_tree( proc_tree_t tree);
+void print_tree(proc_tree_t tree);
+int binomial_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm);
+
+
+
+/**
+ * alloc and init
+ **/
+proc_tree_t alloc_tree( int arity ) {
+        proc_tree_t tree = malloc(1*sizeof(struct proc_tree));
+        int i;
+
+        tree->PROCTREE_A = arity;
+        tree->isRoot = 0; 
+        tree->numChildren = 0;
+        tree->child = malloc(arity*sizeof(int));
+        for (i=0; i < arity; i++) {
+                tree->child[i] = -1;
+        }
+        tree->root = -1;
+        tree->parent = -1;
+        return( tree );
+}
+
+/**
+ * free
+ **/
+void free_tree( proc_tree_t tree) {
+        free (tree->child );
+        free(tree);
+}
+
+
+
+/**
+ * Build the tree depending on a process rank (index) and the group size (extent)
+ * @param index the rank of the calling process
+ * @param extent the total number of processes
+ **/
+void build_tree( int index, int extent, proc_tree_t *tree) {
+        int places = (*tree)->PROCTREE_A * index;
+        int i;
+        int ch;
+        int pr;
+
+        (*tree)->me = index;
+        (*tree)->root = 0 ;
+
+        for (i = 1; i <= (*tree)->PROCTREE_A; i++) {
+                ++places;
+                ch = (*tree)->PROCTREE_A * index + i + (*tree)->root;
+                //printf("places %d\n",places);
+                ch %= extent;
+                if (places < extent) {
+                         //printf("ch <%d> = <%d>\n",i,ch);
+                         //printf("adding to the tree at index <%d>\n\n",i-1);
+                        (*tree)->child[i - 1] = ch;
+                        (*tree)->numChildren++;
+                }
+                else {
+                       //fprintf(stderr,"not adding to the tree\n");
+                }
+        }
+        //fprintf(stderr,"procTree.numChildren <%d>\n",(*tree)->numChildren);
+
+        if (index == (*tree)->root) {
+                (*tree)->isRoot = 1;
+        }
+        else {
+                (*tree)->isRoot = 0;
+                pr = (index - 1) / (*tree)->PROCTREE_A;
+                (*tree)->parent = pr;
+        }
+}
+
+
+void print_tree(proc_tree_t tree) {
+      int i;
+      char *spacer;
+      if (-1 != tree->parent ) {
+            printf("[%d]\n   +---",tree->parent);
+            spacer= strdup("     ");
+      }
+      else {
+            spacer=strdup("");
+      }
+      printf("<%d>\n",tree->me);
+      for (i=0;i < tree->numChildren; i++) {
+              printf("%s   +--- %d\n", spacer,tree->child[i]);
+      }
+      free(spacer);
+}
+      
+/**
+ * bcast
+ **/
+int binomial_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root,
+                MPI_Comm comm)
+{
+        int system_tag = 999;  // used negative int but smpi_create_request() declares this illegal (to be checked)
+        int rank;
+        int retval = MPI_SUCCESS;
+        int i;
+        smpi_mpi_request_t request;
+        smpi_mpi_request_t * requests;
+        void **tmpbufs; 
+
+        rank = smpi_mpi_comm_rank(comm);
+        proc_tree_t tree = alloc_tree( 2 ); // arity=2: a binomial tree
+
+        build_tree( rank, comm->size, &tree );
+        /* wait for data from my parent in the tree */
+        if (!tree->isRoot) {
+#ifdef DEBUG_STEPH
+                printf("[%d] recv(%d  from %d, tag=%d)\n",rank,rank, tree->parent, system_tag+rank);
+#endif
+                retval = smpi_create_request(buf, count, datatype, 
+                                tree->parent, rank, 
+                                system_tag + rank, 
+                                comm, &request);
+                if (MPI_SUCCESS != retval) {
+                        printf("** internal error: smpi_create_request() rank=%d returned retval=%d, %s:%d\n",
+                                        rank,retval,__FILE__,__LINE__);
+                }
+                smpi_mpi_irecv(request);
+#ifdef DEBUG_STEPH
+                printf("[%d] waiting on irecv from %d\n",rank , tree->parent);
+#endif
+                smpi_mpi_wait(request, MPI_STATUS_IGNORE);
+        }
+
+        tmpbufs  = xbt_malloc( tree->numChildren * sizeof(void *)); 
+        requests = xbt_malloc( tree->numChildren * sizeof(smpi_mpi_request_t));
+#ifdef DEBUG_STEPH
+        printf("[%d] creates %d requests\n",rank,tree->numChildren);
+#endif
+
+        /* iniates sends to ranks lower in the tree */
+        for (i=0; i < tree->numChildren; i++) {
+                if (tree->child[i] != -1) {
+#ifdef DEBUG_STEPH
+                        printf("[%d] send(%d->%d, tag=%d)\n",rank,rank, tree->child[i], system_tag+tree->child[i]);
+#endif
+                        tmpbufs[i] =  xbt_malloc( count * datatype->size);
+                        memcpy( tmpbufs[i], buf, count * datatype->size * sizeof(char));
+                        retval = smpi_create_request(tmpbufs[i], count, datatype, 
+                                        rank, tree->child[i], 
+                                        system_tag + tree->child[i], 
+                                        comm, &(requests[i]));
+#ifdef DEBUG_STEPH
+                        printf("[%d] after create req[%d]=%p req->(src=%d,dst=%d)\n",rank , i, requests[i],requests[i]->src,requests[i]->dst );
+#endif
+                        if (MPI_SUCCESS != retval) {
+                              printf("** internal error: smpi_create_request() rank=%d returned retval=%d, %s:%d\n",
+                                              rank,retval,__FILE__,__LINE__);
+                        }
+                        smpi_mpi_isend(requests[i]);
+                        /* FIXME : we should not wait immediately here. See next FIXME. */
+                        smpi_mpi_wait( requests[i], MPI_STATUS_IGNORE);
+                        xbt_free(tmpbufs[i]);
+                        xbt_free(requests[i]);
+                }
+        }
+        /* FIXME : normally, we sould wait only once all isend have been issued:
+         * this is the following commented code. It deadlocks, probably because
+         * of a bug in the sender process */
+        
+        /* wait for completion of sends */
+        /* printf("[%d] wait for %d send completions\n",rank,tree->numChildren);
+          smpi_mpi_waitall( tree->numChildren, requests, MPI_STATUS_IGNORE);
+         printf("[%d] reqs completed\n)",rank);
+           */
+        
+        xbt_free(tmpbufs);
+        xbt_free(requests);
+        return(retval);
+}
+
+/**
+ * example usage
+ **/
+/*
+ * int main() {
+
+      int rank; 
+      int size=12;
+
+      proc_tree_t tree;
+      for (rank=0;rank<size;rank++) {
+            printf("--------------tree for rank %d ----------\n",rank);
+            tree = alloc_tree( 2 );
+            build_tree( rank, size, &tree );
+            print_tree( tree );
+            free_tree( tree );
+   
+      }
+      printf("-------------- bcast ----------\n");
+      for (rank=0;rank<size;rank++) {
+              bcast( rank, size );
+      }
+
+
+}
+*/
+
+                
+
index fb65892..9267288 100644 (file)
@@ -1,4 +1,7 @@
+
+
 #include "private.h"
+#include "smpi_coll_private.h"
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_mpi, smpi,
                                 "Logging specific to SMPI (mpi)");
@@ -203,31 +206,50 @@ int SMPI_MPI_Waitany(int count, MPI_Request requests[], int *index,
 /**
  * MPI_Bcast
  **/
+
+/**
+ * flat bcast 
+ **/
+int flat_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm);
+int flat_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root,
+                MPI_Comm comm)
+{
+        int rank;
+        int retval = MPI_SUCCESS;
+        smpi_mpi_request_t request;
+
+        rank = smpi_mpi_comm_rank(comm);
+        if (rank == root) {
+                retval = smpi_create_request(buf, count, datatype, root,
+                                (root + 1) % comm->size, 0, comm, &request);
+                request->forward = comm->size - 1;
+                smpi_mpi_isend(request);
+        } else {
+                retval = smpi_create_request(buf, count, datatype, MPI_ANY_SOURCE, rank,
+                                0, comm, &request);
+                smpi_mpi_irecv(request);
+        }
+
+        smpi_mpi_wait(request, MPI_STATUS_IGNORE);
+        xbt_mallocator_release(smpi_global->request_mallocator, request);
+
+        return(retval);
+
+}
+
+/**
+ * Bcast user entry point
+ **/
 int SMPI_MPI_Bcast(void *buf, int count, MPI_Datatype datatype, int root,
                    MPI_Comm comm)
 {
 
   int retval = MPI_SUCCESS;
-  int rank;
-  smpi_mpi_request_t request;
 
   smpi_bench_end();
 
-  rank = smpi_mpi_comm_rank(comm);
-
-  if (rank == root) {
-    retval = smpi_create_request(buf, count, datatype, root,
-                                 (root + 1) % comm->size, 0, comm, &request);
-    request->forward = comm->size - 1;
-    smpi_mpi_isend(request);
-  } else {
-    retval = smpi_create_request(buf, count, datatype, MPI_ANY_SOURCE, rank,
-                                 0, comm, &request);
-    smpi_mpi_irecv(request);
-  }
-
-  smpi_mpi_wait(request, MPI_STATUS_IGNORE);
-  xbt_mallocator_release(smpi_global->request_mallocator, request);
+  //retval = flat_tree_bcast(buf, count, datatype, root, comm);
+  retval = binomial_tree_bcast(buf, count, datatype, root, comm);
 
   smpi_bench_begin();
 
index cd4766a..90d6ea8 100644 (file)
@@ -23,6 +23,8 @@ int smpi_receiver(int argc, char *argv[])
   request_queue = mydata->pending_recv_request_queue;
   message_queue = mydata->received_message_queue;
 
+
+
   while (1) {
     // FIXME: better algorithm, maybe some kind of balanced tree? or a heap?
 
@@ -30,6 +32,11 @@ int smpi_receiver(int argc, char *argv[])
       xbt_fifo_foreach(message_queue, message_item, message,
                        smpi_received_message_t) {
 
+//#define DEBUG_MATCH
+#ifdef DEBUG_MATCH
+        printf("[%s] try match (req_src=%d,msg_src=%d)x(req_tag=%d,msg_tag=%d)\n",
+                        __FILE__,request->src,message->src,request->tag, message->tag);
+#endif
         if (request->comm == message->comm &&
             (MPI_ANY_SOURCE == request->src || request->src == message->src)
             && (MPI_ANY_TAG == request->tag || request->tag == message->tag)) {
@@ -37,6 +44,10 @@ int smpi_receiver(int argc, char *argv[])
           xbt_fifo_free_item(request_item);
           xbt_fifo_remove_item(message_queue, message_item);
           xbt_fifo_free_item(message_item);
+#ifdef DEBUG_MATCH
+        printf("[%s] found match: req_src=%d,msg_src=%d)x(req_tag=%d,msg_tag=%d)\n",
+                        __FILE__,request->src,message->src,request->tag, message->tag);
+#endif
           goto stopsearch;
         }
       }
index 7700a16..129caab 100644 (file)
@@ -61,6 +61,10 @@ int smpi_sender(int argc, char *argv[])
           (request->dst + message->forward + 1) % request->comm->size;
         xbt_fifo_push(request_queue, request);
       } else {
+//#define DEBUG_MATCH
+#ifdef DEBUG_MATCH
+ printf("**SENDER: request %p completed :=1\n",request);
+#endif
         request->completed = 1;
       }