From c7abe4a06a869040289677126a8c3e1b9a92216f Mon Sep 17 00:00:00 2001 From: genaud Date: Thu, 30 Jul 2009 19:51:17 +0000 Subject: [PATCH] alltoall implemented (almost opmpi algorithms) git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6532 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- src/smpi/BUGS | 10 +++++++--- src/smpi/smpi_base.c | 32 +++++++++++++++----------------- src/smpi/smpi_coll.c | 34 +++++++++++++++++++++++----------- src/smpi/smpi_mpi.c | 15 ++++++++++----- src/smpi/smpi_receiver.c | 28 ++++++++++++---------------- src/smpi/smpi_sender.c | 10 ++++++---- 6 files changed, 73 insertions(+), 56 deletions(-) diff --git a/src/smpi/BUGS b/src/smpi/BUGS index a70b0ff3fc..b6b24e8a12 100644 --- a/src/smpi/BUGS +++ b/src/smpi/BUGS @@ -1,10 +1,14 @@ -* Vérifier que la condition et le mutex de la requete sont clean lors du mallocator_reset - + + +* if the program has a different main() prototype than + int main( int argc, char *argv[] ) + then the compilation smpicc fails : conflicting types for ‘smpi_simulated_main’ + since smpicc substitutes smpi_simulated_main for the user main() declaration. + * MPI_Barrier() Tue Jun 30 11:49:07 UTC 2009 - genaud - several calls to MPI_Barrier() result in: genaud@boa:~/Documents/svn/simgrid/simgrid/trunk/src/smpi$ ./smpirun -np 2 -platform g1.xml -hostfile hostfile ./barrier diff --git a/src/smpi/smpi_base.c b/src/smpi/smpi_base.c index e3b944fd8a..f3bfa38085 100644 --- a/src/smpi/smpi_base.c +++ b/src/smpi/smpi_base.c @@ -198,7 +198,7 @@ void smpi_process_init(int *argc, char ***argv) /* get rank from command line, and remove it from argv */ pdata->index = atoi((*argv)[1]); - DEBUG1("I'm rank %d", pdata->index); + DEBUG1("I'm rank <%d>", pdata->index); if (*argc > 2) { memmove((*argv)[1], (*argv)[2], sizeof(char *) * (*argc - 2)); (*argv)[(*argc) - 1] = NULL; @@ -303,7 +303,7 @@ int smpi_mpi_irecv(smpi_mpi_request_t request) void print_req( smpi_mpi_request_t r ); void print_req( smpi_mpi_request_t r ) { - printf("***req %p-> src=%d dst=%d tag=%d completed=0x%x consumed=0x%x\n",r,r->src,r->dst,r->tag,r->completed,r->consumed); + fprintf(stderr,"***req %p-> src=%d dst=%d tag=%d completed=0x%x consumed=0x%x\n",r,r->src,r->dst,r->tag,r->completed,r->consumed); } @@ -317,6 +317,9 @@ int smpi_mpi_wait(smpi_mpi_request_t request, smpi_mpi_status_t * status) if (NULL == request) { retval = MPI_ERR_INTERN; } else { + + DEBUG3("entered smpi_mpi_wait() for req_src=%d,req_dst=%d,req_tag=%d", + request->src,request->dst,request->tag); SIMIX_mutex_lock(request->mutex); //#define DEBUG_STEPH #ifdef DEBUG_STEPH @@ -365,19 +368,16 @@ int smpi_mpi_waitany(int count, smpi_mpi_request_t * requests, int *index, { int cpt; + DEBUG0("entering smpi_wait_any() ..."); *index = MPI_UNDEFINED; if (NULL == requests) { return MPI_ERR_INTERN; } /* First check if one of them is already done */ for (cpt = 0; cpt < count; cpt++) { -#ifdef DEBUG_STEPH - printf("...exam req[%d] of msg from [%d]\n",cpt,requests[cpt]->src); -#endif + DEBUG2(" exam req[%d] of msg from <%d>",cpt,requests[cpt]->src); if (requests[cpt]->completed && !requests[cpt]->consumed) { /* got ya */ -#ifdef DEBUG_STEPH - printf("...found match req[%d] of msg from [%d]\n",cpt,requests[cpt]->src); -#endif + DEBUG2("smpi_wait_any() found match req[%d] of msg from <%d>",cpt,requests[cpt]->src); *index = cpt; goto found_request; } @@ -391,9 +391,7 @@ int smpi_mpi_waitany(int count, smpi_mpi_request_t * requests, int *index, print_req( requests[cpt] ); #endif if (!requests[cpt]->completed) { /* this one is not done, wait on it */ -#ifdef DEBUG_STEPH - printf("... blocked waiting a msg %d->%d, tag=%d\n",requests[cpt]->src,requests[cpt]->dst,requests[cpt]->tag); -#endif + DEBUG3("smpi_waitany() blocked waiting a msg <%d> -> <%d>, tag=%d",requests[cpt]->src,requests[cpt]->dst,requests[cpt]->tag); while (!requests[cpt]->completed) SIMIX_cond_wait(requests[cpt]->cond, requests[cpt]->mutex); @@ -407,17 +405,17 @@ int smpi_mpi_waitany(int count, smpi_mpi_request_t * requests, int *index, found_request: #ifdef DEBUG_STEPH - print_req( requests[cpt] ); + print_req( requests[cpt] ); #endif requests[*index]->consumed = 1; #ifdef DEBUG_STEPH - print_req( requests[cpt] ); - printf("...accessing *req[%d]->consumed\n",cpt); + print_req( requests[cpt] ); #endif + DEBUG2("smpi_waitany() request %p unblocked ... mark *req[%d]->consumed",requests[*index],cpt); if (NULL != status) { - status->MPI_SOURCE = requests[*index]->src; - status->MPI_TAG = requests[*index]->tag; - status->MPI_ERROR = MPI_SUCCESS; + status->MPI_SOURCE = requests[*index]->src; + status->MPI_TAG = requests[*index]->tag; + status->MPI_ERROR = MPI_SUCCESS; } return MPI_SUCCESS; diff --git a/src/smpi/smpi_coll.c b/src/smpi/smpi_coll.c index 729645386b..68815921e5 100644 --- a/src/smpi/smpi_coll.c +++ b/src/smpi/smpi_coll.c @@ -12,6 +12,7 @@ #include #include #include +#include #include "private.h" #include "smpi_coll_private.h" @@ -151,7 +152,7 @@ int tree_bcast( void *buf, int count, MPI_Datatype datatype, int root, /* wait for data from my parent in the tree */ if (!tree->isRoot) { - DEBUG4("[%d] recv(%d from %d, tag=%d)\n",rank,rank, tree->parent, system_tag+rank); + DEBUG3("<%d> tree_bcast(): i am not root: recv from %d, tag=%d)",rank,tree->parent,system_tag+rank); retval = smpi_create_request(buf, count, datatype, tree->parent, rank, system_tag + rank, @@ -161,23 +162,22 @@ int tree_bcast( void *buf, int count, MPI_Datatype datatype, int root, rank,retval,__FILE__,__LINE__); } smpi_mpi_irecv(request); - DEBUG2("[%d] waiting on irecv from %d\n",rank , tree->parent); + DEBUG2("<%d> tree_bcast(): waiting on irecv from %d",rank, tree->parent); smpi_mpi_wait(request, MPI_STATUS_IGNORE); xbt_mallocator_release(smpi_global->request_mallocator, request); } requests = xbt_malloc( tree->numChildren * sizeof(smpi_mpi_request_t)); - DEBUG2("[%d] creates %d requests\n",rank,tree->numChildren); + DEBUG2("<%d> creates %d requests (1 per child)\n",rank,tree->numChildren); /* iniates sends to ranks lower in the tree */ for (i=0; i < tree->numChildren; i++) { if (tree->child[i] != -1) { - DEBUG4("[%d] send(%d->%d, tag=%d)\n",rank,rank, tree->child[i], system_tag+tree->child[i]); + DEBUG3("<%d> send to <%d>,tag=%d",rank,tree->child[i], system_tag+tree->child[i]); retval = smpi_create_request(buf, count, datatype, rank, tree->child[i], system_tag + tree->child[i], comm, &(requests[i])); - DEBUG5("[%d] after create req[%d]=%p req->(src=%d,dst=%d)\n",rank,i,requests[i],requests[i]->src,requests[i]->dst ); if (MPI_SUCCESS != retval) { printf("** internal error: smpi_create_request() rank=%d returned retval=%d, %s:%d\n", rank,retval,__FILE__,__LINE__); @@ -271,6 +271,7 @@ int rank; int retval; rank = smpi_mpi_comm_rank( comm ); + DEBUG2("<%d> entered nary_tree_bcast(), arity=%d",rank,arity); // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI)) proc_tree_t tree = alloc_tree( arity ); build_tree( rank, comm->size, &tree ); @@ -335,7 +336,7 @@ int smpi_coll_tuned_alltoall_pairwise (void *sendbuf, int sendcount, MPI_Datatyp void * tmpsend, *tmprecv; rank = smpi_mpi_comm_rank(comm); - INFO1("[%d] algorithm alltoall_pairwise() called.\n",rank); + INFO1("<%d> algorithm alltoall_pairwise() called.\n",rank); /* Perform pairwise exchange - starting from 1 so the local copy is last */ @@ -408,10 +409,12 @@ int smpi_coll_tuned_alltoall_basic_linear(void *sbuf, int scount, MPI_Datatype s /* Initialize. */ rank = smpi_mpi_comm_rank(comm); - INFO1("[%d] algorithm alltoall_basic_linear() called.\n",rank); + DEBUG1("<%d> algorithm alltoall_basic_linear() called.",rank); err = smpi_mpi_type_get_extent(sdtype, &lb, &sndinc); err = smpi_mpi_type_get_extent(rdtype, &lb, &rcvinc); + sndinc *= scount; + rcvinc *= rcount; /* simple optimization */ psnd = ((char *) sbuf) + (rank * sndinc); prcv = ((char *) rbuf) + (rank * rcvinc); @@ -439,7 +442,7 @@ int smpi_coll_tuned_alltoall_basic_linear(void *sbuf, int scount, MPI_Datatype s system_alltoall_tag, comm, &(reqs[nreq])); if (MPI_SUCCESS != err) { - DEBUG2("[%d] failed to create request for rank %d\n",rank,i); + DEBUG2("<%d> failed to create request for rank %d",rank,i); for (i=0;i< nreq;i++) xbt_mallocator_release(smpi_global->request_mallocator, reqs[i]); return err; @@ -456,7 +459,7 @@ int smpi_coll_tuned_alltoall_basic_linear(void *sbuf, int scount, MPI_Datatype s system_alltoall_tag, comm, &(reqs[nreq])); if (MPI_SUCCESS != err) { - DEBUG2("[%d] failed to create request for rank %d\n",rank,i); + DEBUG2("<%d> failed to create request for rank %d\n",rank,i); for (i=0;i< nreq;i++) xbt_mallocator_release(smpi_global->request_mallocator, reqs[i]); return err; @@ -466,9 +469,11 @@ int smpi_coll_tuned_alltoall_basic_linear(void *sbuf, int scount, MPI_Datatype s /* Start your engines. This will never return an error. */ for ( i=0; i< nreq/2; i++ ) { + DEBUG3("<%d> issued irecv request reqs[%d]=%p",rank,i,reqs[i]); smpi_mpi_irecv( reqs[i] ); } for ( i= nreq/2; i issued isend request reqs[%d]=%p",rank,i,reqs[i]); smpi_mpi_isend( reqs[i] ); } @@ -480,12 +485,19 @@ int smpi_coll_tuned_alltoall_basic_linear(void *sbuf, int scount, MPI_Datatype s * So free them anyway -- even if there was an error, and return * the error after we free everything. */ - err = smpi_mpi_waitall(nreq, reqs, MPI_STATUS_IGNORE); + DEBUG2("<%d> wait for %d requests",rank,nreq); + // waitall is buggy: use a loop instead for the moment + // err = smpi_mpi_waitall(nreq, reqs, MPI_STATUS_IGNORE); + for (i=0;irequest_mallocator, reqs[i]); } + xbt_free( reqs ); return err; } diff --git a/src/smpi/smpi_mpi.c b/src/smpi/smpi_mpi.c index 2b9caebbb1..fdd8a4f2a2 100644 --- a/src/smpi/smpi_mpi.c +++ b/src/smpi/smpi_mpi.c @@ -296,6 +296,9 @@ int smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm) { int retval = MPI_SUCCESS; + int rank = smpi_mpi_comm_rank(comm); + + DEBUG1("<%d> entered smpi_mpi_bcast(). Calls nary_tree_bcast()",rank); //retval = flat_tree_bcast(buf, count, datatype, root, comm); retval = nary_tree_bcast(buf, count, datatype, root, comm, 2 ); return retval; @@ -358,7 +361,7 @@ int smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count, int rank; int size; int i; - int tag = 0; + int system_tag = 666; smpi_mpi_request_t *requests; smpi_mpi_request_t request; @@ -366,13 +369,14 @@ int smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count, rank = smpi_mpi_comm_rank(comm); size = comm->size; + DEBUG1("<%d> entered smpi_mpi_reduce()",rank); if (rank != root) { // if i am not ROOT, simply send my buffer to root #ifdef DEBUG_REDUCE print_buffer_int(sendbuf, count, xbt_strdup("sndbuf"), rank); #endif - retval = smpi_create_request(sendbuf, count, datatype, rank, root, tag, comm, + retval = smpi_create_request(sendbuf, count, datatype, rank, root, system_tag, comm, &request); smpi_mpi_isend(request); smpi_mpi_wait(request, MPI_STATUS_IGNORE); @@ -397,7 +401,7 @@ int smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count, // reminder: for smpi_create_request() the src is always the process sending. src = i < root ? i : i + 1; retval = smpi_create_request(tmpbufs[i], count, datatype, - src, root, tag, comm, &(requests[i])); + src, root, system_tag, comm, &(requests[i])); if (NULL != requests[i] && MPI_SUCCESS == retval) { if (MPI_SUCCESS == retval) { smpi_mpi_irecv(requests[i]); @@ -408,8 +412,9 @@ int smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count, for (i = 0; i < size-1; i++) { int index = MPI_UNDEFINED; smpi_mpi_waitany( size-1, requests, &index, MPI_STATUS_IGNORE); + DEBUG3("<%d> waitany() unblocked by reception (completes request[%d]) (%d reqs remaining)", + rank,index,size-i-2); #ifdef DEBUG_REDUCE - printf ("MPI_Waitany() unblocked: root received (completes req[index=%d])\n",index); print_buffer_int(tmpbufs[index], count, bprintf("tmpbufs[index=%d] (value received)", index), rank); #endif @@ -561,7 +566,7 @@ int SMPI_MPI_Alltoall(void *sendbuf, int sendcount, MPI_Datatype datatype, rank = smpi_mpi_comm_rank(comm); block_dsize = datatype->size * sendcount; - INFO2("[%d] optimized alltoall() called. Block size sent to each rank=%d.\n",rank,block_dsize); + DEBUG2("<%d> optimized alltoall() called. Block size sent to each rank: %d bytes.",rank,block_dsize); if ((block_dsize < 200) && (comm->size > 12)) { retval = smpi_coll_tuned_alltoall_bruck(sendbuf, sendcount, datatype, diff --git a/src/smpi/smpi_receiver.c b/src/smpi/smpi_receiver.c index b090ab11ac..999f5612fe 100644 --- a/src/smpi/smpi_receiver.c +++ b/src/smpi/smpi_receiver.c @@ -40,22 +40,18 @@ int smpi_receiver(int argc, char *argv[]) __FILE__,request->src,message->src,request->tag, message->tag); #endif if (request->comm == message->comm && - (MPI_ANY_SOURCE == request->src || request->src == message->src) - && (MPI_ANY_TAG == request->tag || request->tag == message->tag)) { - xbt_fifo_remove_item(request_queue, request_item); - xbt_fifo_free_item(request_item); - xbt_fifo_remove_item(message_queue, message_item); - xbt_fifo_free_item(message_item); -#ifdef DEBUG_MATCH - printf("[%s] found match: req_src=%d,msg_src=%d)x(req_tag=%d,msg_tag=%d)\n", - __FILE__,request->src,message->src,request->tag, message->tag); -#endif - DEBUG4("found match: (req_src=%d,msg_src=%d)x(req_tag=%d,msg_tag=%d)", - request->src,message->src,request->tag, message->tag); - goto stopsearch; + (MPI_ANY_SOURCE == request->src || request->src == message->src) + && (MPI_ANY_TAG == request->tag || request->tag == message->tag)) { + xbt_fifo_remove_item(request_queue, request_item); + xbt_fifo_free_item(request_item); + xbt_fifo_remove_item(message_queue, message_item); + xbt_fifo_free_item(message_item); + DEBUG5("found matching request %p: (req_src=%d,msg_src=%d)x(req_tag=%d,msg_tag=%d)", + request,request->src,message->src,request->tag, message->tag); + goto stopsearch; } else { - DEBUG4("Matching fails: (req_src=%d,msg_src=%d)x(req_tag=%d,msg_tag=%d)", - request->src,message->src,request->tag, message->tag); + DEBUG5("fail to match request %p: (req_src=%d,msg_src=%d)x(req_tag=%d,msg_tag=%d)", + request,request->src,message->src,request->tag, message->tag); } } } @@ -98,7 +94,7 @@ int smpi_receiver(int argc, char *argv[]) } else { DEBUG0("Nothing to do. Let's get a nap"); SIMIX_process_suspend(self); - DEBUG0("Uh? Someone called me?"); + DEBUG0("=== Uh? Someone called me? ==="); } } diff --git a/src/smpi/smpi_sender.c b/src/smpi/smpi_sender.c index 9ec5273b07..eb5e727d58 100644 --- a/src/smpi/smpi_sender.c +++ b/src/smpi/smpi_sender.c @@ -55,7 +55,8 @@ int smpi_sender(int argc, char *argv[]) SIMIX_process_get_data(smpi_global->main_processes[dindex]); dhost = SIMIX_process_get_host(smpi_global->main_processes[dindex]); - DEBUG3("Handle send request %p to %s (tag:%d)",request,SIMIX_host_get_name(dhost),message->tag); + DEBUG4("handle send request %p to %s (req_dst=%d,req_tag=%d)", + request,SIMIX_host_get_name(dhost),request->dst,message->tag); message->forward = (request->forward - 1) / 2; request->forward = request->forward / 2; @@ -64,7 +65,8 @@ int smpi_sender(int argc, char *argv[]) (request->dst + message->forward + 1) % request->comm->size; xbt_fifo_push(request_queue, request); } else { - DEBUG3("DONE Handling send request %p to %s (tag:%d)",request, SIMIX_host_get_name(dhost),message->tag); + DEBUG4("DONE Handling send request %p to %s (req_dst=%d,req_tag=%d)", + request, SIMIX_host_get_name(dhost),request->dst,message->tag); request->completed = 1; } @@ -94,14 +96,14 @@ int smpi_sender(int argc, char *argv[]) SIMIX_process_resume(remote_process->receiver); } else if (mydata->finalize > 0) { /* main wants me to die and nothing to do */ - DEBUG0("Main wants me to die and I'm done. Bye, guys."); + DEBUG0("===Main wants me to die and I'm done. Bye, guys.==="); mydata->finalize--; SIMIX_cond_signal(mydata->cond); return 0; } else { DEBUG0("Nothing to do. Let's get a nap"); SIMIX_process_suspend(self); - DEBUG0("Uh? Someone called me?"); + DEBUG0("===Uh? Someone called me?==="); } } return 0; -- 2.20.1