Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
chaned global execute_mutex and execute_cond to host-specific and wrapped all
authormarkls <markls@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Fri, 30 Nov 2007 01:23:43 +0000 (01:23 +0000)
committermarkls <markls@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Fri, 30 Nov 2007 01:23:43 +0000 (01:23 +0000)
cond_wait statements in while loops with appropriate tests.

git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@5096 48e7efb5-ca39-0410-a469-dd3cf9ba447f

src/smpi/private.h
src/smpi/smpi_base.c
src/smpi/smpi_bench.c
src/smpi/smpi_global.c
src/smpi/smpi_receiver.c
src/smpi/smpi_sender.c
src/smpi/smpi_util.c

index 285c547..69d63f5 100644 (file)
@@ -93,6 +93,7 @@ typedef struct smpi_global_t {
        xbt_mallocator_t  request_mallocator;
        xbt_mallocator_t  message_mallocator;
 
+       // FIXME: request queues should be moved to host data...
        xbt_fifo_t       *pending_send_request_queues;
        smx_mutex_t      *pending_send_request_queues_mutexes;
 
@@ -108,15 +109,10 @@ typedef struct smpi_global_t {
        int               running_hosts_count;
        smx_mutex_t       running_hosts_count_mutex;
 
-       // FIXME: maybe all code needs to lock timer?
        xbt_os_timer_t    timer;
        smx_mutex_t       timer_mutex;
        smx_cond_t        timer_cond;
 
-       smx_mutex_t       execute_mutex;
-       smx_cond_t        execute_cond;
-       int               execute_count;
-
        // keeps track of previous times
        smpi_do_once_duration_node_t do_once_duration_nodes;
        smx_mutex_t do_once_mutex;
@@ -128,6 +124,8 @@ extern smpi_global_t smpi_global;
 
 typedef struct smpi_host_data_t {
        int index;
+       smx_mutex_t mutex;
+       smx_cond_t cond;
 } s_smpi_host_data_t;
 typedef struct smpi_host_data_t *smpi_host_data_t;
 
@@ -151,6 +149,8 @@ void smpi_bench_skip(void);
 void smpi_global_init(void);
 void smpi_global_destroy(void);
 int smpi_host_index(void);
+smx_mutex_t smpi_host_mutex(void);
+smx_cond_t smpi_host_cond(void);
 int smpi_run_simulation(int *argc, char **argv);
 int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t datatype,
        int src, int dst, int tag, smpi_mpi_communicator_t comm, smpi_mpi_request_t *request);
index 3fcfc25..797e242 100644 (file)
@@ -57,6 +57,8 @@ void smpi_mpi_init()
        for (i = 0; i < host_count && host != hosts[i]; i ++);
 
        hdata->index = i;
+       hdata->mutex = SIMIX_mutex_init();
+       hdata->cond  = SIMIX_cond_init();
 
        SIMIX_host_set_data(host, hdata);
 
@@ -115,11 +117,12 @@ void smpi_mpi_init()
        // wait for all nodes to signal initializatin complete
        SIMIX_mutex_lock(smpi_global->start_stop_mutex);
        smpi_global->ready_process_count++;
-       if (smpi_global->ready_process_count < 3 * host_count) {
-               SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
-       } else {
+       if (smpi_global->ready_process_count >= 3 * host_count) {
                SIMIX_cond_broadcast(smpi_global->start_stop_cond);
        }
+       while (smpi_global->ready_process_count < 3 * host_count) {
+               SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
+       }
        SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
 
        return;
@@ -137,6 +140,9 @@ void smpi_mpi_finalize()
        smpi_global->ready_process_count--;
        SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
 
+       SIMIX_mutex_destroy(smpi_host_mutex());
+       SIMIX_cond_destroy(smpi_host_cond());
+
        if (0 >= i) {
 
                // wake up senders/receivers
@@ -170,12 +176,12 @@ int smpi_mpi_barrier(smpi_mpi_communicator_t comm)
 {
 
        SIMIX_mutex_lock(comm->barrier_mutex);
-       if(++comm->barrier_count < comm->size) {
-               SIMIX_cond_wait(comm->barrier_cond, comm->barrier_mutex);
-       } else {
-               comm->barrier_count = 0;
+       if (++comm->barrier_count >= comm->size) {
                SIMIX_cond_broadcast(comm->barrier_cond);
        }
+       while (comm->barrier_count < comm->size) {
+               SIMIX_cond_wait(comm->barrier_cond, comm->barrier_mutex);
+       }
        SIMIX_mutex_unlock(comm->barrier_mutex);
 
        return MPI_SUCCESS;
index f879b9a..7fab85e 100644 (file)
@@ -5,18 +5,28 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_bench, smpi, "Logging specific to SMPI (ben
 
 void smpi_execute(double duration) {
         smx_host_t host = SIMIX_host_self();
+       smx_mutex_t mutex = smpi_host_mutex();
+       smx_cond_t cond = smpi_host_cond();
         smx_action_t action;
+       e_surf_action_state_t state;
 
-       SIMIX_mutex_lock(smpi_global->execute_mutex);
+       SIMIX_mutex_lock(mutex);
 
        action = SIMIX_action_execute(host, "execute", duration * SMPI_DEFAULT_SPEED);
 
-        SIMIX_register_action_to_condition(action, smpi_global->execute_cond);
-        SIMIX_cond_wait(smpi_global->execute_cond, smpi_global->execute_mutex);
-        SIMIX_unregister_action_to_condition(action, smpi_global->execute_cond);
+        SIMIX_register_action_to_condition(action, cond);
+       for (
+               state =  SIMIX_action_get_state(action);
+               state == SURF_ACTION_READY ||
+               state == SURF_ACTION_RUNNING;
+               state =  SIMIX_action_get_state(action)
+       ) {
+               SIMIX_cond_wait(cond, mutex);
+       }
+        SIMIX_unregister_action_to_condition(action, cond);
         SIMIX_action_destroy(action);
 
-        SIMIX_mutex_unlock(smpi_global->execute_mutex);
+        SIMIX_mutex_unlock(mutex);
 
        return;
 }
index d285a00..d9da589 100644 (file)
@@ -168,10 +168,6 @@ void smpi_global_init()
        smpi_global->timer_mutex                         = SIMIX_mutex_init();
        smpi_global->timer_cond                          = SIMIX_cond_init();
 
-       smpi_global->execute_mutex                       = SIMIX_mutex_init();
-       smpi_global->execute_cond                        = SIMIX_cond_init();
-       smpi_global->execute_count                       = 0;
-
        smpi_global->do_once_duration_nodes              = NULL;
        smpi_global->do_once_duration                    = NULL;
        smpi_global->do_once_mutex                       = SIMIX_mutex_init();
@@ -213,8 +209,6 @@ void smpi_global_destroy()
        xbt_os_timer_free(smpi_global->timer);
        SIMIX_mutex_destroy(smpi_global->timer_mutex);
        SIMIX_cond_destroy(smpi_global->timer_cond);
-       SIMIX_mutex_destroy(smpi_global->execute_mutex);
-       SIMIX_cond_destroy(smpi_global->execute_cond);
 
        for(curr = smpi_global->do_once_duration_nodes; NULL != curr; curr = next) {
                next = curr->next;
@@ -249,10 +243,23 @@ int smpi_host_index()
 {
        smx_host_t host = SIMIX_host_self();
        smpi_host_data_t hdata = (smpi_host_data_t)SIMIX_host_get_data(host);
-
        return hdata->index;
 }
 
+smx_mutex_t smpi_host_mutex()
+{
+       smx_host_t host = SIMIX_host_self();
+       smpi_host_data_t hdata = (smpi_host_data_t)SIMIX_host_get_data(host);
+       return hdata->mutex;
+}
+
+smx_cond_t smpi_host_cond()
+{
+       smx_host_t host = SIMIX_host_self();
+       smpi_host_data_t hdata = (smpi_host_data_t)SIMIX_host_get_data(host);
+       return hdata->cond;
+}
+
 int smpi_run_simulation(int *argc, char **argv)
 {
        smx_cond_t   cond           = NULL;
index e22363f..edfb213 100644 (file)
@@ -24,7 +24,7 @@ int smpi_receiver(int argc, char **argv)
 
        // make sure root is done before own initialization
        SIMIX_mutex_lock(smpi_global->start_stop_mutex);
-       if (!smpi_global->root_ready) {
+       while (!smpi_global->root_ready) {
                SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
        }
        SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
@@ -41,11 +41,12 @@ int smpi_receiver(int argc, char **argv)
        // wait for all nodes to signal initializatin complete
        SIMIX_mutex_lock(smpi_global->start_stop_mutex);
        smpi_global->ready_process_count++;
-       if (smpi_global->ready_process_count < 3 * smpi_global->host_count) {
-               SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
-       } else {
+       if (smpi_global->ready_process_count >= 3 * smpi_global->host_count) {
                SIMIX_cond_broadcast(smpi_global->start_stop_cond);
        }
+       while (smpi_global->ready_process_count < 3 * smpi_global->host_count) {
+               SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
+       }
        SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
 
        do {
index 1df105f..a03b421 100644 (file)
@@ -20,6 +20,8 @@ int smpi_sender(int argc, char **argv)
 
        smx_action_t action;
 
+       e_surf_action_state_t state;
+
        smpi_received_message_t message;
 
        int dindex;
@@ -31,7 +33,7 @@ int smpi_sender(int argc, char **argv)
 
        // make sure root is done before own initialization
        SIMIX_mutex_lock(smpi_global->start_stop_mutex);
-       if (!smpi_global->root_ready) {
+       while (!smpi_global->root_ready) {
                SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
        }
        SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
@@ -46,11 +48,12 @@ int smpi_sender(int argc, char **argv)
        // wait for all nodes to signal initializatin complete
        SIMIX_mutex_lock(smpi_global->start_stop_mutex);
        smpi_global->ready_process_count++;
-       if (smpi_global->ready_process_count < 3 * smpi_global->host_count) {
-               SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
-       } else {
+       if (smpi_global->ready_process_count >= 3 * smpi_global->host_count) {
                SIMIX_cond_broadcast(smpi_global->start_stop_cond);
        }
+       while (smpi_global->ready_process_count < 3 * smpi_global->host_count) {
+               SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
+       }
        SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
 
        do {
@@ -92,7 +95,15 @@ int smpi_sender(int argc, char **argv)
                        action = SIMIX_action_communicate(shost, dhost, "communication", request->datatype->size * request->count, -1.0);
 
                        SIMIX_register_action_to_condition(action, request->cond);
-                       SIMIX_cond_wait(request->cond, request->mutex);
+
+                       for (
+                               state  = SIMIX_action_get_state(action);
+                               state == SURF_ACTION_READY ||
+                               state == SURF_ACTION_RUNNING;
+                               state  = SIMIX_action_get_state(action)
+                       ) {
+                               SIMIX_cond_wait(request->cond, request->mutex);
+                       }
 
                        SIMIX_unregister_action_to_condition(action, request->cond);
                        SIMIX_action_destroy(action);
index 71946a8..596c294 100644 (file)
@@ -22,23 +22,35 @@ int smpi_gettimeofday(struct timeval *tv, struct timezone *tz)
 unsigned int smpi_sleep(unsigned int seconds)
 {
        smx_host_t host;
+       smx_mutex_t mutex;
+       smx_cond_t cond;
        smx_action_t action;
+       e_surf_action_state_t state;
 
        smpi_bench_end();
 
        host  = SIMIX_host_self();
+       mutex = smpi_host_mutex();
+       cond  = smpi_host_cond();
 
-       SIMIX_mutex_lock(smpi_global->execute_mutex);
+       SIMIX_mutex_lock(mutex);
 
        // FIXME: explicit conversion to double?
        action = SIMIX_action_sleep(host, seconds);
 
-       SIMIX_register_action_to_condition(action, smpi_global->execute_cond);
-       SIMIX_cond_wait(smpi_global->execute_cond, smpi_global->execute_mutex);
-       SIMIX_unregister_action_to_condition(action, smpi_global->execute_cond);
+       SIMIX_register_action_to_condition(action, cond);
+       for (
+               state =  SIMIX_action_get_state(action);
+               state == SURF_ACTION_READY ||
+               state == SURF_ACTION_RUNNING;
+               state =  SIMIX_action_get_state(action)
+       ) {
+               SIMIX_cond_wait(cond, mutex);
+       }
+       SIMIX_unregister_action_to_condition(action, cond);
        SIMIX_action_destroy(action);
 
-       SIMIX_mutex_unlock(smpi_global->execute_mutex);
+       SIMIX_mutex_unlock(mutex);
 
        smpi_bench_begin();
        return 0;