Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
accidently tried to compare action to state. now fixed.
[simgrid.git] / src / smpi / smpi_receiver.c
index eb5c887..9f4771e 100644 (file)
@@ -1,5 +1,7 @@
 #include "private.h"
 
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_receiver, smpi, "Logging specific to SMPI (receiver)");
+
 int smpi_receiver(int argc, char **argv)
 {
        smx_process_t self;
@@ -18,11 +20,13 @@ int smpi_receiver(int argc, char **argv)
        xbt_fifo_item_t request_item;
        xbt_fifo_item_t message_item;
 
+       e_surf_action_state_t state;
+
        self = SIMIX_process_self();
 
        // make sure root is done before own initialization
        SIMIX_mutex_lock(smpi_global->start_stop_mutex);
-       if (!smpi_global->root_ready) {
+       while (!smpi_global->root_ready) {
                SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
        }
        SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
@@ -39,11 +43,12 @@ int smpi_receiver(int argc, char **argv)
        // wait for all nodes to signal initializatin complete
        SIMIX_mutex_lock(smpi_global->start_stop_mutex);
        smpi_global->ready_process_count++;
-       if (smpi_global->ready_process_count < 3 * smpi_global->host_count) {
-               SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
-       } else {
+       if (smpi_global->ready_process_count >= 3 * smpi_global->host_count) {
                SIMIX_cond_broadcast(smpi_global->start_stop_cond);
        }
+       while (smpi_global->ready_process_count < 3 * smpi_global->host_count) {
+               SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
+       }
        SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
 
        do {
@@ -63,9 +68,13 @@ int smpi_receiver(int argc, char **argv)
                                NULL != message_item;
                                message_item = xbt_fifo_get_next_item(message_item)) {
                                message = xbt_fifo_get_item_content(message_item);
-                               if (request->comm == message->comm &&
-                                  (MPI_ANY_SOURCE == request->src || request->src == message->src) &&
-                                  (MPI_ANY_TAG == request->tag || request->tag == message->tag)) {
+                               state   = SIMIX_action_get_state(message->action);
+                               if (
+                                       request->comm == message->comm &&
+                                       (MPI_ANY_SOURCE == request->src || request->src == message->src) &&
+                                       (MPI_ANY_TAG == request->tag || request->tag == message->tag) &&
+                                       (state != SURF_ACTION_READY && state != SURF_ACTION_RUNNING)
+                               ) {
                                        xbt_fifo_remove_item(request_queue, request_item);
                                        xbt_fifo_remove_item(message_queue, message_item);
                                        goto stopsearch;
@@ -80,6 +89,8 @@ stopsearch:
                        SIMIX_process_suspend(self);
                } else {
 
+                       // FIXME: check action status for bad messages
+
                        SIMIX_mutex_lock(request->mutex);
                        memcpy(request->buf, message->buf, request->datatype->size * request->count);
                        request->src = message->src;
@@ -90,7 +101,7 @@ stopsearch:
                                request->completed = 1;
                                SIMIX_cond_broadcast(request->cond);
                        } else {
-                               request->src = smpi_mpi_comm_rank(request->comm);
+                               request->src = request->comm->index_to_rank_map[index];
                                request->dst = (request->src + 1) % request->comm->size;
                                smpi_mpi_isend(request);
                        }