Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
edaed915e61277ca3ad2a62d7aa37ae6b1650790
[simgrid.git] / examples / msg / actions / actions.c
1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include "msg/msg.h"            /* Yeah! If you want to use msg, you need to include msg/msg.h */
10 #include "simix/simix.h"        /* semaphores for the barrier */
11 #include "xbt.h"                /* calloc, printf */
12 #include "instr/instr_private.h"
13
14 /** @addtogroup MSG_examples
15  *
16  *  @section MSG_ex_actions Trace driven simulations
17  * 
18  *  The <b>actions/actions.c</b> example demonstrates how to run trace-driven simulations. It
19  *  is very handy when you want to test an algorithm or protocol that
20  *  does nothing unless it receives some events from outside. For
21  *  example, a P2P protocol reacts to requests from the user, but
22  *  does nothing if there is no such event. 
23  * 
24  *  In such situations, SimGrid allows to write your protocol in your
25  *  C file, and the events to react to in a separate text file.
26  *  Declare a function handling each of the events that you want to
27  *  accept in your trace files, register them using @ref
28  *  MSG_action_register in your main, and then use @ref
29  *  MSG_action_trace_run to launch the simulation. You can either
30  *  have one trace file containing all your events, or a file per
31  *  simulated process. Check the tesh files in the example directory
32  *  for details on how to do it.
33  *
34  *  This example uses this approach to replay MPI-like traces. It
35  *  comes with a set of event handlers reproducing MPI events. This
36  *  is somehow similar to SMPI, yet differently implemented. This
37  *  code should probably be changed to use SMPI internals instead,
38  *  but wasn't, so far.
39  * 
40  */
41
42 XBT_LOG_NEW_DEFAULT_CATEGORY(actions,
43                              "Messages specific for this msg example");
44 int communicator_size = 0;
45
46 static void action_Isend(const char *const *action);
47
48 typedef struct  {
49   int last_Irecv_sender_id;
50   int bcast_counter;
51   int reduce_counter;
52   int allReduce_counter;
53   xbt_dynar_t isends; /* of msg_comm_t */
54   /* Used to implement irecv+wait */
55   xbt_dynar_t irecvs; /* of msg_comm_t */
56   xbt_dynar_t tasks; /* of m_task_t */
57 } s_process_globals_t, *process_globals_t;
58
59 /* Helper function */
60 static double parse_double(const char *string)
61 {
62   double value;
63   char *endptr;
64
65   value = strtod(string, &endptr);
66   if (*endptr != '\0')
67     THROWF(unknown_error, 0, "%s is not a double", string);
68   return value;
69 }
70
71 static int get_rank (const char *process_name)
72 {
73   return atoi(&(process_name[1]));
74
75
76 static void asynchronous_cleanup(void) {
77   process_globals_t globals = (process_globals_t) MSG_process_get_data(MSG_process_self());
78
79   /* Destroy any isend which correspond to completed communications */
80   int found;
81   msg_comm_t comm;
82   while ((found = MSG_comm_testany(globals->isends)) != -1) {
83     xbt_dynar_remove_at(globals->isends,found,&comm);
84     MSG_comm_destroy(comm);
85   }
86 }
87
88 /* My actions */
89 static void action_send(const char *const *action)
90 {
91   char *name = NULL;
92   char to[250];
93   const char *size_str = action[3];
94   double size=parse_double(size_str);
95   double clock = MSG_get_clock(); /* this "call" is free thanks to inlining */
96
97   sprintf(to, "%s_%s", MSG_process_get_name(MSG_process_self()),action[2]);
98
99   if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
100     name = xbt_str_join_array(action, " ");
101
102 #ifdef HAVE_TRACING
103   int rank = get_rank(MSG_process_get_name(MSG_process_self()));
104   int dst_traced = get_rank(action[2]);
105   TRACE_smpi_ptp_in(rank, rank, dst_traced, "send");
106   TRACE_smpi_send(rank, rank, dst_traced);
107 #endif
108
109   XBT_DEBUG("Entering Send: %s (size: %lg)", name, size);
110    if (size<65536) {
111      action_Isend(action);
112    } else {
113      MSG_task_send(MSG_task_create(name, 0, size, NULL), to);
114    }
115    
116    XBT_VERB("%s %f", name, MSG_get_clock() - clock);
117
118   free(name);
119
120 #ifdef HAVE_TRACING
121   TRACE_smpi_ptp_out(rank, rank, dst_traced, "send");
122 #endif
123
124   asynchronous_cleanup();
125 }
126
127 static void action_Isend(const char *const *action)
128 {
129   char to[250];
130   const char *size = action[3];
131   double clock = MSG_get_clock();
132   process_globals_t globals = (process_globals_t) MSG_process_get_data(MSG_process_self());
133
134
135   sprintf(to, "%s_%s", MSG_process_get_name(MSG_process_self()),action[2]);
136   msg_comm_t comm =
137       MSG_task_isend( MSG_task_create(to,0,parse_double(size),NULL), to);
138   xbt_dynar_push(globals->isends,&comm);
139
140   XBT_DEBUG("Isend on %s", MSG_process_get_name(MSG_process_self()));
141   XBT_VERB("%s %f", xbt_str_join_array(action, " "), MSG_get_clock() - clock);
142
143   asynchronous_cleanup();
144 }
145
146
147 static void action_recv(const char *const *action)
148 {
149   char *name = NULL;
150   char mailbox_name[250];
151   m_task_t task = NULL;
152   double clock = MSG_get_clock();
153
154   sprintf(mailbox_name, "%s_%s", action[2],
155           MSG_process_get_name(MSG_process_self()));
156
157   if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
158     name = xbt_str_join_array(action, " ");
159
160 #ifdef HAVE_TRACING
161   int rank = get_rank(MSG_process_get_name(MSG_process_self()));
162   int src_traced = get_rank(action[2]);
163   TRACE_smpi_ptp_in(rank, src_traced, rank, "recv");
164 #endif
165
166   XBT_DEBUG("Receiving: %s", name);
167   MSG_error_t res = MSG_task_receive(&task, mailbox_name);
168   //  MSG_task_receive(&task, MSG_process_get_name(MSG_process_self()));
169   XBT_VERB("%s %f", name, MSG_get_clock() - clock);
170
171   if (res == MSG_OK) {
172     MSG_task_destroy(task);
173   }
174
175   free(name);
176 #ifdef HAVE_TRACING
177   TRACE_smpi_ptp_out(rank, src_traced, rank, "recv");
178   TRACE_smpi_recv(rank, src_traced, rank);
179 #endif
180
181   asynchronous_cleanup();
182 }
183
184 static void action_Irecv(const char *const *action)
185 {
186   char mailbox[250];
187   double clock = MSG_get_clock();
188   process_globals_t globals = (process_globals_t) MSG_process_get_data(MSG_process_self());
189
190   XBT_DEBUG("Irecv on %s", MSG_process_get_name(MSG_process_self()));
191 #ifdef HAVE_TRACING
192   int rank = get_rank(MSG_process_get_name(MSG_process_self()));
193   int src_traced = get_rank(action[2]);
194   globals->last_Irecv_sender_id = src_traced;
195   MSG_process_set_data(MSG_process_self(), (void *) globals);
196
197   TRACE_smpi_ptp_in(rank, src_traced, rank, "Irecv");
198 #endif
199
200   sprintf(mailbox, "%s_%s", action[2],
201           MSG_process_get_name(MSG_process_self()));
202   m_task_t t=NULL;
203   xbt_dynar_push(globals->tasks,&t);
204   msg_comm_t c =
205       MSG_task_irecv(
206           xbt_dynar_get_ptr(globals->tasks, xbt_dynar_length(globals->tasks)-1),
207           mailbox);
208   xbt_dynar_push(globals->irecvs,&c);
209
210   XBT_VERB("%s %f", xbt_str_join_array(action, " "), MSG_get_clock() - clock);
211
212 #ifdef HAVE_TRACING
213   TRACE_smpi_ptp_out(rank, src_traced, rank, "Irecv");
214 #endif
215
216   asynchronous_cleanup();
217 }
218
219
220 static void action_wait(const char *const *action)
221 {
222   char *name = NULL;
223   m_task_t task = NULL;
224   msg_comm_t comm;
225   double clock = MSG_get_clock();
226   process_globals_t globals = (process_globals_t) MSG_process_get_data(MSG_process_self());
227
228   xbt_assert(xbt_dynar_length(globals->irecvs),
229       "action wait not preceded by any irecv: %s", xbt_str_join_array(action," "));
230
231   if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
232     name = xbt_str_join_array(action, " ");
233 #ifdef HAVE_TRACING
234   process_globals_t counters = (process_globals_t) MSG_process_get_data(MSG_process_self());
235   int src_traced = counters->last_Irecv_sender_id;
236   int rank = get_rank(MSG_process_get_name(MSG_process_self()));
237   TRACE_smpi_ptp_in(rank, src_traced, rank, "wait");
238 #endif
239
240   XBT_DEBUG("Entering %s", name);
241   comm = xbt_dynar_pop_as(globals->irecvs,msg_comm_t);
242   MSG_comm_wait(comm,-1);
243   task = xbt_dynar_pop_as(globals->tasks,m_task_t);
244   MSG_comm_destroy(comm);
245   MSG_task_destroy(task);
246
247   XBT_VERB("%s %f", name, MSG_get_clock() - clock);
248   free(name);
249 #ifdef HAVE_TRACING
250   TRACE_smpi_ptp_out(rank, src_traced, rank, "wait");
251   TRACE_smpi_recv(rank, src_traced, rank);
252 #endif
253
254 }
255
256 /* FIXME: that's a poor man's implementation: we should take the message exchanges into account */
257 static void action_barrier(const char *const *action)
258 {
259   char *name = NULL;
260   static smx_mutex_t mutex = NULL;
261   static smx_cond_t cond = NULL;
262   static int processes_arrived_sofar=0;
263
264   if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
265     name = xbt_str_join_array(action, " ");
266
267   if (mutex == NULL) {       // first arriving on the barrier
268     mutex = simcall_mutex_init();
269     cond = simcall_cond_init();
270     processes_arrived_sofar=0;
271   }
272   XBT_DEBUG("Entering barrier: %s (%d already there)", name,processes_arrived_sofar);
273
274   simcall_mutex_lock(mutex);
275   if (++processes_arrived_sofar == communicator_size) {
276     simcall_cond_broadcast(cond);
277     simcall_mutex_unlock(mutex);
278   } else {
279     simcall_cond_wait(cond,mutex);
280     simcall_mutex_unlock(mutex);
281   }
282
283   XBT_DEBUG("Exiting barrier: %s", name);
284
285   processes_arrived_sofar--;
286   if (!processes_arrived_sofar) {
287     simcall_cond_destroy(cond);
288     simcall_mutex_destroy(mutex);
289     mutex=NULL;
290   }
291
292   free(name);
293
294 }
295
296 static void action_reduce(const char *const *action)
297 {
298         int i;
299         char *reduce_identifier;
300         char mailbox[80];
301         double comm_size = parse_double(action[2]);
302         double comp_size = parse_double(action[3]);
303         m_task_t comp_task = NULL;
304         const char *process_name;
305         double clock = MSG_get_clock();
306
307         process_globals_t counters = (process_globals_t) MSG_process_get_data(MSG_process_self());
308
309         xbt_assert(communicator_size, "Size of Communicator is not defined, "
310                         "can't use collective operations");
311
312         process_name = MSG_process_get_name(MSG_process_self());
313
314         reduce_identifier = bprintf("reduce_%d", counters->reduce_counter++);
315
316         if (!strcmp(process_name, "p0")) {
317                 XBT_DEBUG("%s: %s is the Root", reduce_identifier, process_name);
318
319                 msg_comm_t *comms = xbt_new0(msg_comm_t,communicator_size-1);
320             m_task_t *tasks = xbt_new0(m_task_t,communicator_size-1);
321             for (i = 1; i < communicator_size; i++) {
322               sprintf(mailbox, "%s_p%d_p0", reduce_identifier, i);
323               comms[i-1] = MSG_task_irecv(&(tasks[i-1]),mailbox);
324             }
325             MSG_comm_waitall(comms,communicator_size-1,-1);
326             for (i = 1; i < communicator_size; i++) {
327                 MSG_comm_destroy(comms[i-1]);
328                 MSG_task_destroy(tasks[i-1]);
329             }
330             free(tasks);
331
332             comp_task = MSG_task_create("reduce_comp", comp_size, 0, NULL);
333             XBT_DEBUG("%s: computing 'reduce_comp'", reduce_identifier);
334             MSG_task_execute(comp_task);
335             MSG_task_destroy(comp_task);
336             XBT_DEBUG("%s: computed", reduce_identifier);
337
338         } else {
339                 XBT_DEBUG("%s: %s sends", reduce_identifier, process_name);
340                 sprintf(mailbox, "%s_%s_p0", reduce_identifier, process_name);
341             XBT_DEBUG("put on %s", mailbox);
342             MSG_task_send(MSG_task_create(reduce_identifier, 0, comm_size, NULL),
343                           mailbox);
344         }
345
346         XBT_VERB("%s %f", xbt_str_join_array(action, " "), MSG_get_clock() - clock);
347         free(reduce_identifier);
348 }
349
350 static void action_bcast(const char *const *action)
351 {
352         int i;
353         char *bcast_identifier;
354         char mailbox[80];
355         double comm_size = parse_double(action[2]);
356         m_task_t task = NULL;
357         const char *process_name;
358         double clock = MSG_get_clock();
359
360         process_globals_t counters = (process_globals_t) MSG_process_get_data(MSG_process_self());
361
362         xbt_assert(communicator_size, "Size of Communicator is not defined, "
363                         "can't use collective operations");
364
365         process_name = MSG_process_get_name(MSG_process_self());
366
367         bcast_identifier = bprintf("bcast_%d", counters->bcast_counter++);
368
369         if (!strcmp(process_name, "p0")) {
370                 XBT_DEBUG("%s: %s is the Root", bcast_identifier, process_name);
371
372             msg_comm_t *comms = xbt_new0(msg_comm_t,communicator_size-1);
373
374             for (i = 1; i < communicator_size; i++) {
375               sprintf(mailbox, "%s_p0_p%d", bcast_identifier, i);
376               comms[i-1] =
377                   MSG_task_isend(MSG_task_create(mailbox,0,comm_size,NULL),
378                       mailbox);
379             }
380             MSG_comm_waitall(comms,communicator_size-1,-1);
381                 for (i = 1; i < communicator_size; i++)
382                MSG_comm_destroy(comms[i-1]);
383             free(comms);
384
385             XBT_DEBUG("%s: all messages sent by %s have been received",
386                    bcast_identifier, process_name);
387
388         } else {
389             sprintf(mailbox, "%s_p0_%s", bcast_identifier, process_name);
390             MSG_task_receive(&task, mailbox);
391             MSG_task_destroy(task);
392             XBT_DEBUG("%s: %s has received", bcast_identifier, process_name);
393         }
394
395         XBT_VERB("%s %f", xbt_str_join_array(action, " "), MSG_get_clock() - clock);
396         free(bcast_identifier);
397 }
398
399
400 static void action_sleep(const char *const *action)
401 {
402   char *name = NULL;
403   const char *duration = action[2];
404   double clock = MSG_get_clock();
405
406   if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
407     name = xbt_str_join_array(action, " ");
408
409   XBT_DEBUG("Entering %s", name);
410   MSG_process_sleep(parse_double(duration));
411   XBT_VERB("%s %f ", name, MSG_get_clock() - clock);
412
413   free(name);
414 }
415
416 static void action_allReduce(const char *const *action) {
417   int i;
418   char *allreduce_identifier;
419   char mailbox[80];
420   double comm_size = parse_double(action[2]);
421   double comp_size = parse_double(action[3]);
422   m_task_t task = NULL, comp_task = NULL;
423   const char *process_name;
424   double clock = MSG_get_clock();
425
426   process_globals_t counters = (process_globals_t) MSG_process_get_data(MSG_process_self());
427
428   xbt_assert(communicator_size, "Size of Communicator is not defined, "
429               "can't use collective operations");
430
431   process_name = MSG_process_get_name(MSG_process_self());
432
433   allreduce_identifier = bprintf("allReduce_%d", counters->allReduce_counter++);
434
435   if (!strcmp(process_name, "p0")) {
436     XBT_DEBUG("%s: %s is the Root", allreduce_identifier, process_name);
437
438     msg_comm_t *comms = xbt_new0(msg_comm_t,communicator_size-1);
439     m_task_t *tasks = xbt_new0(m_task_t,communicator_size-1);
440     for (i = 1; i < communicator_size; i++) {
441       sprintf(mailbox, "%s_p%d_p0", allreduce_identifier, i);
442       comms[i-1] = MSG_task_irecv(&(tasks[i-1]),mailbox);
443     }
444     MSG_comm_waitall(comms,communicator_size-1,-1);
445     for (i = 1; i < communicator_size; i++) {
446       MSG_comm_destroy(comms[i-1]);
447       MSG_task_destroy(tasks[i-1]);
448     }
449     free(tasks);
450
451     comp_task = MSG_task_create("allReduce_comp", comp_size, 0, NULL);
452     XBT_DEBUG("%s: computing 'reduce_comp'", allreduce_identifier);
453     MSG_task_execute(comp_task);
454     MSG_task_destroy(comp_task);
455     XBT_DEBUG("%s: computed", allreduce_identifier);
456
457     for (i = 1; i < communicator_size; i++) {
458       sprintf(mailbox, "%s_p0_p%d", allreduce_identifier, i);
459       comms[i-1] =
460           MSG_task_isend(MSG_task_create(mailbox,0,comm_size,NULL),
461               mailbox);
462     }
463     MSG_comm_waitall(comms,communicator_size-1,-1);
464     for (i = 1; i < communicator_size; i++)
465        MSG_comm_destroy(comms[i-1]);
466     free(comms);
467
468     XBT_DEBUG("%s: all messages sent by %s have been received",
469            allreduce_identifier, process_name);
470
471   } else {
472     XBT_DEBUG("%s: %s sends", allreduce_identifier, process_name);
473     sprintf(mailbox, "%s_%s_p0", allreduce_identifier, process_name);
474     XBT_DEBUG("put on %s", mailbox);
475     MSG_task_send(MSG_task_create(allreduce_identifier, 0, comm_size, NULL),
476                   mailbox);
477
478     sprintf(mailbox, "%s_p0_%s", allreduce_identifier, process_name);
479     MSG_task_receive(&task, mailbox);
480     MSG_task_destroy(task);
481     XBT_DEBUG("%s: %s has received", allreduce_identifier, process_name);
482   }
483
484   XBT_VERB("%s %f", xbt_str_join_array(action, " "), MSG_get_clock() - clock);
485   free(allreduce_identifier);
486 }
487
488 static void action_comm_size(const char *const *action)
489 {
490   char *name = NULL;
491   const char *size = action[2];
492   double clock = MSG_get_clock();
493
494   if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
495     name = xbt_str_join_array(action, " ");
496   communicator_size = parse_double(size);
497   XBT_VERB("%s %f", name, MSG_get_clock() - clock);
498   free(name);
499 }
500
501 static void action_compute(const char *const *action)
502 {
503   char *name = NULL;
504   const char *amout = action[2];
505   m_task_t task = MSG_task_create(name, parse_double(amout), 0, NULL);
506   double clock = MSG_get_clock();
507
508   if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
509     name = xbt_str_join_array(action, " ");
510   XBT_DEBUG("Entering %s", name);
511   MSG_task_execute(task);
512   MSG_task_destroy(task);
513   XBT_VERB("%s %f", name, MSG_get_clock() - clock);
514   free(name);
515 }
516
517 static void action_init(const char *const *action)
518
519 #ifdef HAVE_TRACING
520   TRACE_smpi_init(get_rank(MSG_process_get_name(MSG_process_self())));
521 #endif
522   XBT_DEBUG("Initialize the counters");
523   process_globals_t globals = (process_globals_t) calloc(1, sizeof(s_process_globals_t));
524   globals->isends = xbt_dynar_new(sizeof(msg_comm_t),NULL);
525   globals->irecvs = xbt_dynar_new(sizeof(msg_comm_t),NULL);
526   globals->tasks  = xbt_dynar_new(sizeof(m_task_t),NULL);
527   MSG_process_set_data(MSG_process_self(),globals);
528
529 }
530
531 static void action_finalize(const char *const *action)
532 {
533 #ifdef HAVE_TRACING
534   TRACE_smpi_finalize(get_rank(MSG_process_get_name(MSG_process_self())));
535 #endif
536   process_globals_t globals = (process_globals_t) MSG_process_get_data(MSG_process_self());
537   if (globals){
538     xbt_dynar_free_container(&(globals->isends));
539     xbt_dynar_free_container(&(globals->irecvs));
540     xbt_dynar_free_container(&(globals->tasks));
541     free(globals);
542   }
543 }
544
545 /** Main function */
546 int main(int argc, char *argv[])
547 {
548   MSG_error_t res = MSG_OK;
549
550   /* Check the given arguments */
551   MSG_global_init(&argc, argv);
552   if (argc < 3) {
553     printf("Usage: %s platform_file deployment_file [action_files]\n",
554            argv[0]);
555     printf
556         ("example: %s msg_platform.xml msg_deployment.xml actions # if all actions are in the same file\n",
557          argv[0]);
558     printf
559         ("example: %s msg_platform.xml msg_deployment.xml # if actions are in separate files, specified in deployment\n",
560          argv[0]);
561     exit(1);
562   }
563
564   /*  Simulation setting */
565   MSG_create_environment(argv[1]);
566
567   /* No need to register functions as in classical MSG programs: the actions get started anyway */
568   MSG_launch_application(argv[2]);
569
570   /*   Action registration */
571   MSG_action_register("init",     action_init);
572   MSG_action_register("finalize", action_finalize);
573   MSG_action_register("comm_size",action_comm_size);
574   MSG_action_register("send",     action_send);
575   MSG_action_register("Isend",    action_Isend);
576   MSG_action_register("recv",     action_recv);
577   MSG_action_register("Irecv",    action_Irecv);
578   MSG_action_register("wait",     action_wait);
579   MSG_action_register("barrier",  action_barrier);
580   MSG_action_register("bcast",    action_bcast);
581   MSG_action_register("reduce",   action_reduce);
582   MSG_action_register("allReduce",action_allReduce);
583   MSG_action_register("sleep",    action_sleep);
584   MSG_action_register("compute",  action_compute);
585
586
587   /* Actually do the simulation using MSG_action_trace_run */
588   res = MSG_action_trace_run(argv[3]);  // it's ok to pass a NULL argument here
589
590   XBT_INFO("Simulation time %g", MSG_get_clock());
591   MSG_clean();
592
593   if (res == MSG_OK)
594     return 0;
595   else
596     return 1;
597 }                               /* end_of_main */