Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Added flag for printing more debug info
[simgrid.git] / src / smpi / smpi_sender.c
1 #include "private.h"
2
3 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_sender, smpi,
4                                 "Logging specific to SMPI (sender)");
5
6 int smpi_sender(int argc, char *argv[])
7 {
8   smpi_process_data_t mydata = SIMIX_process_get_data(SIMIX_process_self());
9   smx_process_t self;
10   smx_host_t shost;
11
12   int index;
13
14   xbt_fifo_t request_queue;
15
16   smpi_mpi_request_t request;
17
18   smx_host_t dhost;
19
20   smx_action_t action;
21
22   e_surf_action_state_t state;
23
24   smpi_received_message_t message;
25
26   int dindex;
27
28   self = SIMIX_process_self();
29   shost = SIMIX_host_self();
30
31   index = mydata->index;
32
33   DEBUG0("Up and running");
34
35   request_queue = mydata->pending_send_request_queue;
36
37   while (1) {
38     request = xbt_fifo_shift(request_queue);
39
40     if (NULL != request) {
41       message = xbt_mallocator_get(smpi_global->message_mallocator);
42
43       SIMIX_mutex_lock(request->mutex);
44
45       message->comm = request->comm;
46       message->src = request->comm->index_to_rank_map[index];
47       message->tag = request->tag;
48       message->data = request->data;
49       message->buf = xbt_malloc(request->datatype->size * request->count);
50       memcpy(message->buf, request->buf,
51              request->datatype->size * request->count);
52
53       dindex = request->comm->rank_to_index_map[request->dst];
54       smpi_process_data_t remote_process =
55         SIMIX_process_get_data(smpi_global->main_processes[dindex]);
56       dhost = SIMIX_process_get_host(smpi_global->main_processes[dindex]);
57
58       DEBUG4("handle send request %p to %s (req_dst=%d,req_tag=%d)",
59                       request,SIMIX_host_get_name(dhost),request->dst,message->tag);
60       message->forward = (request->forward - 1) / 2;
61       request->forward = request->forward / 2;
62
63       if (0 < request->forward) {
64         request->dst =
65           (request->dst + message->forward + 1) % request->comm->size;
66         xbt_fifo_push(request_queue, request);
67       } else {
68         DEBUG4("DONE Handling send request %p to %s (req_dst=%d,req_tag=%d)",
69                         request, SIMIX_host_get_name(dhost),request->dst,message->tag);
70         request->completed = 1;
71       }
72
73       action =
74         SIMIX_action_communicate(shost, dhost, "communication",
75                                  request->datatype->size * request->count,
76                                  -1.0);
77
78       SIMIX_register_action_to_condition(action, request->cond);
79
80       for (state = SIMIX_action_get_state(action);
81            state == SURF_ACTION_READY ||
82            state == SURF_ACTION_RUNNING;
83            state = SIMIX_action_get_state(action)
84         ) {
85         SIMIX_cond_wait(request->cond, request->mutex);
86       }
87
88       xbt_fifo_push(remote_process->received_message_queue, message);
89
90       SIMIX_unregister_action_to_condition(action, request->cond);
91       SIMIX_action_destroy(action);
92
93       SIMIX_mutex_unlock(request->mutex);
94
95       // wake up receiver if necessary
96       SIMIX_process_resume(remote_process->receiver);
97
98     } else if (mydata->finalize > 0) {  /* main wants me to die and nothing to do */
99       DEBUG0("===Main wants me to die and I'm done. Bye, guys.===");
100       mydata->finalize--;
101       SIMIX_cond_signal(mydata->cond);
102       return 0;
103     } else {
104       DEBUG0("Nothing to do. Let's get a nap");
105       SIMIX_process_suspend(self);
106       DEBUG0("===Uh? Someone called me?===");
107     }
108   }
109   return 0;
110 }