From: markls Date: Fri, 13 Jul 2007 08:28:10 +0000 (+0000) Subject: more work on smpi receiver. basically everything is in place except matching X-Git-Tag: v3.3~1607 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/ddb9cdba7de52ce188e1ed7bbe313243271a27b9 more work on smpi receiver. basically everything is in place except matching pending recv requests to received messages. git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@3753 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- diff --git a/src/smpi/src/smpi_base.c b/src/smpi/src/smpi_base.c index 0150470929..1937372a38 100644 --- a/src/smpi/src/smpi_base.c +++ b/src/smpi/src/smpi_base.c @@ -7,6 +7,8 @@ #include "simix/private.h" #include "smpi.h" +// FIXME: move globals into structure... + xbt_mallocator_t smpi_request_mallocator = NULL; xbt_fifo_t *smpi_pending_send_requests = NULL; xbt_fifo_t *smpi_pending_recv_requests = NULL; @@ -48,6 +50,7 @@ int inline smpi_mpi_comm_size(smpi_mpi_communicator_t *comm) return comm->size; } +// FIXME: smarter algorithm? int smpi_mpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host) { int i; @@ -64,20 +67,21 @@ int inline smpi_mpi_comm_rank_self(smpi_mpi_communicator_t *comm) int smpi_sender(int argc, char **argv) { + smx_process_t self; + smx_host_t shost; int rank; + xbt_fifo_t request_queue; int size; int running_hosts = 0; smpi_mpi_request_t *request; - smx_process_t self; - smx_host_t shost, dhost; + smx_host_t dhost; smx_action_t communicate_action; smpi_mpi_request_t *scratch; int drank; - self = SIMIX_process_self(); + self = SIMIX_process_self(); shost = SIMIX_host_self(); - - rank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, shost); + rank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, shost); // make sure root is done before own initialization SIMIX_mutex_lock(init_mutex); @@ -86,6 +90,7 @@ int smpi_sender(int argc, char **argv) } SIMIX_mutex_unlock(init_mutex); + request_queue = smpi_pending_send_requests[rank]; size = smpi_mpi_comm_size(&smpi_mpi_comm_world); smpi_sender_processes[rank] = self; @@ -105,7 +110,7 @@ int smpi_sender(int argc, char **argv) while (0 < running_hosts) { - request = xbt_fifo_shift(smpi_pending_send_requests[rank]); + request = xbt_fifo_shift(request_queue); if (NULL == request) { SIMIX_process_suspend(self); @@ -144,15 +149,18 @@ int smpi_sender(int argc, char **argv) int smpi_receiver(int argc, char **argv) { - smx_host_t dhost; - int rank; smx_process_t self; + int rank; + xbt_fifo_t request_queue; + xbt_fifo_t message_queue; int size; int running_hosts; + smpi_mpi_request_t *message; + smpi_mpi_request_t *request; + smx_process_t dproc; - dhost = SIMIX_host_self(); - rank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, dhost); - self = SIMIX_process_self(); + self = SIMIX_process_self(); + rank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, dhost); // make sure root is done before own initialization SIMIX_mutex_lock(init_mutex); @@ -161,8 +169,10 @@ int smpi_receiver(int argc, char **argv) } SIMIX_mutex_unlock(init_mutex); + request_queue = smpi_pending_receive_requests[rank]; + message_queue = smpi_received_messages[rank]; size = smpi_mpi_comm_size(&smpi_mpi_comm_world); - smpi_receiver_processes[rank] = SIMIX_process_self(); + smpi_receiver_processes[rank] = self; // wait for all nodes to signal initializatin complete SIMIX_mutex_lock(init_mutex); @@ -180,12 +190,24 @@ int smpi_receiver(int argc, char **argv) while (0 < running_hosts) { - request = xbt_fifo_shift(smpi_pending_send_requests[rank]); + // FIXME: search for received messages and requests if (NULL == request) { SIMIX_process_suspend(self); } else { SIMIX_mutex_lock(request->mutex); + memcpy(request->buf, message->buf, request->count * request->type->size); + request->src = message->src; + reqeust->completed = 1; + + while (dproc = xbt_fifo_shift(request->waitlist)) { + if (SIMIX_process_is_suspended(dproc)) { + SIMIX_process_resume(dproc); + } + } + + SIMIX_mutex_unlock(request->mutex); + xbt_mallocator_release(smpi_request_mallocator, message); } SIMIX_mutex_lock(smpi_running_hosts_mutex);