From 6f529f3589de095d87dea6d451d4c648b4e3147e Mon Sep 17 00:00:00 2001 From: markls Date: Tue, 10 Jul 2007 05:55:06 +0000 Subject: [PATCH] some more work on smpi_sender and smpi_receiver. It'd be nice to be in a position to do unit testing... git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@3704 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- src/smpi/src/smpi_base.c | 65 +++++++++++++++++++++++++++++++++------- 1 file changed, 54 insertions(+), 11 deletions(-) diff --git a/src/smpi/src/smpi_base.c b/src/smpi/src/smpi_base.c index 44a1d020f1..0150470929 100644 --- a/src/smpi/src/smpi_base.c +++ b/src/smpi/src/smpi_base.c @@ -7,6 +7,7 @@ #include "simix/private.h" #include "smpi.h" +xbt_mallocator_t smpi_request_mallocator = NULL; xbt_fifo_t *smpi_pending_send_requests = NULL; xbt_fifo_t *smpi_pending_recv_requests = NULL; xbt_fifo_t *smpi_received_messages = NULL; @@ -63,17 +64,21 @@ int inline smpi_mpi_comm_rank_self(smpi_mpi_communicator_t *comm) int smpi_sender(int argc, char **argv) { - int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world); + int rank; int size; int running_hosts = 0; smpi_mpi_request_t *request; smx_process_t self; smx_host_t shost, dhost; smx_action_t communicate_action; + smpi_mpi_request_t *scratch; + int drank; self = SIMIX_process_self(); shost = SIMIX_host_self(); + rank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, shost); + // make sure root is done before own initialization SIMIX_mutex_lock(init_mutex); if (!smpi_root_ready) { @@ -118,8 +123,13 @@ int smpi_sender(int argc, char **argv) SIMIX_cond_wait(request->cond, request->mutex); - // fixme, create new request, copy over to - // should be malloc and memcpy + // copy request to appropriate received queue + scratch = xbt_mallocator_get(smpi_request_mallocator); + memcpy(scratch, request, sizeof smpi_mpi_request_t); + drank = smpi_mpi_comm_rank(MPI_COMM_WORLD, dhost); + xbt_fifo_push(smpi_received_messages[drank], scratch); + + request->completed = 1; SIMIX_mutex_unlock(request->mutex); } @@ -134,8 +144,15 @@ int smpi_sender(int argc, char **argv) int smpi_receiver(int argc, char **argv) { - int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world); + smx_host_t dhost; + int rank; + smx_process_t self; int size; + int running_hosts; + + dhost = SIMIX_host_self(); + rank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, dhost); + self = SIMIX_process_self(); // make sure root is done before own initialization SIMIX_mutex_lock(init_mutex); @@ -157,6 +174,25 @@ int smpi_receiver(int argc, char **argv) } SIMIX_mutex_unlock(init_mutex); + SIMIX_mutex_lock(smpi_running_hosts_mutex); + running_hosts = smpi_running_hosts; + SIMIX_mutex_unlock(smpi_running_hosts_mutex); + + while (0 < running_hosts) { + + request = xbt_fifo_shift(smpi_pending_send_requests[rank]); + + if (NULL == request) { + SIMIX_process_suspend(self); + } else { + SIMIX_mutex_lock(request->mutex); + } + + SIMIX_mutex_lock(smpi_running_hosts_mutex); + running_hosts = smpi_running_hosts; + SIMIX_mutex_unlock(smpi_running_hosts_mutex); + } + return 0; } @@ -221,6 +257,11 @@ void smpi_mpi_sum_func(void *x, void *y, void *z) *(int *)z = *(int *)x + *(int *)y; } +smpi_mpi_request_t *smpi_new_request() +{ + return xbt_new(smpi_mpi_request_t, 1); +} + void smpi_mpi_init() { int i; @@ -239,8 +280,8 @@ void smpi_mpi_init() if (host == hosts[0]) { // processes - smpi_sender_processes = xbt_new0(smx_process_t, size); - smpi_receiver_processes = xbt_new0(smx_process_t, size); + smpi_sender_processes = xbt_new(smx_process_t, size); + smpi_receiver_processes = xbt_new(smx_process_t, size); // running hosts smpi_running_hosts_mutex = SIMIX_mutex_init(); @@ -252,7 +293,7 @@ void smpi_mpi_init() smpi_mpi_comm_world.barrier_mutex = SIMIX_mutex_init(); smpi_mpi_comm_world.barrier_cond = SIMIX_cond_init(); smpi_mpi_comm_world.hosts = hosts; - smpi_mpi_comm_world.processes = xbt_new0(smx_process_t, size); + smpi_mpi_comm_world.processes = xbt_new(smx_process_t, size); smpi_mpi_comm_world.processes[0] = SIMIX_process_self(); // mpi datatypes @@ -265,9 +306,10 @@ void smpi_mpi_init() smpi_mpi_sum.func = &smpi_mpi_sum_func; // smpi globals - smpi_pending_send_requests = xbt_new0(xbt_fifo_t, size); - smpi_pending_recv_requests = xbt_new0(xbt_fifo_t, size); - smpi_received_messages = xbt_new0(xbt_fifo_t, size); + smpi_request_mallocator = xbt_mallocator_new(SMPI_REQUEST_MALLOCATOR_SIZE, smpi_new_request, xbt_free, NULL); + smpi_pending_send_requests = xbt_new(xbt_fifo_t, size); + smpi_pending_recv_requests = xbt_new(xbt_fifo_t, size); + smpi_received_messages = xbt_new(xbt_fifo_t, size); for(i = 0; i < size; i++) { smpi_pending_send_requests[i] = xbt_fifo_new(); @@ -329,6 +371,7 @@ void smpi_mpi_finalize() xbt_fifo_free(smpi_received_messages[i]); } + xbt_mallocator_free(smpi_request_mallocator); xbt_free(smpi_pending_send_requests); xbt_free(smpi_pending_recv_requests); xbt_free(smpi_received_messages); @@ -420,7 +463,7 @@ int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t *datatype, } else if (0 > tag) { retval = MPI_ERR_TAG; } else { - *request = xbt_new0(smpi_mpi_request_t, 1); + *request = xbt_mallocator_get(smpi_request_mallocator); (*request)->buf = buf; (*request)->count = count; (*request)->datatype = datatype; -- 2.20.1