Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
91733c2f1f64c044724ff81dd637bea59bbc765f
[simgrid.git] / src / smpi / src / smpi_base.c
1 #include <stdio.h>
2
3 #include <signal.h>
4 #include <sys/time.h>
5 #include "xbt/xbt_portability.h"
6 #include "simix/simix.h"
7 #include "simix/private.h"
8 #include "smpi.h"
9
10 // FIXME: move globals into structure...
11
12 xbt_mallocator_t smpi_request_mallocator    = NULL;
13 xbt_fifo_t *smpi_pending_send_requests      = NULL;
14 xbt_fifo_t *smpi_pending_recv_requests      = NULL;
15 xbt_fifo_t *smpi_received_messages          = NULL;
16
17 smx_process_t *smpi_sender_processes        = NULL;
18 smx_process_t *smpi_receiver_processes      = NULL;
19
20 int smpi_running_hosts = 0;
21
22 smpi_mpi_communicator_t smpi_mpi_comm_world;
23
24 smpi_mpi_status_t smpi_mpi_status_ignore;
25
26 smpi_mpi_datatype_t smpi_mpi_byte;
27 smpi_mpi_datatype_t smpi_mpi_int;
28 smpi_mpi_datatype_t smpi_mpi_double;
29
30 smpi_mpi_op_t smpi_mpi_land;
31 smpi_mpi_op_t smpi_mpi_sum;
32
33 static xbt_os_timer_t smpi_timer;
34 static int smpi_benchmarking;
35 static double smpi_reference_speed;
36
37 // mutexes
38 smx_mutex_t smpi_running_hosts_mutex = NULL;
39 smx_mutex_t smpi_benchmarking_mutex  = NULL;
40 smx_mutex_t init_mutex = NULL;
41 smx_cond_t init_cond  = NULL;
42
43 int smpi_root_ready = 0;
44 int smpi_ready_count = 0;
45
46 XBT_LOG_NEW_DEFAULT_CATEGORY(smpi, "SMPI");
47
48 int inline smpi_mpi_comm_size(smpi_mpi_communicator_t *comm) 
49 {
50         return comm->size;
51 }
52
53 // FIXME: smarter algorithm?
54 int smpi_mpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
55 {
56         int i;
57
58         for(i = comm->size - 1; i > 0 && host != comm->hosts[i]; i--);
59
60         return i;
61 }
62
63 int inline smpi_mpi_comm_rank_self(smpi_mpi_communicator_t *comm)
64 {
65         return smpi_mpi_comm_rank(comm, SIMIX_host_self());
66 }
67
68 int inline smpi_mpi_comm_world_rank_self()
69 {
70         return smpi_mpi_comm_rank(&smpi_mpi_comm_world, SIMIX_host_self())
71 }
72
73 // FIXME: messages are actually smaller than requests, use them instead?
74 int smpi_sender(int argc, char **argv)
75 {
76         smx_process_t self;
77         smx_host_t shost;
78         int rank;
79         xbt_fifo_t request_queue;
80         int size;
81         int running_hosts = 0;
82         smpi_mpi_request_t *request;
83         smx_host_t dhost;
84         smx_action_t communicate_action;
85         smpi_mpi_request_t *scratch;
86         int drank;
87         smx_process_t waitproc;
88
89         self  = SIMIX_process_self();
90         shost = SIMIX_host_self();
91         rank  = smpi_mpi_comm_rank(&smpi_mpi_comm_world, shost);
92
93         // make sure root is done before own initialization
94         SIMIX_mutex_lock(init_mutex);
95         if (!smpi_root_ready) {
96                 SIMIX_cond_wait(init_cond, init_mutex);
97         }
98         SIMIX_mutex_unlock(init_mutex);
99
100         request_queue = smpi_pending_send_requests[rank];
101         size = smpi_mpi_comm_size(&smpi_mpi_comm_world);
102         smpi_sender_processes[rank] = self;
103
104         // wait for all nodes to signal initializatin complete
105         SIMIX_mutex_lock(init_mutex);
106         smpi_ready_count++;
107         if (smpi_ready_count < 3 * size) {
108                 SIMIX_cond_wait(init_cond, init_mutex);
109         } else {
110                 SIMIX_cond_broadcast(init_cond);
111         }
112         SIMIX_mutex_unlock(init_mutex);
113
114         SIMIX_mutex_lock(smpi_running_hosts_mutex);
115         running_hosts = smpi_running_hosts;
116         SIMIX_mutex_unlock(smpi_running_hosts_mutex);
117
118         while (0 < running_hosts) {
119
120                 // FIXME: mutex?
121                 request = xbt_fifo_shift(request_queue);
122
123                 if (NULL == request) {
124                         SIMIX_process_suspend(self);
125                 } else {
126                         SIMIX_mutex_lock(request->mutex);
127
128                         dhost = request->comm->hosts[request->dst];
129
130                         // FIXME: not at all sure I can assume magic just happens here....
131                         communicate_action = SIMIX_action_communicate(shost, dhost,
132                                 "communication", request->datatype->size * request->count * 1.0, -1.0);
133
134                         SIMIX_register_condition_to_action(communicate_action, request->cond);
135                         SIMIX_register_action_to_condition(communicate_action, request->cond);
136
137                         SIMIX_cond_wait(request->cond, request->mutex);
138
139                         // copy request to appropriate received queue
140                         scratch = xbt_mallocator_get(smpi_request_mallocator);
141                         memcpy(scratch, request, sizeof smpi_mpi_request_t);
142                         drank = smpi_mpi_comm_rank(&smpi_mpi_comm_world, dhost);
143                         xbt_fifo_push(smpi_received_messages[drank], scratch);
144
145                         request->completed = 1;
146
147                         while(waitproc = xbt_fifo_shift(request->waitlist)) {
148                                 if (SIMIX_process_is_suspended(waitproc)) {
149                                         SIMIX_process_resume(waitproc);
150                                 }
151                         }
152
153                         SIMIX_mutex_unlock(request->mutex);
154                 }
155
156                 SIMIX_mutex_lock(smpi_running_hosts_mutex);
157                 running_hosts = smpi_running_hosts;
158                 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
159         }
160
161         return 0;
162 }
163
164 int smpi_receiver(int argc, char **argv)
165 {
166         smx_process_t self;
167         int rank;
168         xbt_fifo_t request_queue;
169         xbt_fifo_t message_queue;
170         int size;
171         int running_hosts;
172         smpi_mpi_request_t *message;
173         smpi_mpi_request_t *request;
174         smx_process_t waitproc;
175
176         self  = SIMIX_process_self();
177         rank  = smpi_mpi_comm_world_rank_self();
178
179         // make sure root is done before own initialization
180         SIMIX_mutex_lock(init_mutex);
181         if (!smpi_root_ready) {
182                 SIMIX_cond_wait(init_cond, init_mutex);
183         }
184         SIMIX_mutex_unlock(init_mutex);
185
186         request_queue = smpi_pending_receive_requests[rank];
187         message_queue = smpi_received_messages[rank];
188         size = smpi_mpi_comm_size(&smpi_mpi_comm_world);
189         smpi_receiver_processes[rank] = self;
190
191         // wait for all nodes to signal initializatin complete
192         SIMIX_mutex_lock(init_mutex);
193         smpi_ready_count++;
194         if (smpi_ready_count < 3 * size) {
195                 SIMIX_cond_wait(init_cond, init_mutex);
196         } else {
197                 SIMIX_cond_broadcast(init_cond);
198         }
199         SIMIX_mutex_unlock(init_mutex);
200
201         SIMIX_mutex_lock(smpi_running_hosts_mutex);
202         running_hosts = smpi_running_hosts;
203         SIMIX_mutex_unlock(smpi_running_hosts_mutex);
204
205         while (0 < running_hosts) {
206
207                 // FIXME: search for received messages and requests
208
209                 if (NULL == request) {
210                         SIMIX_process_suspend(self);
211                 } else {
212                         SIMIX_mutex_lock(request->mutex);
213                         memcpy(request->buf, message->buf, request->count * request->type->size);
214                         request->src = message->src;
215                         reqeust->completed = 1;
216
217                         while (waitproc = xbt_fifo_shift(request->waitlist)) {
218                                 if (SIMIX_process_is_suspended(waitproc)) {
219                                         SIMIX_process_resume(waitproc);
220                                 }
221                         }
222
223                         SIMIX_mutex_unlock(request->mutex);
224                         xbt_mallocator_release(smpi_request_mallocator, message);
225                 }
226
227                 SIMIX_mutex_lock(smpi_running_hosts_mutex);
228                 running_hosts = smpi_running_hosts;
229                 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
230         }
231
232         return 0;
233 }
234
235 int smpi_run_simulation(int argc, char **argv)
236 {
237         smx_cond_t   cond           = NULL;
238         smx_action_t action         = NULL;
239
240         xbt_fifo_t   actions_failed = xbt_fifo_new();
241         xbt_fifo_t   actions_done   = xbt_fifo_new();
242
243         srand(SMPI_RAND_SEED);
244
245         SIMIX_global_init(&argc, argv);
246
247         init_mutex = SIMIX_mutex_init();
248         init_cond  = SIMIX_cond_init();
249
250         SIMIX_function_register("smpi_simulated_main", smpi_simulated_main);
251         SIMIX_function_register("smpi_sender", smpi_sender);
252         SIMIX_function_register("smpi_receiver", smpi_receiver);
253         SIMIX_create_environment(argv[1]);
254         SIMIX_launch_application(argv[2]);
255
256         /* Prepare to display some more info when dying on Ctrl-C pressing */
257         //signal(SIGINT, inthandler);
258
259         /* Clean IO before the run */
260         fflush(stdout);
261         fflush(stderr);
262
263         while (SIMIX_solve(actions_done, actions_failed) != -1.0) {
264                 while (action = xbt_fifo_pop(actions_failed)) {
265                         DEBUG1("** %s failed **", action->name);
266                         while (cond = xbt_fifo_pop(action->cond_list)) {
267                                 SIMIX_cond_broadcast(cond);
268                         }
269                         SIMIX_action_destroy(action);
270                 }
271                 while (action = xbt_fifo_pop(actions_done)) {
272                         DEBUG1("** %s done **",action->name);
273                         while (cond = xbt_fifo_pop(action->cond_list)) {
274                                 SIMIX_cond_broadcast(cond);
275                         }
276                         SIMIX_action_destroy(action);
277                 }
278         }
279         xbt_fifo_free(actions_failed);
280         xbt_fifo_free(actions_done);
281         INFO1("simulation time %g", SIMIX_get_clock());
282         SIMIX_clean();
283         return 0;
284 }
285
286 void smpi_mpi_land_func(void *x, void *y, void *z)
287 {
288         *(int *)z = *(int *)x && *(int *)y;
289 }
290
291 void smpi_mpi_sum_func(void *x, void *y, void *z)
292 {
293         *(int *)z = *(int *)x + *(int *)y;
294 }
295
296 smpi_mpi_request_t *smpi_new_request()
297 {
298         return xbt_new(smpi_mpi_request_t, 1);
299 }
300
301 void smpi_mpi_init()
302 {
303         int i;
304         int size;
305         smx_process_t process;
306         smx_host_t *hosts;
307         smx_host_t host;
308         double duration;
309
310         // initialize some local variables
311         host  = SIMIX_host_self();
312         hosts = SIMIX_host_get_table();
313         size  = SIMIX_host_get_number();
314
315         // node 0 sets the globals
316         if (host == hosts[0]) {
317
318                 // processes
319                 smpi_sender_processes             = xbt_new(smx_process_t, size);
320                 smpi_receiver_processes           = xbt_new(smx_process_t, size);
321
322                 // running hosts
323                 smpi_running_hosts_mutex          = SIMIX_mutex_init();
324                 smpi_running_hosts                = size;
325
326                 // global communicator
327                 smpi_mpi_comm_world.size          = size;
328                 smpi_mpi_comm_world.barrier       = 0;
329                 smpi_mpi_comm_world.barrier_mutex = SIMIX_mutex_init();
330                 smpi_mpi_comm_world.barrier_cond  = SIMIX_cond_init();
331                 smpi_mpi_comm_world.hosts         = hosts;
332                 smpi_mpi_comm_world.processes     = xbt_new(smx_process_t, size);
333                 smpi_mpi_comm_world.processes[0]  = SIMIX_process_self();
334
335                 // mpi datatypes
336                 smpi_mpi_byte.size                = (size_t)1;
337                 smpi_mpi_int.size                 = sizeof(int);
338                 smpi_mpi_double.size              = sizeof(double);
339
340                 // mpi operations
341                 smpi_mpi_land.func                = &smpi_mpi_land_func;
342                 smpi_mpi_sum.func                 = &smpi_mpi_sum_func;
343
344                 // smpi globals
345                 smpi_request_mallocator           = xbt_mallocator_new(SMPI_REQUEST_MALLOCATOR_SIZE, smpi_new_request, xbt_free, NULL);
346                 smpi_pending_send_requests        = xbt_new(xbt_fifo_t, size);
347                 smpi_pending_recv_requests        = xbt_new(xbt_fifo_t, size);
348                 smpi_received_messages            = xbt_new(xbt_fifo_t, size);
349
350                 for(i = 0; i < size; i++) {
351                         smpi_pending_send_requests[i] = xbt_fifo_new();
352                         smpi_pending_recv_requests[i] = xbt_fifo_new();
353                         smpi_received_messages[i]     = xbt_fifo_new();
354                 }
355
356                 smpi_timer                      = xbt_os_timer_new();
357                 smpi_reference_speed            = SMPI_DEFAULT_SPEED;
358                 smpi_benchmarking               = 0;
359                 smpi_benchmarking_mutex         = SIMIX_mutex_init();
360
361                 // signal all nodes to perform initialization
362                 SIMIX_mutex_lock(init_mutex);
363                 smpi_root_ready = 1;
364                 SIMIX_cond_broadcast(init_cond);
365                 SIMIX_mutex_unlock(init_mutex);
366
367         } else {
368
369                 // make sure root is done before own initialization
370                 SIMIX_mutex_lock(init_mutex);
371                 if (!smpi_root_ready) {
372                         SIMIX_cond_wait(init_cond, init_mutex);
373                 }
374                 SIMIX_mutex_unlock(init_mutex);
375
376                 smpi_mpi_comm_world.processes[smpi_mpi_comm_rank_self(&smpi_mpi_comm_world)] = SIMIX_process_self();
377
378         }
379
380         // wait for all nodes to signal initializatin complete
381         SIMIX_mutex_lock(init_mutex);
382         smpi_ready_count++;
383         if (smpi_ready_count < 3 * size) {
384                 SIMIX_cond_wait(init_cond, init_mutex);
385         } else {
386                 SIMIX_cond_broadcast(init_cond);
387         }
388         SIMIX_mutex_unlock(init_mutex);
389
390 }
391
392 void smpi_mpi_finalize()
393 {
394         int i;
395
396         SIMIX_mutex_lock(smpi_running_hosts_mutex);
397         i = --smpi_running_hosts;
398         SIMIX_mutex_unlock(smpi_running_hosts_mutex);
399
400         if (0 >= i) {
401
402                 SIMIX_mutex_destroy(smpi_running_hosts_mutex);
403
404                 for (i = 0 ; i < smpi_mpi_comm_world.size; i++) {
405                         xbt_fifo_free(smpi_pending_send_requests[i]);
406                         xbt_fifo_free(smpi_pending_recv_requests[i]);
407                         xbt_fifo_free(smpi_received_messages[i]);
408                 }
409
410                 xbt_mallocator_free(smpi_request_mallocator);
411                 xbt_free(smpi_pending_send_requests);
412                 xbt_free(smpi_pending_recv_requests);
413                 xbt_free(smpi_received_messages);
414
415                 SIMIX_mutex_destroy(smpi_mpi_comm_world.barrier_mutex);
416                 SIMIX_cond_destroy(smpi_mpi_comm_world.barrier_cond);
417                 xbt_free(smpi_mpi_comm_world.processes);
418
419                 xbt_os_timer_free(smpi_timer);
420         }
421
422 }
423
424 void smpi_bench_begin()
425 {
426         xbt_assert0(!smpi_benchmarking, "Already benchmarking");
427         smpi_benchmarking = 1;
428         xbt_os_timer_start(smpi_timer);
429         return;
430 }
431
432 void smpi_bench_end()
433 {
434         double duration;
435         smx_host_t host;
436         smx_action_t compute_action;
437         smx_mutex_t mutex;
438         smx_cond_t cond;
439
440         xbt_assert0(smpi_benchmarking, "Not benchmarking yet");
441         smpi_benchmarking = 0;
442         xbt_os_timer_stop(smpi_timer);
443         duration = xbt_os_timer_elapsed(smpi_timer);
444         host           = SIMIX_host_self();
445         compute_action = SIMIX_action_execute(host, "computation", duration * SMPI_DEFAULT_SPEED);
446         mutex          = SIMIX_mutex_init();
447         cond           = SIMIX_cond_init();
448         SIMIX_mutex_lock(mutex);
449         SIMIX_register_condition_to_action(compute_action, cond);
450         SIMIX_register_action_to_condition(compute_action, cond);
451         SIMIX_cond_wait(cond, mutex);
452         SIMIX_mutex_unlock(mutex);
453         SIMIX_mutex_destroy(mutex);
454         SIMIX_cond_destroy(cond);
455         // FIXME: check for success/failure?
456         return;
457 }
458
459 void smpi_barrier(smpi_mpi_communicator_t *comm) {
460         int i;
461         SIMIX_mutex_lock(comm->barrier_mutex);
462         comm->barrier++;
463         if(i < comm->size) {
464                 SIMIX_cond_wait(comm->barrier_cond, comm->barrier_mutex);
465         } else {
466                 comm->barrier = 0;
467                 SIMIX_cond_broadcast(comm->barrier_cond);
468         }
469         SIMIX_mutex_unlock(comm->barrier_mutex);
470 }
471
472 int smpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
473 {
474         int i;
475         for(i = 0; i < comm->size && host != comm->hosts[i]; i++);
476         if (i >= comm->size) i = -1;
477         return i;
478 }
479
480 int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t *datatype,
481         int src, int dst, int tag, smpi_mpi_communicator_t *comm, smpi_mpi_request_t **request)
482 {
483         int retval = MPI_SUCCESS;
484
485         *request = NULL;
486
487         if (0 > count) {
488                 retval = MPI_ERR_COUNT;
489         } else if (NULL == buf) {
490                 retval = MPI_ERR_INTERN;
491         } else if (NULL == datatype) {
492                 retval = MPI_ERR_TYPE;
493         } else if (NULL == comm) {
494                 retval = MPI_ERR_COMM;
495         } else if (MPI_ANY_SOURCE != src && (0 > src || comm->size <= src)) {
496                 retval = MPI_ERR_RANK;
497         } else if (0 > dst || comm->size <= dst) {
498                 retval = MPI_ERR_RANK;
499         } else if (0 > tag) {
500                 retval = MPI_ERR_TAG;
501         } else {
502                 *request = xbt_mallocator_get(smpi_request_mallocator);
503                 (*request)->buf        = buf;
504                 (*request)->count      = count;
505                 (*request)->datatype   = datatype;
506                 (*request)->src        = src;
507                 (*request)->dst        = dst;
508                 (*request)->tag        = tag;
509                 (*request)->comm       = comm;
510                 (*request)->completed  = 0;
511                 (*request)->waitlist   = NULL;
512         }
513         return retval;
514 }
515
516 int smpi_isend(smpi_mpi_request_t *request)
517 {
518         int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
519
520         xbt_fifo_push(smpi_pending_send_requests[rank], request);
521
522         if (MSG_process_is_suspended(smpi_sender_processes[rank])) {
523                 MSG_process_resume(smpi_sender_processes[rank]);
524         }
525 }
526
527 int smpi_irecv(smpi_mpi_request_t *request)
528 {
529         int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
530
531         xbt_fifo_push(smpi_pending_recv_requests[rank], request);
532
533         if (MSG_process_is_suspended(smpi_receiver_processes[rank])) {
534                 MSG_process_resume(smpi_receiver_processes[rank]);
535         }
536 }
537
538 void smpi_wait(smpi_mpi_request_t *request, smpi_mpi_status_t *status)
539 {
540         smx_process_t self;
541         int suspend = 0;
542         self = SIMIX_process_self();
543
544         if (NULL != request) {
545                 SIMIX_mutex_lock(request->mutex);
546                 if (!request->completed) {
547                         xbt_fifo_push(request->waitlist, self);
548                         suspend = 1;
549                 }
550                 SIMIX_mutex_unlock(request->mutex);
551                 if (suspend) {
552                         SIMIX_suspend(self);
553                 }
554                 if (NULL != status && MPI_STATUS_IGNORE != status) {
555                         SIMIX_mutex_lock(request->mutex);
556                         status->MPI_SOURCE = request->src;
557                         SIMIX_mutex_unlock(request->mutex);
558                 }
559         }
560 }
561
562 // FIXME: move into own file
563 int smpi_gettimeofday(struct timeval *tv, struct timezone *tz)
564 {
565         double now;
566         int retval = 0;
567         smpi_bench_end();
568         if (NULL == tv) {
569                 retval = -1;
570         } else {
571                 now = SIMIX_get_clock();
572                 tv->tv_sec  = now;
573                 tv->tv_usec = ((now - (double)tv->tv_sec) * 1000000.0);
574         }
575         smpi_bench_begin();
576         return retval;
577 }
578
579 unsigned int smpi_sleep(unsigned int seconds)
580 {
581         smx_mutex_t mutex;
582         smx_cond_t cond;
583         smx_host_t host;
584         smx_action_t sleep_action;
585
586         smpi_bench_end();
587         host         = SIMIX_host_self();
588         sleep_action = SIMIX_action_sleep(host, seconds);
589         mutex        = SIMIX_mutex_init();
590         cond         = SIMIX_cond_init();
591         SIMIX_mutex_lock(mutex);
592         SIMIX_register_condition_to_action(sleep_action, cond);
593         SIMIX_register_action_to_condition(sleep_action, cond);
594         SIMIX_cond_wait(cond, mutex);
595         SIMIX_mutex_unlock(mutex);
596         SIMIX_mutex_destroy(mutex);
597         SIMIX_cond_destroy(cond);
598         // FIXME: check for success/failure?
599         smpi_bench_begin();
600         return 0;
601 }
602
603 void smpi_exit(int status)
604 {
605         smpi_bench_end();
606         SIMIX_mutex_lock(smpi_running_hosts_mutex);
607         smpi_running_hosts--;
608         SIMIX_mutex_unlock(smpi_running_hosts_mutex);
609         SIMIX_process_kill(SIMIX_process_self());
610         return;
611 }