Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
SMPI: cleanup the initialization process by moving stuff done by first process in...
[simgrid.git] / src / smpi / smpi_sender.c
1 #include "private.h"
2
3 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_sender, smpi,
4                                 "Logging specific to SMPI (sender)");
5
6 int smpi_sender(int argc, char **argv)
7 {
8   smx_process_t self;
9   smx_host_t shost;
10
11   int index;
12
13   xbt_fifo_t request_queue;
14   smx_mutex_t request_queue_mutex;
15
16   int running_hosts_count;
17
18   smpi_mpi_request_t request;
19
20   smx_host_t dhost;
21
22   smx_action_t action;
23
24   e_surf_action_state_t state;
25
26   smpi_received_message_t message;
27
28   int dindex;
29
30   smx_process_t receiver_process;
31
32   self = SIMIX_process_self();
33   shost = SIMIX_host_self();
34
35   // make sure root is done before own initialization
36   SIMIX_mutex_lock(smpi_global->start_stop_mutex);
37   while (!smpi_global->root_ready) {
38     SIMIX_cond_wait(smpi_global->start_stop_cond,
39                     smpi_global->start_stop_mutex);
40   }
41   SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
42
43   index = smpi_host_index();
44
45   request_queue = smpi_global->pending_send_request_queues[index];
46   request_queue_mutex =
47     smpi_global->pending_send_request_queues_mutexes[index];
48
49   smpi_global->sender_processes[index] = self;
50
51   // wait for all nodes to signal initializatin complete
52   SIMIX_mutex_lock(smpi_global->start_stop_mutex);
53   smpi_global->ready_process_count++;
54   if (smpi_global->ready_process_count >= 3 * smpi_global->host_count) {
55     SIMIX_cond_broadcast(smpi_global->start_stop_cond);
56   }
57   while (smpi_global->ready_process_count < 3 * smpi_global->host_count) {
58     SIMIX_cond_wait(smpi_global->start_stop_cond,
59                     smpi_global->start_stop_mutex);
60   }
61   SIMIX_mutex_unlock(smpi_global->start_stop_mutex);
62
63   do {
64
65     SIMIX_mutex_lock(request_queue_mutex);
66     request = xbt_fifo_shift(request_queue);
67     SIMIX_mutex_unlock(request_queue_mutex);
68
69     if (NULL == request) {
70       SIMIX_process_suspend(self);
71     } else {
72
73       message = xbt_mallocator_get(smpi_global->message_mallocator);
74
75       SIMIX_mutex_lock(request->mutex);
76
77       message->comm = request->comm;
78       message->src = request->comm->index_to_rank_map[index];
79       message->tag = request->tag;
80       message->data = request->data;
81       message->buf = xbt_malloc(request->datatype->size * request->count);
82       memcpy(message->buf, request->buf,
83              request->datatype->size * request->count);
84
85       dindex = request->comm->rank_to_index_map[request->dst];
86       dhost = smpi_global->hosts[dindex];
87
88       message->forward = (request->forward - 1) / 2;
89       request->forward = request->forward / 2;
90
91       if (0 < request->forward) {
92         request->dst =
93           (request->dst + message->forward + 1) % request->comm->size;
94         SIMIX_mutex_lock(request_queue_mutex);
95         xbt_fifo_push(request_queue, request);
96         SIMIX_mutex_unlock(request_queue_mutex);
97       } else {
98         request->completed = 1;
99       }
100
101       action =
102         SIMIX_action_communicate(shost, dhost, "communication",
103                                  request->datatype->size * request->count,
104                                  -1.0);
105
106       SIMIX_register_action_to_condition(action, request->cond);
107
108       for (state = SIMIX_action_get_state(action);
109            state == SURF_ACTION_READY ||
110            state == SURF_ACTION_RUNNING;
111            state = SIMIX_action_get_state(action)
112         ) {
113         SIMIX_cond_wait(request->cond, request->mutex);
114       }
115
116       SIMIX_mutex_lock(smpi_global->received_message_queues_mutexes[dindex]);
117       xbt_fifo_push(smpi_global->received_message_queues[dindex], message);
118       SIMIX_mutex_unlock(smpi_global->received_message_queues_mutexes
119                          [dindex]);
120
121       SIMIX_unregister_action_to_condition(action, request->cond);
122       SIMIX_action_destroy(action);
123
124       SIMIX_mutex_unlock(request->mutex);
125
126       // wake up receiver if necessary
127       receiver_process = smpi_global->receiver_processes[dindex];
128       if (SIMIX_process_is_suspended(receiver_process)) {
129         SIMIX_process_resume(receiver_process);
130       }
131
132     }
133
134     SIMIX_mutex_lock(smpi_global->running_hosts_count_mutex);
135     running_hosts_count = smpi_global->running_hosts_count;
136     SIMIX_mutex_unlock(smpi_global->running_hosts_count_mutex);
137
138   } while (0 < running_hosts_count);
139
140   return 0;
141 }