Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
And now, a version that compiles, bummer (load the right includes, even if they are...
[simgrid.git] / examples / msg / actions / actions.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2009. The SimGrid team. All rights reserved.               */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include "msg/msg.h"            /* Yeah! If you want to use msg, you need to include msg/msg.h */
11 #include "simix/simix.h"        /* semaphores for the barrier */
12 #include "xbt.h"                /* calloc, printf */
13 #include "simgrid_config.h"     /* getline */
14
15 XBT_LOG_NEW_DEFAULT_CATEGORY(actions,
16                  "Messages specific for this msg example");
17 int communicator_size=0;
18
19 typedef struct coll_ctr_t{
20   int bcast_counter;
21   int reduce_counter;
22   int allReduce_counter;
23 } *coll_ctr;
24
25 /* Helper function */
26 static double parse_double(const char *string) {
27   double value;
28   char *endptr;
29
30   value=strtod(string, &endptr);
31   if (*endptr != '\0')
32           THROW1(unknown_error, 0, "%s is not a double", string);
33   return value;
34 }
35
36
37 /* My actions */
38 static void action_send(xbt_dynar_t action)
39 {
40   char *name = NULL;
41   char to[250];
42   char *size = xbt_dynar_get_as(action, 3, char *);
43   double clock = MSG_get_clock();
44   sprintf (to,"%s_%s", MSG_process_get_name(MSG_process_self()),
45            xbt_dynar_get_as(action, 2, char *));
46   //  char *to =  xbt_dynar_get_as(action, 2, char *);
47
48   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
49     name = xbt_str_join(action, " ");
50
51   DEBUG2("Entering Send: %s (size: %lg)", name, parse_double(size));
52   MSG_task_send(MSG_task_create(name, 0, parse_double(size), NULL), to);
53   DEBUG2("%s %f", name, MSG_get_clock()-clock);
54
55   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
56     free(name);
57 }
58
59
60 static int spawned_send(int argc, char *argv[])
61 {
62   DEBUG3("%s: Sending %s on %s", MSG_process_get_name(MSG_process_self()), 
63         argv[1],argv[0]);
64   MSG_task_send(MSG_task_create(argv[0], 0, parse_double(argv[1]), NULL), 
65                 argv[0]);
66   return 0;
67 }
68
69 static void Isend(xbt_dynar_t action)
70 {
71   char spawn_name[80];
72   char to[250];
73   //  char *to = xbt_dynar_get_as(action, 2, char *);
74   char *size = xbt_dynar_get_as(action, 3, char *);
75   char **myargv;
76   m_process_t comm_helper;
77   double clock = MSG_get_clock();
78   DEBUG1("Isend on %s: spawn process ", 
79          MSG_process_get_name(MSG_process_self()));
80
81   sprintf (to,"%s_%s", MSG_process_get_name(MSG_process_self()),
82            xbt_dynar_get_as(action, 2, char *));
83   myargv = (char**) calloc (3, sizeof (char*));
84   
85   myargv[0] = xbt_strdup(to);
86   myargv[1] = xbt_strdup(size);
87   myargv[2] = NULL;
88
89   //    sprintf(spawn_name,"%s_wait",MSG_process_get_name(MSG_process_self()));
90   sprintf(spawn_name,"%s_wait",to);
91   comm_helper =
92     MSG_process_create_with_arguments(spawn_name, spawned_send, 
93                                       NULL, MSG_host_self(), 2, myargv);
94   DEBUG2("%s %f",xbt_str_join(action, " "), MSG_get_clock()-clock);
95 }
96
97
98 static void action_recv(xbt_dynar_t action)
99 {
100   char *name = NULL;
101     char mailbox_name[250];
102   m_task_t task = NULL;
103   double clock = MSG_get_clock();
104   //FIXME: argument of action ignored so far; semantic not clear
105   //char *from=xbt_dynar_get_as(action,2,char*);
106   sprintf (mailbox_name,"%s_%s", xbt_dynar_get_as(action, 2, char *),
107            MSG_process_get_name(MSG_process_self()));
108
109   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
110     name = xbt_str_join(action, " ");
111
112   DEBUG1("Receiving: %s", name);
113   MSG_task_receive(&task, mailbox_name);
114   //  MSG_task_receive(&task, MSG_process_get_name(MSG_process_self()));
115   DEBUG2("%s %f", name, MSG_get_clock()-clock);
116   MSG_task_destroy(task);
117
118   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
119     free(name);
120 }
121
122 static int spawned_recv(int argc, char *argv[])
123 {
124   m_task_t task = NULL;
125   DEBUG1("Receiving on %s", argv[0]);
126   MSG_task_receive(&task, argv[0]);
127   DEBUG1("Received %s", MSG_task_get_name(task));
128   DEBUG1("waiter on %s", MSG_process_get_name(MSG_process_self()));
129   MSG_task_send(MSG_task_create("waiter",0,0,NULL),
130                 MSG_process_get_name(MSG_process_self())); 
131   
132   MSG_task_destroy(task);
133   return 0;
134 }
135
136
137 static void Irecv(xbt_dynar_t action)
138 {
139   char *name;
140   m_process_t comm_helper;
141   char mailbox_name[250];
142   char **myargv;
143   double clock = MSG_get_clock();
144   DEBUG1("Irecv on %s: spawn process ", 
145          MSG_process_get_name(MSG_process_self()));
146
147   sprintf (mailbox_name,"%s_%s", xbt_dynar_get_as(action, 2, char *),
148            MSG_process_get_name(MSG_process_self()));
149   name = bprintf("%s_wait",MSG_process_get_name(MSG_process_self()));
150   myargv = (char**) calloc (2, sizeof (char*));
151   
152   myargv[0] = xbt_strdup(mailbox_name);
153   myargv[1] = NULL;
154   comm_helper = MSG_process_create_with_arguments(name,spawned_recv,
155                                                   NULL, MSG_host_self(),
156                                                   1, myargv);
157
158   DEBUG2("%s %f",  xbt_str_join(action, " "), 
159       MSG_get_clock()-clock);
160  
161   free(name);
162 }
163
164
165 static void action_wait(xbt_dynar_t action)
166 {
167   char *name = NULL;
168   char task_name[80];
169   m_task_t task = NULL;
170   double clock = MSG_get_clock();
171   
172   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
173     name = xbt_str_join(action, " ");
174
175   DEBUG1("Entering %s", name);
176   sprintf(task_name,"%s_wait",MSG_process_get_name(MSG_process_self()));
177   DEBUG1("wait: %s", task_name);
178   MSG_task_receive(&task,task_name);
179   MSG_task_destroy(task);
180   DEBUG2("%s %f", name, MSG_get_clock()-clock);
181   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
182     free(name);
183 }
184
185 /* FIXME: that's a poor man's implementation: we should take the message exchanges into account */
186 smx_sem_t barrier_semaphore=NULL;
187 static void barrier (xbt_dynar_t action)
188 {
189   char *name = NULL;
190
191   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
192     name = xbt_str_join(action, " ");
193
194   DEBUG1("Entering barrier: %s", name);
195   if (barrier_semaphore == NULL)  // first arriving on the barrier
196     barrier_semaphore = SIMIX_sem_init(0);
197
198   if (SIMIX_sem_get_capacity(barrier_semaphore)==-communicator_size +1) { // last arriving
199     SIMIX_sem_release_forever(barrier_semaphore);
200     SIMIX_sem_destroy(barrier_semaphore);
201     barrier_semaphore = NULL;
202   } else { // not last
203     SIMIX_sem_acquire(barrier_semaphore);
204   }
205
206   DEBUG1("Exiting barrier: %s", name);
207
208   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
209     free(name);
210
211 }
212
213 static void reduce(xbt_dynar_t action)
214 {
215   int i;
216   char *name;
217   char task_name[80];
218   char spawn_name[80];
219   char **myargv;
220   char *comm_size = xbt_dynar_get_as(action, 2, char *);
221   char *comp_size = xbt_dynar_get_as(action, 3, char *);
222   m_process_t comm_helper=NULL;
223   m_task_t task=NULL, comp_task=NULL;
224   const char* process_name;
225   double clock = MSG_get_clock();
226   
227   coll_ctr counters =  (coll_ctr) MSG_process_get_data(MSG_process_self());
228
229   xbt_assert0(communicator_size, "Size of Communicator is not defined"
230               ", can't use collective operations");
231
232   process_name = MSG_process_get_name(MSG_process_self());
233
234   if (!counters){
235     DEBUG0("Initialize the counters");
236     counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t));
237   }
238
239   name = bprintf("reduce_%d", counters->reduce_counter++);
240
241   if (!strcmp(process_name, "p0")){
242     DEBUG2("%s: %s is the Root",name, process_name);
243     for(i=1;i<communicator_size;i++){
244       sprintf(spawn_name,"%s_p%d_%s", name, i,
245            MSG_process_get_name(MSG_process_self()));
246       sprintf(task_name,"%s_wait", spawn_name);
247       myargv = (char**) calloc (2, sizeof (char*));
248       
249       myargv[0] = xbt_strdup(spawn_name);
250       myargv[1] = NULL;
251
252       comm_helper = 
253         MSG_process_create_with_arguments(task_name, spawned_recv, 
254                                           NULL, MSG_host_self(),
255                                           1, myargv);
256     }
257
258     for(i=1;i<communicator_size;i++){
259       sprintf(task_name,"%s_p%d_p0_wait", name, i);
260       MSG_task_receive(&task,task_name);
261       MSG_task_destroy(task);
262       task=NULL;
263     }
264
265     comp_task = 
266       MSG_task_create("reduce_comp", parse_double(comp_size), 0, NULL);
267     DEBUG1("%s: computing 'reduce_comp'", name);
268     MSG_task_execute(comp_task);
269     MSG_task_destroy(comp_task);
270     DEBUG1("%s: computed", name);
271   } else {
272     DEBUG2("%s: %s sends", name, process_name);
273     sprintf(task_name,"%s_%s_p0", name, process_name);
274     DEBUG1("put on %s", task_name);
275     MSG_task_send(MSG_task_create(name, 0, parse_double(comm_size), NULL),
276                   task_name);
277   }
278
279   MSG_process_set_data(MSG_process_self(), (void*)counters);
280   DEBUG2("%s %f", xbt_str_join(action, " "), MSG_get_clock()-clock);
281   free(name);
282 }
283
284 static void bcast (xbt_dynar_t action)
285 {
286   int i;
287   char *name;
288   const char* process_name;
289   char task_name[80];
290   char spawn_name[80];
291   char **myargv;
292   m_process_t comm_helper=NULL;
293   m_task_t task=NULL;
294   char *size = xbt_dynar_get_as(action, 2, char *);
295   coll_ctr counters =  (coll_ctr) MSG_process_get_data(MSG_process_self());
296   double clock = MSG_get_clock();
297   
298   xbt_assert0(communicator_size, "Size of Communicator is not defined"
299               ", can't use collective operations");
300
301
302   process_name = MSG_process_get_name(MSG_process_self());
303   if (!counters){
304     DEBUG0("Initialize the counters");
305     counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t));
306   }
307
308   name = bprintf("bcast_%d", counters->bcast_counter++);
309   if (!strcmp(process_name, "p0")){
310     DEBUG2("%s: %s is the Root",name, process_name);
311
312     for(i=1;i<communicator_size;i++){
313       myargv = (char**) calloc (3, sizeof (char*));
314       myargv[0] = xbt_strdup(name);
315       myargv[1] = xbt_strdup(size);
316       myargv[2] = NULL;
317
318       sprintf(spawn_name,"%s_%d", myargv[0], i);
319       comm_helper = 
320         MSG_process_create_with_arguments(spawn_name, spawned_send, 
321                                           NULL, MSG_host_self(), 2, myargv);
322     }
323     
324     for(i=1;i<communicator_size;i++){
325       sprintf(task_name,"p%d_wait", i);
326       DEBUG1("get on %s", task_name);
327       MSG_task_receive(&task,task_name);      
328       MSG_task_destroy(task);
329       task=NULL;
330     }
331     DEBUG2("%s: all messages sent by %s have been received",
332           name, process_name);
333   } else {
334     DEBUG2("%s: %s receives", name, process_name);
335     MSG_task_receive(&task, name);
336     MSG_task_destroy(task);
337     DEBUG2("%s: %s has received", name,process_name);
338     sprintf(task_name,"%s_wait", process_name);
339     DEBUG1("put on %s", task_name);
340     MSG_task_send(MSG_task_create("waiter",0,0,NULL),task_name);
341   }
342
343   MSG_process_set_data(MSG_process_self(), (void*)counters);
344   DEBUG2("%s %f", xbt_str_join(action, " "), MSG_get_clock()-clock);
345   free(name);
346 }
347
348
349 static void action_sleep(xbt_dynar_t action)
350 {
351   char *name = NULL;
352   char *duration = xbt_dynar_get_as(action, 2, char *);
353   double clock = MSG_get_clock();
354
355   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
356     name = xbt_str_join(action, " ");
357
358   DEBUG1("Entering %s", name);
359   MSG_process_sleep(parse_double(duration));
360   DEBUG2("%s %f ", name, MSG_get_clock()-clock);
361
362   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
363     free(name);
364 }
365
366 static void allReduce(xbt_dynar_t action)
367 {
368   int i;
369   char *name;
370   char task_name[80];
371   char spawn_name[80];
372   char **myargv;
373   char *comm_size = xbt_dynar_get_as(action, 2, char *);
374   char *comp_size = xbt_dynar_get_as(action, 3, char *);
375   m_process_t comm_helper=NULL;
376   m_task_t task=NULL, comp_task=NULL;
377   const char* process_name;
378   double clock = MSG_get_clock();
379  
380   coll_ctr counters =  (coll_ctr) MSG_process_get_data(MSG_process_self());
381
382   xbt_assert0(communicator_size, "Size of Communicator is not defined"
383               ", can't use collective operations");
384
385   process_name = MSG_process_get_name(MSG_process_self());
386
387   if (!counters){
388     DEBUG0("Initialize the counters");
389     counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t));
390   }
391
392   name = bprintf("allReduce_%d", counters->allReduce_counter++);
393
394   if (!strcmp(process_name, "p0")){
395     DEBUG2("%s: %s is the Root",name, process_name);
396     for(i=1;i<communicator_size;i++){
397       sprintf(spawn_name,"%s_p%d_%s", name, i,
398            MSG_process_get_name(MSG_process_self()));
399       sprintf(task_name,"%s_wait", spawn_name);
400       myargv = (char**) calloc (2, sizeof (char*));
401       
402       myargv[0] = xbt_strdup(spawn_name);
403       myargv[1] = NULL;
404
405       comm_helper = 
406         MSG_process_create_with_arguments(task_name, spawned_recv, 
407                                           NULL, MSG_host_self(),
408                                           1, myargv);
409     }
410
411     for(i=1;i<communicator_size;i++){
412       sprintf(task_name,"%s_p%d_p0_wait", name, i);
413       MSG_task_receive(&task,task_name);
414       MSG_task_destroy(task);
415       task=NULL;
416     }
417
418     comp_task = 
419       MSG_task_create("allReduce_comp", parse_double(comp_size), 0, NULL);
420     DEBUG1("%s: computing 'reduce_comp'", name);
421     MSG_task_execute(comp_task);
422     MSG_task_destroy(comp_task);
423     DEBUG1("%s: computed", name);
424
425     for(i=1;i<communicator_size;i++){
426       myargv = (char**) calloc (3, sizeof (char*));
427       myargv[0] = xbt_strdup(name);
428       myargv[1] = xbt_strdup(comm_size);
429       myargv[2] = NULL;
430
431       sprintf(spawn_name,"%s_%d", myargv[0], i);
432       comm_helper = 
433         MSG_process_create_with_arguments(spawn_name, spawned_send, 
434                                           NULL, MSG_host_self(), 2, myargv);
435     }
436     
437     for(i=1;i<communicator_size;i++){
438       sprintf(task_name,"p%d_wait", i);
439       DEBUG1("get on %s", task_name);
440       MSG_task_receive(&task,task_name);      
441       MSG_task_destroy(task);
442       task=NULL;
443     }
444     DEBUG2("%s: all messages sent by %s have been received",
445           name, process_name);
446
447   } else {
448     DEBUG2("%s: %s sends", name, process_name);
449     sprintf(task_name,"%s_%s_p0", name, process_name);
450     DEBUG1("put on %s", task_name);
451     MSG_task_send(MSG_task_create(name, 0, parse_double(comm_size), NULL),
452                   task_name);
453
454     MSG_task_receive(&task, name);
455     MSG_task_destroy(task);
456     DEBUG2("%s: %s has received", name,process_name);
457     sprintf(task_name,"%s_wait", process_name);
458     DEBUG1("put on %s", task_name);
459     MSG_task_send(MSG_task_create("waiter",0,0,NULL),task_name);
460
461   }
462
463   MSG_process_set_data(MSG_process_self(), (void*)counters);
464   DEBUG2("%s %f", xbt_str_join(action, " "), MSG_get_clock()-clock);
465   free(name);
466 }
467
468 static void comm_size(xbt_dynar_t action)
469 {
470   char *size = xbt_dynar_get_as(action, 2, char *);
471   communicator_size = parse_double(size);
472 }
473
474 static void compute(xbt_dynar_t action)
475 {
476   char *name=NULL;
477   char *amout = xbt_dynar_get_as(action, 2, char *);
478   m_task_t task = MSG_task_create(name, parse_double(amout), 0, NULL);
479   double clock = MSG_get_clock();
480
481   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
482     name = xbt_str_join(action, " ");
483   DEBUG1("Entering %s", name);
484   MSG_task_execute(task);
485   MSG_task_destroy(task);
486   DEBUG2("%s %f", name, MSG_get_clock()-clock);
487   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
488     free(name);
489 }
490
491 /** Main function */
492 int main(int argc, char *argv[])
493 {
494   MSG_error_t res = MSG_OK;
495   
496   /* Check the given arguments */
497   MSG_global_init(&argc, argv);
498   if (argc < 3) {
499     printf("Usage: %s platform_file deployment_file [action_files]\n", argv[0]);
500     printf("example: %s msg_platform.xml msg_deployment.xml actions # if all actions are in the same file\n",
501            argv[0]);
502     printf("example: %s msg_platform.xml msg_deployment.xml # if actions are in separate files, specified in deployment\n",
503            argv[0]);
504     exit(1);
505   }
506
507   /*  Simulation setting */
508   MSG_create_environment(argv[1]);
509
510   /* No need to register functions as in classical MSG programs: the actions get started anyway */
511   MSG_launch_application(argv[2]);
512
513   /*   Action registration */
514   MSG_action_register("comm_size", comm_size);
515   MSG_action_register("send", action_send);
516   MSG_action_register("Isend", Isend);
517   MSG_action_register("recv", action_recv);
518   MSG_action_register("Irecv", Irecv);
519   MSG_action_register("wait", action_wait);
520   MSG_action_register("barrier", barrier);
521   MSG_action_register("bcast", bcast);
522   MSG_action_register("reduce", reduce);
523   MSG_action_register("allReduce", allReduce);
524   MSG_action_register("sleep", action_sleep);
525   MSG_action_register("compute", compute);
526
527
528   /* Actually do the simulation using MSG_action_trace_run */
529   res = MSG_action_trace_run(argv[3]); // it's ok to pass a NULL argument here
530
531   INFO1("Simulation time %g", MSG_get_clock());
532   MSG_clean();
533
534   if (res == MSG_OK)
535     return 0;
536   else
537     return 1;
538 }                               /* end_of_main */