Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
optimize: strings that are only used on debug should only get computed when debug...
[simgrid.git] / examples / msg / actions / actions.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2009. The SimGrid team. All rights reserved.               */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include "msg/msg.h"            /* Yeah! If you want to use msg, you need to include msg/msg.h */
11 #include "xbt.h"                /* calloc, printf */
12 #include "simgrid_config.h"     /* getline */
13
14 XBT_LOG_NEW_DEFAULT_CATEGORY(actions,
15                  "Messages specific for this msg example");
16 int communicator_size=0;
17
18 typedef struct coll_ctr_t{
19   int bcast_counter;
20   int reduce_counter;
21   int allReduce_counter;
22 } *coll_ctr;
23
24 /* Helper function */
25 static double parse_double(const char *string) {
26   double value;
27   char *endptr;
28
29   value=strtod(string, &endptr);
30   if (*endptr != '\0')
31           THROW1(unknown_error, 0, "%s is not a double", string);
32   return value;
33 }
34
35
36 /* My actions */
37 static void send(xbt_dynar_t action)
38 {
39   char *name = NULL;
40   char to[250];
41   char *size = xbt_dynar_get_as(action, 3, char *);
42   double clock = MSG_get_clock();
43   sprintf (to,"%s_%s", MSG_process_get_name(MSG_process_self()),
44            xbt_dynar_get_as(action, 2, char *));
45   //  char *to =  xbt_dynar_get_as(action, 2, char *);
46
47   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
48     name = xbt_str_join(action, " ");
49
50   DEBUG2("Entering Send: %s (size: %lg)", name, parse_double(size));
51   MSG_task_send(MSG_task_create(name, 0, parse_double(size), NULL), to);
52   DEBUG2("%s %f", name, MSG_get_clock()-clock);
53
54   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
55     free(name);
56 }
57
58
59 static int spawned_send(int argc, char *argv[])
60 {
61   DEBUG3("%s: Sending %s on %s", MSG_process_get_name(MSG_process_self()), 
62         argv[1],argv[0]);
63   MSG_task_send(MSG_task_create(argv[0], 0, parse_double(argv[1]), NULL), 
64                 argv[0]);
65   return 0;
66 }
67
68 static void Isend(xbt_dynar_t action)
69 {
70   char spawn_name[80];
71   char to[250];
72   //  char *to = xbt_dynar_get_as(action, 2, char *);
73   char *size = xbt_dynar_get_as(action, 3, char *);
74   char **myargv;
75   m_process_t comm_helper;
76   double clock = MSG_get_clock();
77   DEBUG1("Isend on %s: spawn process ", 
78          MSG_process_get_name(MSG_process_self()));
79
80   sprintf (to,"%s_%s", MSG_process_get_name(MSG_process_self()),
81            xbt_dynar_get_as(action, 2, char *));
82   myargv = (char**) calloc (3, sizeof (char*));
83   
84   myargv[0] = xbt_strdup(to);
85   myargv[1] = xbt_strdup(size);
86   myargv[2] = NULL;
87
88   //    sprintf(spawn_name,"%s_wait",MSG_process_get_name(MSG_process_self()));
89   sprintf(spawn_name,"%s_wait",to);
90   comm_helper =
91     MSG_process_create_with_arguments(spawn_name, spawned_send, 
92                                       NULL, MSG_host_self(), 2, myargv);
93   DEBUG2("%s %f",xbt_str_join(action, " "), MSG_get_clock()-clock);
94 }
95
96
97 static void recv(xbt_dynar_t action)
98 {
99   char *name = NULL;
100     char mailbox_name[250];
101   m_task_t task = NULL;
102   double clock = MSG_get_clock();
103   //FIXME: argument of action ignored so far; semantic not clear
104   //char *from=xbt_dynar_get_as(action,2,char*);
105   sprintf (mailbox_name,"%s_%s", xbt_dynar_get_as(action, 2, char *),
106            MSG_process_get_name(MSG_process_self()));
107
108   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
109     name = xbt_str_join(action, " ");
110
111   DEBUG1("Receiving: %s", name);
112   MSG_task_receive(&task, mailbox_name);
113   //  MSG_task_receive(&task, MSG_process_get_name(MSG_process_self()));
114   DEBUG2("%s %f", name, MSG_get_clock()-clock);
115   MSG_task_destroy(task);
116
117   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
118     free(name);
119 }
120
121 static int spawned_recv(int argc, char *argv[])
122 {
123   m_task_t task = NULL;
124   DEBUG1("Receiving on %s", argv[0]);
125   MSG_task_receive(&task, argv[0]);
126   DEBUG1("Received %s", MSG_task_get_name(task));
127   DEBUG1("waiter on %s", MSG_process_get_name(MSG_process_self()));
128   MSG_task_send(MSG_task_create("waiter",0,0,NULL),
129                 MSG_process_get_name(MSG_process_self())); 
130   
131   MSG_task_destroy(task);
132   return 0;
133 }
134
135
136 static void Irecv(xbt_dynar_t action)
137 {
138   char *name;
139   m_process_t comm_helper;
140   char mailbox_name[250];
141   char **myargv;
142   double clock = MSG_get_clock();
143   DEBUG1("Irecv on %s: spawn process ", 
144          MSG_process_get_name(MSG_process_self()));
145
146   sprintf (mailbox_name,"%s_%s", xbt_dynar_get_as(action, 2, char *),
147            MSG_process_get_name(MSG_process_self()));
148   name = bprintf("%s_wait",MSG_process_get_name(MSG_process_self()));
149   myargv = (char**) calloc (2, sizeof (char*));
150   
151   myargv[0] = xbt_strdup(mailbox_name);
152   myargv[1] = NULL;
153   comm_helper = MSG_process_create_with_arguments(name,spawned_recv,
154                                                   NULL, MSG_host_self(),
155                                                   1, myargv);
156
157   DEBUG2("%s %f",  xbt_str_join(action, " "), 
158       MSG_get_clock()-clock);
159  
160   free(name);
161 }
162
163
164 static void wait_action(xbt_dynar_t action)
165 {
166   char *name = NULL;
167   char task_name[80];
168   m_task_t task = NULL;
169   double clock = MSG_get_clock();
170   
171   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
172     name = xbt_str_join(action, " ");
173
174   DEBUG1("Entering %s", name);
175   sprintf(task_name,"%s_wait",MSG_process_get_name(MSG_process_self()));
176   DEBUG1("wait: %s", task_name);
177   MSG_task_receive(&task,task_name);
178   MSG_task_destroy(task);
179   DEBUG2("%s %f", name, MSG_get_clock()-clock);
180   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
181     free(name);
182 }
183
184 static void barrier (xbt_dynar_t action)
185 {
186   char *name = NULL;
187
188   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
189     name = xbt_str_join(action, " ");
190
191   DEBUG1("barrier: %s", name);
192
193   THROW_UNIMPLEMENTED;
194
195   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
196     free(name);
197
198 }
199
200 static void reduce(xbt_dynar_t action)
201 {
202   int i;
203   char *name;
204   char task_name[80];
205   char spawn_name[80];
206   char **myargv;
207   char *comm_size = xbt_dynar_get_as(action, 2, char *);
208   char *comp_size = xbt_dynar_get_as(action, 3, char *);
209   m_process_t comm_helper=NULL;
210   m_task_t task=NULL, comp_task=NULL;
211   const char* process_name;
212   double clock = MSG_get_clock();
213   
214   coll_ctr counters =  (coll_ctr) MSG_process_get_data(MSG_process_self());
215
216   xbt_assert0(communicator_size, "Size of Communicator is not defined"
217               ", can't use collective operations");
218
219   process_name = MSG_process_get_name(MSG_process_self());
220
221   if (!counters){
222     DEBUG0("Initialize the counters");
223     counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t));
224   }
225
226   name = bprintf("reduce_%d", counters->reduce_counter++);
227
228   if (!strcmp(process_name, "p0")){
229     DEBUG2("%s: %s is the Root",name, process_name);
230     for(i=1;i<communicator_size;i++){
231       sprintf(spawn_name,"%s_p%d_%s", name, i,
232            MSG_process_get_name(MSG_process_self()));
233       sprintf(task_name,"%s_wait", spawn_name);
234       myargv = (char**) calloc (2, sizeof (char*));
235       
236       myargv[0] = xbt_strdup(spawn_name);
237       myargv[1] = NULL;
238
239       comm_helper = 
240         MSG_process_create_with_arguments(task_name, spawned_recv, 
241                                           NULL, MSG_host_self(),
242                                           1, myargv);
243     }
244
245     for(i=1;i<communicator_size;i++){
246       sprintf(task_name,"%s_p%d_p0_wait", name, i);
247       MSG_task_receive(&task,task_name);
248       MSG_task_destroy(task);
249       task=NULL;
250     }
251
252     comp_task = 
253       MSG_task_create("reduce_comp", parse_double(comp_size), 0, NULL);
254     DEBUG1("%s: computing 'reduce_comp'", name);
255     MSG_task_execute(comp_task);
256     MSG_task_destroy(comp_task);
257     DEBUG1("%s: computed", name);
258   } else {
259     DEBUG2("%s: %s sends", name, process_name);
260     sprintf(task_name,"%s_%s_p0", name, process_name);
261     DEBUG1("put on %s", task_name);
262     MSG_task_send(MSG_task_create(name, 0, parse_double(comm_size), NULL),
263                   task_name);
264   }
265
266   MSG_process_set_data(MSG_process_self(), (void*)counters);
267   DEBUG2("%s %f", xbt_str_join(action, " "), MSG_get_clock()-clock);
268   free(name);
269 }
270
271 static void bcast (xbt_dynar_t action)
272 {
273   int i;
274   char *name;
275   const char* process_name;
276   char task_name[80];
277   char spawn_name[80];
278   char **myargv;
279   m_process_t comm_helper=NULL;
280   m_task_t task=NULL;
281   char *size = xbt_dynar_get_as(action, 2, char *);
282   coll_ctr counters =  (coll_ctr) MSG_process_get_data(MSG_process_self());
283   double clock = MSG_get_clock();
284   
285   xbt_assert0(communicator_size, "Size of Communicator is not defined"
286               ", can't use collective operations");
287
288
289   process_name = MSG_process_get_name(MSG_process_self());
290   if (!counters){
291     DEBUG0("Initialize the counters");
292     counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t));
293   }
294
295   name = bprintf("bcast_%d", counters->bcast_counter++);
296   if (!strcmp(process_name, "p0")){
297     DEBUG2("%s: %s is the Root",name, process_name);
298
299     for(i=1;i<communicator_size;i++){
300       myargv = (char**) calloc (3, sizeof (char*));
301       myargv[0] = xbt_strdup(name);
302       myargv[1] = xbt_strdup(size);
303       myargv[2] = NULL;
304
305       sprintf(spawn_name,"%s_%d", myargv[0], i);
306       comm_helper = 
307         MSG_process_create_with_arguments(spawn_name, spawned_send, 
308                                           NULL, MSG_host_self(), 2, myargv);
309     }
310     
311     for(i=1;i<communicator_size;i++){
312       sprintf(task_name,"p%d_wait", i);
313       DEBUG1("get on %s", task_name);
314       MSG_task_receive(&task,task_name);      
315       MSG_task_destroy(task);
316       task=NULL;
317     }
318     DEBUG2("%s: all messages sent by %s have been received",
319           name, process_name);
320   } else {
321     DEBUG2("%s: %s receives", name, process_name);
322     MSG_task_receive(&task, name);
323     MSG_task_destroy(task);
324     DEBUG2("%s: %s has received", name,process_name);
325     sprintf(task_name,"%s_wait", process_name);
326     DEBUG1("put on %s", task_name);
327     MSG_task_send(MSG_task_create("waiter",0,0,NULL),task_name);
328   }
329
330   MSG_process_set_data(MSG_process_self(), (void*)counters);
331   DEBUG2("%s %f", xbt_str_join(action, " "), MSG_get_clock()-clock);
332   free(name);
333 }
334
335
336 static void sleep(xbt_dynar_t action)
337 {
338   char *name = NULL;
339   char *duration = xbt_dynar_get_as(action, 2, char *);
340   double clock = MSG_get_clock();
341
342   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
343     name = xbt_str_join(action, " ");
344
345   DEBUG1("Entering %s", name);
346   MSG_process_sleep(parse_double(duration));
347   DEBUG2("%s %f ", name, MSG_get_clock()-clock);
348
349   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
350     free(name);
351 }
352
353 static void allReduce(xbt_dynar_t action)
354 {
355   int i;
356   char *name;
357   char task_name[80];
358   char spawn_name[80];
359   char **myargv;
360   char *comm_size = xbt_dynar_get_as(action, 2, char *);
361   char *comp_size = xbt_dynar_get_as(action, 3, char *);
362   m_process_t comm_helper=NULL;
363   m_task_t task=NULL, comp_task=NULL;
364   const char* process_name;
365   double clock = MSG_get_clock();
366  
367   coll_ctr counters =  (coll_ctr) MSG_process_get_data(MSG_process_self());
368
369   xbt_assert0(communicator_size, "Size of Communicator is not defined"
370               ", can't use collective operations");
371
372   process_name = MSG_process_get_name(MSG_process_self());
373
374   if (!counters){
375     DEBUG0("Initialize the counters");
376     counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t));
377   }
378
379   name = bprintf("allReduce_%d", counters->allReduce_counter++);
380
381   if (!strcmp(process_name, "p0")){
382     DEBUG2("%s: %s is the Root",name, process_name);
383     for(i=1;i<communicator_size;i++){
384       sprintf(spawn_name,"%s_p%d_%s", name, i,
385            MSG_process_get_name(MSG_process_self()));
386       sprintf(task_name,"%s_wait", spawn_name);
387       myargv = (char**) calloc (2, sizeof (char*));
388       
389       myargv[0] = xbt_strdup(spawn_name);
390       myargv[1] = NULL;
391
392       comm_helper = 
393         MSG_process_create_with_arguments(task_name, spawned_recv, 
394                                           NULL, MSG_host_self(),
395                                           1, myargv);
396     }
397
398     for(i=1;i<communicator_size;i++){
399       sprintf(task_name,"%s_p%d_p0_wait", name, i);
400       MSG_task_receive(&task,task_name);
401       MSG_task_destroy(task);
402       task=NULL;
403     }
404
405     comp_task = 
406       MSG_task_create("allReduce_comp", parse_double(comp_size), 0, NULL);
407     DEBUG1("%s: computing 'reduce_comp'", name);
408     MSG_task_execute(comp_task);
409     MSG_task_destroy(comp_task);
410     DEBUG1("%s: computed", name);
411
412     for(i=1;i<communicator_size;i++){
413       myargv = (char**) calloc (3, sizeof (char*));
414       myargv[0] = xbt_strdup(name);
415       myargv[1] = xbt_strdup(comm_size);
416       myargv[2] = NULL;
417
418       sprintf(spawn_name,"%s_%d", myargv[0], i);
419       comm_helper = 
420         MSG_process_create_with_arguments(spawn_name, spawned_send, 
421                                           NULL, MSG_host_self(), 2, myargv);
422     }
423     
424     for(i=1;i<communicator_size;i++){
425       sprintf(task_name,"p%d_wait", i);
426       DEBUG1("get on %s", task_name);
427       MSG_task_receive(&task,task_name);      
428       MSG_task_destroy(task);
429       task=NULL;
430     }
431     DEBUG2("%s: all messages sent by %s have been received",
432           name, process_name);
433
434   } else {
435     DEBUG2("%s: %s sends", name, process_name);
436     sprintf(task_name,"%s_%s_p0", name, process_name);
437     DEBUG1("put on %s", task_name);
438     MSG_task_send(MSG_task_create(name, 0, parse_double(comm_size), NULL),
439                   task_name);
440
441     MSG_task_receive(&task, name);
442     MSG_task_destroy(task);
443     DEBUG2("%s: %s has received", name,process_name);
444     sprintf(task_name,"%s_wait", process_name);
445     DEBUG1("put on %s", task_name);
446     MSG_task_send(MSG_task_create("waiter",0,0,NULL),task_name);
447
448   }
449
450   MSG_process_set_data(MSG_process_self(), (void*)counters);
451   DEBUG2("%s %f", xbt_str_join(action, " "), MSG_get_clock()-clock);
452   free(name);
453 }
454
455 static void comm_size(xbt_dynar_t action)
456 {
457   char *size = xbt_dynar_get_as(action, 2, char *);
458   communicator_size = parse_double(size);
459 }
460
461 static void compute(xbt_dynar_t action)
462 {
463   char *name=NULL;
464   char *amout = xbt_dynar_get_as(action, 2, char *);
465   m_task_t task = MSG_task_create(name, parse_double(amout), 0, NULL);
466   double clock = MSG_get_clock();
467
468   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
469     name = xbt_str_join(action, " ");
470   DEBUG1("Entering %s", name);
471   MSG_task_execute(task);
472   MSG_task_destroy(task);
473   DEBUG2("%s %f", name, MSG_get_clock()-clock);
474   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
475     free(name);
476 }
477
478 /** Main function */
479 int main(int argc, char *argv[])
480 {
481   MSG_error_t res = MSG_OK;
482   
483   /* Check the given arguments */
484   MSG_global_init(&argc, argv);
485   if (argc < 3) {
486     printf("Usage: %s platform_file deployment_file [action_files]\n", argv[0]);
487     printf("example: %s msg_platform.xml msg_deployment.xml actions # if all actions are in the same file\n",
488            argv[0]);
489     printf("example: %s msg_platform.xml msg_deployment.xml # if actions are in separate files, specified in deployment\n",
490            argv[0]);
491     exit(1);
492   }
493
494   /*  Simulation setting */
495   MSG_create_environment(argv[1]);
496
497   /* No need to register functions as in classical MSG programs: the actions get started anyway */
498   MSG_launch_application(argv[2]);
499
500   /*   Action registration */
501   MSG_action_register("comm_size", comm_size);
502   MSG_action_register("send", send);
503   MSG_action_register("Isend", Isend);
504   MSG_action_register("recv", recv);
505   MSG_action_register("Irecv", Irecv);
506   MSG_action_register("wait", wait_action);
507   MSG_action_register("barrier", barrier);
508   MSG_action_register("bcast", bcast);
509   MSG_action_register("reduce", reduce);
510   MSG_action_register("allReduce", allReduce);
511   MSG_action_register("sleep", sleep);
512   MSG_action_register("compute", compute);
513
514
515   /* Actually do the simulation using MSG_action_trace_run */
516   res = MSG_action_trace_run(argv[3]); // it's ok to pass a NULL argument here
517
518   INFO1("Simulation time %g", MSG_get_clock());
519   MSG_clean();
520
521   if (res == MSG_OK)
522     return 0;
523   else
524     return 1;
525 }                               /* end_of_main */