Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
02fcb18eacc5046e6198b64a74f324197f4f4b27
[simgrid.git] / examples / msg / actions / actions.c
1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include "msg/msg.h"            /* Yeah! If you want to use msg, you need to include msg/msg.h */
10 #include "simix/simix.h"        /* semaphores for the barrier */
11 #include "xbt.h"                /* calloc, printf */
12 #include "simgrid_config.h"     /* getline */
13
14 XBT_LOG_NEW_DEFAULT_CATEGORY(actions,
15                  "Messages specific for this msg example");
16 int communicator_size=0;
17
18 typedef struct coll_ctr_t{
19   int bcast_counter;
20   int reduce_counter;
21   int allReduce_counter;
22 } *coll_ctr;
23
24 /* Helper function */
25 static double parse_double(const char *string) {
26   double value;
27   char *endptr;
28
29   value=strtod(string, &endptr);
30   if (*endptr != '\0')
31           THROW1(unknown_error, 0, "%s is not a double", string);
32   return value;
33 }
34
35
36 /* My actions */
37 static void action_send(xbt_dynar_t action)
38 {
39   char *name = NULL;
40   char to[250];
41   char *size = xbt_dynar_get_as(action, 3, char *);
42   double clock = MSG_get_clock();
43   sprintf (to,"%s_%s", MSG_process_get_name(MSG_process_self()),
44            xbt_dynar_get_as(action, 2, char *));
45   //  char *to =  xbt_dynar_get_as(action, 2, char *);
46
47   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
48     name = xbt_str_join(action, " ");
49
50   DEBUG2("Entering Send: %s (size: %lg)", name, parse_double(size));
51   MSG_task_send(MSG_task_create(name, 0, parse_double(size), NULL), to);
52   DEBUG2("%s %f", name, MSG_get_clock()-clock);
53
54   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
55     free(name);
56 }
57
58
59 static int spawned_send(int argc, char *argv[])
60 {
61   DEBUG3("%s: Sending %s on %s", MSG_process_get_name(MSG_process_self()), 
62         argv[1],argv[0]);
63   MSG_task_send(MSG_task_create(argv[0], 0, parse_double(argv[1]), NULL), 
64                 argv[0]);
65   return 0;
66 }
67
68 static void Isend(xbt_dynar_t action)
69 {
70   char spawn_name[80];
71   char to[250];
72   //  char *to = xbt_dynar_get_as(action, 2, char *);
73   char *size = xbt_dynar_get_as(action, 3, char *);
74   char **myargv;
75   m_process_t comm_helper;
76   double clock = MSG_get_clock();
77   DEBUG1("Isend on %s: spawn process ", 
78          MSG_process_get_name(MSG_process_self()));
79
80   sprintf (to,"%s_%s", MSG_process_get_name(MSG_process_self()),
81            xbt_dynar_get_as(action, 2, char *));
82   myargv = (char**) calloc (3, sizeof (char*));
83   
84   myargv[0] = xbt_strdup(to);
85   myargv[1] = xbt_strdup(size);
86   myargv[2] = NULL;
87
88   //    sprintf(spawn_name,"%s_wait",MSG_process_get_name(MSG_process_self()));
89   sprintf(spawn_name,"%s_wait",to);
90   comm_helper =
91     MSG_process_create_with_arguments(spawn_name, spawned_send, 
92                                       NULL, MSG_host_self(), 2, myargv);
93   DEBUG2("%s %f",xbt_str_join(action, " "), MSG_get_clock()-clock);
94 }
95
96
97 static void action_recv(xbt_dynar_t action)
98 {
99   char *name = NULL;
100     char mailbox_name[250];
101   m_task_t task = NULL;
102   double clock = MSG_get_clock();
103   //FIXME: argument of action ignored so far; semantic not clear
104   //char *from=xbt_dynar_get_as(action,2,char*);
105   sprintf (mailbox_name,"%s_%s", xbt_dynar_get_as(action, 2, char *),
106            MSG_process_get_name(MSG_process_self()));
107
108   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
109     name = xbt_str_join(action, " ");
110
111   DEBUG1("Receiving: %s", name);
112   MSG_task_receive(&task, mailbox_name);
113   //  MSG_task_receive(&task, MSG_process_get_name(MSG_process_self()));
114   DEBUG2("%s %f", name, MSG_get_clock()-clock);
115   MSG_task_destroy(task);
116
117   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
118     free(name);
119 }
120
121 static int spawned_recv(int argc, char *argv[])
122 {
123   m_task_t task = NULL;
124   DEBUG1("Receiving on %s", argv[0]);
125   MSG_task_receive(&task, argv[0]);
126   DEBUG1("Received %s", MSG_task_get_name(task));
127   DEBUG1("waiter on %s", MSG_process_get_name(MSG_process_self()));
128   MSG_task_send(MSG_task_create("waiter",0,0,NULL),
129                 MSG_process_get_name(MSG_process_self())); 
130   
131   MSG_task_destroy(task);
132   return 0;
133 }
134
135
136 static void Irecv(xbt_dynar_t action)
137 {
138   char *name;
139   m_process_t comm_helper;
140   char mailbox_name[250];
141   char **myargv;
142   double clock = MSG_get_clock();
143   DEBUG1("Irecv on %s: spawn process ", 
144          MSG_process_get_name(MSG_process_self()));
145
146   sprintf (mailbox_name,"%s_%s", xbt_dynar_get_as(action, 2, char *),
147            MSG_process_get_name(MSG_process_self()));
148   name = bprintf("%s_wait",MSG_process_get_name(MSG_process_self()));
149   myargv = (char**) calloc (2, sizeof (char*));
150   
151   myargv[0] = xbt_strdup(mailbox_name);
152   myargv[1] = NULL;
153   comm_helper = MSG_process_create_with_arguments(name,spawned_recv,
154                                                   NULL, MSG_host_self(),
155                                                   1, myargv);
156
157   DEBUG2("%s %f",  xbt_str_join(action, " "), 
158       MSG_get_clock()-clock);
159  
160   free(name);
161 }
162
163
164 static void action_wait(xbt_dynar_t action)
165 {
166   char *name = NULL;
167   char task_name[80];
168   m_task_t task = NULL;
169   double clock = MSG_get_clock();
170   
171   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
172     name = xbt_str_join(action, " ");
173
174   DEBUG1("Entering %s", name);
175   sprintf(task_name,"%s_wait",MSG_process_get_name(MSG_process_self()));
176   DEBUG1("wait: %s", task_name);
177   MSG_task_receive(&task,task_name);
178   MSG_task_destroy(task);
179   DEBUG2("%s %f", name, MSG_get_clock()-clock);
180   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
181     free(name);
182 }
183
184 /* FIXME: that's a poor man's implementation: we should take the message exchanges into account */
185 smx_sem_t barrier_semaphore=NULL;
186 static void barrier (xbt_dynar_t action)
187 {
188   char *name = NULL;
189
190   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
191     name = xbt_str_join(action, " ");
192
193   DEBUG1("Entering barrier: %s", name);
194   if (barrier_semaphore == NULL)  // first arriving on the barrier
195     barrier_semaphore = SIMIX_sem_init(0);
196
197   if (SIMIX_sem_get_capacity(barrier_semaphore)==-communicator_size +1) { // last arriving
198     SIMIX_sem_release_forever(barrier_semaphore);
199     SIMIX_sem_destroy(barrier_semaphore);
200     barrier_semaphore = NULL;
201   } else { // not last
202     SIMIX_sem_acquire(barrier_semaphore);
203   }
204
205   DEBUG1("Exiting barrier: %s", name);
206
207   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
208     free(name);
209
210 }
211
212 static void reduce(xbt_dynar_t action)
213 {
214   int i;
215   char *name;
216   char task_name[80];
217   char spawn_name[80];
218   char **myargv;
219   char *comm_size = xbt_dynar_get_as(action, 2, char *);
220   char *comp_size = xbt_dynar_get_as(action, 3, char *);
221   m_process_t comm_helper=NULL;
222   m_task_t task=NULL, comp_task=NULL;
223   const char* process_name;
224   double clock = MSG_get_clock();
225   
226   coll_ctr counters =  (coll_ctr) MSG_process_get_data(MSG_process_self());
227
228   xbt_assert0(communicator_size, "Size of Communicator is not defined"
229               ", can't use collective operations");
230
231   process_name = MSG_process_get_name(MSG_process_self());
232
233   if (!counters){
234     DEBUG0("Initialize the counters");
235     counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t));
236   }
237
238   name = bprintf("reduce_%d", counters->reduce_counter++);
239
240   if (!strcmp(process_name, "p0")){
241     DEBUG2("%s: %s is the Root",name, process_name);
242     for(i=1;i<communicator_size;i++){
243       sprintf(spawn_name,"%s_p%d_%s", name, i,
244            MSG_process_get_name(MSG_process_self()));
245       sprintf(task_name,"%s_wait", spawn_name);
246       myargv = (char**) calloc (2, sizeof (char*));
247       
248       myargv[0] = xbt_strdup(spawn_name);
249       myargv[1] = NULL;
250
251       comm_helper = 
252         MSG_process_create_with_arguments(task_name, spawned_recv, 
253                                           NULL, MSG_host_self(),
254                                           1, myargv);
255     }
256
257     for(i=1;i<communicator_size;i++){
258       sprintf(task_name,"%s_p%d_p0_wait", name, i);
259       MSG_task_receive(&task,task_name);
260       MSG_task_destroy(task);
261       task=NULL;
262     }
263
264     comp_task = 
265       MSG_task_create("reduce_comp", parse_double(comp_size), 0, NULL);
266     DEBUG1("%s: computing 'reduce_comp'", name);
267     MSG_task_execute(comp_task);
268     MSG_task_destroy(comp_task);
269     DEBUG1("%s: computed", name);
270   } else {
271     DEBUG2("%s: %s sends", name, process_name);
272     sprintf(task_name,"%s_%s_p0", name, process_name);
273     DEBUG1("put on %s", task_name);
274     MSG_task_send(MSG_task_create(name, 0, parse_double(comm_size), NULL),
275                   task_name);
276   }
277
278   MSG_process_set_data(MSG_process_self(), (void*)counters);
279   DEBUG2("%s %f", xbt_str_join(action, " "), MSG_get_clock()-clock);
280   free(name);
281 }
282
283 static void bcast (xbt_dynar_t action)
284 {
285   int i;
286   char *name;
287   const char* process_name;
288   char task_name[80];
289   char spawn_name[80];
290   char **myargv;
291   m_process_t comm_helper=NULL;
292   m_task_t task=NULL;
293   char *size = xbt_dynar_get_as(action, 2, char *);
294   coll_ctr counters =  (coll_ctr) MSG_process_get_data(MSG_process_self());
295   double clock = MSG_get_clock();
296   
297   xbt_assert0(communicator_size, "Size of Communicator is not defined"
298               ", can't use collective operations");
299
300
301   process_name = MSG_process_get_name(MSG_process_self());
302   if (!counters){
303     DEBUG0("Initialize the counters");
304     counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t));
305   }
306
307   name = bprintf("bcast_%d", counters->bcast_counter++);
308   if (!strcmp(process_name, "p0")){
309     DEBUG2("%s: %s is the Root",name, process_name);
310
311     for(i=1;i<communicator_size;i++){
312       myargv = (char**) calloc (3, sizeof (char*));
313       myargv[0] = xbt_strdup(name);
314       myargv[1] = xbt_strdup(size);
315       myargv[2] = NULL;
316
317       sprintf(spawn_name,"%s_%d", myargv[0], i);
318       comm_helper = 
319         MSG_process_create_with_arguments(spawn_name, spawned_send, 
320                                           NULL, MSG_host_self(), 2, myargv);
321     }
322     
323     for(i=1;i<communicator_size;i++){
324       sprintf(task_name,"p%d_wait", i);
325       DEBUG1("get on %s", task_name);
326       MSG_task_receive(&task,task_name);      
327       MSG_task_destroy(task);
328       task=NULL;
329     }
330     DEBUG2("%s: all messages sent by %s have been received",
331           name, process_name);
332   } else {
333     DEBUG2("%s: %s receives", name, process_name);
334     MSG_task_receive(&task, name);
335     MSG_task_destroy(task);
336     DEBUG2("%s: %s has received", name,process_name);
337     sprintf(task_name,"%s_wait", process_name);
338     DEBUG1("put on %s", task_name);
339     MSG_task_send(MSG_task_create("waiter",0,0,NULL),task_name);
340   }
341
342   MSG_process_set_data(MSG_process_self(), (void*)counters);
343   DEBUG2("%s %f", xbt_str_join(action, " "), MSG_get_clock()-clock);
344   free(name);
345 }
346
347
348 static void action_sleep(xbt_dynar_t action)
349 {
350   char *name = NULL;
351   char *duration = xbt_dynar_get_as(action, 2, char *);
352   double clock = MSG_get_clock();
353
354   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
355     name = xbt_str_join(action, " ");
356
357   DEBUG1("Entering %s", name);
358   MSG_process_sleep(parse_double(duration));
359   DEBUG2("%s %f ", name, MSG_get_clock()-clock);
360
361   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
362     free(name);
363 }
364
365 static void allReduce(xbt_dynar_t action)
366 {
367   int i;
368   char *name;
369   char task_name[80];
370   char spawn_name[80];
371   char **myargv;
372   char *comm_size = xbt_dynar_get_as(action, 2, char *);
373   char *comp_size = xbt_dynar_get_as(action, 3, char *);
374   m_process_t comm_helper=NULL;
375   m_task_t task=NULL, comp_task=NULL;
376   const char* process_name;
377   double clock = MSG_get_clock();
378  
379   coll_ctr counters =  (coll_ctr) MSG_process_get_data(MSG_process_self());
380
381   xbt_assert0(communicator_size, "Size of Communicator is not defined"
382               ", can't use collective operations");
383
384   process_name = MSG_process_get_name(MSG_process_self());
385
386   if (!counters){
387     DEBUG0("Initialize the counters");
388     counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t));
389   }
390
391   name = bprintf("allReduce_%d", counters->allReduce_counter++);
392
393   if (!strcmp(process_name, "p0")){
394     DEBUG2("%s: %s is the Root",name, process_name);
395     for(i=1;i<communicator_size;i++){
396       sprintf(spawn_name,"%s_p%d_%s", name, i,
397            MSG_process_get_name(MSG_process_self()));
398       sprintf(task_name,"%s_wait", spawn_name);
399       myargv = (char**) calloc (2, sizeof (char*));
400       
401       myargv[0] = xbt_strdup(spawn_name);
402       myargv[1] = NULL;
403
404       comm_helper = 
405         MSG_process_create_with_arguments(task_name, spawned_recv, 
406                                           NULL, MSG_host_self(),
407                                           1, myargv);
408     }
409
410     for(i=1;i<communicator_size;i++){
411       sprintf(task_name,"%s_p%d_p0_wait", name, i);
412       MSG_task_receive(&task,task_name);
413       MSG_task_destroy(task);
414       task=NULL;
415     }
416
417     comp_task = 
418       MSG_task_create("allReduce_comp", parse_double(comp_size), 0, NULL);
419     DEBUG1("%s: computing 'reduce_comp'", name);
420     MSG_task_execute(comp_task);
421     MSG_task_destroy(comp_task);
422     DEBUG1("%s: computed", name);
423
424     for(i=1;i<communicator_size;i++){
425       myargv = (char**) calloc (3, sizeof (char*));
426       myargv[0] = xbt_strdup(name);
427       myargv[1] = xbt_strdup(comm_size);
428       myargv[2] = NULL;
429
430       sprintf(spawn_name,"%s_%d", myargv[0], i);
431       comm_helper = 
432         MSG_process_create_with_arguments(spawn_name, spawned_send, 
433                                           NULL, MSG_host_self(), 2, myargv);
434     }
435     
436     for(i=1;i<communicator_size;i++){
437       sprintf(task_name,"p%d_wait", i);
438       DEBUG1("get on %s", task_name);
439       MSG_task_receive(&task,task_name);      
440       MSG_task_destroy(task);
441       task=NULL;
442     }
443     DEBUG2("%s: all messages sent by %s have been received",
444           name, process_name);
445
446   } else {
447     DEBUG2("%s: %s sends", name, process_name);
448     sprintf(task_name,"%s_%s_p0", name, process_name);
449     DEBUG1("put on %s", task_name);
450     MSG_task_send(MSG_task_create(name, 0, parse_double(comm_size), NULL),
451                   task_name);
452
453     MSG_task_receive(&task, name);
454     MSG_task_destroy(task);
455     DEBUG2("%s: %s has received", name,process_name);
456     sprintf(task_name,"%s_wait", process_name);
457     DEBUG1("put on %s", task_name);
458     MSG_task_send(MSG_task_create("waiter",0,0,NULL),task_name);
459
460   }
461
462   MSG_process_set_data(MSG_process_self(), (void*)counters);
463   DEBUG2("%s %f", xbt_str_join(action, " "), MSG_get_clock()-clock);
464   free(name);
465 }
466
467 static void comm_size(xbt_dynar_t action)
468 {
469   char *size = xbt_dynar_get_as(action, 2, char *);
470   communicator_size = parse_double(size);
471 }
472
473 static void compute(xbt_dynar_t action)
474 {
475   char *name=NULL;
476   char *amout = xbt_dynar_get_as(action, 2, char *);
477   m_task_t task = MSG_task_create(name, parse_double(amout), 0, NULL);
478   double clock = MSG_get_clock();
479
480   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
481     name = xbt_str_join(action, " ");
482   DEBUG1("Entering %s", name);
483   MSG_task_execute(task);
484   MSG_task_destroy(task);
485   DEBUG2("%s %f", name, MSG_get_clock()-clock);
486   if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
487     free(name);
488 }
489
490 /** Main function */
491 int main(int argc, char *argv[])
492 {
493   MSG_error_t res = MSG_OK;
494   
495   /* Check the given arguments */
496   MSG_global_init(&argc, argv);
497   if (argc < 3) {
498     printf("Usage: %s platform_file deployment_file [action_files]\n", argv[0]);
499     printf("example: %s msg_platform.xml msg_deployment.xml actions # if all actions are in the same file\n",
500            argv[0]);
501     printf("example: %s msg_platform.xml msg_deployment.xml # if actions are in separate files, specified in deployment\n",
502            argv[0]);
503     exit(1);
504   }
505
506   /*  Simulation setting */
507   MSG_create_environment(argv[1]);
508
509   /* No need to register functions as in classical MSG programs: the actions get started anyway */
510   MSG_launch_application(argv[2]);
511
512   /*   Action registration */
513   MSG_action_register("comm_size", comm_size);
514   MSG_action_register("send", action_send);
515   MSG_action_register("Isend", Isend);
516   MSG_action_register("recv", action_recv);
517   MSG_action_register("Irecv", Irecv);
518   MSG_action_register("wait", action_wait);
519   MSG_action_register("barrier", barrier);
520   MSG_action_register("bcast", bcast);
521   MSG_action_register("reduce", reduce);
522   MSG_action_register("allReduce", allReduce);
523   MSG_action_register("sleep", action_sleep);
524   MSG_action_register("compute", compute);
525
526
527   /* Actually do the simulation using MSG_action_trace_run */
528   res = MSG_action_trace_run(argv[3]); // it's ok to pass a NULL argument here
529
530   INFO1("Simulation time %g", MSG_get_clock());
531   MSG_clean();
532
533   if (res == MSG_OK)
534     return 0;
535   else
536     return 1;
537 }                               /* end_of_main */