From: markls Date: Wed, 10 Oct 2007 00:34:58 +0000 (+0000) Subject: updated to use branching-tree broadcast instead of for-loop. X-Git-Tag: v3.3~999 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/5f7e3ee3f051a31e1568c69456e83f20ff69e69f?hp=7b47e04f1a6749e163d6dc3f76f8e1cf09fc4984 updated to use branching-tree broadcast instead of for-loop. git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@4792 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- diff --git a/src/smpi/private.h b/src/smpi/private.h index c69ddb41fa..1a8c584550 100644 --- a/src/smpi/private.h +++ b/src/smpi/private.h @@ -46,6 +46,7 @@ typedef struct smpi_mpi_request_t { smx_cond_t cond; void *data; + int forward; } s_smpi_mpi_request_t; @@ -63,6 +64,7 @@ typedef struct smpi_received_message_t { void *buf; void *data; + int forward; } s_smpi_received_message_t; typedef struct smpi_received_message_t *smpi_received_message_t; diff --git a/src/smpi/smpi_base.c b/src/smpi/smpi_base.c index 10f8303b68..62834b624b 100644 --- a/src/smpi/smpi_base.c +++ b/src/smpi/smpi_base.c @@ -227,7 +227,7 @@ int smpi_mpi_wait(smpi_mpi_request_t request, smpi_mpi_status_t *status) retval = MPI_ERR_INTERN; } else { SIMIX_mutex_lock(request->mutex); - if (!request->completed) { + while (!request->completed) { SIMIX_cond_wait(request->cond, request->mutex); } if (NULL != status) { diff --git a/src/smpi/smpi_global.c b/src/smpi/smpi_global.c index dfac6a6124..89771f5068 100644 --- a/src/smpi/smpi_global.c +++ b/src/smpi/smpi_global.c @@ -12,9 +12,12 @@ void *smpi_request_new() { smpi_mpi_request_t request = xbt_new(s_smpi_mpi_request_t, 1); + request->buf = NULL; request->completed = 0; request->mutex = SIMIX_mutex_init(); request->cond = SIMIX_cond_init(); + request->data = NULL; + request->forward = 0; return request; } @@ -26,11 +29,9 @@ void smpi_request_free(void *pointer) smpi_mpi_request_t request = pointer; - if (NULL != request) { - SIMIX_cond_destroy(request->cond); - SIMIX_mutex_destroy(request->mutex); - xbt_free(request); - } + SIMIX_cond_destroy(request->cond); + SIMIX_mutex_destroy(request->mutex); + xbt_free(request); return; } @@ -41,7 +42,10 @@ void smpi_request_reset(void *pointer) { smpi_mpi_request_t request = pointer; + request->buf = NULL; request->completed = 0; + request->data = NULL; + request->forward = 0; return; } @@ -51,17 +55,16 @@ void *smpi_message_new(void); void *smpi_message_new() { - return xbt_new(s_smpi_received_message_t, 1); + smpi_received_message_t message = xbt_new(s_smpi_received_message_t, 1); + message->buf = NULL; + return message; } void smpi_message_free(void *pointer); void smpi_message_free(void *pointer) { - if (NULL != pointer) { - xbt_free(pointer); - } - + xbt_free(pointer); return; } @@ -69,6 +72,8 @@ void smpi_message_reset(void *pointer); void smpi_message_reset(void *pointer) { + smpi_received_message_t message = pointer; + message->buf = NULL; return; } diff --git a/src/smpi/smpi_mpi.c b/src/smpi/smpi_mpi.c index 8fdb8a12e2..1a2c3f3858 100644 --- a/src/smpi/smpi_mpi.c +++ b/src/smpi/smpi_mpi.c @@ -189,26 +189,20 @@ int MPI_Bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm co int retval = MPI_SUCCESS; int rank; + smpi_mpi_request_t request; smpi_bench_end(); rank = smpi_mpi_comm_rank(comm); if (rank == root) { - int i; - smpi_mpi_request_t *requests = xbt_new(smpi_mpi_request_t, comm->size - 1); - for (i = 1; i < comm->size; i++) { - retval = smpi_create_request(buf, count, datatype, root, (root + i) % comm->size, 0, comm, requests + i - 1); - smpi_mpi_isend(requests[i - 1]); - } - for (i = 0; i < comm->size - 1; i++) { - smpi_mpi_wait(requests[i], MPI_STATUS_IGNORE); - xbt_mallocator_release(smpi_global->request_mallocator, requests[i]); - } - xbt_free(requests); + retval = smpi_create_request(buf, count, datatype, root, (root + 1) % comm->size, 0, comm, &request); + request->forward = comm->size - 1; + smpi_mpi_isend(request); + smpi_mpi_wait(request, MPI_STATUS_IGNORE); + xbt_mallocator_release(smpi_global->request_mallocator, request); } else { - smpi_mpi_request_t request; - retval = smpi_create_request(buf, count, datatype, root, rank, 0, comm, &request); + retval = smpi_create_request(buf, count, datatype, MPI_ANY_SOURCE, rank, 0, comm, &request); smpi_mpi_irecv(request); smpi_mpi_wait(request, MPI_STATUS_IGNORE); xbt_mallocator_release(smpi_global->request_mallocator, request); diff --git a/src/smpi/smpi_receiver.c b/src/smpi/smpi_receiver.c index 315810df63..eb5c887cbf 100644 --- a/src/smpi/smpi_receiver.c +++ b/src/smpi/smpi_receiver.c @@ -83,9 +83,18 @@ stopsearch: SIMIX_mutex_lock(request->mutex); memcpy(request->buf, message->buf, request->datatype->size * request->count); request->src = message->src; - request->completed = 1; request->data = message->data; - SIMIX_cond_broadcast(request->cond); + request->forward = message->forward; + + if (0 == request->forward) { + request->completed = 1; + SIMIX_cond_broadcast(request->cond); + } else { + request->src = smpi_mpi_comm_rank(request->comm); + request->dst = (request->src + 1) % request->comm->size; + smpi_mpi_isend(request); + } + SIMIX_mutex_unlock(request->mutex); xbt_free(message->buf); diff --git a/src/smpi/smpi_sender.c b/src/smpi/smpi_sender.c index c3cee349bf..983ded806f 100644 --- a/src/smpi/smpi_sender.c +++ b/src/smpi/smpi_sender.c @@ -61,25 +61,35 @@ int smpi_sender(int argc, char **argv) SIMIX_process_suspend(self); } else { - message = xbt_mallocator_get(smpi_global->message_mallocator); + message = xbt_mallocator_get(smpi_global->message_mallocator); SIMIX_mutex_lock(request->mutex); - message->comm = request->comm; - message->src = smpi_mpi_comm_rank(request->comm); - message->tag = request->tag; - message->buf = xbt_malloc(request->datatype->size * request->count); + message->comm = request->comm; + message->src = smpi_mpi_comm_rank(request->comm); + message->tag = request->tag; + message->data = request->data; + message->buf = xbt_malloc(request->datatype->size * request->count); memcpy(message->buf, request->buf, request->datatype->size * request->count); - message->data = request->data; dindex = request->comm->rank_to_index_map[request->dst]; dhost = smpi_global->hosts[dindex]; + message->forward = (request->forward - 1) / 2; + request->forward = request->forward / 2; + SIMIX_mutex_lock(smpi_global->received_message_queues_mutexes[dindex]); xbt_fifo_push(smpi_global->received_message_queues[dindex], message); SIMIX_mutex_unlock(smpi_global->received_message_queues_mutexes[dindex]); - request->completed = 1; + if (0 < request->forward) { + request->dst = (request->dst + message->forward + 1) % request->comm->size; + SIMIX_mutex_lock(request_queue_mutex); + xbt_fifo_push(request_queue, request); + SIMIX_mutex_unlock(request_queue_mutex); + } else { + request->completed = 1; + } action = SIMIX_action_communicate(shost, dhost, communication, request->datatype->size * request->count, -1.0); @@ -89,8 +99,6 @@ int smpi_sender(int argc, char **argv) SIMIX_mutex_unlock(request->mutex); - //SIMIX_action_destroy(action); - // wake up receiver if necessary receiver_process = smpi_global->receiver_processes[dindex]; if (SIMIX_process_is_suspended(receiver_process)) {