Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
more work on smpi receiver. basically everything is in place except matching
[simgrid.git] / src / smpi / src / smpi_base.c
1 #include <stdio.h>
2
3 #include <signal.h>
4 #include <sys/time.h>
5 #include "xbt/xbt_portability.h"
6 #include "simix/simix.h"
7 #include "simix/private.h"
8 #include "smpi.h"
9
10 // FIXME: move globals into structure...
11
12 xbt_mallocator_t smpi_request_mallocator    = NULL;
13 xbt_fifo_t *smpi_pending_send_requests      = NULL;
14 xbt_fifo_t *smpi_pending_recv_requests      = NULL;
15 xbt_fifo_t *smpi_received_messages          = NULL;
16
17 smx_process_t *smpi_sender_processes        = NULL;
18 smx_process_t *smpi_receiver_processes      = NULL;
19
20 int smpi_running_hosts = 0;
21
22 smpi_mpi_communicator_t smpi_mpi_comm_world;
23
24 smpi_mpi_status_t smpi_mpi_status_ignore;
25
26 smpi_mpi_datatype_t smpi_mpi_byte;
27 smpi_mpi_datatype_t smpi_mpi_int;
28 smpi_mpi_datatype_t smpi_mpi_double;
29
30 smpi_mpi_op_t smpi_mpi_land;
31 smpi_mpi_op_t smpi_mpi_sum;
32
33 static xbt_os_timer_t smpi_timer;
34 static int smpi_benchmarking;
35 static double smpi_reference_speed;
36
37 // mutexes
38 smx_mutex_t smpi_running_hosts_mutex = NULL;
39 smx_mutex_t smpi_benchmarking_mutex  = NULL;
40 smx_mutex_t init_mutex = NULL;
41 smx_cond_t init_cond  = NULL;
42
43 int smpi_root_ready = 0;
44 int smpi_ready_count = 0;
45
46 XBT_LOG_NEW_DEFAULT_CATEGORY(smpi, "SMPI");
47
48 int inline smpi_mpi_comm_size(smpi_mpi_communicator_t *comm) 
49 {
50         return comm->size;
51 }
52
53 // FIXME: smarter algorithm?
54 int smpi_mpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
55 {
56         int i;
57
58         for(i = comm->size - 1; i > 0 && host != comm->hosts[i]; i--);
59
60         return i;
61 }
62
63 int inline smpi_mpi_comm_rank_self(smpi_mpi_communicator_t *comm)
64 {
65         return smpi_mpi_comm_rank(comm, SIMIX_host_self());
66 }
67
68 int smpi_sender(int argc, char **argv)
69 {
70         smx_process_t self;
71         smx_host_t shost;
72         int rank;
73         xbt_fifo_t request_queue;
74         int size;
75         int running_hosts = 0;
76         smpi_mpi_request_t *request;
77         smx_host_t dhost;
78         smx_action_t communicate_action;
79         smpi_mpi_request_t *scratch;
80         int drank;
81
82         self  = SIMIX_process_self();
83         shost = SIMIX_host_self();
84         rank  = smpi_mpi_comm_rank(&smpi_mpi_comm_world, shost);
85
86         // make sure root is done before own initialization
87         SIMIX_mutex_lock(init_mutex);
88         if (!smpi_root_ready) {
89                 SIMIX_cond_wait(init_cond, init_mutex);
90         }
91         SIMIX_mutex_unlock(init_mutex);
92
93         request_queue = smpi_pending_send_requests[rank];
94         size = smpi_mpi_comm_size(&smpi_mpi_comm_world);
95         smpi_sender_processes[rank] = self;
96
97         // wait for all nodes to signal initializatin complete
98         SIMIX_mutex_lock(init_mutex);
99         smpi_ready_count++;
100         if (smpi_ready_count < 3 * size) {
101                 SIMIX_cond_wait(init_cond, init_mutex);
102         } else {
103                 SIMIX_cond_broadcast(init_cond);
104         }
105         SIMIX_mutex_unlock(init_mutex);
106
107         SIMIX_mutex_lock(smpi_running_hosts_mutex);
108         running_hosts = smpi_running_hosts;
109         SIMIX_mutex_unlock(smpi_running_hosts_mutex);
110
111         while (0 < running_hosts) {
112
113                 request = xbt_fifo_shift(request_queue);
114
115                 if (NULL == request) {
116                         SIMIX_process_suspend(self);
117                 } else {
118                         SIMIX_mutex_lock(request->mutex);
119
120                         dhost = request->comm->hosts[request->dst];
121
122                         // FIXME: not at all sure I can assume magic just happens here....
123                         communicate_action = SIMIX_action_communicate(shost, dhost,
124                                 "communication", request->datatype->size * request->count * 1.0, -1.0);
125
126                         SIMIX_register_condition_to_action(communicate_action, request->cond);
127                         SIMIX_register_action_to_condition(communicate_action, request->cond);
128
129                         SIMIX_cond_wait(request->cond, request->mutex);
130
131                         // copy request to appropriate received queue
132                         scratch = xbt_mallocator_get(smpi_request_mallocator);
133                         memcpy(scratch, request, sizeof smpi_mpi_request_t);
134                         drank = smpi_mpi_comm_rank(MPI_COMM_WORLD, dhost);
135                         xbt_fifo_push(smpi_received_messages[drank], scratch);
136
137                         request->completed = 1;
138
139                         SIMIX_mutex_unlock(request->mutex);
140                 }
141
142                 SIMIX_mutex_lock(smpi_running_hosts_mutex);
143                 running_hosts = smpi_running_hosts;
144                 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
145         }
146
147         return 0;
148 }
149
150 int smpi_receiver(int argc, char **argv)
151 {
152         smx_process_t self;
153         int rank;
154         xbt_fifo_t request_queue;
155         xbt_fifo_t message_queue;
156         int size;
157         int running_hosts;
158         smpi_mpi_request_t *message;
159         smpi_mpi_request_t *request;
160         smx_process_t dproc;
161
162         self  = SIMIX_process_self();
163         rank  = smpi_mpi_comm_rank(&smpi_mpi_comm_world, dhost);
164
165         // make sure root is done before own initialization
166         SIMIX_mutex_lock(init_mutex);
167         if (!smpi_root_ready) {
168                 SIMIX_cond_wait(init_cond, init_mutex);
169         }
170         SIMIX_mutex_unlock(init_mutex);
171
172         request_queue = smpi_pending_receive_requests[rank];
173         message_queue = smpi_received_messages[rank];
174         size = smpi_mpi_comm_size(&smpi_mpi_comm_world);
175         smpi_receiver_processes[rank] = self;
176
177         // wait for all nodes to signal initializatin complete
178         SIMIX_mutex_lock(init_mutex);
179         smpi_ready_count++;
180         if (smpi_ready_count < 3 * size) {
181                 SIMIX_cond_wait(init_cond, init_mutex);
182         } else {
183                 SIMIX_cond_broadcast(init_cond);
184         }
185         SIMIX_mutex_unlock(init_mutex);
186
187         SIMIX_mutex_lock(smpi_running_hosts_mutex);
188         running_hosts = smpi_running_hosts;
189         SIMIX_mutex_unlock(smpi_running_hosts_mutex);
190
191         while (0 < running_hosts) {
192
193                 // FIXME: search for received messages and requests
194
195                 if (NULL == request) {
196                         SIMIX_process_suspend(self);
197                 } else {
198                         SIMIX_mutex_lock(request->mutex);
199                         memcpy(request->buf, message->buf, request->count * request->type->size);
200                         request->src = message->src;
201                         reqeust->completed = 1;
202
203                         while (dproc = xbt_fifo_shift(request->waitlist)) {
204                                 if (SIMIX_process_is_suspended(dproc)) {
205                                         SIMIX_process_resume(dproc);
206                                 }
207                         }
208
209                         SIMIX_mutex_unlock(request->mutex);
210                         xbt_mallocator_release(smpi_request_mallocator, message);
211                 }
212
213                 SIMIX_mutex_lock(smpi_running_hosts_mutex);
214                 running_hosts = smpi_running_hosts;
215                 SIMIX_mutex_unlock(smpi_running_hosts_mutex);
216         }
217
218         return 0;
219 }
220
221 int smpi_run_simulation(int argc, char **argv)
222 {
223         smx_cond_t   cond           = NULL;
224         smx_action_t action         = NULL;
225
226         xbt_fifo_t   actions_failed = xbt_fifo_new();
227         xbt_fifo_t   actions_done   = xbt_fifo_new();
228
229         srand(SMPI_RAND_SEED);
230
231         SIMIX_global_init(&argc, argv);
232
233         init_mutex = SIMIX_mutex_init();
234         init_cond  = SIMIX_cond_init();
235
236         SIMIX_function_register("smpi_simulated_main", smpi_simulated_main);
237         SIMIX_function_register("smpi_sender", smpi_sender);
238         SIMIX_function_register("smpi_receiver", smpi_receiver);
239         SIMIX_create_environment(argv[1]);
240         SIMIX_launch_application(argv[2]);
241
242         /* Prepare to display some more info when dying on Ctrl-C pressing */
243         //signal(SIGINT, inthandler);
244
245         /* Clean IO before the run */
246         fflush(stdout);
247         fflush(stderr);
248
249         while (SIMIX_solve(actions_done, actions_failed) != -1.0) {
250                 while (action = xbt_fifo_pop(actions_failed)) {
251                         DEBUG1("** %s failed **", action->name);
252                         while (cond = xbt_fifo_pop(action->cond_list)) {
253                                 SIMIX_cond_broadcast(cond);
254                         }
255                         SIMIX_action_destroy(action);
256                 }
257                 while (action = xbt_fifo_pop(actions_done)) {
258                         DEBUG1("** %s done **",action->name);
259                         while (cond = xbt_fifo_pop(action->cond_list)) {
260                                 SIMIX_cond_broadcast(cond);
261                         }
262                         SIMIX_action_destroy(action);
263                 }
264         }
265         xbt_fifo_free(actions_failed);
266         xbt_fifo_free(actions_done);
267         INFO1("simulation time %g", SIMIX_get_clock());
268         SIMIX_clean();
269         return 0;
270 }
271
272 void smpi_mpi_land_func(void *x, void *y, void *z)
273 {
274         *(int *)z = *(int *)x && *(int *)y;
275 }
276
277 void smpi_mpi_sum_func(void *x, void *y, void *z)
278 {
279         *(int *)z = *(int *)x + *(int *)y;
280 }
281
282 smpi_mpi_request_t *smpi_new_request()
283 {
284         return xbt_new(smpi_mpi_request_t, 1);
285 }
286
287 void smpi_mpi_init()
288 {
289         int i;
290         int size;
291         smx_process_t process;
292         smx_host_t *hosts;
293         smx_host_t host;
294         double duration;
295
296         // initialize some local variables
297         host  = SIMIX_host_self();
298         hosts = SIMIX_host_get_table();
299         size  = SIMIX_host_get_number();
300
301         // node 0 sets the globals
302         if (host == hosts[0]) {
303
304                 // processes
305                 smpi_sender_processes             = xbt_new(smx_process_t, size);
306                 smpi_receiver_processes           = xbt_new(smx_process_t, size);
307
308                 // running hosts
309                 smpi_running_hosts_mutex          = SIMIX_mutex_init();
310                 smpi_running_hosts                = size;
311
312                 // global communicator
313                 smpi_mpi_comm_world.size          = size;
314                 smpi_mpi_comm_world.barrier       = 0;
315                 smpi_mpi_comm_world.barrier_mutex = SIMIX_mutex_init();
316                 smpi_mpi_comm_world.barrier_cond  = SIMIX_cond_init();
317                 smpi_mpi_comm_world.hosts         = hosts;
318                 smpi_mpi_comm_world.processes     = xbt_new(smx_process_t, size);
319                 smpi_mpi_comm_world.processes[0]  = SIMIX_process_self();
320
321                 // mpi datatypes
322                 smpi_mpi_byte.size                = (size_t)1;
323                 smpi_mpi_int.size                 = sizeof(int);
324                 smpi_mpi_double.size              = sizeof(double);
325
326                 // mpi operations
327                 smpi_mpi_land.func                = &smpi_mpi_land_func;
328                 smpi_mpi_sum.func                 = &smpi_mpi_sum_func;
329
330                 // smpi globals
331                 smpi_request_mallocator           = xbt_mallocator_new(SMPI_REQUEST_MALLOCATOR_SIZE, smpi_new_request, xbt_free, NULL);
332                 smpi_pending_send_requests        = xbt_new(xbt_fifo_t, size);
333                 smpi_pending_recv_requests        = xbt_new(xbt_fifo_t, size);
334                 smpi_received_messages            = xbt_new(xbt_fifo_t, size);
335
336                 for(i = 0; i < size; i++) {
337                         smpi_pending_send_requests[i] = xbt_fifo_new();
338                         smpi_pending_recv_requests[i] = xbt_fifo_new();
339                         smpi_received_messages[i]     = xbt_fifo_new();
340                 }
341
342                 smpi_timer                      = xbt_os_timer_new();
343                 smpi_reference_speed            = SMPI_DEFAULT_SPEED;
344                 smpi_benchmarking               = 0;
345                 smpi_benchmarking_mutex         = SIMIX_mutex_init();
346
347                 // signal all nodes to perform initialization
348                 SIMIX_mutex_lock(init_mutex);
349                 smpi_root_ready = 1;
350                 SIMIX_cond_broadcast(init_cond);
351                 SIMIX_mutex_unlock(init_mutex);
352
353         } else {
354
355                 // make sure root is done before own initialization
356                 SIMIX_mutex_lock(init_mutex);
357                 if (!smpi_root_ready) {
358                         SIMIX_cond_wait(init_cond, init_mutex);
359                 }
360                 SIMIX_mutex_unlock(init_mutex);
361
362                 smpi_mpi_comm_world.processes[smpi_mpi_comm_rank_self(&smpi_mpi_comm_world)] = SIMIX_process_self();
363
364         }
365
366         // wait for all nodes to signal initializatin complete
367         SIMIX_mutex_lock(init_mutex);
368         smpi_ready_count++;
369         if (smpi_ready_count < 3 * size) {
370                 SIMIX_cond_wait(init_cond, init_mutex);
371         } else {
372                 SIMIX_cond_broadcast(init_cond);
373         }
374         SIMIX_mutex_unlock(init_mutex);
375
376 }
377
378 void smpi_mpi_finalize()
379 {
380         int i;
381
382         SIMIX_mutex_lock(smpi_running_hosts_mutex);
383         i = --smpi_running_hosts;
384         SIMIX_mutex_unlock(smpi_running_hosts_mutex);
385
386         if (0 >= i) {
387
388                 SIMIX_mutex_destroy(smpi_running_hosts_mutex);
389
390                 for (i = 0 ; i < smpi_mpi_comm_world.size; i++) {
391                         xbt_fifo_free(smpi_pending_send_requests[i]);
392                         xbt_fifo_free(smpi_pending_recv_requests[i]);
393                         xbt_fifo_free(smpi_received_messages[i]);
394                 }
395
396                 xbt_mallocator_free(smpi_request_mallocator);
397                 xbt_free(smpi_pending_send_requests);
398                 xbt_free(smpi_pending_recv_requests);
399                 xbt_free(smpi_received_messages);
400
401                 SIMIX_mutex_destroy(smpi_mpi_comm_world.barrier_mutex);
402                 SIMIX_cond_destroy(smpi_mpi_comm_world.barrier_cond);
403                 xbt_free(smpi_mpi_comm_world.processes);
404
405                 xbt_os_timer_free(smpi_timer);
406         }
407
408 }
409
410 void smpi_bench_begin()
411 {
412         xbt_assert0(!smpi_benchmarking, "Already benchmarking");
413         smpi_benchmarking = 1;
414         xbt_os_timer_start(smpi_timer);
415         return;
416 }
417
418 void smpi_bench_end()
419 {
420         double duration;
421         smx_host_t host;
422         smx_action_t compute_action;
423         smx_mutex_t mutex;
424         smx_cond_t cond;
425
426         xbt_assert0(smpi_benchmarking, "Not benchmarking yet");
427         smpi_benchmarking = 0;
428         xbt_os_timer_stop(smpi_timer);
429         duration = xbt_os_timer_elapsed(smpi_timer);
430         host           = SIMIX_host_self();
431         compute_action = SIMIX_action_execute(host, "computation", duration * SMPI_DEFAULT_SPEED);
432         mutex          = SIMIX_mutex_init();
433         cond           = SIMIX_cond_init();
434         SIMIX_mutex_lock(mutex);
435         SIMIX_register_condition_to_action(compute_action, cond);
436         SIMIX_register_action_to_condition(compute_action, cond);
437         SIMIX_cond_wait(cond, mutex);
438         SIMIX_mutex_unlock(mutex);
439         SIMIX_mutex_destroy(mutex);
440         SIMIX_cond_destroy(cond);
441         // FIXME: check for success/failure?
442         return;
443 }
444
445 void smpi_barrier(smpi_mpi_communicator_t *comm) {
446         int i;
447         SIMIX_mutex_lock(comm->barrier_mutex);
448         comm->barrier++;
449         if(i < comm->size) {
450                 SIMIX_cond_wait(comm->barrier_cond, comm->barrier_mutex);
451         } else {
452                 comm->barrier = 0;
453                 SIMIX_cond_broadcast(comm->barrier_cond);
454         }
455         SIMIX_mutex_unlock(comm->barrier_mutex);
456 }
457
458 int smpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
459 {
460         int i;
461         for(i = 0; i < comm->size && host != comm->hosts[i]; i++);
462         if (i >= comm->size) i = -1;
463         return i;
464 }
465
466 int smpi_create_request(void *buf, int count, smpi_mpi_datatype_t *datatype,
467         int src, int dst, int tag, smpi_mpi_communicator_t *comm, smpi_mpi_request_t **request)
468 {
469         int retval = MPI_SUCCESS;
470
471         *request = NULL;
472
473         if (0 > count) {
474                 retval = MPI_ERR_COUNT;
475         } else if (NULL == buf) {
476                 retval = MPI_ERR_INTERN;
477         } else if (NULL == datatype) {
478                 retval = MPI_ERR_TYPE;
479         } else if (NULL == comm) {
480                 retval = MPI_ERR_COMM;
481         } else if (MPI_ANY_SOURCE != src && (0 > src || comm->size <= src)) {
482                 retval = MPI_ERR_RANK;
483         } else if (0 > dst || comm->size <= dst) {
484                 retval = MPI_ERR_RANK;
485         } else if (0 > tag) {
486                 retval = MPI_ERR_TAG;
487         } else {
488                 *request = xbt_mallocator_get(smpi_request_mallocator);
489                 (*request)->buf        = buf;
490                 (*request)->count      = count;
491                 (*request)->datatype   = datatype;
492                 (*request)->src        = src;
493                 (*request)->dst        = dst;
494                 (*request)->tag        = tag;
495                 (*request)->comm       = comm;
496                 (*request)->completed  = 0;
497                 (*request)->waitlist   = NULL;
498         }
499         return retval;
500 }
501
502 int smpi_isend(smpi_mpi_request_t *request)
503 {
504         int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
505
506         xbt_fifo_push(smpi_pending_send_requests[rank], request);
507
508         if (MSG_process_is_suspended(smpi_sender_processes[rank])) {
509                 MSG_process_resume(smpi_sender_processes[rank]);
510         }
511 }
512
513 int smpi_irecv(smpi_mpi_request_t *request)
514 {
515         int rank = smpi_mpi_comm_rank_self(&smpi_mpi_comm_world);
516
517         xbt_fifo_push(smpi_pending_recv_requests[rank], request);
518
519         if (MSG_process_is_suspended(smpi_receiver_processes[rank])) {
520                 MSG_process_resume(smpi_receiver_processes[rank]);
521         }
522 }
523
524 void smpi_wait(smpi_mpi_request_t *request, smpi_mpi_status_t *status)
525 {
526         smx_process_t self;
527
528         if (NULL != request) {
529                 if (!request->completed) {
530                         self = SIMIX_process_self();
531                         xbt_fifo_push(request->waitlist, self);
532                 }       SIMIX_suspend(self);
533                 if (NULL != status && MPI_STATUS_IGNORE != status) {
534                         status->MPI_SOURCE = request->src;
535                 }
536         }
537 }
538
539 // FIXME: move into own file
540 int smpi_gettimeofday(struct timeval *tv, struct timezone *tz)
541 {
542         double now;
543         int retval = 0;
544         smpi_bench_end();
545         if (NULL == tv) {
546                 retval = -1;
547         } else {
548                 now = SIMIX_get_clock();
549                 tv->tv_sec  = now;
550                 tv->tv_usec = ((now - (double)tv->tv_sec) * 1000000.0);
551         }
552         smpi_bench_begin();
553         return retval;
554 }
555
556 unsigned int smpi_sleep(unsigned int seconds)
557 {
558         smx_mutex_t mutex;
559         smx_cond_t cond;
560         smx_host_t host;
561         smx_action_t sleep_action;
562
563         smpi_bench_end();
564         host         = SIMIX_host_self();
565         sleep_action = SIMIX_action_sleep(host, seconds);
566         mutex        = SIMIX_mutex_init();
567         cond         = SIMIX_cond_init();
568         SIMIX_mutex_lock(mutex);
569         SIMIX_register_condition_to_action(sleep_action, cond);
570         SIMIX_register_action_to_condition(sleep_action, cond);
571         SIMIX_cond_wait(cond, mutex);
572         SIMIX_mutex_unlock(mutex);
573         SIMIX_mutex_destroy(mutex);
574         SIMIX_cond_destroy(cond);
575         // FIXME: check for success/failure?
576         smpi_bench_begin();
577         return 0;
578 }
579
580 void smpi_exit(int status)
581 {
582         smpi_bench_end();
583         SIMIX_mutex_lock(smpi_running_hosts_mutex);
584         smpi_running_hosts--;
585         SIMIX_mutex_unlock(smpi_running_hosts_mutex);
586         SIMIX_process_kill(SIMIX_process_self());
587         return;
588 }