Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
efb88dd4f6fe56c51fdaa63e652fa82988ccfff1
[simgrid.git] / src / smpi / smpi_receiver.c
1 #include "private.h"
2
3 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_receiver, smpi,
4                                 "Logging specific to SMPI (receiver)");
5
6 int smpi_receiver(int argc, char*argv[])
7 {
8         smpi_host_data_t mydata = SIMIX_process_get_data(SIMIX_process_self());
9   smx_process_t self;
10   int index = mydata->index;
11
12   xbt_fifo_t request_queue;
13   xbt_fifo_t message_queue;
14
15   int running_hosts_count;
16
17   smpi_mpi_request_t request;
18   smpi_received_message_t message;
19
20   xbt_fifo_item_t request_item;
21   xbt_fifo_item_t message_item;
22
23   self = SIMIX_process_self();
24
25   request_queue = smpi_global->pending_recv_request_queues[index];
26   message_queue = smpi_global->received_message_queues[index];
27
28   smpi_global->receiver_processes[index] = self;
29
30   do {
31
32     // FIXME: better algorithm, maybe some kind of balanced tree? or a heap?
33
34         xbt_fifo_foreach(request_queue,request_item,request,smpi_mpi_request_t){
35           xbt_fifo_foreach(message_queue,message_item,message, smpi_received_message_t) {
36
37         if (request->comm == message->comm &&
38             (MPI_ANY_SOURCE == request->src || request->src == message->src)
39             && (MPI_ANY_TAG == request->tag || request->tag == message->tag)) {
40           xbt_fifo_remove_item(request_queue, request_item);
41           xbt_fifo_free_item(request_item);
42           xbt_fifo_remove_item(message_queue, message_item);
43           xbt_fifo_free_item(message_item);
44           goto stopsearch;
45         }
46       }
47     }
48
49     request = NULL;
50     message = NULL;
51
52   stopsearch:
53     if (NULL == request || NULL == message) {
54       SIMIX_process_suspend(self);
55     } else {
56
57       SIMIX_mutex_lock(request->mutex);
58       memcpy(request->buf, message->buf,
59              request->datatype->size * request->count);
60       request->src = message->src;
61       request->data = message->data;
62       request->forward = message->forward;
63
64       if (0 == request->forward) {
65         request->completed = 1;
66         SIMIX_cond_broadcast(request->cond);
67       } else {
68         request->src = request->comm->index_to_rank_map[index];
69         request->dst = (request->src + 1) % request->comm->size;
70         smpi_mpi_isend(request);
71       }
72
73       SIMIX_mutex_unlock(request->mutex);
74
75       xbt_free(message->buf);
76       xbt_mallocator_release(smpi_global->message_mallocator, message);
77
78     }
79
80     running_hosts_count = smpi_global->running_hosts_count;
81
82   } while (0 < running_hosts_count);
83
84   return 0;
85 }