Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
49725caab347df3d8528c2290378efef0a3286aa
[simgrid.git] / src / smpi / src / smpi_base.c
1 #include <stdio.h>
2
3 #include <signal.h>
4 #include <sys/time.h>
5 #include "xbt/xbt_portability.h"
6 #include "simix/simix.h"
7 #include "simix/private.h"
8 #include "smpi.h"
9
10 // FIXME: move globals into structure...
11
12 xbt_mallocator_t smpi_request_mallocator    = NULL;
13 xbt_mallocator_t smpi_message_mallocator    = NULL;
14 xbt_fifo_t *smpi_pending_send_requests      = NULL;
15 xbt_fifo_t *smpi_pending_recv_requests      = NULL;
16 xbt_fifo_t *smpi_received_messages          = NULL;
17
18 smx_process_t *smpi_sender_processes        = NULL;
19 smx_process_t *smpi_receiver_processes      = NULL;
20
21 int smpi_running_hosts = 0;
22
23 smpi_mpi_communicator_t smpi_mpi_comm_world;
24
25 smpi_mpi_status_t smpi_mpi_status_ignore;
26
27 smpi_mpi_datatype_t smpi_mpi_byte;
28 smpi_mpi_datatype_t smpi_mpi_int;
29 smpi_mpi_datatype_t smpi_mpi_double;
30
31 smpi_mpi_op_t smpi_mpi_land;
32 smpi_mpi_op_t smpi_mpi_sum;
33
34 static xbt_os_timer_t smpi_timer;
35 static int smpi_benchmarking;
36 static double smpi_reference_speed;
37
38 // mutexes
39 smx_mutex_t smpi_running_hosts_mutex = NULL;
40 smx_mutex_t smpi_benchmarking_mutex  = NULL;
41 smx_mutex_t init_mutex = NULL;
42 smx_cond_t init_cond  = NULL;
43
44 int smpi_root_ready = 0;
45 int smpi_ready_count = 0;
46
47 XBT_LOG_NEW_DEFAULT_CATEGORY(smpi, "SMPI");
48
49 int inline smpi_mpi_comm_size(smpi_mpi_communicator_t *comm) 
50 {
51         return comm->size;
52 }
53
54 // FIXME: smarter algorithm?
55 int smpi_mpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
56 {
57         int i;
58
59         for(i = comm->size - 1; i > 0 && host != comm->hosts[i]; i--);
60
61         return i;
62 }
63
64 int inline smpi_mpi_comm_rank_self(smpi_mpi_communicator_t *comm)
65 {
66         return smpi_mpi_comm_rank(comm, SIMIX_host_self());
67 }
68
69 int inline smpi_mpi_comm_world_rank_self()
70 {
71         return smpi_mpi_comm_rank(&smpi_mpi_comm_world, SIMIX_host_self())
72 }
73
74 // FIXME: messages are actually smaller than requests, use them instead?
75 int smpi_sender(int argc, char **argv)
76 {
77         smx_process_t self;
78         smx_host_t shost;
79         int rank;
80         xbt_fifo_t request_queue;
81         int size;
82         int running_hosts = 0;
83         smpi_mpi_request_t *request;
84         smx_host_t dhost;
85         smx_action_t communicate_action;
86         smpi_mpi_request_t *scratch;
87         int drank;
88         smx_process_t waitproc;
89
90         self  = SIMIX_process_self();
91         shost = SIMIX_host_self();
92         rank  = smpi_mpi_comm_rank(&smpi_mpi_comm_world, shost);
93
94         // make sure root is done before own initialization
95         SIMIX_mutex_lock(init_mutex);
96         if (!smpi_root_ready) {
97                 SIMIX_cond_wait(init_cond, init_mutex);
98         }
99         SIMIX_mutex_unlock(init_mutex);
100
101         request_queue = smpi_pending_send_requests[rank];
102         size = smpi_mpi_comm_size(&smpi_mpi_comm_world);
103         smpi_sender_processes[rank] = self;
104
105         // wait for all nodes to signal initializatin complete
106         SIMIX_mutex_lock(init_mutex);
107         smpi_ready_count++;
108         if (smpi_ready_count < 3 * size) {
109                 SIMIX_cond_wait(init_cond, init_mutex);
110         } else {
111                 SIMIX_cond_broadcast(init_cond);
112         }
113         SIMIX_mutex_unlock(init_mutex);
114
115         SIMIX_mutex_lock(smpi_running_hosts_mutex);
116         running_hosts = smpi_running_hosts;
117         SIMIX_mutex_unlock(smpi_running_hosts_mutex);
118
119         while (0 < running_hosts) {
120
121                 // FIXME: mutex?
122                 request = xbt_fifo_shift(request_queue);
123
124                 if (NULL == request) {
125                         SIMIX_process_suspend(self);
126                 } else {
127                         SIMIX_mutex_lock(request->mutex);
128
129                         dhost = request->comm->hosts[request->dst];
130
131                         // FIXME: not at all sure I can assume magic just happens here....
132                         communicate_action = SIMIX_action_communicate(shost, dhost,
133                                 "communication", request->datatype->size * request->count * 1.0, -1.0);
134
135                         SIMIX_register_condition_to_action(communicate_action, request->cond);
136                         SIMIX_register_action_to_condition(communicate_action, request->cond);
137
138                         SIMIX_cond_wait(request->cond, request->mutex);
139
140                         // copy request to appropriate received queue
141                         scratch = xbt_mallocator_get(smpi_message_mallocator);
142                         scratch->comm = request->comm;
143                         scratch->src  = request->src;
144                         scratch->dst  = request->dst;
145                         scratch->tag  = request->tag;
146                         scratch->buf  = request->buf;
147                         drank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, dhost);
148                         xbt_fifo_push(smpi_received_messages[drank], scratch);
149
150                         request->completed = 1;
151
152                         while(waitproc = xbt_fifo_shift(request->waitlist)) {
153                                 if (SIMIX_process_is_suspended(waitproc)) {
154                                         SIMIX_process_resume(waitproc);
155                                 }
156                         }
157
158                         SIMIX_mutex_unlock(request->mutex);
159                 }
160
161                 SIMIX_mutex_lock(smpi_running_hosts_mutex);
162                 running_hosts = smpi_running_hosts;
163                 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
164         }
165
166         return 0;
167 }
168
169 int smpi_receiver(int argc, char **argv)
170 {
171         smx_process_t self;
172         int rank;
173         xbt_fifo_t request_queue;
174         xbt_fifo_t message_queue;
175         int size;
176         int running_hosts;
177         smpi_mpi_request_t *message;
178         smpi_mpi_request_t *request;
179         smx_process_t waitproc;
180
181         self  = SIMIX_process_self();
182         rank  = smpi_mpi_comm_world_rank_self();
183
184         // make sure root is done before own initialization
185         SIMIX_mutex_lock(init_mutex);
186         if (!smpi_root_ready) {
187                 SIMIX_cond_wait(init_cond, init_mutex);
188         }
189         SIMIX_mutex_unlock(init_mutex);
190
191         request_queue = smpi_pending_receive_requests[rank];
192         message_queue = smpi_received_messages[rank];
193         size = smpi_mpi_comm_size(&smpi_mpi_comm_world);
194         smpi_receiver_processes[rank] = self;
195
196         // wait for all nodes to signal initializatin complete
197         SIMIX_mutex_lock(init_mutex);
198         smpi_ready_count++;
199         if (smpi_ready_count < 3 * size) {
200                 SIMIX_cond_wait(init_cond, init_mutex);
201         } else {
202                 SIMIX_cond_broadcast(init_cond);
203         }
204         SIMIX_mutex_unlock(init_mutex);
205
206         SIMIX_mutex_lock(smpi_running_hosts_mutex);
207         running_hosts = smpi_running_hosts;
208         SIMIX_mutex_unlock(smpi_running_hosts_mutex);
209
210         while (0 < running_hosts) {
211
212                 // FIXME: search for received messages and requests
213                 // use stupid algorithm for now
214
215                 if (NULL == request) {
216                         SIMIX_process_suspend(self);
217                 } else {
218                         SIMIX_mutex_lock(request->mutex);
219                         memcpy(request->buf, message->buf, request->count * request->type->size);
220                         request->src = message->src;
221                         reqeust->completed = 1;
222
223                         while (waitproc = xbt_fifo_shift(request->waitlist)) {
224                                 if (SIMIX_process_is_suspended(waitproc)) {
225                                         SIMIX_process_resume(waitproc);
226                                 }
227                         }
228
229                         SIMIX_mutex_unlock(request->mutex);
230                         xbt_mallocator_release(smpi_message_mallocator, message);
231                 }
232
233                 SIMIX_mutex_lock(smpi_running_hosts_mutex);
234                 running_hosts = smpi_running_hosts;
235                 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
236         }
237
238         return 0;
239 }
240
241 int smpi_run_simulation(int argc, char **argv)
242 {
243         smx_cond_t   cond           = NULL;
244         smx_action_t action         = NULL;
245
246         xbt_fifo_t   actions_failed = xbt_fifo_new();
247         xbt_fifo_t   actions_done   = xbt_fifo_new();
248
249         srand(SMPI_RAND_SEED);
250
251         SIMIX_global_init(&argc, argv);
252
253         init_mutex = SIMIX_mutex_init();
254         init_cond  = SIMIX_cond_init();
255
256         SIMIX_function_register("smpi_simulated_main", smpi_simulated_main);
257         SIMIX_function_register("smpi_sender", smpi_sender);
258         SIMIX_function_register("smpi_receiver", smpi_receiver);
259         SIMIX_create_environment(argv[1]);
260         SIMIX_launch_application(argv[2]);
261
262         /* Prepare to display some more info when dying on Ctrl-C pressing */
263         //signal(SIGINT, inthandler);
264
265         /* Clean IO before the run */
266         fflush(stdout);
267         fflush(stderr);
268
269         while (SIMIX_solve(actions_done, actions_failed) != -1.0) {
270                 while (action = xbt_fifo_pop(actions_failed)) {
271                         DEBUG1("** %s failed **", action->name);
272                         while (cond = xbt_fifo_pop(action->cond_list)) {
273                                 SIMIX_cond_broadcast(cond);
274                         }
275                         SIMIX_action_destroy(action);
276                 }
277                 while (action = xbt_fifo_pop(actions_done)) {
278                         DEBUG1("** %s done **",action->name);
279                         while (cond = xbt_fifo_pop(action->cond_list)) {
280                                 SIMIX_cond_broadcast(cond);
281                         }
282                         SIMIX_action_destroy(action);
283                 }
284         }
285         xbt_fifo_free(actions_failed);
286         xbt_fifo_free(actions_done);
287         INFO1("simulation time %g", SIMIX_get_clock());
288         SIMIX_clean();
289         return 0;
290 }
291
292 void smpi_mpi_land_func(void *x, void *y, void *z)
293 {
294         *(int *)z = *(int *)x && *(int *)y;
295 }
296
297 void smpi_mpi_sum_func(void *x, void *y, void *z)
298 {
299         *(int *)z = *(int *)x + *(int *)y;
300 }
301
302 smpi_mpi_request_t *smpi_new_request()
303 {
304         return xbt_new(smpi_mpi_request_t, 1);
305 }
306
307 void smpi_mpi_init()
308 {
309         int i;
310         int size;
311         smx_process_t process;
312         smx_host_t *hosts;
313         smx_host_t host;
314         double duration;
315
316         // initialize some local variables
317         host  = SIMIX_host_self();
318         hosts = SIMIX_host_get_table();
319         size  = SIMIX_host_get_number();
320
321         // node 0 sets the globals
322         if (host == hosts[0]) {
323
324                 // processes
325                 smpi_sender_processes             = xbt_new(smx_process_t, size);
326                 smpi_receiver_processes           = xbt_new(smx_process_t, size);
327
328                 // running hosts
329                 smpi_running_hosts_mutex          = SIMIX_mutex_init();
330                 smpi_running_hosts                = size;
331
332                 // global communicator
333                 smpi_mpi_comm_world.size          = size;
334                 smpi_mpi_comm_world.barrier       = 0;
335                 smpi_mpi_comm_world.barrier_mutex = SIMIX_mutex_init();
336                 smpi_mpi_comm_world.barrier_cond  = SIMIX_cond_init();
337                 smpi_mpi_comm_world.hosts         = hosts;
338                 smpi_mpi_comm_world.processes     = xbt_new(smx_process_t, size);
339                 smpi_mpi_comm_world.processes[0]  = SIMIX_process_self();
340
341                 // mpi datatypes
342                 smpi_mpi_byte.size                = (size_t)1;
343                 smpi_mpi_int.size                 = sizeof(int);
344                 smpi_mpi_double.size              = sizeof(double);
345
346                 // mpi operations
347                 smpi_mpi_land.func                = &smpi_mpi_land_func;
348                 smpi_mpi_sum.func                 = &smpi_mpi_sum_func;
349
350                 // smpi globals
351                 smpi_request_mallocator           = xbt_mallocator_new(SMPI_REQUEST_MALLOCATOR_SIZE, smpi_new_request, xbt_free, NULL);
352                 smpi_message_mallocator           = xbt_mallocator_new(SMPI_MESSAGE_MALLOCATOR_SIZE, smpi_new_message, xbt_free, NULL);
353                 smpi_pending_send_requests        = xbt_new(xbt_fifo_t, size);
354                 smpi_pending_recv_requests        = xbt_new(xbt_fifo_t, size);
355                 smpi_received_messages            = xbt_new(xbt_fifo_t, size);
356
357                 for(i = 0; i < size; i++) {
358                         smpi_pending_send_requests[i] = xbt_fifo_new();
359                         smpi_pending_recv_requests[i] = xbt_fifo_new();
360                         smpi_received_messages[i]     = xbt_fifo_new();
361                 }
362
363                 smpi_timer                      = xbt_os_timer_new();
364                 smpi_reference_speed            = SMPI_DEFAULT_SPEED;
365                 smpi_benchmarking               = 0;
366                 smpi_benchmarking_mutex         = SIMIX_mutex_init();
367
368                 // signal all nodes to perform initialization
369                 SIMIX_mutex_lock(init_mutex);
370                 smpi_root_ready = 1;
371                 SIMIX_cond_broadcast(init_cond);
372                 SIMIX_mutex_unlock(init_mutex);
373
374         } else {
375
376                 // make sure root is done before own initialization
377                 SIMIX_mutex_lock(init_mutex);
378                 if (!smpi_root_ready) {
379                         SIMIX_cond_wait(init_cond, init_mutex);
380                 }
381                 SIMIX_mutex_unlock(init_mutex);
382
383                 smpi_mpi_comm_world.processes[smpi_mpi_comm_rank_self(&smpi_mpi_comm_world)] = SIMIX_process_self();
384
385         }
386
387         // wait for all nodes to signal initializatin complete
388         SIMIX_mutex_lock(init_mutex);
389         smpi_ready_count++;
390         if (smpi_ready_count < 3 * size) {
391                 SIMIX_cond_wait(init_cond, init_mutex);
392         } else {
393                 SIMIX_cond_broadcast(init_cond);
394         }
395         SIMIX_mutex_unlock(init_mutex);
396
397 }
398
399 void smpi_mpi_finalize()
400 {
401         int i;
402
403         SIMIX_mutex_lock(smpi_running_hosts_mutex);
404         i = --smpi_running_hosts;
405         SIMIX_mutex_unlock(smpi_running_hosts_mutex);
406
407         if (0 >= i) {
408
409                 SIMIX_mutex_destroy(smpi_running_hosts_mutex);
410
411                 for (i = 0 ; i < smpi_mpi_comm_world.size; i++) {
412                         xbt_fifo_free(smpi_pending_send_requests[i]);
413                         xbt_fifo_free(smpi_pending_recv_requests[i]);
414                         xbt_fifo_free(smpi_received_messages[i]);
415                 }
416
417                 xbt_mallocator_free(smpi_request_mallocator);
418                 xbt_mallocator_free(smpi_message_mallocator);
419                 xbt_free(smpi_pending_send_requests);
420                 xbt_free(smpi_pending_recv_requests);
421                 xbt_free(smpi_received_messages);
422
423                 SIMIX_mutex_destroy(smpi_mpi_comm_world.barrier_mutex);
424                 SIMIX_cond_destroy(smpi_mpi_comm_world.barrier_cond);
425                 xbt_free(smpi_mpi_comm_world.processes);
426
427                 xbt_os_timer_free(smpi_timer);
428         }
429
430 }
431
432 void smpi_bench_begin()
433 {
434         xbt_assert0(!smpi_benchmarking, "Already benchmarking");
435         smpi_benchmarking = 1;
436         xbt_os_timer_start(smpi_timer);
437         return;
438 }
439
440 void smpi_bench_end()
441 {
442         double duration;
443         smx_host_t host;
444         smx_action_t compute_action;
445         smx_mutex_t mutex;
446         smx_cond_t cond;
447
448         xbt_assert0(smpi_benchmarking, "Not benchmarking yet");
449         smpi_benchmarking = 0;
450         xbt_os_timer_stop(smpi_timer);
451         duration = xbt_os_timer_elapsed(smpi_timer);
452         host           = SIMIX_host_self();
453         compute_action = SIMIX_action_execute(host, "computation", duration * SMPI_DEFAULT_SPEED);
454         mutex          = SIMIX_mutex_init();
455         cond           = SIMIX_cond_init();
456         SIMIX_mutex_lock(mutex);
457         SIMIX_register_condition_to_action(compute_action, cond);
458         SIMIX_register_action_to_condition(compute_action, cond);
459         SIMIX_cond_wait(cond, mutex);
460         SIMIX_mutex_unlock(mutex);
461         SIMIX_mutex_destroy(mutex);
462         SIMIX_cond_destroy(cond);
463         // FIXME: check for success/failure?
464         return;
465 }
466
467 void smpi_barrier(smpi_mpi_communicator_t *comm) {
468         int i;
469         SIMIX_mutex_lock(comm->barrier_mutex);
470         comm->barrier++;
471         if(i < comm->size) {
472                 SIMIX_cond_wait(comm->barrier_cond, comm->barrier_mutex);
473         } else {
474                 comm->barrier = 0;
475                 SIMIX_cond_broadcast(comm->barrier_cond);
476         }
477         SIMIX_mutex_unlock(comm->barrier_mutex);
478 }
479
480 int smpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
481 {
482         int i;
483         for(i = 0; i < comm->size && host != comm->hosts[i]; i++);
484         if (i >= comm->size) i = -1;
485         return i;
486 }
487
488 int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t *datatype,
489         int src, int dst, int tag, smpi_mpi_communicator_t *comm, smpi_mpi_request_t **request)
490 {
491         int retval = MPI_SUCCESS;
492
493         *request = NULL;
494
495         if (0 > count) {
496                 retval = MPI_ERR_COUNT;
497         } else if (NULL == buf) {
498                 retval = MPI_ERR_INTERN;
499         } else if (NULL == datatype) {
500                 retval = MPI_ERR_TYPE;
501         } else if (NULL == comm) {
502                 retval = MPI_ERR_COMM;
503         } else if (MPI_ANY_SOURCE != src && (0 > src || comm->size <= src)) {
504                 retval = MPI_ERR_RANK;
505         } else if (0 > dst || comm->size <= dst) {
506                 retval = MPI_ERR_RANK;
507         } else if (0 > tag) {
508                 retval = MPI_ERR_TAG;
509         } else {
510                 *request = xbt_mallocator_get(smpi_request_mallocator);
511                 (*request)->buf        = buf;
512                 (*request)->count      = count;
513                 (*request)->datatype   = datatype;
514                 (*request)->src        = src;
515                 (*request)->dst        = dst;
516                 (*request)->tag        = tag;
517                 (*request)->comm       = comm;
518                 (*request)->completed  = 0;
519                 (*request)->waitlist   = NULL;
520         }
521         return retval;
522 }
523
524 int smpi_isend(smpi_mpi_request_t *request)
525 {
526         int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
527
528         xbt_fifo_push(smpi_pending_send_requests[rank], request);
529
530         if (MSG_process_is_suspended(smpi_sender_processes[rank])) {
531                 MSG_process_resume(smpi_sender_processes[rank]);
532         }
533 }
534
535 int smpi_irecv(smpi_mpi_request_t *request)
536 {
537         int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
538
539         xbt_fifo_push(smpi_pending_recv_requests[rank], request);
540
541         if (MSG_process_is_suspended(smpi_receiver_processes[rank])) {
542                 MSG_process_resume(smpi_receiver_processes[rank]);
543         }
544 }
545
546 void smpi_wait(smpi_mpi_request_t *request, smpi_mpi_status_t *status)
547 {
548         smx_process_t self;
549         int suspend = 0;
550         self = SIMIX_process_self();
551
552         if (NULL != request) {
553                 SIMIX_mutex_lock(request->mutex);
554                 if (!request->completed) {
555                         xbt_fifo_push(request->waitlist, self);
556                         suspend = 1;
557                 }
558                 SIMIX_mutex_unlock(request->mutex);
559                 if (suspend) {
560                         SIMIX_suspend(self);
561                 }
562                 if (NULL != status && MPI_STATUS_IGNORE != status) {
563                         SIMIX_mutex_lock(request->mutex);
564                         status->MPI_SOURCE = request->src;
565                         SIMIX_mutex_unlock(request->mutex);
566                 }
567         }
568 }
569
570 // FIXME: move into own file
571 int smpi_gettimeofday(struct timeval *tv, struct timezone *tz)
572 {
573         double now;
574         int retval = 0;
575         smpi_bench_end();
576         if (NULL == tv) {
577                 retval = -1;
578         } else {
579                 now = SIMIX_get_clock();
580                 tv->tv_sec  = now;
581                 tv->tv_usec = ((now - (double)tv->tv_sec) * 1000000.0);
582         }
583         smpi_bench_begin();
584         return retval;
585 }
586
587 unsigned int smpi_sleep(unsigned int seconds)
588 {
589         smx_mutex_t mutex;
590         smx_cond_t cond;
591         smx_host_t host;
592         smx_action_t sleep_action;
593
594         smpi_bench_end();
595         host         = SIMIX_host_self();
596         sleep_action = SIMIX_action_sleep(host, seconds);
597         mutex        = SIMIX_mutex_init();
598         cond         = SIMIX_cond_init();
599         SIMIX_mutex_lock(mutex);
600         SIMIX_register_condition_to_action(sleep_action, cond);
601         SIMIX_register_action_to_condition(sleep_action, cond);
602         SIMIX_cond_wait(cond, mutex);
603         SIMIX_mutex_unlock(mutex);
604         SIMIX_mutex_destroy(mutex);
605         SIMIX_cond_destroy(cond);
606         // FIXME: check for success/failure?
607         smpi_bench_begin();
608         return 0;
609 }
610
611 void smpi_exit(int status)
612 {
613         smpi_bench_end();
614         SIMIX_mutex_lock(smpi_running_hosts_mutex);
615         smpi_running_hosts--;
616         SIMIX_mutex_unlock(smpi_running_hosts_mutex);
617         SIMIX_process_kill(SIMIX_process_self());
618         return;
619 }