Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
00978758100ad0f9de9534d76acb0e02993e12ac
[simgrid.git] / src / smpi / smpi_receiver.c
1 #include "private.h"
2
3 int smpi_receiver(int argc, char **argv)
4 {
5         smx_process_t self;
6         int index;
7
8         xbt_fifo_t request_queue;
9         smx_mutex_t request_queue_mutex;
10         xbt_fifo_t message_queue;
11         smx_mutex_t message_queue_mutex;
12
13         int running_hosts_count;
14
15         smpi_mpi_request_t request;
16         smpi_received_message_t message;
17
18         xbt_fifo_item_t request_item;
19         xbt_fifo_item_t message_item;
20
21         self = SIMIX_process_self();
22
23         // make sure root is done before own initialization
24         SIMIX_mutex_lock(smpi_global->start_stop_mutex);
25         if (!smpi_global->root_ready) {
26                 SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
27         }
28         SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
29
30         index = smpi_host_index();
31
32         request_queue       = smpi_global->pending_recv_request_queues[index];
33         request_queue_mutex = smpi_global->pending_recv_request_queues_mutexes[index];
34         message_queue       = smpi_global->received_message_queues[index];
35         message_queue_mutex = smpi_global->received_message_queues_mutexes[index];
36
37         smpi_global->receiver_processes[index] = self;
38
39         // wait for all nodes to signal initializatin complete
40         SIMIX_mutex_lock(smpi_global->start_stop_mutex);
41         smpi_global->ready_process_count++;
42         if (smpi_global->ready_process_count < 3 * smpi_global->host_count) {
43                 SIMIX_cond_wait(smpi_global->start_stop_cond, smpi_global->start_stop_mutex);
44         } else {
45                 SIMIX_cond_broadcast(smpi_global->start_stop_cond);
46         }
47         SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
48
49         do {
50                 request = NULL;
51                 message = NULL;
52
53                 // FIXME: better algorithm, maybe some kind of balanced tree? or a heap?
54
55                 // FIXME: not the best way to request multiple locks...
56                 SIMIX_mutex_lock(request_queue_mutex);
57                 SIMIX_mutex_lock(message_queue_mutex);
58                 for (request_item = xbt_fifo_get_first_item(request_queue);
59                         NULL != request_item;
60                         request_item = xbt_fifo_get_next_item(request_item)) {
61                         request = xbt_fifo_get_item_content(request_item);
62                         for (message_item = xbt_fifo_get_first_item(message_queue);
63                                 NULL != message_item;
64                                 message_item = xbt_fifo_get_next_item(message_item)) {
65                                 message = xbt_fifo_get_item_content(message_item);
66                                 if (request->comm == message->comm &&
67                                    (MPI_ANY_SOURCE == request->src || request->src == message->src) &&
68                                    (MPI_ANY_TAG == request->tag || request->tag == message->tag)) {
69                                         xbt_fifo_remove_item(request_queue, request_item);
70                                         xbt_fifo_remove_item(message_queue, message_item);
71                                         goto stopsearch;
72                                 }
73                         }
74                 }
75 stopsearch:
76                 SIMIX_mutex_unlock(message_queue_mutex);
77                 SIMIX_mutex_unlock(request_queue_mutex);
78
79                 if (NULL == request || NULL == message) {
80                         SIMIX_process_suspend(self);
81                 } else {
82
83                         SIMIX_mutex_lock(request->mutex);
84                         memcpy(request->buf, message->buf, request->datatype->size * request->count);
85                         request->src = message->src;
86                         request->data = message->data;
87                         request->forward = message->forward;
88
89                         if (0 == request->forward) {
90                                 request->completed = 1;
91                                 SIMIX_cond_broadcast(request->cond);
92                         } else {
93                                 request->src = request->comm->index_to_rank_map[index];
94                                 request->dst = (request->src + 1) % request->comm->size;
95                                 smpi_mpi_isend(request);
96                         }
97
98                         SIMIX_mutex_unlock(request->mutex);
99
100                         xbt_free(message->buf);
101                         xbt_mallocator_release(smpi_global->message_mallocator, message);
102
103                 }
104
105                 SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex);
106                 running_hosts_count = smpi_global->running_hosts_count;
107                 SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex);
108
109         } while (0 < running_hosts_count);
110
111         return 0;
112 }