Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
First try at instrumenting the action replay tool.
[simgrid.git] / examples / msg / actions / actions.c
1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include "msg/msg.h"            /* Yeah! If you want to use msg, you need to include msg/msg.h */
10 #include "simix/simix.h"        /* semaphores for the barrier */
11 #include "xbt.h"                /* calloc, printf */
12 #include "simgrid_config.h"     /* getline */
13 #include "instr/private.h"
14
15 XBT_LOG_NEW_DEFAULT_CATEGORY(actions,
16                              "Messages specific for this msg example");
17 int communicator_size = 0;
18
19 typedef struct coll_ctr_t {
20   int last_Irecv_sender_id;
21   int bcast_counter;
22   int reduce_counter;
23   int allReduce_counter;
24 } *coll_ctr;
25
26 /* Helper function */
27 static double parse_double(const char *string)
28 {
29   double value;
30   char *endptr;
31
32   value = strtod(string, &endptr);
33   if (*endptr != '\0')
34     THROW1(unknown_error, 0, "%s is not a double", string);
35   return value;
36 }
37
38 static int get_rank (const char *process_name)
39 {
40   return atoi(&(process_name[1]));
41
42
43 /* My actions */
44 static void action_send(xbt_dynar_t action)
45 {
46   char *name = NULL;
47   char to[250];
48   char *size = xbt_dynar_get_as(action, 3, char *);
49   double clock = MSG_get_clock();
50  
51   sprintf(to, "%s_%s", MSG_process_get_name(MSG_process_self()),
52           xbt_dynar_get_as(action, 2, char *));
53   //  char *to =  xbt_dynar_get_as(action, 2, char *);
54
55   if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
56     name = xbt_str_join(action, " ");
57
58 #ifdef HAVE_TRACING
59   int rank = get_rank(MSG_process_get_name(MSG_process_self()));
60   int dst_traced = get_rank(xbt_dynar_get_as(action, 2, char *));
61   TRACE_smpi_ptp_in(rank, rank, dst_traced, "send");
62   TRACE_smpi_send(rank, rank, dst_traced);
63 #endif
64
65   DEBUG2("Entering Send: %s (size: %lg)", name, parse_double(size));
66   MSG_task_send(MSG_task_create(name, 0, parse_double(size), NULL), to);
67   VERB2("%s %f", name, MSG_get_clock() - clock);
68
69   if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
70     free(name);
71
72 #ifdef HAVE_TRACING
73   TRACE_smpi_ptp_out(rank, rank, dst_traced, "send");
74 #endif  
75 }
76
77
78 static int spawned_send(int argc, char *argv[])
79 {
80   DEBUG3("%s: Sending %s on %s", MSG_process_get_name(MSG_process_self()),
81          argv[1], argv[0]);
82   MSG_task_send(MSG_task_create(argv[0], 0, parse_double(argv[1]), NULL),
83                 argv[0]);
84   return 0;
85 }
86
87 static void Isend(xbt_dynar_t action)
88 {
89   char spawn_name[80];
90   char to[250];
91   //  char *to = xbt_dynar_get_as(action, 2, char *);
92   char *size = xbt_dynar_get_as(action, 3, char *);
93   char **myargv;
94   m_process_t comm_helper;
95   double clock = MSG_get_clock();
96   DEBUG1("Isend on %s: spawn process ",
97          MSG_process_get_name(MSG_process_self()));
98
99   sprintf(to, "%s_%s", MSG_process_get_name(MSG_process_self()),
100           xbt_dynar_get_as(action, 2, char *));
101   myargv = (char **) calloc(3, sizeof(char *));
102
103   myargv[0] = xbt_strdup(to);
104   myargv[1] = xbt_strdup(size);
105   myargv[2] = NULL;
106
107   //    sprintf(spawn_name,"%s_wait",MSG_process_get_name(MSG_process_self()));
108   sprintf(spawn_name, "%s_wait", to);
109   comm_helper =
110       MSG_process_create_with_arguments(spawn_name, spawned_send,
111                                         NULL, MSG_host_self(), 2, myargv);
112   VERB2("%s %f", xbt_str_join(action, " "), MSG_get_clock() - clock);
113 }
114
115
116 static void action_recv(xbt_dynar_t action)
117 {
118   char *name = NULL;
119   char mailbox_name[250];
120   m_task_t task = NULL;
121   double clock = MSG_get_clock();
122   //FIXME: argument of action ignored so far; semantic not clear
123   //char *from=xbt_dynar_get_as(action,2,char*);
124   sprintf(mailbox_name, "%s_%s", xbt_dynar_get_as(action, 2, char *),
125           MSG_process_get_name(MSG_process_self()));
126
127   if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
128     name = xbt_str_join(action, " ");
129
130 #ifdef HAVE_TRACING
131   int rank = get_rank(MSG_process_get_name(MSG_process_self()));
132   int src_traced = get_rank(xbt_dynar_get_as(action, 2, char *));
133   TRACE_smpi_ptp_in(rank, src_traced, rank, "recv");
134 #endif
135
136   DEBUG1("Receiving: %s", name);
137   MSG_task_receive(&task, mailbox_name);
138   //  MSG_task_receive(&task, MSG_process_get_name(MSG_process_self()));
139   VERB2("%s %f", name, MSG_get_clock() - clock);
140   MSG_task_destroy(task);
141
142   if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
143     free(name);
144 #ifdef HAVE_TRACING
145   TRACE_smpi_ptp_out(rank, src_traced, rank, "recv");
146   TRACE_smpi_recv(rank, src_traced, rank);
147 #endif
148
149 }
150
151 static int spawned_recv(int argc, char *argv[])
152 {
153   m_task_t task = NULL;
154   DEBUG1("Receiving on %s", argv[0]);
155   MSG_task_receive(&task, argv[0]);
156   DEBUG1("Received %s", MSG_task_get_name(task));
157   DEBUG1("waiter on %s", MSG_process_get_name(MSG_process_self()));
158   MSG_task_send(MSG_task_create("waiter", 0, 0, NULL),
159                 MSG_process_get_name(MSG_process_self()));
160
161   MSG_task_destroy(task);
162   return 0;
163 }
164
165
166 static void Irecv(xbt_dynar_t action)
167 {
168   char *name;
169   m_process_t comm_helper;
170   char mailbox_name[250];
171   char **myargv;
172   double clock = MSG_get_clock();
173
174   DEBUG1("Irecv on %s: spawn process ",
175          MSG_process_get_name(MSG_process_self()));
176 #ifdef HAVE_TRACING
177   int rank = get_rank(MSG_process_get_name(MSG_process_self()));
178   int src_traced = get_rank(xbt_dynar_get_as(action, 2, char *));
179   coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
180   if (!counters) {
181       DEBUG0("Initialize the counters");
182       counters = (coll_ctr) calloc(1, sizeof(struct coll_ctr_t));
183     }
184   counters->last_Irecv_sender_id = src_traced;
185   MSG_process_set_data(MSG_process_self(), (void *) counters);
186
187   TRACE_smpi_ptp_in(rank, src_traced, rank, "Irecv");
188 #endif
189
190   sprintf(mailbox_name, "%s_%s", xbt_dynar_get_as(action, 2, char *),
191           MSG_process_get_name(MSG_process_self()));
192   name = bprintf("%s_wait", MSG_process_get_name(MSG_process_self()));
193   myargv = (char **) calloc(2, sizeof(char *));
194
195   myargv[0] = xbt_strdup(mailbox_name);
196   myargv[1] = NULL;
197   comm_helper = MSG_process_create_with_arguments(name, spawned_recv,
198                                                   NULL, MSG_host_self(),
199                                                   1, myargv);
200
201   VERB2("%s %f", xbt_str_join(action, " "), MSG_get_clock() - clock);
202
203   free(name);
204 #ifdef HAVE_TRACING
205   TRACE_smpi_ptp_out(rank, src_traced, rank, "Irecv");
206 #endif
207
208 }
209
210
211 static void action_wait(xbt_dynar_t action)
212 {
213   char *name = NULL;
214   char task_name[80];
215   m_task_t task = NULL;
216   double clock = MSG_get_clock();
217
218   if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
219     name = xbt_str_join(action, " ");
220 #ifdef HAVE_TRACING
221   coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
222   int src_traced = counters->last_Irecv_sender_id;
223   int rank = get_rank(MSG_process_get_name(MSG_process_self()));
224   TRACE_smpi_ptp_in(rank, src_traced, rank, "wait");
225 #endif
226
227   DEBUG1("Entering %s", name);
228   sprintf(task_name, "%s_wait", MSG_process_get_name(MSG_process_self()));
229   DEBUG1("wait: %s", task_name);
230   MSG_task_receive(&task, task_name);
231   MSG_task_destroy(task);
232   VERB2("%s %f", name, MSG_get_clock() - clock);
233   if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
234     free(name);
235 #ifdef HAVE_TRACING
236   TRACE_smpi_ptp_out(rank, src_traced, rank, "wait");
237   TRACE_smpi_recv(rank, src_traced, rank);
238 #endif
239
240 }
241
242 /* FIXME: that's a poor man's implementation: we should take the message exchanges into account */
243 static void barrier(xbt_dynar_t action)
244 {
245   char *name = NULL;
246   static smx_mutex_t mutex = NULL;
247   static smx_cond_t cond = NULL;
248   static int processes_arrived_sofar=0;
249
250   if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
251     name = xbt_str_join(action, " ");
252
253   if (mutex == NULL) {       // first arriving on the barrier
254     mutex = SIMIX_mutex_init();
255     cond = SIMIX_cond_init();
256     processes_arrived_sofar=0;
257   }
258   DEBUG2("Entering barrier: %s (%d already there)", name,processes_arrived_sofar);
259
260   SIMIX_mutex_lock(mutex);
261   if (++processes_arrived_sofar == communicator_size) {
262     SIMIX_cond_broadcast(cond);
263     SIMIX_mutex_unlock(mutex);
264   } else {
265     SIMIX_cond_wait(cond,mutex);
266     SIMIX_mutex_unlock(mutex);
267   }
268
269   DEBUG1("Exiting barrier: %s", name);
270
271   processes_arrived_sofar--;
272   if (!processes_arrived_sofar) {
273     SIMIX_cond_destroy(cond);
274     SIMIX_mutex_destroy(mutex);
275     mutex=NULL;
276   }
277
278   if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
279     free(name);
280
281 }
282
283 static void reduce(xbt_dynar_t action)
284 {
285   int i;
286   char *name;
287   char task_name[80];
288   char spawn_name[80];
289   char **myargv;
290   char *comm_size = xbt_dynar_get_as(action, 2, char *);
291   char *comp_size = xbt_dynar_get_as(action, 3, char *);
292   m_process_t comm_helper = NULL;
293   m_task_t task = NULL, comp_task = NULL;
294   const char *process_name;
295   double clock = MSG_get_clock();
296
297   coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
298
299   xbt_assert0(communicator_size, "Size of Communicator is not defined"
300               ", can't use collective operations");
301
302   process_name = MSG_process_get_name(MSG_process_self());
303
304   if (!counters) {
305     DEBUG0("Initialize the counters");
306     counters = (coll_ctr) calloc(1, sizeof(struct coll_ctr_t));
307   }
308
309   name = bprintf("reduce_%d", counters->reduce_counter++);
310
311   if (!strcmp(process_name, "p0")) {
312     DEBUG2("%s: %s is the Root", name, process_name);
313     for (i = 1; i < communicator_size; i++) {
314       sprintf(spawn_name, "%s_p%d_%s", name, i,
315               MSG_process_get_name(MSG_process_self()));
316       sprintf(task_name, "%s_wait", spawn_name);
317       myargv = (char **) calloc(2, sizeof(char *));
318
319       myargv[0] = xbt_strdup(spawn_name);
320       myargv[1] = NULL;
321
322       comm_helper =
323           MSG_process_create_with_arguments(task_name, spawned_recv,
324                                             NULL, MSG_host_self(),
325                                             1, myargv);
326     }
327
328     for (i = 1; i < communicator_size; i++) {
329       sprintf(task_name, "%s_p%d_p0_wait", name, i);
330       MSG_task_receive(&task, task_name);
331       MSG_task_destroy(task);
332       task = NULL;
333     }
334
335     comp_task =
336         MSG_task_create("reduce_comp", parse_double(comp_size), 0, NULL);
337     DEBUG1("%s: computing 'reduce_comp'", name);
338     MSG_task_execute(comp_task);
339     MSG_task_destroy(comp_task);
340     DEBUG1("%s: computed", name);
341   } else {
342     DEBUG2("%s: %s sends", name, process_name);
343     sprintf(task_name, "%s_%s_p0", name, process_name);
344     DEBUG1("put on %s", task_name);
345     MSG_task_send(MSG_task_create(name, 0, parse_double(comm_size), NULL),
346                   task_name);
347   }
348
349   MSG_process_set_data(MSG_process_self(), (void *) counters);
350   VERB2("%s %f", xbt_str_join(action, " "), MSG_get_clock() - clock);
351   free(name);
352 }
353
354 static void bcast(xbt_dynar_t action)
355 {
356   int i;
357   char *name;
358   const char *process_name;
359   char task_name[80];
360   char spawn_name[80];
361   char **myargv;
362   m_process_t comm_helper = NULL;
363   m_task_t task = NULL;
364   char *size = xbt_dynar_get_as(action, 2, char *);
365   coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
366   double clock = MSG_get_clock();
367
368   xbt_assert0(communicator_size, "Size of Communicator is not defined"
369               ", can't use collective operations");
370
371
372   process_name = MSG_process_get_name(MSG_process_self());
373   if (!counters) {
374     DEBUG0("Initialize the counters");
375     counters = (coll_ctr) calloc(1, sizeof(struct coll_ctr_t));
376   }
377
378   name = bprintf("bcast_%d", counters->bcast_counter++);
379   if (!strcmp(process_name, "p0")) {
380     DEBUG2("%s: %s is the Root", name, process_name);
381
382     for (i = 1; i < communicator_size; i++) {
383       myargv = (char **) calloc(3, sizeof(char *));
384       myargv[0] = xbt_strdup(name);
385       myargv[1] = xbt_strdup(size);
386       myargv[2] = NULL;
387
388       sprintf(spawn_name, "%s_%d", myargv[0], i);
389       comm_helper =
390           MSG_process_create_with_arguments(spawn_name, spawned_send,
391                                             NULL, MSG_host_self(), 2,
392                                             myargv);
393     }
394
395     for (i = 1; i < communicator_size; i++) {
396       sprintf(task_name, "p%d_wait", i);
397       DEBUG1("get on %s", task_name);
398       MSG_task_receive(&task, task_name);
399       MSG_task_destroy(task);
400       task = NULL;
401     }
402     DEBUG2("%s: all messages sent by %s have been received",
403            name, process_name);
404   } else {
405     DEBUG2("%s: %s receives", name, process_name);
406     MSG_task_receive(&task, name);
407     MSG_task_destroy(task);
408     DEBUG2("%s: %s has received", name, process_name);
409     sprintf(task_name, "%s_wait", process_name);
410     DEBUG1("put on %s", task_name);
411     MSG_task_send(MSG_task_create("waiter", 0, 0, NULL), task_name);
412   }
413
414   MSG_process_set_data(MSG_process_self(), (void *) counters);
415   VERB2("%s %f", xbt_str_join(action, " "), MSG_get_clock() - clock);
416   free(name);
417 }
418
419
420 static void action_sleep(xbt_dynar_t action)
421 {
422   char *name = NULL;
423   char *duration = xbt_dynar_get_as(action, 2, char *);
424   double clock = MSG_get_clock();
425
426   if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
427     name = xbt_str_join(action, " ");
428
429   DEBUG1("Entering %s", name);
430   MSG_process_sleep(parse_double(duration));
431   VERB2("%s %f ", name, MSG_get_clock() - clock);
432
433   if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
434     free(name);
435 }
436
437 static void allReduce(xbt_dynar_t action)
438 {
439   int i;
440   char *name;
441   char task_name[80];
442   char spawn_name[80];
443   char **myargv;
444   char *comm_size = xbt_dynar_get_as(action, 2, char *);
445   char *comp_size = xbt_dynar_get_as(action, 3, char *);
446   m_process_t comm_helper = NULL;
447   m_task_t task = NULL, comp_task = NULL;
448   const char *process_name;
449   double clock = MSG_get_clock();
450
451   coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
452
453   xbt_assert0(communicator_size, "Size of Communicator is not defined"
454               ", can't use collective operations");
455
456   process_name = MSG_process_get_name(MSG_process_self());
457
458   if (!counters) {
459     DEBUG0("Initialize the counters");
460     counters = (coll_ctr) calloc(1, sizeof(struct coll_ctr_t));
461   }
462
463   name = bprintf("allReduce_%d", counters->allReduce_counter++);
464
465   if (!strcmp(process_name, "p0")) {
466     DEBUG2("%s: %s is the Root", name, process_name);
467     for (i = 1; i < communicator_size; i++) {
468       sprintf(spawn_name, "%s_p%d_%s", name, i,
469               MSG_process_get_name(MSG_process_self()));
470       sprintf(task_name, "%s_wait", spawn_name);
471       myargv = (char **) calloc(2, sizeof(char *));
472
473       myargv[0] = xbt_strdup(spawn_name);
474       myargv[1] = NULL;
475
476       comm_helper =
477           MSG_process_create_with_arguments(task_name, spawned_recv,
478                                             NULL, MSG_host_self(),
479                                             1, myargv);
480     }
481
482     for (i = 1; i < communicator_size; i++) {
483       sprintf(task_name, "%s_p%d_p0_wait", name, i);
484       MSG_task_receive(&task, task_name);
485       MSG_task_destroy(task);
486       task = NULL;
487     }
488
489     comp_task =
490         MSG_task_create("allReduce_comp", parse_double(comp_size), 0,
491                         NULL);
492     DEBUG1("%s: computing 'reduce_comp'", name);
493     MSG_task_execute(comp_task);
494     MSG_task_destroy(comp_task);
495     DEBUG1("%s: computed", name);
496
497     for (i = 1; i < communicator_size; i++) {
498       myargv = (char **) calloc(3, sizeof(char *));
499       myargv[0] = xbt_strdup(name);
500       myargv[1] = xbt_strdup(comm_size);
501       myargv[2] = NULL;
502
503       sprintf(spawn_name, "%s_%d", myargv[0], i);
504       comm_helper =
505           MSG_process_create_with_arguments(spawn_name, spawned_send,
506                                             NULL, MSG_host_self(), 2,
507                                             myargv);
508     }
509
510     for (i = 1; i < communicator_size; i++) {
511       sprintf(task_name, "p%d_wait", i);
512       DEBUG1("get on %s", task_name);
513       MSG_task_receive(&task, task_name);
514       MSG_task_destroy(task);
515       task = NULL;
516     }
517     DEBUG2("%s: all messages sent by %s have been received",
518            name, process_name);
519
520   } else {
521     DEBUG2("%s: %s sends", name, process_name);
522     sprintf(task_name, "%s_%s_p0", name, process_name);
523     DEBUG1("put on %s", task_name);
524     MSG_task_send(MSG_task_create(name, 0, parse_double(comm_size), NULL),
525                   task_name);
526
527     MSG_task_receive(&task, name);
528     MSG_task_destroy(task);
529     DEBUG2("%s: %s has received", name, process_name);
530     sprintf(task_name, "%s_wait", process_name);
531     DEBUG1("put on %s", task_name);
532     MSG_task_send(MSG_task_create("waiter", 0, 0, NULL), task_name);
533
534   }
535
536   MSG_process_set_data(MSG_process_self(), (void *) counters);
537   VERB2("%s %f", xbt_str_join(action, " "), MSG_get_clock() - clock);
538   free(name);
539 }
540
541 static void comm_size(xbt_dynar_t action)
542 {
543   char *name = NULL;
544   char *size = xbt_dynar_get_as(action, 2, char *);
545   double clock = MSG_get_clock();
546
547   if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
548     name = xbt_str_join(action, " ");
549   communicator_size = parse_double(size);
550   VERB2("%s %f", name, MSG_get_clock() - clock);
551   if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
552     free(name);
553 }
554
555 static void compute(xbt_dynar_t action)
556 {
557   char *name = NULL;
558   char *amout = xbt_dynar_get_as(action, 2, char *);
559   m_task_t task = MSG_task_create(name, parse_double(amout), 0, NULL);
560   double clock = MSG_get_clock();
561
562   if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
563     name = xbt_str_join(action, " ");
564   DEBUG1("Entering %s", name);
565   MSG_task_execute(task);
566   MSG_task_destroy(task);
567   VERB2("%s %f", name, MSG_get_clock() - clock);
568   if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
569     free(name);
570 }
571
572 static void init(xbt_dynar_t action)
573
574 #ifdef HAVE_TRACING
575   TRACE_smpi_init(get_rank(MSG_process_get_name(MSG_process_self())));
576 #endif
577 }
578
579 static void finalize(xbt_dynar_t action)
580 {
581 #ifdef HAVE_TRACING
582   TRACE_smpi_finalize(get_rank(MSG_process_get_name(MSG_process_self())));
583 #endif
584   coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
585   if (counters)
586           free(counters);
587 }
588
589 /** Main function */
590 int main(int argc, char *argv[])
591 {
592   MSG_error_t res = MSG_OK;
593
594   /* Check the given arguments */
595   MSG_global_init(&argc, argv);
596   if (argc < 3) {
597     printf("Usage: %s platform_file deployment_file [action_files]\n",
598            argv[0]);
599     printf
600         ("example: %s msg_platform.xml msg_deployment.xml actions # if all actions are in the same file\n",
601          argv[0]);
602     printf
603         ("example: %s msg_platform.xml msg_deployment.xml # if actions are in separate files, specified in deployment\n",
604          argv[0]);
605     exit(1);
606   }
607
608   /*  Simulation setting */
609   MSG_create_environment(argv[1]);
610
611   /* No need to register functions as in classical MSG programs: the actions get started anyway */
612   MSG_launch_application(argv[2]);
613
614   /*   Action registration */
615   MSG_action_register("init", init);
616   MSG_action_register("finalize", finalize);
617   MSG_action_register("comm_size", comm_size);
618   MSG_action_register("send", action_send);
619   MSG_action_register("Isend", Isend);
620   MSG_action_register("recv", action_recv);
621   MSG_action_register("Irecv", Irecv);
622   MSG_action_register("wait", action_wait);
623   MSG_action_register("barrier", barrier);
624   MSG_action_register("bcast", bcast);
625   MSG_action_register("reduce", reduce);
626   MSG_action_register("allReduce", allReduce);
627   MSG_action_register("sleep", action_sleep);
628   MSG_action_register("compute", compute);
629
630   
631   /* Actually do the simulation using MSG_action_trace_run */
632   res = MSG_action_trace_run(argv[3]);  // it's ok to pass a NULL argument here
633
634   INFO1("Simulation time %g", MSG_get_clock());
635   MSG_clean();
636
637   if (res == MSG_OK)
638     return 0;
639   else
640     return 1;
641 }                               /* end_of_main */