3 /* Copyright (c) 2009. The SimGrid team. All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "msg/msg.h" /* Yeah! If you want to use msg, you need to include msg/msg.h */
11 #include "xbt.h" /* calloc, printf */
12 #include "simgrid_config.h" /* getline */
14 XBT_LOG_NEW_DEFAULT_CATEGORY(actions,
15 "Messages specific for this msg example");
16 int communicator_size=0;
18 typedef struct coll_ctr_t{
21 int allReduce_counter;
25 static double parse_double(const char *string) {
29 value=strtod(string, &endptr);
31 THROW1(unknown_error, 0, "%s is not a double", string);
37 static void send(xbt_dynar_t action)
41 char *size = xbt_dynar_get_as(action, 3, char *);
42 double clock = MSG_get_clock();
43 sprintf (to,"%s_%s", MSG_process_get_name(MSG_process_self()),
44 xbt_dynar_get_as(action, 2, char *));
45 // char *to = xbt_dynar_get_as(action, 2, char *);
47 if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
48 name = xbt_str_join(action, " ");
50 DEBUG2("Entering Send: %s (size: %lg)", name, parse_double(size));
51 MSG_task_send(MSG_task_create(name, 0, parse_double(size), NULL), to);
52 DEBUG2("%s %f", name, MSG_get_clock()-clock);
54 if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
59 static int spawned_send(int argc, char *argv[])
61 DEBUG3("%s: Sending %s on %s", MSG_process_get_name(MSG_process_self()),
63 MSG_task_send(MSG_task_create(argv[0], 0, parse_double(argv[1]), NULL),
68 static void Isend(xbt_dynar_t action)
72 // char *to = xbt_dynar_get_as(action, 2, char *);
73 char *size = xbt_dynar_get_as(action, 3, char *);
75 m_process_t comm_helper;
76 double clock = MSG_get_clock();
77 DEBUG1("Isend on %s: spawn process ",
78 MSG_process_get_name(MSG_process_self()));
80 sprintf (to,"%s_%s", MSG_process_get_name(MSG_process_self()),
81 xbt_dynar_get_as(action, 2, char *));
82 myargv = (char**) calloc (3, sizeof (char*));
84 myargv[0] = xbt_strdup(to);
85 myargv[1] = xbt_strdup(size);
88 // sprintf(spawn_name,"%s_wait",MSG_process_get_name(MSG_process_self()));
89 sprintf(spawn_name,"%s_wait",to);
91 MSG_process_create_with_arguments(spawn_name, spawned_send,
92 NULL, MSG_host_self(), 2, myargv);
93 DEBUG2("%s %f",xbt_str_join(action, " "), MSG_get_clock()-clock);
97 static void recv(xbt_dynar_t action)
100 char mailbox_name[250];
101 m_task_t task = NULL;
102 double clock = MSG_get_clock();
103 //FIXME: argument of action ignored so far; semantic not clear
104 //char *from=xbt_dynar_get_as(action,2,char*);
105 sprintf (mailbox_name,"%s_%s", xbt_dynar_get_as(action, 2, char *),
106 MSG_process_get_name(MSG_process_self()));
108 if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
109 name = xbt_str_join(action, " ");
111 DEBUG1("Receiving: %s", name);
112 MSG_task_receive(&task, mailbox_name);
113 // MSG_task_receive(&task, MSG_process_get_name(MSG_process_self()));
114 DEBUG2("%s %f", name, MSG_get_clock()-clock);
115 MSG_task_destroy(task);
117 if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
121 static int spawned_recv(int argc, char *argv[])
123 m_task_t task = NULL;
124 DEBUG1("Receiving on %s", argv[0]);
125 MSG_task_receive(&task, argv[0]);
126 DEBUG1("Received %s", MSG_task_get_name(task));
127 DEBUG1("waiter on %s", MSG_process_get_name(MSG_process_self()));
128 MSG_task_send(MSG_task_create("waiter",0,0,NULL),
129 MSG_process_get_name(MSG_process_self()));
131 MSG_task_destroy(task);
136 static void Irecv(xbt_dynar_t action)
139 m_process_t comm_helper;
140 char mailbox_name[250];
142 double clock = MSG_get_clock();
143 DEBUG1("Irecv on %s: spawn process ",
144 MSG_process_get_name(MSG_process_self()));
146 sprintf (mailbox_name,"%s_%s", xbt_dynar_get_as(action, 2, char *),
147 MSG_process_get_name(MSG_process_self()));
148 name = bprintf("%s_wait",MSG_process_get_name(MSG_process_self()));
149 myargv = (char**) calloc (2, sizeof (char*));
151 myargv[0] = xbt_strdup(mailbox_name);
153 comm_helper = MSG_process_create_with_arguments(name,spawned_recv,
154 NULL, MSG_host_self(),
157 DEBUG2("%s %f", xbt_str_join(action, " "),
158 MSG_get_clock()-clock);
164 static void wait_action(xbt_dynar_t action)
168 m_task_t task = NULL;
169 double clock = MSG_get_clock();
171 if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
172 name = xbt_str_join(action, " ");
174 DEBUG1("Entering %s", name);
175 sprintf(task_name,"%s_wait",MSG_process_get_name(MSG_process_self()));
176 DEBUG1("wait: %s", task_name);
177 MSG_task_receive(&task,task_name);
178 MSG_task_destroy(task);
179 DEBUG2("%s %f", name, MSG_get_clock()-clock);
180 if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
184 static void barrier (xbt_dynar_t action)
188 if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
189 name = xbt_str_join(action, " ");
191 DEBUG1("barrier: %s", name);
195 if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
200 static void reduce(xbt_dynar_t action)
207 char *comm_size = xbt_dynar_get_as(action, 2, char *);
208 char *comp_size = xbt_dynar_get_as(action, 3, char *);
209 m_process_t comm_helper=NULL;
210 m_task_t task=NULL, comp_task=NULL;
211 const char* process_name;
212 double clock = MSG_get_clock();
214 coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
216 xbt_assert0(communicator_size, "Size of Communicator is not defined"
217 ", can't use collective operations");
219 process_name = MSG_process_get_name(MSG_process_self());
222 DEBUG0("Initialize the counters");
223 counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t));
226 name = bprintf("reduce_%d", counters->reduce_counter++);
228 if (!strcmp(process_name, "p0")){
229 DEBUG2("%s: %s is the Root",name, process_name);
230 for(i=1;i<communicator_size;i++){
231 sprintf(spawn_name,"%s_p%d_%s", name, i,
232 MSG_process_get_name(MSG_process_self()));
233 sprintf(task_name,"%s_wait", spawn_name);
234 myargv = (char**) calloc (2, sizeof (char*));
236 myargv[0] = xbt_strdup(spawn_name);
240 MSG_process_create_with_arguments(task_name, spawned_recv,
241 NULL, MSG_host_self(),
245 for(i=1;i<communicator_size;i++){
246 sprintf(task_name,"%s_p%d_p0_wait", name, i);
247 MSG_task_receive(&task,task_name);
248 MSG_task_destroy(task);
253 MSG_task_create("reduce_comp", parse_double(comp_size), 0, NULL);
254 DEBUG1("%s: computing 'reduce_comp'", name);
255 MSG_task_execute(comp_task);
256 MSG_task_destroy(comp_task);
257 DEBUG1("%s: computed", name);
259 DEBUG2("%s: %s sends", name, process_name);
260 sprintf(task_name,"%s_%s_p0", name, process_name);
261 DEBUG1("put on %s", task_name);
262 MSG_task_send(MSG_task_create(name, 0, parse_double(comm_size), NULL),
266 MSG_process_set_data(MSG_process_self(), (void*)counters);
267 DEBUG2("%s %f", xbt_str_join(action, " "), MSG_get_clock()-clock);
271 static void bcast (xbt_dynar_t action)
275 const char* process_name;
279 m_process_t comm_helper=NULL;
281 char *size = xbt_dynar_get_as(action, 2, char *);
282 coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
283 double clock = MSG_get_clock();
285 xbt_assert0(communicator_size, "Size of Communicator is not defined"
286 ", can't use collective operations");
289 process_name = MSG_process_get_name(MSG_process_self());
291 DEBUG0("Initialize the counters");
292 counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t));
295 name = bprintf("bcast_%d", counters->bcast_counter++);
296 if (!strcmp(process_name, "p0")){
297 DEBUG2("%s: %s is the Root",name, process_name);
299 for(i=1;i<communicator_size;i++){
300 myargv = (char**) calloc (3, sizeof (char*));
301 myargv[0] = xbt_strdup(name);
302 myargv[1] = xbt_strdup(size);
305 sprintf(spawn_name,"%s_%d", myargv[0], i);
307 MSG_process_create_with_arguments(spawn_name, spawned_send,
308 NULL, MSG_host_self(), 2, myargv);
311 for(i=1;i<communicator_size;i++){
312 sprintf(task_name,"p%d_wait", i);
313 DEBUG1("get on %s", task_name);
314 MSG_task_receive(&task,task_name);
315 MSG_task_destroy(task);
318 DEBUG2("%s: all messages sent by %s have been received",
321 DEBUG2("%s: %s receives", name, process_name);
322 MSG_task_receive(&task, name);
323 MSG_task_destroy(task);
324 DEBUG2("%s: %s has received", name,process_name);
325 sprintf(task_name,"%s_wait", process_name);
326 DEBUG1("put on %s", task_name);
327 MSG_task_send(MSG_task_create("waiter",0,0,NULL),task_name);
330 MSG_process_set_data(MSG_process_self(), (void*)counters);
331 DEBUG2("%s %f", xbt_str_join(action, " "), MSG_get_clock()-clock);
336 static void sleep(xbt_dynar_t action)
339 char *duration = xbt_dynar_get_as(action, 2, char *);
340 double clock = MSG_get_clock();
342 if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
343 name = xbt_str_join(action, " ");
345 DEBUG1("Entering %s", name);
346 MSG_process_sleep(parse_double(duration));
347 DEBUG2("%s %f ", name, MSG_get_clock()-clock);
349 if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
353 static void allReduce(xbt_dynar_t action)
360 char *comm_size = xbt_dynar_get_as(action, 2, char *);
361 char *comp_size = xbt_dynar_get_as(action, 3, char *);
362 m_process_t comm_helper=NULL;
363 m_task_t task=NULL, comp_task=NULL;
364 const char* process_name;
365 double clock = MSG_get_clock();
367 coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
369 xbt_assert0(communicator_size, "Size of Communicator is not defined"
370 ", can't use collective operations");
372 process_name = MSG_process_get_name(MSG_process_self());
375 DEBUG0("Initialize the counters");
376 counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t));
379 name = bprintf("allReduce_%d", counters->allReduce_counter++);
381 if (!strcmp(process_name, "p0")){
382 DEBUG2("%s: %s is the Root",name, process_name);
383 for(i=1;i<communicator_size;i++){
384 sprintf(spawn_name,"%s_p%d_%s", name, i,
385 MSG_process_get_name(MSG_process_self()));
386 sprintf(task_name,"%s_wait", spawn_name);
387 myargv = (char**) calloc (2, sizeof (char*));
389 myargv[0] = xbt_strdup(spawn_name);
393 MSG_process_create_with_arguments(task_name, spawned_recv,
394 NULL, MSG_host_self(),
398 for(i=1;i<communicator_size;i++){
399 sprintf(task_name,"%s_p%d_p0_wait", name, i);
400 MSG_task_receive(&task,task_name);
401 MSG_task_destroy(task);
406 MSG_task_create("allReduce_comp", parse_double(comp_size), 0, NULL);
407 DEBUG1("%s: computing 'reduce_comp'", name);
408 MSG_task_execute(comp_task);
409 MSG_task_destroy(comp_task);
410 DEBUG1("%s: computed", name);
412 for(i=1;i<communicator_size;i++){
413 myargv = (char**) calloc (3, sizeof (char*));
414 myargv[0] = xbt_strdup(name);
415 myargv[1] = xbt_strdup(comm_size);
418 sprintf(spawn_name,"%s_%d", myargv[0], i);
420 MSG_process_create_with_arguments(spawn_name, spawned_send,
421 NULL, MSG_host_self(), 2, myargv);
424 for(i=1;i<communicator_size;i++){
425 sprintf(task_name,"p%d_wait", i);
426 DEBUG1("get on %s", task_name);
427 MSG_task_receive(&task,task_name);
428 MSG_task_destroy(task);
431 DEBUG2("%s: all messages sent by %s have been received",
435 DEBUG2("%s: %s sends", name, process_name);
436 sprintf(task_name,"%s_%s_p0", name, process_name);
437 DEBUG1("put on %s", task_name);
438 MSG_task_send(MSG_task_create(name, 0, parse_double(comm_size), NULL),
441 MSG_task_receive(&task, name);
442 MSG_task_destroy(task);
443 DEBUG2("%s: %s has received", name,process_name);
444 sprintf(task_name,"%s_wait", process_name);
445 DEBUG1("put on %s", task_name);
446 MSG_task_send(MSG_task_create("waiter",0,0,NULL),task_name);
450 MSG_process_set_data(MSG_process_self(), (void*)counters);
451 DEBUG2("%s %f", xbt_str_join(action, " "), MSG_get_clock()-clock);
455 static void comm_size(xbt_dynar_t action)
457 char *size = xbt_dynar_get_as(action, 2, char *);
458 communicator_size = parse_double(size);
461 static void compute(xbt_dynar_t action)
464 char *amout = xbt_dynar_get_as(action, 2, char *);
465 m_task_t task = MSG_task_create(name, parse_double(amout), 0, NULL);
466 double clock = MSG_get_clock();
468 if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
469 name = xbt_str_join(action, " ");
470 DEBUG1("Entering %s", name);
471 MSG_task_execute(task);
472 MSG_task_destroy(task);
473 DEBUG2("%s %f", name, MSG_get_clock()-clock);
474 if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
479 int main(int argc, char *argv[])
481 MSG_error_t res = MSG_OK;
483 /* Check the given arguments */
484 MSG_global_init(&argc, argv);
486 printf("Usage: %s platform_file deployment_file [action_files]\n", argv[0]);
487 printf("example: %s msg_platform.xml msg_deployment.xml actions # if all actions are in the same file\n",
489 printf("example: %s msg_platform.xml msg_deployment.xml # if actions are in separate files, specified in deployment\n",
494 /* Simulation setting */
495 MSG_create_environment(argv[1]);
497 /* No need to register functions as in classical MSG programs: the actions get started anyway */
498 MSG_launch_application(argv[2]);
500 /* Action registration */
501 MSG_action_register("comm_size", comm_size);
502 MSG_action_register("send", send);
503 MSG_action_register("Isend", Isend);
504 MSG_action_register("recv", recv);
505 MSG_action_register("Irecv", Irecv);
506 MSG_action_register("wait", wait_action);
507 MSG_action_register("barrier", barrier);
508 MSG_action_register("bcast", bcast);
509 MSG_action_register("reduce", reduce);
510 MSG_action_register("allReduce", allReduce);
511 MSG_action_register("sleep", sleep);
512 MSG_action_register("compute", compute);
515 /* Actually do the simulation using MSG_action_trace_run */
516 res = MSG_action_trace_run(argv[3]); // it's ok to pass a NULL argument here
518 INFO1("Simulation time %g", MSG_get_clock());