Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
updated to use branching-tree broadcast instead of for-loop.
[simgrid.git] / src / smpi / smpi_receiver.c
index 23d9cca..eb5c887 100644 (file)
@@ -3,13 +3,12 @@
 int smpi_receiver(int argc, char **argv)
 {
        smx_process_t self;
-       int rank;
+       int index;
 
        xbt_fifo_t request_queue;
        smx_mutex_t request_queue_mutex;
        xbt_fifo_t message_queue;
        smx_mutex_t message_queue_mutex;
-       int size;
 
        int running_hosts_count;
 
@@ -28,20 +27,19 @@ int smpi_receiver(int argc, char **argv)
        }
        SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
 
-       rank = smpi_mpi_comm_rank_self(smpi_mpi_global->mpi_comm_world);
-       size = smpi_mpi_comm_size(smpi_mpi_global->mpi_comm_world);
+       index = smpi_host_index();
 
-       request_queue       = smpi_global->pending_recv_request_queues[rank];
-       request_queue_mutex = smpi_global->pending_recv_request_queues_mutexes[rank];
-       message_queue       = smpi_global->received_message_queues[rank];
-       message_queue_mutex = smpi_global->received_message_queues_mutexes[rank];
+       request_queue       = smpi_global->pending_recv_request_queues[index];
+       request_queue_mutex = smpi_global->pending_recv_request_queues_mutexes[index];
+       message_queue       = smpi_global->received_message_queues[index];
+       message_queue_mutex = smpi_global->received_message_queues_mutexes[index];
 
-       smpi_global->receiver_processes[rank] = self;
+       smpi_global->receiver_processes[index] = self;
 
        // wait for all nodes to signal initializatin complete
        SIMIX_mutex_lock(smpi_global->start_stop_mutex);
        smpi_global->ready_process_count++;
-       if (smpi_global->ready_process_count < 3 * size) {
+       if (smpi_global->ready_process_count < 3 * smpi_global->host_count) {
                SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
        } else {
                SIMIX_cond_broadcast(smpi_global->start_stop_cond);
@@ -85,8 +83,18 @@ stopsearch:
                        SIMIX_mutex_lock(request->mutex);
                        memcpy(request->buf, message->buf, request->datatype->size * request->count);
                        request->src = message->src;
-                       request->completed = 1;
-                       SIMIX_cond_broadcast(request->cond);
+                       request->data = message->data;
+                       request->forward = message->forward;
+
+                       if (0 == request->forward) {
+                               request->completed = 1;
+                               SIMIX_cond_broadcast(request->cond);
+                       } else {
+                               request->src = smpi_mpi_comm_rank(request->comm);
+                               request->dst = (request->src + 1) % request->comm->size;
+                               smpi_mpi_isend(request);
+                       }
+
                        SIMIX_mutex_unlock(request->mutex);
 
                        xbt_free(message->buf);