From d30fcaa41c0aabda5afef028a793b9e295a4aadb Mon Sep 17 00:00:00 2001 From: genaud Date: Thu, 2 Jul 2009 16:22:39 +0000 Subject: [PATCH 1/1] * added support for optimized collectives: smpi_coll.c has tree structures and tree implementations * Bcast has its original implementation renamed as flat_tree_bcast() and the new default one is binomial_tree_bcast() git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6443 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- ChangeLog | 3 +- src/Makefile.am | 3 +- src/smpi/smpi_base.c | 61 +++++++--- src/smpi/smpi_coll.c | 244 +++++++++++++++++++++++++++++++++++++++ src/smpi/smpi_mpi.c | 56 ++++++--- src/smpi/smpi_receiver.c | 11 ++ src/smpi/smpi_sender.c | 4 + 7 files changed, 349 insertions(+), 33 deletions(-) create mode 100644 src/smpi/smpi_coll.c diff --git a/ChangeLog b/ChangeLog index b78d7de45a..4454991316 100644 --- a/ChangeLog +++ b/ChangeLog @@ -2,7 +2,8 @@ SimGrid (3.3.2-svn) unstable; urgency=low SMPI: * Implement some more MPI primitives: - MPI_Waitany, MPI_Waitall, MPI_Allreduce + MPI_Waitany, MPI_Waitall, MPI_Reduce, MPI_Allreduce + * Add support for optimized collectives (Bcast is now binomial by default) SURF: * Extract the routing logic into its own object. diff --git a/src/Makefile.am b/src/Makefile.am index a184411e5d..a9ea7dcaee 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -215,7 +215,8 @@ SMPI_SRC= \ smpi/smpi_mpi.c \ smpi/smpi_sender.c \ smpi/smpi_receiver.c \ - smpi/smpi_util.c + smpi/smpi_util.c \ + smpi/smpi_coll.c MSG_SRC= msg/msg_config.c \ msg/task.c msg/host.c msg/m_process.c msg/gos.c \ diff --git a/src/smpi/smpi_base.c b/src/smpi/smpi_base.c index 2a7961452c..e5af68d34c 100644 --- a/src/smpi/smpi_base.c +++ b/src/smpi/smpi_base.c @@ -262,6 +262,15 @@ int smpi_mpi_irecv(smpi_mpi_request_t request) return retval; } +void print_req( smpi_mpi_request_t r ); +void print_req( smpi_mpi_request_t r ) { + printf("***req %p-> src=%d dst=%d tag=%d completed=0x%x consumed=0x%x\n",r,r->src,r->dst,r->tag,r->completed,r->consumed); +} + + +/** + * wait and friends ... + **/ int smpi_mpi_wait(smpi_mpi_request_t request, smpi_mpi_status_t * status) { int retval = MPI_SUCCESS; @@ -270,6 +279,10 @@ int smpi_mpi_wait(smpi_mpi_request_t request, smpi_mpi_status_t * status) retval = MPI_ERR_INTERN; } else { SIMIX_mutex_lock(request->mutex); + +#ifdef DEBUG_STEPH + print_req( request ); //@@ +#endif while (!request->completed) { SIMIX_cond_wait(request->cond, request->mutex); } @@ -284,23 +297,30 @@ int smpi_mpi_wait(smpi_mpi_request_t request, smpi_mpi_status_t * status) return retval; } +/** + * waitall + **/ int smpi_mpi_waitall(int count, smpi_mpi_request_t requests[], - smpi_mpi_status_t status[]) + smpi_mpi_status_t status[]) { - int cpt; - int index; - int retval; - smpi_mpi_status_t stat; - - for (cpt = 0; cpt < count; cpt++) { - retval = smpi_mpi_waitany(count, requests, &index, &stat); - if (retval != MPI_SUCCESS) - return retval; - memcpy(&(status[index]), &stat, sizeof(stat)); - } - return MPI_SUCCESS; + int cpt; + int index; + int retval; + smpi_mpi_status_t stat; + + for (cpt = 0; cpt < count; cpt++) { + retval = smpi_mpi_waitany(count, requests, &index, &stat); + if (retval != MPI_SUCCESS) + return retval; + if (MPI_STATUS_IGNORE != status) + memcpy(&(status[index]), &stat, sizeof(stat)); + } + return MPI_SUCCESS; } +/** + * waitany + **/ int smpi_mpi_waitany(int count, smpi_mpi_request_t * requests, int *index, smpi_mpi_status_t * status) { @@ -312,7 +332,9 @@ int smpi_mpi_waitany(int count, smpi_mpi_request_t * requests, int *index, } /* First check if one of them is already done */ for (cpt = 0; cpt < count; cpt++) { + printf("...exam req[%d] of msg from [%d]\n",cpt,requests[cpt]->src); if (requests[cpt]->completed && !requests[cpt]->consumed) { /* got ya */ + printf("...found match req[%d] of msg from [%d]\n",cpt,requests[cpt]->src); *index = cpt; goto found_request; } @@ -321,7 +343,12 @@ int smpi_mpi_waitany(int count, smpi_mpi_request_t * requests, int *index, /* FIXME: should use a SIMIX_cond_waitany, when implemented. For now, block on the first one */ while (1) { for (cpt = 0; cpt < count; cpt++) { + +#ifdef DEBUG_STEPH + print_req( requests[cpt] ); +#endif if (!requests[cpt]->completed) { /* this one is not done, wait on it */ + printf("... blocked waiting a msg %d->%d, tag=%d\n",requests[cpt]->src,requests[cpt]->dst,requests[cpt]->tag); while (!requests[cpt]->completed) SIMIX_cond_wait(requests[cpt]->cond, requests[cpt]->mutex); @@ -334,8 +361,14 @@ int smpi_mpi_waitany(int count, smpi_mpi_request_t * requests, int *index, } found_request: +#ifdef DEBUG_STEPH + print_req( requests[cpt] ); +#endif requests[*index]->consumed = 1; - +#ifdef DEBUG_STEPH + print_req( requests[cpt] ); +#endif + printf("...accessing *req[%d]->consumed\n",cpt); if (NULL != status) { status->MPI_SOURCE = requests[*index]->src; status->MPI_TAG = requests[*index]->tag; diff --git a/src/smpi/smpi_coll.c b/src/smpi/smpi_coll.c new file mode 100644 index 0000000000..23dee9bf86 --- /dev/null +++ b/src/smpi/smpi_coll.c @@ -0,0 +1,244 @@ +/* $Id$tag */ + +/* smpi_coll.c -- various optimized routing for collectives */ + +/* Copyright (c) 2009 Stephane Genaud. */ +/* All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * * under the terms of the license (GNU LGPL) which comes with this package. */ + + +#include +#include +#include + +#include "private.h" + + +/* proctree taken and translated from P2P-MPI */ + +struct proc_tree { + int PROCTREE_A; + int numChildren; + int * child; + int parent; + int me; + int root; + int isRoot; +}; +typedef struct proc_tree * proc_tree_t; + + + +/* prototypes */ +void build_tree( int index, int extent, proc_tree_t *tree); +proc_tree_t alloc_tree( int arity ); +void free_tree( proc_tree_t tree); +void print_tree(proc_tree_t tree); +int binomial_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm); + + + +/** + * alloc and init + **/ +proc_tree_t alloc_tree( int arity ) { + proc_tree_t tree = malloc(1*sizeof(struct proc_tree)); + int i; + + tree->PROCTREE_A = arity; + tree->isRoot = 0; + tree->numChildren = 0; + tree->child = malloc(arity*sizeof(int)); + for (i=0; i < arity; i++) { + tree->child[i] = -1; + } + tree->root = -1; + tree->parent = -1; + return( tree ); +} + +/** + * free + **/ +void free_tree( proc_tree_t tree) { + free (tree->child ); + free(tree); +} + + + +/** + * Build the tree depending on a process rank (index) and the group size (extent) + * @param index the rank of the calling process + * @param extent the total number of processes + **/ +void build_tree( int index, int extent, proc_tree_t *tree) { + int places = (*tree)->PROCTREE_A * index; + int i; + int ch; + int pr; + + (*tree)->me = index; + (*tree)->root = 0 ; + + for (i = 1; i <= (*tree)->PROCTREE_A; i++) { + ++places; + ch = (*tree)->PROCTREE_A * index + i + (*tree)->root; + //printf("places %d\n",places); + ch %= extent; + if (places < extent) { + //printf("ch <%d> = <%d>\n",i,ch); + //printf("adding to the tree at index <%d>\n\n",i-1); + (*tree)->child[i - 1] = ch; + (*tree)->numChildren++; + } + else { + //fprintf(stderr,"not adding to the tree\n"); + } + } + //fprintf(stderr,"procTree.numChildren <%d>\n",(*tree)->numChildren); + + if (index == (*tree)->root) { + (*tree)->isRoot = 1; + } + else { + (*tree)->isRoot = 0; + pr = (index - 1) / (*tree)->PROCTREE_A; + (*tree)->parent = pr; + } +} + + +void print_tree(proc_tree_t tree) { + int i; + char *spacer; + if (-1 != tree->parent ) { + printf("[%d]\n +---",tree->parent); + spacer= strdup(" "); + } + else { + spacer=strdup(""); + } + printf("<%d>\n",tree->me); + for (i=0;i < tree->numChildren; i++) { + printf("%s +--- %d\n", spacer,tree->child[i]); + } + free(spacer); +} + +/** + * bcast + **/ +int binomial_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root, + MPI_Comm comm) +{ + int system_tag = 999; // used negative int but smpi_create_request() declares this illegal (to be checked) + int rank; + int retval = MPI_SUCCESS; + int i; + smpi_mpi_request_t request; + smpi_mpi_request_t * requests; + void **tmpbufs; + + rank = smpi_mpi_comm_rank(comm); + proc_tree_t tree = alloc_tree( 2 ); // arity=2: a binomial tree + + build_tree( rank, comm->size, &tree ); + /* wait for data from my parent in the tree */ + if (!tree->isRoot) { +#ifdef DEBUG_STEPH + printf("[%d] recv(%d from %d, tag=%d)\n",rank,rank, tree->parent, system_tag+rank); +#endif + retval = smpi_create_request(buf, count, datatype, + tree->parent, rank, + system_tag + rank, + comm, &request); + if (MPI_SUCCESS != retval) { + printf("** internal error: smpi_create_request() rank=%d returned retval=%d, %s:%d\n", + rank,retval,__FILE__,__LINE__); + } + smpi_mpi_irecv(request); +#ifdef DEBUG_STEPH + printf("[%d] waiting on irecv from %d\n",rank , tree->parent); +#endif + smpi_mpi_wait(request, MPI_STATUS_IGNORE); + } + + tmpbufs = xbt_malloc( tree->numChildren * sizeof(void *)); + requests = xbt_malloc( tree->numChildren * sizeof(smpi_mpi_request_t)); +#ifdef DEBUG_STEPH + printf("[%d] creates %d requests\n",rank,tree->numChildren); +#endif + + /* iniates sends to ranks lower in the tree */ + for (i=0; i < tree->numChildren; i++) { + if (tree->child[i] != -1) { +#ifdef DEBUG_STEPH + printf("[%d] send(%d->%d, tag=%d)\n",rank,rank, tree->child[i], system_tag+tree->child[i]); +#endif + tmpbufs[i] = xbt_malloc( count * datatype->size); + memcpy( tmpbufs[i], buf, count * datatype->size * sizeof(char)); + retval = smpi_create_request(tmpbufs[i], count, datatype, + rank, tree->child[i], + system_tag + tree->child[i], + comm, &(requests[i])); +#ifdef DEBUG_STEPH + printf("[%d] after create req[%d]=%p req->(src=%d,dst=%d)\n",rank , i, requests[i],requests[i]->src,requests[i]->dst ); +#endif + if (MPI_SUCCESS != retval) { + printf("** internal error: smpi_create_request() rank=%d returned retval=%d, %s:%d\n", + rank,retval,__FILE__,__LINE__); + } + smpi_mpi_isend(requests[i]); + /* FIXME : we should not wait immediately here. See next FIXME. */ + smpi_mpi_wait( requests[i], MPI_STATUS_IGNORE); + xbt_free(tmpbufs[i]); + xbt_free(requests[i]); + } + } + /* FIXME : normally, we sould wait only once all isend have been issued: + * this is the following commented code. It deadlocks, probably because + * of a bug in the sender process */ + + /* wait for completion of sends */ + /* printf("[%d] wait for %d send completions\n",rank,tree->numChildren); + smpi_mpi_waitall( tree->numChildren, requests, MPI_STATUS_IGNORE); + printf("[%d] reqs completed\n)",rank); + */ + + xbt_free(tmpbufs); + xbt_free(requests); + return(retval); +} + +/** + * example usage + **/ +/* + * int main() { + + int rank; + int size=12; + + proc_tree_t tree; + for (rank=0;ranksize, 0, comm, &request); + request->forward = comm->size - 1; + smpi_mpi_isend(request); + } else { + retval = smpi_create_request(buf, count, datatype, MPI_ANY_SOURCE, rank, + 0, comm, &request); + smpi_mpi_irecv(request); + } + + smpi_mpi_wait(request, MPI_STATUS_IGNORE); + xbt_mallocator_release(smpi_global->request_mallocator, request); + + return(retval); + +} + +/** + * Bcast user entry point + **/ int SMPI_MPI_Bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm) { int retval = MPI_SUCCESS; - int rank; - smpi_mpi_request_t request; smpi_bench_end(); - rank = smpi_mpi_comm_rank(comm); - - if (rank == root) { - retval = smpi_create_request(buf, count, datatype, root, - (root + 1) % comm->size, 0, comm, &request); - request->forward = comm->size - 1; - smpi_mpi_isend(request); - } else { - retval = smpi_create_request(buf, count, datatype, MPI_ANY_SOURCE, rank, - 0, comm, &request); - smpi_mpi_irecv(request); - } - - smpi_mpi_wait(request, MPI_STATUS_IGNORE); - xbt_mallocator_release(smpi_global->request_mallocator, request); + //retval = flat_tree_bcast(buf, count, datatype, root, comm); + retval = binomial_tree_bcast(buf, count, datatype, root, comm); smpi_bench_begin(); diff --git a/src/smpi/smpi_receiver.c b/src/smpi/smpi_receiver.c index cd4766adaa..90d6ea8632 100644 --- a/src/smpi/smpi_receiver.c +++ b/src/smpi/smpi_receiver.c @@ -23,6 +23,8 @@ int smpi_receiver(int argc, char *argv[]) request_queue = mydata->pending_recv_request_queue; message_queue = mydata->received_message_queue; + + while (1) { // FIXME: better algorithm, maybe some kind of balanced tree? or a heap? @@ -30,6 +32,11 @@ int smpi_receiver(int argc, char *argv[]) xbt_fifo_foreach(message_queue, message_item, message, smpi_received_message_t) { +//#define DEBUG_MATCH +#ifdef DEBUG_MATCH + printf("[%s] try match (req_src=%d,msg_src=%d)x(req_tag=%d,msg_tag=%d)\n", + __FILE__,request->src,message->src,request->tag, message->tag); +#endif if (request->comm == message->comm && (MPI_ANY_SOURCE == request->src || request->src == message->src) && (MPI_ANY_TAG == request->tag || request->tag == message->tag)) { @@ -37,6 +44,10 @@ int smpi_receiver(int argc, char *argv[]) xbt_fifo_free_item(request_item); xbt_fifo_remove_item(message_queue, message_item); xbt_fifo_free_item(message_item); +#ifdef DEBUG_MATCH + printf("[%s] found match: req_src=%d,msg_src=%d)x(req_tag=%d,msg_tag=%d)\n", + __FILE__,request->src,message->src,request->tag, message->tag); +#endif goto stopsearch; } } diff --git a/src/smpi/smpi_sender.c b/src/smpi/smpi_sender.c index 7700a169f8..129caab533 100644 --- a/src/smpi/smpi_sender.c +++ b/src/smpi/smpi_sender.c @@ -61,6 +61,10 @@ int smpi_sender(int argc, char *argv[]) (request->dst + message->forward + 1) % request->comm->size; xbt_fifo_push(request_queue, request); } else { +//#define DEBUG_MATCH +#ifdef DEBUG_MATCH + printf("**SENDER: request %p completed :=1\n",request); +#endif request->completed = 1; } -- 2.20.1