Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
some of the action checking code didn't work right, so I removed it.
[simgrid.git] / src / smpi / smpi_receiver.c
index 23d9cca..43990a5 100644 (file)
@@ -1,15 +1,16 @@
 #include "private.h"
 
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_receiver, smpi, "Logging specific to SMPI (receiver)");
+
 int smpi_receiver(int argc, char **argv)
 {
        smx_process_t self;
-       int rank;
+       int index;
 
        xbt_fifo_t request_queue;
        smx_mutex_t request_queue_mutex;
        xbt_fifo_t message_queue;
        smx_mutex_t message_queue_mutex;
-       int size;
 
        int running_hosts_count;
 
@@ -23,29 +24,29 @@ int smpi_receiver(int argc, char **argv)
 
        // make sure root is done before own initialization
        SIMIX_mutex_lock(smpi_global->start_stop_mutex);
-       if (!smpi_global->root_ready) {
+       while (!smpi_global->root_ready) {
                SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
        }
        SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
 
-       rank = smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world);
-       size = smpi_mpi_comm_size(smpi_mpi_global->mpi_comm_world);
+       index = smpi_host_index();
 
-       request_queue       = smpi_global->pending_recv_request_queues[rank];
-       request_queue_mutex = smpi_global->pending_recv_request_queues_mutexes[rank];
-       message_queue       = smpi_global->received_message_queues[rank];
-       message_queue_mutex = smpi_global->received_message_queues_mutexes[rank];
+       request_queue       = smpi_global->pending_recv_request_queues[index];
+       request_queue_mutex = smpi_global->pending_recv_request_queues_mutexes[index];
+       message_queue       = smpi_global->received_message_queues[index];
+       message_queue_mutex = smpi_global->received_message_queues_mutexes[index];
 
-       smpi_global->receiver_processes[rank] = self;
+       smpi_global->receiver_processes[index] = self;
 
        // wait for all nodes to signal initializatin complete
        SIMIX_mutex_lock(smpi_global->start_stop_mutex);
        smpi_global->ready_process_count++;
-       if (smpi_global->ready_process_count < 3 * size) {
-               SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
-       } else {
+       if (smpi_global->ready_process_count >= 3 * smpi_global->host_count) {
                SIMIX_cond_broadcast(smpi_global->start_stop_cond);
        }
+       while (smpi_global->ready_process_count < 3 * smpi_global->host_count) {
+               SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
+       }
        SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
 
        do {
@@ -65,9 +66,10 @@ int smpi_receiver(int argc, char **argv)
                                NULL != message_item;
                                message_item = xbt_fifo_get_next_item(message_item)) {
                                message = xbt_fifo_get_item_content(message_item);
-                               if (request->comm == message->comm &&
-                                  (MPI_ANY_SOURCE == request->src || request->src == message->src) &&
-                                  (MPI_ANY_TAG == request->tag || request->tag == message->tag)) {
+                               if (
+                                       request->comm == message->comm &&
+                                       (MPI_ANY_SOURCE == request->src || request->src == message->src) &&
+                                       (MPI_ANY_TAG == request->tag || request->tag == message->tag)) {
                                        xbt_fifo_remove_item(request_queue, request_item);
                                        xbt_fifo_remove_item(message_queue, message_item);
                                        goto stopsearch;
@@ -85,8 +87,18 @@ stopsearch:
                        SIMIX_mutex_lock(request->mutex);
                        memcpy(request->buf, message->buf, request->datatype->size * request->count);
                        request->src = message->src;
-                       request->completed = 1;
-                       SIMIX_cond_broadcast(request->cond);
+                       request->data = message->data;
+                       request->forward = message->forward;
+
+                       if (0 == request->forward) {
+                               request->completed = 1;
+                               SIMIX_cond_broadcast(request->cond);
+                       } else {
+                               request->src = request->comm->index_to_rank_map[index];
+                               request->dst = (request->src + 1) % request->comm->size;
+                               smpi_mpi_isend(request);
+                       }
+
                        SIMIX_mutex_unlock(request->mutex);
 
                        xbt_free(message->buf);