From f916c3e8c9a80f8fa1ab90897774d98e7ec9942d Mon Sep 17 00:00:00 2001 From: markls Date: Fri, 20 Jul 2007 10:16:54 +0000 Subject: [PATCH] Everything should work at this point, but doesn't. Now for the debugging... git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@3876 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- src/smpi/include/smpi.h | 39 ++++++------ src/smpi/src/smpi_base.c | 124 +++++++++++++++++++++++++++++---------- 2 files changed, 114 insertions(+), 49 deletions(-) diff --git a/src/smpi/include/smpi.h b/src/smpi/include/smpi.h index 374382dc3b..2624e5840a 100644 --- a/src/smpi/include/smpi.h +++ b/src/smpi/include/smpi.h @@ -58,19 +58,31 @@ extern smpi_mpi_datatype_t smpi_mpi_double; // MPI_Request struct smpi_mpi_request_t { - void *buf; - int count; - smpi_mpi_datatype_t *datatype; - int src; - int dst; - int tag; - smpi_mpi_communicator_t *comm; - short int completed; - xbt_fifo_t waitlist; + smpi_mpi_communicator_t *comm; + int src; + int dst; + int tag; + void *buf; + int count; + smpi_mpi_datatype_t *datatype; + smx_mutex_t mutex; + smx_cond_t cond; + short int completed :1; + xbt_fifo_t waitlist; }; typedef struct smpi_mpi_request_t smpi_mpi_request_t; typedef smpi_mpi_request_t *MPI_Request; +// smpi_received_message_t +struct smpi_received_message_t { + smpi_mpi_communicator_t *comm; + int src; + int dst; + int tag; + void *buf; +}; +typedef struct smpi_received_message_t smpi_received_message_t; + // MPI_Op struct smpi_mpi_op_t { void (*func)(void *x, void *y, void *z); @@ -82,15 +94,6 @@ extern smpi_mpi_op_t smpi_mpi_land; extern smpi_mpi_op_t smpi_mpi_sum; #define MPI_SUM (&smpi_mpi_sum) -// smpi_received_message_t -struct smpi_received_message_t { - void *buf; - smpi_mpi_communicator_t *comm; - int src; - int dst; - int tag; -}; -typedef struct smpi_received_message_t smpi_received_message_t; // smpi functions extern int smpi_simulated_main(int argc, char **argv); diff --git a/src/smpi/src/smpi_base.c b/src/smpi/src/smpi_base.c index 49725caab3..f80e9500e4 100644 --- a/src/smpi/src/smpi_base.c +++ b/src/smpi/src/smpi_base.c @@ -2,18 +2,24 @@ #include #include -#include "xbt/xbt_portability.h" +#include "xbt/xbt_os_time.h" #include "simix/simix.h" #include "simix/private.h" #include "smpi.h" // FIXME: move globals into structure... -xbt_mallocator_t smpi_request_mallocator = NULL; -xbt_mallocator_t smpi_message_mallocator = NULL; -xbt_fifo_t *smpi_pending_send_requests = NULL; -xbt_fifo_t *smpi_pending_recv_requests = NULL; -xbt_fifo_t *smpi_received_messages = NULL; +xbt_mallocator_t smpi_request_mallocator = NULL; +xbt_mallocator_t smpi_message_mallocator = NULL; + +xbt_fifo_t *smpi_pending_send_requests = NULL; +smx_mutex_t *smpi_pending_send_requests_mutex = NULL; + +xbt_fifo_t *smpi_pending_recv_requests = NULL; +smx_mutex_t *smpi_pending_recv_requests_mutex = NULL; + +xbt_fifo_t *smpi_received_messages = NULL; +smx_mutex_t *smpi_received_messages_mutex = NULL; smx_process_t *smpi_sender_processes = NULL; smx_process_t *smpi_receiver_processes = NULL; @@ -71,19 +77,19 @@ int inline smpi_mpi_comm_world_rank_self() return smpi_mpi_comm_rank(&smpi_mpi_comm_world, SIMIX_host_self()) } -// FIXME: messages are actually smaller than requests, use them instead? int smpi_sender(int argc, char **argv) { smx_process_t self; smx_host_t shost; int rank; xbt_fifo_t request_queue; + smx_mutex_t request_queue_mutex; int size; int running_hosts = 0; smpi_mpi_request_t *request; smx_host_t dhost; smx_action_t communicate_action; - smpi_mpi_request_t *scratch; + smpi_received_message_t *scratch; int drank; smx_process_t waitproc; @@ -98,8 +104,11 @@ int smpi_sender(int argc, char **argv) } SIMIX_mutex_unlock(init_mutex); - request_queue = smpi_pending_send_requests[rank]; + request_queue = smpi_pending_send_requests[rank]; + request_queue_mutex = smpi_pending_send_requests_mutex[rank]; + size = smpi_mpi_comm_size(&smpi_mpi_comm_world); + smpi_sender_processes[rank] = self; // wait for all nodes to signal initializatin complete @@ -118,8 +127,9 @@ int smpi_sender(int argc, char **argv) while (0 < running_hosts) { - // FIXME: mutex? + SIMIX_mutex_lock(request_queue_mutex); request = xbt_fifo_shift(request_queue); + SIMIX_mutex_unlock(request_queue_mutex); if (NULL == request) { SIMIX_process_suspend(self); @@ -145,15 +155,20 @@ int smpi_sender(int argc, char **argv) scratch->tag = request->tag; scratch->buf = request->buf; drank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, dhost); + SIMIX_mutex_lock(smpi_received_messages_mutex[drank]); xbt_fifo_push(smpi_received_messages[drank], scratch); + SIMIX_mutex_unlock(smpi_received_messages_mutex[drank]); request->completed = 1; - while(waitproc = xbt_fifo_shift(request->waitlist)) { + // wake up receiver, then any waiting sender + waitproc = smpi_receiver_processes[drank]; + + do { if (SIMIX_process_is_suspended(waitproc)) { SIMIX_process_resume(waitproc); } - } + } while(waitproc = xbt_fifo_shift(request->waitlist)); SIMIX_mutex_unlock(request->mutex); } @@ -171,11 +186,14 @@ int smpi_receiver(int argc, char **argv) smx_process_t self; int rank; xbt_fifo_t request_queue; + smx_mutex_t request_queue_mutex; xbt_fifo_t message_queue; + smx_mutex_t message_queue_mutex; int size; int running_hosts; - smpi_mpi_request_t *message; + xbt_fifo_item_t request_item, message_item; smpi_mpi_request_t *request; + smpi_received_message_t *message; smx_process_t waitproc; self = SIMIX_process_self(); @@ -188,8 +206,12 @@ int smpi_receiver(int argc, char **argv) } SIMIX_mutex_unlock(init_mutex); - request_queue = smpi_pending_receive_requests[rank]; - message_queue = smpi_received_messages[rank]; + request_queue = smpi_pending_recv_requests[rank]; + request_queue_mutex = smpi_pending_recv_requests_mutex[rank]; + + message_queue = smpi_received_messages[rank]; + message_queue_mutex = smpi_received_messages_mutex[rank]; + size = smpi_mpi_comm_size(&smpi_mpi_comm_world); smpi_receiver_processes[rank] = self; @@ -209,24 +231,46 @@ int smpi_receiver(int argc, char **argv) while (0 < running_hosts) { - // FIXME: search for received messages and requests - // use stupid algorithm for now + // FIXME: better algorithm, maybe some kind of balanced tree? or a heap? + + // FIXME: not the best way to request multiple locks... + SIMIX_mutex_lock(request_queue_mutex); + SIMIX_mutex_lock(message_queue_mutex); +search: for (request_item = xbt_fifo_get_first_item(request_queue); + NULL != request_item; + request_item = xbt_fifo_get_next_item(request_item)) { + request = xbt_fifo_get_item_content(request_item); + for (message_item = xbt_fifo_get_first_item(message_queue); + NULL != message_item; + message_item = xbt_fifo_get_next_item(message_item)) { + message = xbt_fifo_get_item_content(message_item); + if (request->comm == message->comm && + (MPI_ANY_SOURCE == request->src || request->src == message->src) && + request->tag == message->tag) { + xbt_fifo_remove_item(request_queue, request_item); + xbt_fifo_remove_item(message_queue, message_item); + break search; + } + } + } + SIMIX_mutex_unlock(message_queue_mutex); + SIMIX_mutex_unlock(request_queue_mutex); - if (NULL == request) { + if (NULL == request || NULL == message) { SIMIX_process_suspend(self); } else { SIMIX_mutex_lock(request->mutex); - memcpy(request->buf, message->buf, request->count * request->type->size); + memcpy(request->buf, message->buf, request->count * request->datatype->size); request->src = message->src; - reqeust->completed = 1; + request->completed = 1; while (waitproc = xbt_fifo_shift(request->waitlist)) { if (SIMIX_process_is_suspended(waitproc)) { SIMIX_process_resume(waitproc); } } - SIMIX_mutex_unlock(request->mutex); + xbt_mallocator_release(smpi_message_mallocator, message); } @@ -350,14 +394,20 @@ void smpi_mpi_init() // smpi globals smpi_request_mallocator = xbt_mallocator_new(SMPI_REQUEST_MALLOCATOR_SIZE, smpi_new_request, xbt_free, NULL); smpi_message_mallocator = xbt_mallocator_new(SMPI_MESSAGE_MALLOCATOR_SIZE, smpi_new_message, xbt_free, NULL); - smpi_pending_send_requests = xbt_new(xbt_fifo_t, size); - smpi_pending_recv_requests = xbt_new(xbt_fifo_t, size); - smpi_received_messages = xbt_new(xbt_fifo_t, size); + smpi_pending_send_requests = xbt_new(xbt_fifo_t, size); + smpi_pending_send_requests_mutex = xbt_new(smx_mutex_t, size); + smpi_pending_recv_requests = xbt_new(xbt_fifo_t, size); + smpi_pending_recv_requests_mutex = xbt_new(smx_mutex_t, size); + smpi_received_messages = xbt_new(xbt_fifo_t, size); + smpi_received_messages_mutex = xbt_new(smx_mutex_t, size); for(i = 0; i < size; i++) { - smpi_pending_send_requests[i] = xbt_fifo_new(); - smpi_pending_recv_requests[i] = xbt_fifo_new(); - smpi_received_messages[i] = xbt_fifo_new(); + smpi_pending_send_requests[i] = xbt_fifo_new(); + smpi_pending_send_requests_mutex[i] = SIMIX_mutex_init(); + smpi_pending_recv_requests[i] = xbt_fifo_new(); + smpi_pending_recv_requests_mutex[i] = SIMIX_mutex_init(); + smpi_received_messages[i] = xbt_fifo_new(); + smpi_received_messages_mutex[i] = SIMIX_mutex_init(); } smpi_timer = xbt_os_timer_new(); @@ -410,15 +460,21 @@ void smpi_mpi_finalize() for (i = 0 ; i < smpi_mpi_comm_world.size; i++) { xbt_fifo_free(smpi_pending_send_requests[i]); + SIMIX_mutex_destroy(smpi_pending_send_requests_mutex[i]); xbt_fifo_free(smpi_pending_recv_requests[i]); + SIMIX_mutex_destroy(smpi_pending_recv_requests_mutex[i]); xbt_fifo_free(smpi_received_messages[i]); + SIMIX_mutex_destroy(smpi_received_messages_mutex[i]); } xbt_mallocator_free(smpi_request_mallocator); xbt_mallocator_free(smpi_message_mallocator); xbt_free(smpi_pending_send_requests); + xbt_free(smpi_pending_send_requests_mutex); xbt_free(smpi_pending_recv_requests); + xbt_free(smpi_pending_recv_requests_mutex); xbt_free(smpi_received_messages); + xbt_free(smpi_received_messages_mutex); SIMIX_mutex_destroy(smpi_mpi_comm_world.barrier_mutex); SIMIX_cond_destroy(smpi_mpi_comm_world.barrier_cond); @@ -508,14 +564,16 @@ int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t *datatype, retval = MPI_ERR_TAG; } else { *request = xbt_mallocator_get(smpi_request_mallocator); - (*request)->buf = buf; - (*request)->count = count; - (*request)->datatype = datatype; + (*request)->comm = comm; (*request)->src = src; (*request)->dst = dst; (*request)->tag = tag; - (*request)->comm = comm; + (*request)->buf = buf; + (*request)->count = count; + (*request)->datatype = datatype; (*request)->completed = 0; + (*request)->mutex = SIMIX_mutex_init(); + (*request)->cond = SIMIX_cond_init(); (*request)->waitlist = NULL; } return retval; @@ -525,7 +583,9 @@ int smpi_isend(smpi_mpi_request_t *request) { int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world); + SIMIX_mutex_lock(smpi_pending_send_requests_mutex[rank]); xbt_fifo_push(smpi_pending_send_requests[rank], request); + SIMIX_mutex_unlock(smpi_pending_send_requests_mutex[rank]); if (MSG_process_is_suspended(smpi_sender_processes[rank])) { MSG_process_resume(smpi_sender_processes[rank]); @@ -536,7 +596,9 @@ int smpi_irecv(smpi_mpi_request_t *request) { int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world); + SIMIX_mutex_lock(smpi_pending_recv_requests_mutex[rank]); xbt_fifo_push(smpi_pending_recv_requests[rank], request); + SIMIX_mutex_unlock(smpi_pending_recv_requests_mutex[rank]); if (MSG_process_is_suspended(smpi_receiver_processes[rank])) { MSG_process_resume(smpi_receiver_processes[rank]); -- 2.20.1