1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
9 #include "msg/msg.h" /* Yeah! If you want to use msg, you need to include msg/msg.h */
10 #include "simix/simix.h" /* semaphores for the barrier */
11 #include "xbt.h" /* calloc, printf */
12 #include "simgrid_config.h" /* getline */
14 XBT_LOG_NEW_DEFAULT_CATEGORY(actions,
15 "Messages specific for this msg example");
16 int communicator_size=0;
18 typedef struct coll_ctr_t{
21 int allReduce_counter;
25 static double parse_double(const char *string) {
29 value=strtod(string, &endptr);
31 THROW1(unknown_error, 0, "%s is not a double", string);
37 static void action_send(xbt_dynar_t action)
41 char *size = xbt_dynar_get_as(action, 3, char *);
42 double clock = MSG_get_clock();
43 sprintf (to,"%s_%s", MSG_process_get_name(MSG_process_self()),
44 xbt_dynar_get_as(action, 2, char *));
45 // char *to = xbt_dynar_get_as(action, 2, char *);
47 if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
48 name = xbt_str_join(action, " ");
50 DEBUG2("Entering Send: %s (size: %lg)", name, parse_double(size));
51 MSG_task_send(MSG_task_create(name, 0, parse_double(size), NULL), to);
52 DEBUG2("%s %f", name, MSG_get_clock()-clock);
54 if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
59 static int spawned_send(int argc, char *argv[])
61 DEBUG3("%s: Sending %s on %s", MSG_process_get_name(MSG_process_self()),
63 MSG_task_send(MSG_task_create(argv[0], 0, parse_double(argv[1]), NULL),
68 static void Isend(xbt_dynar_t action)
72 // char *to = xbt_dynar_get_as(action, 2, char *);
73 char *size = xbt_dynar_get_as(action, 3, char *);
75 m_process_t comm_helper;
76 double clock = MSG_get_clock();
77 DEBUG1("Isend on %s: spawn process ",
78 MSG_process_get_name(MSG_process_self()));
80 sprintf (to,"%s_%s", MSG_process_get_name(MSG_process_self()),
81 xbt_dynar_get_as(action, 2, char *));
82 myargv = (char**) calloc (3, sizeof (char*));
84 myargv[0] = xbt_strdup(to);
85 myargv[1] = xbt_strdup(size);
88 // sprintf(spawn_name,"%s_wait",MSG_process_get_name(MSG_process_self()));
89 sprintf(spawn_name,"%s_wait",to);
91 MSG_process_create_with_arguments(spawn_name, spawned_send,
92 NULL, MSG_host_self(), 2, myargv);
93 DEBUG2("%s %f",xbt_str_join(action, " "), MSG_get_clock()-clock);
97 static void action_recv(xbt_dynar_t action)
100 char mailbox_name[250];
101 m_task_t task = NULL;
102 double clock = MSG_get_clock();
103 //FIXME: argument of action ignored so far; semantic not clear
104 //char *from=xbt_dynar_get_as(action,2,char*);
105 sprintf (mailbox_name,"%s_%s", xbt_dynar_get_as(action, 2, char *),
106 MSG_process_get_name(MSG_process_self()));
108 if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
109 name = xbt_str_join(action, " ");
111 DEBUG1("Receiving: %s", name);
112 MSG_task_receive(&task, mailbox_name);
113 // MSG_task_receive(&task, MSG_process_get_name(MSG_process_self()));
114 DEBUG2("%s %f", name, MSG_get_clock()-clock);
115 MSG_task_destroy(task);
117 if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
121 static int spawned_recv(int argc, char *argv[])
123 m_task_t task = NULL;
124 DEBUG1("Receiving on %s", argv[0]);
125 MSG_task_receive(&task, argv[0]);
126 DEBUG1("Received %s", MSG_task_get_name(task));
127 DEBUG1("waiter on %s", MSG_process_get_name(MSG_process_self()));
128 MSG_task_send(MSG_task_create("waiter",0,0,NULL),
129 MSG_process_get_name(MSG_process_self()));
131 MSG_task_destroy(task);
136 static void Irecv(xbt_dynar_t action)
139 m_process_t comm_helper;
140 char mailbox_name[250];
142 double clock = MSG_get_clock();
143 DEBUG1("Irecv on %s: spawn process ",
144 MSG_process_get_name(MSG_process_self()));
146 sprintf (mailbox_name,"%s_%s", xbt_dynar_get_as(action, 2, char *),
147 MSG_process_get_name(MSG_process_self()));
148 name = bprintf("%s_wait",MSG_process_get_name(MSG_process_self()));
149 myargv = (char**) calloc (2, sizeof (char*));
151 myargv[0] = xbt_strdup(mailbox_name);
153 comm_helper = MSG_process_create_with_arguments(name,spawned_recv,
154 NULL, MSG_host_self(),
157 DEBUG2("%s %f", xbt_str_join(action, " "),
158 MSG_get_clock()-clock);
164 static void action_wait(xbt_dynar_t action)
168 m_task_t task = NULL;
169 double clock = MSG_get_clock();
171 if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
172 name = xbt_str_join(action, " ");
174 DEBUG1("Entering %s", name);
175 sprintf(task_name,"%s_wait",MSG_process_get_name(MSG_process_self()));
176 DEBUG1("wait: %s", task_name);
177 MSG_task_receive(&task,task_name);
178 MSG_task_destroy(task);
179 DEBUG2("%s %f", name, MSG_get_clock()-clock);
180 if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
184 /* FIXME: that's a poor man's implementation: we should take the message exchanges into account */
185 smx_sem_t barrier_semaphore=NULL;
186 static void barrier (xbt_dynar_t action)
190 if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
191 name = xbt_str_join(action, " ");
193 DEBUG1("Entering barrier: %s", name);
194 if (barrier_semaphore == NULL) // first arriving on the barrier
195 barrier_semaphore = SIMIX_sem_init(0);
197 if (SIMIX_sem_get_capacity(barrier_semaphore)==-communicator_size +1) { // last arriving
198 SIMIX_sem_release_forever(barrier_semaphore);
199 SIMIX_sem_destroy(barrier_semaphore);
200 barrier_semaphore = NULL;
202 SIMIX_sem_acquire(barrier_semaphore);
205 DEBUG1("Exiting barrier: %s", name);
207 if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
212 static void reduce(xbt_dynar_t action)
219 char *comm_size = xbt_dynar_get_as(action, 2, char *);
220 char *comp_size = xbt_dynar_get_as(action, 3, char *);
221 m_process_t comm_helper=NULL;
222 m_task_t task=NULL, comp_task=NULL;
223 const char* process_name;
224 double clock = MSG_get_clock();
226 coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
228 xbt_assert0(communicator_size, "Size of Communicator is not defined"
229 ", can't use collective operations");
231 process_name = MSG_process_get_name(MSG_process_self());
234 DEBUG0("Initialize the counters");
235 counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t));
238 name = bprintf("reduce_%d", counters->reduce_counter++);
240 if (!strcmp(process_name, "p0")){
241 DEBUG2("%s: %s is the Root",name, process_name);
242 for(i=1;i<communicator_size;i++){
243 sprintf(spawn_name,"%s_p%d_%s", name, i,
244 MSG_process_get_name(MSG_process_self()));
245 sprintf(task_name,"%s_wait", spawn_name);
246 myargv = (char**) calloc (2, sizeof (char*));
248 myargv[0] = xbt_strdup(spawn_name);
252 MSG_process_create_with_arguments(task_name, spawned_recv,
253 NULL, MSG_host_self(),
257 for(i=1;i<communicator_size;i++){
258 sprintf(task_name,"%s_p%d_p0_wait", name, i);
259 MSG_task_receive(&task,task_name);
260 MSG_task_destroy(task);
265 MSG_task_create("reduce_comp", parse_double(comp_size), 0, NULL);
266 DEBUG1("%s: computing 'reduce_comp'", name);
267 MSG_task_execute(comp_task);
268 MSG_task_destroy(comp_task);
269 DEBUG1("%s: computed", name);
271 DEBUG2("%s: %s sends", name, process_name);
272 sprintf(task_name,"%s_%s_p0", name, process_name);
273 DEBUG1("put on %s", task_name);
274 MSG_task_send(MSG_task_create(name, 0, parse_double(comm_size), NULL),
278 MSG_process_set_data(MSG_process_self(), (void*)counters);
279 DEBUG2("%s %f", xbt_str_join(action, " "), MSG_get_clock()-clock);
283 static void bcast (xbt_dynar_t action)
287 const char* process_name;
291 m_process_t comm_helper=NULL;
293 char *size = xbt_dynar_get_as(action, 2, char *);
294 coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
295 double clock = MSG_get_clock();
297 xbt_assert0(communicator_size, "Size of Communicator is not defined"
298 ", can't use collective operations");
301 process_name = MSG_process_get_name(MSG_process_self());
303 DEBUG0("Initialize the counters");
304 counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t));
307 name = bprintf("bcast_%d", counters->bcast_counter++);
308 if (!strcmp(process_name, "p0")){
309 DEBUG2("%s: %s is the Root",name, process_name);
311 for(i=1;i<communicator_size;i++){
312 myargv = (char**) calloc (3, sizeof (char*));
313 myargv[0] = xbt_strdup(name);
314 myargv[1] = xbt_strdup(size);
317 sprintf(spawn_name,"%s_%d", myargv[0], i);
319 MSG_process_create_with_arguments(spawn_name, spawned_send,
320 NULL, MSG_host_self(), 2, myargv);
323 for(i=1;i<communicator_size;i++){
324 sprintf(task_name,"p%d_wait", i);
325 DEBUG1("get on %s", task_name);
326 MSG_task_receive(&task,task_name);
327 MSG_task_destroy(task);
330 DEBUG2("%s: all messages sent by %s have been received",
333 DEBUG2("%s: %s receives", name, process_name);
334 MSG_task_receive(&task, name);
335 MSG_task_destroy(task);
336 DEBUG2("%s: %s has received", name,process_name);
337 sprintf(task_name,"%s_wait", process_name);
338 DEBUG1("put on %s", task_name);
339 MSG_task_send(MSG_task_create("waiter",0,0,NULL),task_name);
342 MSG_process_set_data(MSG_process_self(), (void*)counters);
343 DEBUG2("%s %f", xbt_str_join(action, " "), MSG_get_clock()-clock);
348 static void action_sleep(xbt_dynar_t action)
351 char *duration = xbt_dynar_get_as(action, 2, char *);
352 double clock = MSG_get_clock();
354 if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
355 name = xbt_str_join(action, " ");
357 DEBUG1("Entering %s", name);
358 MSG_process_sleep(parse_double(duration));
359 DEBUG2("%s %f ", name, MSG_get_clock()-clock);
361 if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
365 static void allReduce(xbt_dynar_t action)
372 char *comm_size = xbt_dynar_get_as(action, 2, char *);
373 char *comp_size = xbt_dynar_get_as(action, 3, char *);
374 m_process_t comm_helper=NULL;
375 m_task_t task=NULL, comp_task=NULL;
376 const char* process_name;
377 double clock = MSG_get_clock();
379 coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
381 xbt_assert0(communicator_size, "Size of Communicator is not defined"
382 ", can't use collective operations");
384 process_name = MSG_process_get_name(MSG_process_self());
387 DEBUG0("Initialize the counters");
388 counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t));
391 name = bprintf("allReduce_%d", counters->allReduce_counter++);
393 if (!strcmp(process_name, "p0")){
394 DEBUG2("%s: %s is the Root",name, process_name);
395 for(i=1;i<communicator_size;i++){
396 sprintf(spawn_name,"%s_p%d_%s", name, i,
397 MSG_process_get_name(MSG_process_self()));
398 sprintf(task_name,"%s_wait", spawn_name);
399 myargv = (char**) calloc (2, sizeof (char*));
401 myargv[0] = xbt_strdup(spawn_name);
405 MSG_process_create_with_arguments(task_name, spawned_recv,
406 NULL, MSG_host_self(),
410 for(i=1;i<communicator_size;i++){
411 sprintf(task_name,"%s_p%d_p0_wait", name, i);
412 MSG_task_receive(&task,task_name);
413 MSG_task_destroy(task);
418 MSG_task_create("allReduce_comp", parse_double(comp_size), 0, NULL);
419 DEBUG1("%s: computing 'reduce_comp'", name);
420 MSG_task_execute(comp_task);
421 MSG_task_destroy(comp_task);
422 DEBUG1("%s: computed", name);
424 for(i=1;i<communicator_size;i++){
425 myargv = (char**) calloc (3, sizeof (char*));
426 myargv[0] = xbt_strdup(name);
427 myargv[1] = xbt_strdup(comm_size);
430 sprintf(spawn_name,"%s_%d", myargv[0], i);
432 MSG_process_create_with_arguments(spawn_name, spawned_send,
433 NULL, MSG_host_self(), 2, myargv);
436 for(i=1;i<communicator_size;i++){
437 sprintf(task_name,"p%d_wait", i);
438 DEBUG1("get on %s", task_name);
439 MSG_task_receive(&task,task_name);
440 MSG_task_destroy(task);
443 DEBUG2("%s: all messages sent by %s have been received",
447 DEBUG2("%s: %s sends", name, process_name);
448 sprintf(task_name,"%s_%s_p0", name, process_name);
449 DEBUG1("put on %s", task_name);
450 MSG_task_send(MSG_task_create(name, 0, parse_double(comm_size), NULL),
453 MSG_task_receive(&task, name);
454 MSG_task_destroy(task);
455 DEBUG2("%s: %s has received", name,process_name);
456 sprintf(task_name,"%s_wait", process_name);
457 DEBUG1("put on %s", task_name);
458 MSG_task_send(MSG_task_create("waiter",0,0,NULL),task_name);
462 MSG_process_set_data(MSG_process_self(), (void*)counters);
463 DEBUG2("%s %f", xbt_str_join(action, " "), MSG_get_clock()-clock);
467 static void comm_size(xbt_dynar_t action)
469 char *size = xbt_dynar_get_as(action, 2, char *);
470 communicator_size = parse_double(size);
473 static void compute(xbt_dynar_t action)
476 char *amout = xbt_dynar_get_as(action, 2, char *);
477 m_task_t task = MSG_task_create(name, parse_double(amout), 0, NULL);
478 double clock = MSG_get_clock();
480 if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
481 name = xbt_str_join(action, " ");
482 DEBUG1("Entering %s", name);
483 MSG_task_execute(task);
484 MSG_task_destroy(task);
485 DEBUG2("%s %f", name, MSG_get_clock()-clock);
486 if (XBT_LOG_ISENABLED(actions,xbt_log_priority_debug))
491 int main(int argc, char *argv[])
493 MSG_error_t res = MSG_OK;
495 /* Check the given arguments */
496 MSG_global_init(&argc, argv);
498 printf("Usage: %s platform_file deployment_file [action_files]\n", argv[0]);
499 printf("example: %s msg_platform.xml msg_deployment.xml actions # if all actions are in the same file\n",
501 printf("example: %s msg_platform.xml msg_deployment.xml # if actions are in separate files, specified in deployment\n",
506 /* Simulation setting */
507 MSG_create_environment(argv[1]);
509 /* No need to register functions as in classical MSG programs: the actions get started anyway */
510 MSG_launch_application(argv[2]);
512 /* Action registration */
513 MSG_action_register("comm_size", comm_size);
514 MSG_action_register("send", action_send);
515 MSG_action_register("Isend", Isend);
516 MSG_action_register("recv", action_recv);
517 MSG_action_register("Irecv", Irecv);
518 MSG_action_register("wait", action_wait);
519 MSG_action_register("barrier", barrier);
520 MSG_action_register("bcast", bcast);
521 MSG_action_register("reduce", reduce);
522 MSG_action_register("allReduce", allReduce);
523 MSG_action_register("sleep", action_sleep);
524 MSG_action_register("compute", compute);
527 /* Actually do the simulation using MSG_action_trace_run */
528 res = MSG_action_trace_run(argv[3]); // it's ok to pass a NULL argument here
530 INFO1("Simulation time %g", MSG_get_clock());