Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Make sure upper layer can retrieve the properties of workstations (and that's the...
[simgrid.git] / src / smpi / smpi_receiver.c
1 #include "private.h"
2
3 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_receiver, smpi,
4                                 "Logging specific to SMPI (receiver)");
5
6 int smpi_receiver(int argc, char *argv[])
7 {
8   smpi_process_data_t mydata = SIMIX_process_get_data(SIMIX_process_self());
9   smx_process_t self;
10   int index = mydata->index;
11
12   xbt_fifo_t request_queue;
13   xbt_fifo_t message_queue;
14
15   smpi_mpi_request_t request;
16   smpi_received_message_t message;
17
18   xbt_fifo_item_t request_item;
19   xbt_fifo_item_t message_item;
20
21   self = SIMIX_process_self();
22
23   request_queue = mydata->pending_recv_request_queue;
24   message_queue = mydata->received_message_queue;
25
26   while (1) {
27     // FIXME: better algorithm, maybe some kind of balanced tree? or a heap?
28
29     xbt_fifo_foreach(request_queue, request_item, request, smpi_mpi_request_t) {
30       xbt_fifo_foreach(message_queue, message_item, message,
31                        smpi_received_message_t) {
32
33         if (request->comm == message->comm &&
34             (MPI_ANY_SOURCE == request->src || request->src == message->src)
35             && (MPI_ANY_TAG == request->tag || request->tag == message->tag)) {
36           xbt_fifo_remove_item(request_queue, request_item);
37           xbt_fifo_free_item(request_item);
38           xbt_fifo_remove_item(message_queue, message_item);
39           xbt_fifo_free_item(message_item);
40           goto stopsearch;
41         }
42       }
43     }
44
45     request = NULL;
46     message = NULL;
47
48   stopsearch:
49     if (NULL != request) {
50       if (NULL == message)
51         DIE_IMPOSSIBLE;
52
53       SIMIX_mutex_lock(request->mutex);
54       memcpy(request->buf, message->buf,
55              request->datatype->size * request->count);
56       request->src = message->src;
57       request->data = message->data;
58       request->forward = message->forward;
59
60       if (0 == request->forward) {
61         request->completed = 1;
62         SIMIX_cond_broadcast(request->cond);
63       } else {
64         request->src = request->comm->index_to_rank_map[index];
65         request->dst = (request->src + 1) % request->comm->size;
66         smpi_mpi_isend(request);
67       }
68
69       SIMIX_mutex_unlock(request->mutex);
70
71       xbt_free(message->buf);
72       xbt_mallocator_release(smpi_global->message_mallocator, message);
73
74     } else if (mydata->finalize > 0) {  /* main wants me to die and nothing to do */
75       // FIXME: display the list of remaining requests and messages (user code synchronization faulty?)
76       mydata->finalize--;
77       SIMIX_cond_signal(mydata->cond);
78       return 0;
79     } else {
80       SIMIX_process_suspend(self);
81     }
82   }
83
84   return 0;
85 }