Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
some of the action checking code didn't work right, so I removed it.
[simgrid.git] / src / smpi / smpi_sender.c
index c1c8050..26f5125 100644 (file)
@@ -1,9 +1,12 @@
 #include "private.h"
 
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_sender, smpi, "Logging specific to SMPI (sender)");
+
 int smpi_sender(int argc, char **argv)
 {
        smx_process_t self;
        smx_host_t shost;
+
        int index;
 
        xbt_fifo_t request_queue;
@@ -17,6 +20,8 @@ int smpi_sender(int argc, char **argv)
 
        smx_action_t action;
 
+       e_surf_action_state_t state;
+
        smpi_received_message_t message;
 
        int dindex;
@@ -28,7 +33,7 @@ int smpi_sender(int argc, char **argv)
 
        // make sure root is done before own initialization
        SIMIX_mutex_lock(smpi_global->start_stop_mutex);
-       if (!smpi_global->root_ready) {
+       while (!smpi_global->root_ready) {
                SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
        }
        SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
@@ -43,11 +48,12 @@ int smpi_sender(int argc, char **argv)
        // wait for all nodes to signal initializatin complete
        SIMIX_mutex_lock(smpi_global->start_stop_mutex);
        smpi_global->ready_process_count++;
-       if (smpi_global->ready_process_count < 3 * smpi_global->host_count) {
-               SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
-       } else {
+       if (smpi_global->ready_process_count >= 3 * smpi_global->host_count) {
                SIMIX_cond_broadcast(smpi_global->start_stop_cond);
        }
+       while (smpi_global->ready_process_count < 3 * smpi_global->host_count) {
+               SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
+       }
        SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
 
        do {
@@ -77,10 +83,6 @@ int smpi_sender(int argc, char **argv)
                        message->forward = (request->forward - 1) / 2;
                        request->forward = request->forward / 2;
 
-                       SIMIX_mutex_lock(smpi_global->received_message_queues_mutexes[dindex]);
-                       xbt_fifo_push(smpi_global->received_message_queues[dindex], message);
-                       SIMIX_mutex_unlock(smpi_global->received_message_queues_mutexes[dindex]);
-
                        if (0 < request->forward) {
                                request->dst = (request->dst + message->forward + 1) % request->comm->size;
                                SIMIX_mutex_lock(request_queue_mutex);
@@ -93,7 +95,20 @@ int smpi_sender(int argc, char **argv)
                        action = SIMIX_action_communicate(shost, dhost, "communication", request->datatype->size * request->count, -1.0);
 
                        SIMIX_register_action_to_condition(action, request->cond);
-                       SIMIX_cond_wait(request->cond, request->mutex);
+
+                       for (
+                               state  = SIMIX_action_get_state(action);
+                               state == SURF_ACTION_READY ||
+                               state == SURF_ACTION_RUNNING;
+                               state  = SIMIX_action_get_state(action)
+                       ) {
+                               SIMIX_cond_wait(request->cond, request->mutex);
+                       }
+
+                       SIMIX_mutex_lock(smpi_global->received_message_queues_mutexes[dindex]);
+                       xbt_fifo_push(smpi_global->received_message_queues[dindex], message);
+                       SIMIX_mutex_unlock(smpi_global->received_message_queues_mutexes[dindex]);
+
                        SIMIX_unregister_action_to_condition(action, request->cond);
                        SIMIX_action_destroy(action);