From 6b05eea7a3de47b43cca531fd00c8c0f21fc477f Mon Sep 17 00:00:00 2001 From: markls Date: Mon, 30 Jul 2007 06:26:12 +0000 Subject: [PATCH] wrapping global variables in structs. not finished so smpi probably won't compile for a few days... git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@3902 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- src/smpi/include/private.h | 55 +++++++++++++++++ src/smpi/include/smpi.h | 79 ++++++++++++------------ src/smpi/src/smpi_base.c | 123 +++++++++++++------------------------ 3 files changed, 136 insertions(+), 121 deletions(-) create mode 100644 src/smpi/include/private.h diff --git a/src/smpi/include/private.h b/src/smpi/include/private.h new file mode 100644 index 0000000000..952715b5f0 --- /dev/null +++ b/src/smpi/include/private.h @@ -0,0 +1,55 @@ +#include "xbt/mallocator.h" +#include "xbt/xbt_os_time.h" + +#define SMPI_DEFAULT_SPEED 100 +#define SMPI_REQUEST_MALLOCATOR_SIZE 100 +#define SMPI_MESSAGE_MALLOCATOR_SIZE 100 + +typedef struct SMPI_Global { + + // config vars + double reference_speed; + + // state vars + int root_ready:1; + int ready_process_count; + smx_mutex_t start_stop_mutex; + smx_cond_t start_stop_cond; + + xbt_mallocator_t request_mallocator; + xbt_mallocator_t message_mallocator; + + xbt_fifo_t *pending_send_request_queues; + smx_mutex_t *pending_send_request_queues_mutexes; + + xbt_fifo_t *pending_recv_request_queues; + smx_mutex_t *pending_recv_request_queues_mutexes; + + xbt_fifo_t *received_message_queues; + smx_mutex_t *received_message_queues_mutexes; + + smx_process_t *sender_processes; + smx_process_t *receiver_processes; + + int running_hosts_count; + smx_mutex_t running_hosts_count_mutex; + + xbt_os_timer_t *timers; + smx_mutex_t *timers_mutexes; + + int *benchmarking_flags; + smx_mutex_t *benchmarking_flags_mutexes; + +} s_SMPI_Global_t, *SMPI_Global_t; + +extern SMPI_Global_t smpi_global; + +// smpi_received_message_t +struct smpi_received_message_t { + smpi_mpi_communicator_t *comm; + int src; + int dst; + int tag; + void *buf; +}; +typedef struct smpi_received_message_t smpi_received_message_t; diff --git a/src/smpi/include/smpi.h b/src/smpi/include/smpi.h index a007c9c659..2674928710 100644 --- a/src/smpi/include/smpi.h +++ b/src/smpi/include/smpi.h @@ -1,6 +1,5 @@ -#define SMPI_DEFAULT_SPEED 100 -#define SMPI_REQUEST_MALLOCATOR_SIZE 100 -#define SMPI_MESSAGE_MALLOCATOR_SIZE 100 +#include +#include #define SMPI_RAND_SEED 5 @@ -17,22 +16,18 @@ #define MPI_ERR_RANK 7 #define MPI_ERR_TAG 8 -#include -#include - // MPI_Comm struct smpi_mpi_communicator_t { - int size; - int barrier; - smx_mutex_t barrier_mutex; - smx_cond_t barrier_cond; - smx_host_t *hosts; + int size; + smx_host_t *hosts; + smx_process_t *processes; + int barrier_count; + smx_mutex_t barrier_mutex; + smx_cond_t barrier_cond; }; typedef struct smpi_mpi_communicator_t smpi_mpi_communicator_t; typedef smpi_mpi_communicator_t *MPI_Comm; -extern smpi_mpi_communicator_t smpi_mpi_comm_world; -#define MPI_COMM_WORLD (&smpi_mpi_comm_world) // MPI_Status struct smpi_mpi_status_t { @@ -40,8 +35,6 @@ struct smpi_mpi_status_t { }; typedef struct smpi_mpi_status_t smpi_mpi_status_t; typedef smpi_mpi_status_t MPI_Status; -extern smpi_mpi_status_t smpi_mpi_status_ignore; -#define MPI_STATUS_IGNORE (&smpi_mpi_status_ignore) // MPI_Datatype struct smpi_mpi_datatype_t { @@ -49,13 +42,6 @@ struct smpi_mpi_datatype_t { }; typedef struct smpi_mpi_datatype_t smpi_mpi_datatype_t; typedef smpi_mpi_datatype_t *MPI_Datatype; -// FIXME: add missing datatypes -extern smpi_mpi_datatype_t smpi_mpi_byte; -#define MPI_BYTE (&smpi_mpi_byte) -extern smpi_mpi_datatype_t smpi_mpi_int; -#define MPI_INT (&smpi_mpi_int) -extern smpi_mpi_datatype_t smpi_mpi_double; -#define MPI_DOUBLE (&smpi_mpi_double) // MPI_Request struct smpi_mpi_request_t { @@ -63,38 +49,53 @@ struct smpi_mpi_request_t { int src; int dst; int tag; + void *buf; - int count; smpi_mpi_datatype_t *datatype; - smx_mutex_t mutex; - smx_cond_t cond; + int count; + short int completed :1; + smx_mutex_t mutex; + smx_cond_t cond; xbt_fifo_t waitlist; }; typedef struct smpi_mpi_request_t smpi_mpi_request_t; typedef smpi_mpi_request_t *MPI_Request; -// smpi_received_message_t -struct smpi_received_message_t { - smpi_mpi_communicator_t *comm; - int src; - int dst; - int tag; - void *buf; -}; -typedef struct smpi_received_message_t smpi_received_message_t; - // MPI_Op struct smpi_mpi_op_t { void (*func)(void *x, void *y, void *z); }; typedef struct smpi_mpi_op_t smpi_mpi_op_t; typedef smpi_mpi_op_t *MPI_Op; -extern smpi_mpi_op_t smpi_mpi_land; -#define MPI_LAND (&smpi_mpi_land) -extern smpi_mpi_op_t smpi_mpi_sum; -#define MPI_SUM (&smpi_mpi_sum) +// global SMPI data structure +typedef struct SMPI_MPI_Global { + + smpi_mpi_communicator_t mpi_comm_world; + + smpi_mpi_status_t mpi_status_ignore; + + smpi_mpi_datatype_t mpi_byte; + smpi_mpi_datatype_t mpi_int; + smpi_mpi_datatype_t mpi_double; + + smpi_mpi_op_t mpi_land; + smpi_mpi_op_t mpi_sum; + +} s_SMPI_MPI_Global_t, *SMPI_MPI_Global_t; +exterm SMPI_MPI_Global_t smpi_mpi_global; + +#define MPI_COMM_WORLD (smpi_mpi_global->mpi_comm_world) + +#define MPI_STATUS_IGNORE (smpi_mpi_global->mpi_status_ignore) + +#define MPI_BYTE (smpi_mpi_global->mpi_byte) +#define MPI_DOUBLE (smpi_mpi_global->mpi_double) +#define MPI_INT (smpi_mpi_global->mpi_int) + +#define MPI_LAND (smpi_mpi_global->mpi_land) +#define MPI_SUM (smpi_mpi_glboal->mpi_sum) // smpi functions extern int smpi_simulated_main(int argc, char **argv); diff --git a/src/smpi/src/smpi_base.c b/src/smpi/src/smpi_base.c index f817786d95..25e082d2cc 100644 --- a/src/smpi/src/smpi_base.c +++ b/src/smpi/src/smpi_base.c @@ -1,57 +1,17 @@ #include - #include #include -#include "xbt/xbt_os_time.h" -#include "xbt/mallocator.h" -#include "smpi.h" - -// FIXME: move globals into structure... - -xbt_mallocator_t smpi_request_mallocator = NULL; -xbt_mallocator_t smpi_message_mallocator = NULL; - -xbt_fifo_t *smpi_pending_send_requests = NULL; -smx_mutex_t *smpi_pending_send_requests_mutex = NULL; - -xbt_fifo_t *smpi_pending_recv_requests = NULL; -smx_mutex_t *smpi_pending_recv_requests_mutex = NULL; - -xbt_fifo_t *smpi_received_messages = NULL; -smx_mutex_t *smpi_received_messages_mutex = NULL; - -smx_process_t *smpi_sender_processes = NULL; -smx_process_t *smpi_receiver_processes = NULL; - -int smpi_running_hosts = 0; - -smpi_mpi_communicator_t smpi_mpi_comm_world; -smpi_mpi_status_t smpi_mpi_status_ignore; - -smpi_mpi_datatype_t smpi_mpi_byte; -smpi_mpi_datatype_t smpi_mpi_int; -smpi_mpi_datatype_t smpi_mpi_double; - -smpi_mpi_op_t smpi_mpi_land; -smpi_mpi_op_t smpi_mpi_sum; - -static xbt_os_timer_t smpi_timer; -static int smpi_benchmarking; -static double smpi_reference_speed; +#include "private.h" +#include "smpi.h" -// mutexes -smx_mutex_t smpi_running_hosts_mutex = NULL; -smx_mutex_t smpi_benchmarking_mutex = NULL; -smx_mutex_t init_mutex = NULL; -smx_cond_t init_cond = NULL; +SMPI_Global_t smpi_global = NULL; -int smpi_root_ready = 0; -int smpi_ready_count = 0; +SMPI_MPI_Global_t smpi_mpi_global = NULL; XBT_LOG_NEW_DEFAULT_CATEGORY(smpi, "SMPI"); -int inline smpi_mpi_comm_size(smpi_mpi_communicator_t *comm) +int inline smpi_mpi_comm_size(smpi_mpi_communicator_t *comm) { return comm->size; } @@ -73,7 +33,7 @@ int inline smpi_mpi_comm_rank_self(smpi_mpi_communicator_t *comm) int inline smpi_mpi_comm_world_rank_self() { - return smpi_mpi_comm_rank(&smpi_mpi_comm_world, SIMIX_host_self()); + return smpi_mpi_comm_rank(smpi_mpi_global->mpi_comm_world, SIMIX_host_self()); } int smpi_sender(int argc, char **argv) @@ -86,7 +46,7 @@ int smpi_sender(int argc, char **argv) smx_mutex_t request_queue_mutex; int size; - int running_hosts = 0; + int running_hosts_count; smpi_mpi_request_t *request; @@ -102,36 +62,32 @@ int smpi_sender(int argc, char **argv) self = SIMIX_process_self(); shost = SIMIX_host_self(); - rank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, shost); + rank = smpi_mpi_comm_rank(smpi_mpi_global->mpi_comm_world, shost); // make sure root is done before own initialization - SIMIX_mutex_lock(init_mutex); - if (!smpi_root_ready) { - SIMIX_cond_wait(init_cond, init_mutex); + SIMIX_mutex_lock(smpi_global->start_stop_mutex); + if (!smpi_global->root_ready) { + SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex); } - SIMIX_mutex_unlock(init_mutex); + SIMIX_mutex_unlock(smpi_global->start_stop_mutex); - request_queue = smpi_pending_send_requests[rank]; - request_queue_mutex = smpi_pending_send_requests_mutex[rank]; - size = smpi_mpi_comm_size(&smpi_mpi_comm_world); + request_queue = smpi_global->pending_send_request_queues[rank]; + request_queue_mutex = smpi_global->pending_send_request_queues_mutexes[rank]; + size = smpi_mpi_comm_size(smpi_mpi_global->mpi_comm_world); - smpi_sender_processes[rank] = self; + smpi_global->sender_processes[rank] = self; // wait for all nodes to signal initializatin complete - SIMIX_mutex_lock(init_mutex); - smpi_ready_count++; - if (smpi_ready_count < 3 * size) { - SIMIX_cond_wait(init_cond, init_mutex); + SIMIX_mutex_lock(smpi_global->start_stop_mutex); + smpi_global->ready_process_count++; + if (smpi_global->ready_process_count < 3 * size) { + SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex); } else { - SIMIX_cond_broadcast(init_cond); + SIMIX_cond_broadcast(smpi_global->start_stop_cond); } - SIMIX_mutex_unlock(init_mutex); + SIMIX_mutex_unlock(smpi_global->start_stop_mutex); - SIMIX_mutex_lock(smpi_running_hosts_mutex); - running_hosts = smpi_running_hosts; - SIMIX_mutex_unlock(smpi_running_hosts_mutex); - - while (0 < running_hosts) { + do { SIMIX_mutex_lock(request_queue_mutex); request = xbt_fifo_shift(request_queue); @@ -144,7 +100,6 @@ int smpi_sender(int argc, char **argv) dhost = request->comm->hosts[request->dst]; - // FIXME: not at all sure I can assume magic just happens here.... communicate_action = SIMIX_action_communicate(shost, dhost, "communication", request->datatype->size * request->count * 1.0, -1.0); @@ -154,7 +109,7 @@ int smpi_sender(int argc, char **argv) SIMIX_cond_wait(request->cond, request->mutex); // copy request to appropriate received queue - message = xbt_mallocator_get(smpi_message_mallocator); + message = xbt_mallocator_get(smpi_global->message_mallocator); message->comm = request->comm; message->src = request->src; message->dst = request->dst; @@ -162,11 +117,11 @@ int smpi_sender(int argc, char **argv) message->buf = xbt_malloc(request->datatype->size * request->count); memcpy(message->buf, request->buf, request->datatype->size * request->count); - drank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, dhost); + drank = smpi_mpi_comm_rank(smpi_mpi_global->mpi_comm_world, dhost); - SIMIX_mutex_lock(smpi_received_messages_mutex[drank]); - xbt_fifo_push(smpi_received_messages[drank], message); - SIMIX_mutex_unlock(smpi_received_messages_mutex[drank]); + SIMIX_mutex_lock(smpi_global->received_message_queues_mutexes[drank]); + xbt_fifo_push(smpi_global->received_message_queues[drank], message); + SIMIX_mutex_unlock(smpi_global->received_message_queues_mutexes[drank]); request->completed = 1; @@ -182,17 +137,18 @@ int smpi_sender(int argc, char **argv) SIMIX_mutex_unlock(request->mutex); } - SIMIX_mutex_lock(smpi_running_hosts_mutex); - running_hosts = smpi_running_hosts; - SIMIX_mutex_unlock(smpi_running_hosts_mutex); - } + SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex); + running_hosts_count = smpi_global->running_hosts_count; + SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex); - SIMIX_mutex_lock(init_mutex); - smpi_ready_count--; - if (smpi_ready_count <= 0) { - SIMIX_cond_broadcast(init_cond); + } while (0 < running_hosts_count); + + SIMIX_mutex_lock(smpi_global->start_stop_mutex); + smpi_global->ready_process_count--; + if (smpi_global->ready_process_count <= 0) { + SIMIX_cond_broadcast(smpi_global->start_stop_cond); } - SIMIX_mutex_unlock(init_mutex); + SIMIX_mutex_unlock(smpi_global->start_stop_mutex); return 0; } @@ -386,6 +342,7 @@ void smpi_mpi_sum_func(void *x, void *y, void *z) *(int *)z = *(int *)x + *(int *)y; } +// FIXME: init conds, etc... void *smpi_new_request() { return xbt_new(smpi_mpi_request_t, 1); @@ -404,6 +361,8 @@ void smpi_free_request(void *pointer) { return; } +// FIXME: don't keep creating new mutexes... +// flush waitlist instead of new? void smpi_reset_request(void *pointer) { smpi_mpi_request_t *request = pointer; -- 2.20.1