Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
added in code to add computation action to simulation...
[simgrid.git] / src / smpi / src / smpi_base.c
1 #include <stdio.h>
2
3 #include <signal.h>
4 #include <sys/time.h>
5 #include "xbt/xbt_portability.h"
6 #include "simix/simix.h"
7 #include "simix/private.h"
8 #include "smpi.h"
9
10 xbt_fifo_t *smpi_pending_send_requests      = NULL;
11 xbt_fifo_t *smpi_pending_recv_requests      = NULL;
12 xbt_fifo_t *smpi_received_messages          = NULL;
13
14 int smpi_running_hosts = 0;
15
16 smpi_mpi_communicator_t smpi_mpi_comm_world;
17
18 smpi_mpi_status_t smpi_mpi_status_ignore;
19
20 smpi_mpi_datatype_t smpi_mpi_byte;
21 smpi_mpi_datatype_t smpi_mpi_int;
22 smpi_mpi_datatype_t smpi_mpi_double;
23
24 smpi_mpi_op_t smpi_mpi_land;
25 smpi_mpi_op_t smpi_mpi_sum;
26
27 static xbt_os_timer_t smpi_timer;
28 static int smpi_benchmarking;
29 static double smpi_reference_speed;
30
31 // mutexes
32 smx_mutex_t smpi_running_hosts_mutex = NULL;
33 smx_mutex_t smpi_benchmarking_mutex  = NULL;
34 smx_mutex_t init_mutex = NULL;
35 smx_cond_t init_cond  = NULL;
36
37 int rootready = 0;
38 int readycount = 0;
39
40 XBT_LOG_NEW_DEFAULT_CATEGORY(smpi, "SMPI");
41
42 int smpi_sender(int argc, char **argv)
43 {
44         return 0;
45 }
46
47 int smpi_receiver(int argc, char **argv)
48 {
49         return 0;
50 }
51
52 int smpi_run_simulation(int argc, char **argv)
53 {
54         smx_cond_t   cond           = NULL;
55         smx_action_t action         = NULL;
56
57         xbt_fifo_t   actions_failed = xbt_fifo_new();
58         xbt_fifo_t   actions_done   = xbt_fifo_new();
59
60         srand(SMPI_RAND_SEED);
61
62         SIMIX_global_init(&argc, argv);
63
64         init_mutex = SIMIX_mutex_init();
65         init_cond  = SIMIX_cond_init();
66
67         SIMIX_function_register("smpi_simulated_main", smpi_simulated_main);
68         SIMIX_create_environment(argv[1]);
69         SIMIX_launch_application(argv[2]);
70
71         /* Prepare to display some more info when dying on Ctrl-C pressing */
72         //signal(SIGINT, inthandler);
73
74         /* Clean IO before the run */
75         fflush(stdout);
76         fflush(stderr);
77
78         while (SIMIX_solve(actions_done, actions_failed) != -1.0) {
79                 while (action = xbt_fifo_pop(actions_failed)) {
80                         DEBUG1("** %s failed **", action->name);
81                         while (cond = xbt_fifo_pop(action->cond_list)) {
82                                 SIMIX_cond_broadcast(cond);
83                         }
84                         SIMIX_action_destroy(action);
85                 }
86                 while (action = xbt_fifo_pop(actions_done)) {
87                         DEBUG1("** %s done **",action->name);
88                         while (cond = xbt_fifo_pop(action->cond_list)) {
89                                 SIMIX_cond_broadcast(cond);
90                         }
91                         SIMIX_action_destroy(action);
92                 }
93         }
94         xbt_fifo_free(actions_failed);
95         xbt_fifo_free(actions_done);
96         INFO1("simulation time %g", SIMIX_get_clock());
97         SIMIX_clean();
98         return 0;
99 }
100
101 void smpi_mpi_land_func(void *x, void *y, void *z)
102 {
103         *(int *)z = *(int *)x && *(int *)y;
104 }
105
106 void smpi_mpi_sum_func(void *x, void *y, void *z)
107 {
108         *(int *)z = *(int *)x + *(int *)y;
109 }
110
111 int smpi_mpi_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
112 {
113         int i;
114
115         for(i = comm->size - 1; i > 0 && host != comm->hosts[i]; i--);
116
117         return i;
118 }
119
120 int inline smpi_mpi_rank_self(smpi_mpi_communicator_t *comm)
121 {
122         return smpi_mpi_rank(comm, SIMIX_host_self());
123 }
124
125 void smpi_mpi_init()
126 {
127         int i;
128         int size;
129         smx_process_t process;
130         smx_host_t *hosts;
131         smx_host_t host;
132         double duration;
133
134         // initialize some local variables
135         host  = SIMIX_host_self();
136         hosts = SIMIX_host_get_table();
137         size  = SIMIX_host_get_number();
138
139         // node 0 sets the globals
140         if (host == hosts[0]) {
141
142                 // running hosts
143                 smpi_running_hosts_mutex          = SIMIX_mutex_init();
144                 smpi_running_hosts                = size;
145
146                 // global communicator
147                 smpi_mpi_comm_world.size          = size;
148                 smpi_mpi_comm_world.barrier       = 0;
149                 smpi_mpi_comm_world.barrier_mutex = SIMIX_mutex_init();
150                 smpi_mpi_comm_world.barrier_cond  = SIMIX_cond_init();
151                 smpi_mpi_comm_world.hosts         = hosts;
152                 smpi_mpi_comm_world.processes     = xbt_new0(smx_process_t, size);
153                 smpi_mpi_comm_world.processes[0]  = SIMIX_process_self();
154
155                 // mpi datatypes
156                 smpi_mpi_byte.size                = (size_t)1;
157                 smpi_mpi_int.size                 = sizeof(int);
158                 smpi_mpi_double.size              = sizeof(double);
159
160                 // mpi operations
161                 smpi_mpi_land.func                = &smpi_mpi_land_func;
162                 smpi_mpi_sum.func                 = &smpi_mpi_sum_func;
163
164                 // smpi globals
165                 smpi_pending_send_requests        = xbt_new0(xbt_fifo_t, size);
166                 smpi_pending_recv_requests        = xbt_new0(xbt_fifo_t, size);
167                 smpi_received_messages            = xbt_new0(xbt_fifo_t, size);
168
169                 for(i = 0; i < size; i++) {
170                         smpi_pending_send_requests[i] = xbt_fifo_new();
171                         smpi_pending_recv_requests[i] = xbt_fifo_new();
172                         smpi_received_messages[i]     = xbt_fifo_new();
173                 }
174
175                 smpi_timer                      = xbt_os_timer_new();
176                 smpi_reference_speed            = SMPI_DEFAULT_SPEED;
177                 smpi_benchmarking               = 0;
178                 smpi_benchmarking_mutex         = SIMIX_mutex_init();
179
180                 // signal all nodes to perform initialization
181                 SIMIX_mutex_lock(init_mutex);
182                 rootready = 1;
183                 SIMIX_cond_broadcast(init_cond);
184                 SIMIX_mutex_unlock(init_mutex);
185
186         } else {
187
188                 // make sure root is done before own initialization
189                 SIMIX_mutex_lock(init_mutex);
190                 if (!rootready) {
191                         SIMIX_cond_wait(init_cond, init_mutex);
192                 }
193                 SIMIX_mutex_unlock(init_mutex);
194
195                 smpi_mpi_comm_world.processes[smpi_mpi_rank_self(&smpi_mpi_comm_world)] = SIMIX_process_self();
196
197         }
198
199         // wait for all nodes to signal initializatin complete
200         SIMIX_mutex_lock(init_mutex);
201         readycount++;
202         if (readycount < size) {
203                 SIMIX_cond_wait(init_cond, init_mutex);
204         } else {
205                 SIMIX_cond_broadcast(init_cond);
206         }
207         SIMIX_mutex_unlock(init_mutex);
208
209 }
210
211 void smpi_mpi_finalize()
212 {
213         int i;
214
215         SIMIX_mutex_lock(smpi_running_hosts_mutex);
216         i = --smpi_running_hosts;
217         SIMIX_mutex_unlock(smpi_running_hosts_mutex);
218
219         if (0 >= i) {
220
221                 SIMIX_mutex_destroy(smpi_running_hosts_mutex);
222
223                 for (i = 0 ; i < smpi_mpi_comm_world.size; i++) {
224                         xbt_fifo_free(smpi_pending_send_requests[i]);
225                         xbt_fifo_free(smpi_pending_recv_requests[i]);
226                         xbt_fifo_free(smpi_received_messages[i]);
227                 }
228
229                 xbt_free(smpi_pending_send_requests);
230                 xbt_free(smpi_pending_recv_requests);
231                 xbt_free(smpi_received_messages);
232
233                 SIMIX_mutex_destroy(smpi_mpi_comm_world.barrier_mutex);
234                 SIMIX_cond_destroy(smpi_mpi_comm_world.barrier_cond);
235                 xbt_free(smpi_mpi_comm_world.processes);
236
237                 xbt_os_timer_free(smpi_timer);
238         }
239
240 }
241
242 void smpi_bench_begin()
243 {
244         xbt_assert0(!smpi_benchmarking, "Already benchmarking");
245         smpi_benchmarking = 1;
246         xbt_os_timer_start(smpi_timer);
247         return;
248 }
249
250 void smpi_bench_end()
251 {
252         double duration;
253         smx_host_t host;
254         smx_action_t compute_action;
255         smx_mutex_t mutex;
256         smx_cond_t cond;
257
258         xbt_assert0(smpi_benchmarking, "Not benchmarking yet");
259         smpi_benchmarking = 0;
260         xbt_os_timer_stop(smpi_timer);
261         duration = xbt_os_timer_elapsed(smpi_timer);
262         host           = SIMIX_host_self();
263         compute_action = SIMIX_action_sleep(host, "computation", duration * SMPI_DEFAULT_SPEED);
264         mutex          = SIMIX_mutex_init();
265         cond           = SIMIX_cond_init();
266         SIMIX_mutex_lock(mutex);
267         SIMIX_register_condition_to_action(compute_action, cond);
268         SIMIX_register_action_to_condition(compute_action, cond);
269         SIMIX_cond_wait(cond, mutex);
270         SIMIX_mutex_unlock(mutex);
271         SIMIX_mutex_destroy(mutex);
272         SIMIX_cond_destroy(cond);
273         // FIXME: check for success/failure?
274         return;
275 }
276
277 void smpi_barrier(smpi_mpi_communicator_t *comm) {
278         int i;
279         SIMIX_mutex_lock(comm->barrier_mutex);
280         comm->barrier++;
281         if(i < comm->size) {
282                 SIMIX_cond_wait(comm->barrier_cond, comm->barrier_mutex);
283         } else {
284                 comm->barrier = 0;
285                 SIMIX_cond_broadcast(comm->barrier_cond);
286         }
287         SIMIX_mutex_unlock(comm->barrier_mutex);
288 }
289
290 int smpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
291 {
292         int i;
293         for(i = 0; i < comm->size && host != comm->hosts[i]; i++);
294         if (i >= comm->size) i = -1;
295         return i;
296 }
297
298 // FIXME: move into own file
299 int smpi_gettimeofday(struct timeval *tv, struct timezone *tz)
300 {
301         double now;
302         int retval = 0;
303         smpi_bench_end();
304         if (NULL == tv) {
305                 retval = -1;
306         } else {
307                 now = SIMIX_get_clock();
308                 tv->tv_sec  = now;
309                 tv->tv_usec = ((now - (double)tv->tv_sec) * 1000000.0);
310         }
311         smpi_bench_begin();
312         return retval;
313 }
314
315 unsigned int smpi_sleep(unsigned int seconds)
316 {
317         smx_mutex_t mutex;
318         smx_cond_t cond;
319         smx_host_t host;
320         smx_action_t sleep_action;
321
322         smpi_bench_end();
323         host         = SIMIX_host_self();
324         sleep_action = SIMIX_action_sleep(host, seconds);
325         mutex        = SIMIX_mutex_init();
326         cond         = SIMIX_cond_init();
327         SIMIX_mutex_lock(mutex);
328         SIMIX_register_condition_to_action(sleep_action, cond);
329         SIMIX_register_action_to_condition(sleep_action, cond);
330         SIMIX_cond_wait(cond, mutex);
331         SIMIX_mutex_unlock(mutex);
332         SIMIX_mutex_destroy(mutex);
333         SIMIX_cond_destroy(cond);
334         // FIXME: check for success/failure?
335         smpi_bench_begin();
336         return 0;
337 }
338
339 void smpi_exit(int status)
340 {
341         smpi_bench_end();
342         SIMIX_mutex_lock(smpi_running_hosts_mutex);
343         smpi_running_hosts--;
344         SIMIX_mutex_unlock(smpi_running_hosts_mutex);
345         SIMIX_process_kill(SIMIX_process_self());
346         return;
347 }