3 /* Copyright (c) 2009. The SimGrid team. All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "msg/msg.h" /* Yeah! If you want to use msg, you need to include msg/msg.h */
11 #include "xbt.h" /* calloc, printf */
12 #include "simgrid_config.h" /* getline */
14 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,
15 "Messages specific for this msg example");
16 int communicator_size=0;
18 typedef struct coll_ctr_t{
21 int allReduce_counter;
25 static double parse_double(const char *string) {
29 value=strtod(string, &endptr);
31 THROW1(unknown_error, 0, "%s is not a double", string);
37 static void send(xbt_dynar_t action)
39 char *name = xbt_str_join(action, " ");
41 char *size = xbt_dynar_get_as(action, 3, char *);
42 double clock = MSG_get_clock();
43 sprintf (to,"%s_%s", MSG_process_get_name(MSG_process_self()),
44 xbt_dynar_get_as(action, 2, char *));
45 // char *to = xbt_dynar_get_as(action, 2, char *);
46 DEBUG2("Entering Send: %s (size: %lg)", name, parse_double(size));
47 MSG_task_send(MSG_task_create(name, 0, parse_double(size), NULL), to);
48 DEBUG2("%s %f", name, MSG_get_clock()-clock);
53 static int spawned_send(int argc, char *argv[])
55 DEBUG3("%s: Sending %s on %s", MSG_process_get_name(MSG_process_self()),
57 MSG_task_send(MSG_task_create(argv[0], 0, parse_double(argv[1]), NULL),
62 static void Isend(xbt_dynar_t action)
66 // char *to = xbt_dynar_get_as(action, 2, char *);
67 char *size = xbt_dynar_get_as(action, 3, char *);
69 m_process_t comm_helper;
70 double clock = MSG_get_clock();
71 DEBUG1("Isend on %s: spawn process ",
72 MSG_process_get_name(MSG_process_self()));
74 sprintf (to,"%s_%s", MSG_process_get_name(MSG_process_self()),
75 xbt_dynar_get_as(action, 2, char *));
76 myargv = (char**) calloc (3, sizeof (char*));
78 myargv[0] = xbt_strdup(to);
79 myargv[1] = xbt_strdup(size);
82 // sprintf(spawn_name,"%s_wait",MSG_process_get_name(MSG_process_self()));
83 sprintf(spawn_name,"%s_wait",to);
85 MSG_process_create_with_arguments(spawn_name, spawned_send,
86 NULL, MSG_host_self(), 2, myargv);
87 DEBUG2("%s %f",xbt_str_join(action, " "), MSG_get_clock()-clock);
91 static void recv(xbt_dynar_t action)
93 char *name = xbt_str_join(action, " ");
94 char mailbox_name[250];
96 double clock = MSG_get_clock();
97 //FIXME: argument of action ignored so far; semantic not clear
98 //char *from=xbt_dynar_get_as(action,2,char*);
99 sprintf (mailbox_name,"%s_%s", xbt_dynar_get_as(action, 2, char *),
100 MSG_process_get_name(MSG_process_self()));
101 DEBUG1("Receiving: %s", name);
102 MSG_task_receive(&task, mailbox_name);
103 // MSG_task_receive(&task, MSG_process_get_name(MSG_process_self()));
104 DEBUG2("%s %f", name, MSG_get_clock()-clock);
105 MSG_task_destroy(task);
109 static int spawned_recv(int argc, char *argv[])
111 m_task_t task = NULL;
112 DEBUG1("Receiving on %s", argv[0]);
113 MSG_task_receive(&task, argv[0]);
114 DEBUG1("Received %s", MSG_task_get_name(task));
115 DEBUG1("waiter on %s", MSG_process_get_name(MSG_process_self()));
116 MSG_task_send(MSG_task_create("waiter",0,0,NULL),
117 MSG_process_get_name(MSG_process_self()));
119 MSG_task_destroy(task);
124 static void Irecv(xbt_dynar_t action)
126 char *name = xbt_str_join(action, " ");
127 m_process_t comm_helper;
128 char mailbox_name[250];
130 double clock = MSG_get_clock();
131 DEBUG1("Irecv on %s: spawn process ",
132 MSG_process_get_name(MSG_process_self()));
134 sprintf (mailbox_name,"%s_%s", xbt_dynar_get_as(action, 2, char *),
135 MSG_process_get_name(MSG_process_self()));
136 sprintf(name,"%s_wait",MSG_process_get_name(MSG_process_self()));
137 myargv = (char**) calloc (2, sizeof (char*));
139 myargv[0] = xbt_strdup(mailbox_name);
141 comm_helper = MSG_process_create_with_arguments(name,spawned_recv,
142 NULL, MSG_host_self(),
145 DEBUG2("%s %f", xbt_str_join(action, " "),
146 MSG_get_clock()-clock);
152 static void wait_action(xbt_dynar_t action)
154 char *name = xbt_str_join(action, " ");
156 m_task_t task = NULL;
157 double clock = MSG_get_clock();
159 DEBUG1("Entering %s", name);
160 sprintf(task_name,"%s_wait",MSG_process_get_name(MSG_process_self()));
161 DEBUG1("wait: %s", task_name);
162 MSG_task_receive(&task,task_name);
163 MSG_task_destroy(task);
164 DEBUG2("%s %f", name, MSG_get_clock()-clock);
168 static void barrier (xbt_dynar_t action)
170 char *name = xbt_str_join(action, " ");
171 DEBUG1("barrier: %s", name);
178 static void reduce(xbt_dynar_t action)
185 char *comm_size = xbt_dynar_get_as(action, 2, char *);
186 char *comp_size = xbt_dynar_get_as(action, 3, char *);
187 m_process_t comm_helper=NULL;
188 m_task_t task=NULL, comp_task=NULL;
189 const char* process_name;
190 double clock = MSG_get_clock();
192 coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
194 xbt_assert0(communicator_size, "Size of Communicator is not defined"
195 ", can't use collective operations");
197 process_name = MSG_process_get_name(MSG_process_self());
200 DEBUG0("Initialize the counters");
201 counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t));
204 name = bprintf("reduce_%d", counters->reduce_counter++);
206 if (!strcmp(process_name, "p0")){
207 DEBUG2("%s: %s is the Root",name, process_name);
208 for(i=1;i<communicator_size;i++){
209 sprintf(spawn_name,"%s_p%d_%s", name, i,
210 MSG_process_get_name(MSG_process_self()));
211 sprintf(task_name,"%s_wait", spawn_name);
212 myargv = (char**) calloc (2, sizeof (char*));
214 myargv[0] = xbt_strdup(spawn_name);
218 MSG_process_create_with_arguments(task_name, spawned_recv,
219 NULL, MSG_host_self(),
223 for(i=1;i<communicator_size;i++){
224 sprintf(task_name,"%s_p%d_p0_wait", name, i);
225 MSG_task_receive(&task,task_name);
226 MSG_task_destroy(task);
231 MSG_task_create("reduce_comp", parse_double(comp_size), 0, NULL);
232 DEBUG1("%s: computing 'reduce_comp'", name);
233 MSG_task_execute(comp_task);
234 MSG_task_destroy(comp_task);
235 DEBUG1("%s: computed", name);
237 DEBUG2("%s: %s sends", name, process_name);
238 sprintf(task_name,"%s_%s_p0", name, process_name);
239 DEBUG1("put on %s", task_name);
240 MSG_task_send(MSG_task_create(name, 0, parse_double(comm_size), NULL),
244 MSG_process_set_data(MSG_process_self(), (void*)counters);
245 DEBUG2("%s %f", xbt_str_join(action, " "), MSG_get_clock()-clock);
249 static void bcast (xbt_dynar_t action)
253 const char* process_name;
257 m_process_t comm_helper=NULL;
259 char *size = xbt_dynar_get_as(action, 2, char *);
260 coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
261 double clock = MSG_get_clock();
263 xbt_assert0(communicator_size, "Size of Communicator is not defined"
264 ", can't use collective operations");
267 process_name = MSG_process_get_name(MSG_process_self());
269 DEBUG0("Initialize the counters");
270 counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t));
273 name = bprintf("bcast_%d", counters->bcast_counter++);
274 if (!strcmp(process_name, "p0")){
275 DEBUG2("%s: %s is the Root",name, process_name);
277 for(i=1;i<communicator_size;i++){
278 myargv = (char**) calloc (3, sizeof (char*));
279 myargv[0] = xbt_strdup(name);
280 myargv[1] = xbt_strdup(size);
283 sprintf(spawn_name,"%s_%d", myargv[0], i);
285 MSG_process_create_with_arguments(spawn_name, spawned_send,
286 NULL, MSG_host_self(), 2, myargv);
289 for(i=1;i<communicator_size;i++){
290 sprintf(task_name,"p%d_wait", i);
291 DEBUG1("get on %s", task_name);
292 MSG_task_receive(&task,task_name);
293 MSG_task_destroy(task);
296 DEBUG2("%s: all messages sent by %s have been received",
299 DEBUG2("%s: %s receives", name, process_name);
300 MSG_task_receive(&task, name);
301 MSG_task_destroy(task);
302 DEBUG2("%s: %s has received", name,process_name);
303 sprintf(task_name,"%s_wait", process_name);
304 DEBUG1("put on %s", task_name);
305 MSG_task_send(MSG_task_create("waiter",0,0,NULL),task_name);
308 MSG_process_set_data(MSG_process_self(), (void*)counters);
309 DEBUG2("%s %f", xbt_str_join(action, " "), MSG_get_clock()-clock);
314 static void sleep(xbt_dynar_t action)
316 char *name = xbt_str_join(action, " ");
317 char *duration = xbt_dynar_get_as(action, 2, char *);
318 double clock = MSG_get_clock();
319 DEBUG1("Entering %s", name);
320 MSG_process_sleep(parse_double(duration));
321 DEBUG2("%s %f ", name, MSG_get_clock()-clock);
325 static void allReduce(xbt_dynar_t action)
332 char *comm_size = xbt_dynar_get_as(action, 2, char *);
333 char *comp_size = xbt_dynar_get_as(action, 3, char *);
334 m_process_t comm_helper=NULL;
335 m_task_t task=NULL, comp_task=NULL;
336 const char* process_name;
337 double clock = MSG_get_clock();
339 coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
341 xbt_assert0(communicator_size, "Size of Communicator is not defined"
342 ", can't use collective operations");
344 process_name = MSG_process_get_name(MSG_process_self());
347 DEBUG0("Initialize the counters");
348 counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t));
351 name = bprintf("allReduce_%d", counters->allReduce_counter++);
353 if (!strcmp(process_name, "p0")){
354 DEBUG2("%s: %s is the Root",name, process_name);
355 for(i=1;i<communicator_size;i++){
356 sprintf(spawn_name,"%s_p%d_%s", name, i,
357 MSG_process_get_name(MSG_process_self()));
358 sprintf(task_name,"%s_wait", spawn_name);
359 myargv = (char**) calloc (2, sizeof (char*));
361 myargv[0] = xbt_strdup(spawn_name);
365 MSG_process_create_with_arguments(task_name, spawned_recv,
366 NULL, MSG_host_self(),
370 for(i=1;i<communicator_size;i++){
371 sprintf(task_name,"%s_p%d_p0_wait", name, i);
372 MSG_task_receive(&task,task_name);
373 MSG_task_destroy(task);
378 MSG_task_create("allReduce_comp", parse_double(comp_size), 0, NULL);
379 DEBUG1("%s: computing 'reduce_comp'", name);
380 MSG_task_execute(comp_task);
381 MSG_task_destroy(comp_task);
382 DEBUG1("%s: computed", name);
384 for(i=1;i<communicator_size;i++){
385 myargv = (char**) calloc (3, sizeof (char*));
386 myargv[0] = xbt_strdup(name);
387 myargv[1] = xbt_strdup(comm_size);
390 sprintf(spawn_name,"%s_%d", myargv[0], i);
392 MSG_process_create_with_arguments(spawn_name, spawned_send,
393 NULL, MSG_host_self(), 2, myargv);
396 for(i=1;i<communicator_size;i++){
397 sprintf(task_name,"p%d_wait", i);
398 DEBUG1("get on %s", task_name);
399 MSG_task_receive(&task,task_name);
400 MSG_task_destroy(task);
403 DEBUG2("%s: all messages sent by %s have been received",
407 DEBUG2("%s: %s sends", name, process_name);
408 sprintf(task_name,"%s_%s_p0", name, process_name);
409 DEBUG1("put on %s", task_name);
410 MSG_task_send(MSG_task_create(name, 0, parse_double(comm_size), NULL),
413 MSG_task_receive(&task, name);
414 MSG_task_destroy(task);
415 DEBUG2("%s: %s has received", name,process_name);
416 sprintf(task_name,"%s_wait", process_name);
417 DEBUG1("put on %s", task_name);
418 MSG_task_send(MSG_task_create("waiter",0,0,NULL),task_name);
422 MSG_process_set_data(MSG_process_self(), (void*)counters);
423 DEBUG2("%s %f", xbt_str_join(action, " "), MSG_get_clock()-clock);
427 static void comm_size(xbt_dynar_t action)
429 char *size = xbt_dynar_get_as(action, 2, char *);
430 communicator_size = parse_double(size);
433 static void compute(xbt_dynar_t action)
435 char *name = xbt_str_join(action, " ");
436 char *amout = xbt_dynar_get_as(action, 2, char *);
437 m_task_t task = MSG_task_create(name, parse_double(amout), 0, NULL);
438 double clock = MSG_get_clock();
440 DEBUG1("Entering %s", name);
441 MSG_task_execute(task);
442 MSG_task_destroy(task);
443 DEBUG2("%s %f", name, MSG_get_clock()-clock);
448 int main(int argc, char *argv[])
450 MSG_error_t res = MSG_OK;
452 /* Check the given arguments */
453 MSG_global_init(&argc, argv);
455 printf("Usage: %s platform_file deployment_file [action_files]\n", argv[0]);
456 printf("example: %s msg_platform.xml msg_deployment.xml actions # if all actions are in the same file\n",
458 printf("example: %s msg_platform.xml msg_deployment.xml # if actions are in separate files, specified in deployment\n",
463 /* Simulation setting */
464 MSG_create_environment(argv[1]);
466 /* No need to register functions as in classical MSG programs: the actions get started anyway */
467 MSG_launch_application(argv[2]);
469 /* Action registration */
470 MSG_action_register("comm_size", comm_size);
471 MSG_action_register("send", send);
472 MSG_action_register("Isend", Isend);
473 MSG_action_register("recv", recv);
474 MSG_action_register("Irecv", Irecv);
475 MSG_action_register("wait", wait_action);
476 MSG_action_register("barrier", barrier);
477 MSG_action_register("bcast", bcast);
478 MSG_action_register("reduce", reduce);
479 MSG_action_register("allReduce", allReduce);
480 MSG_action_register("sleep", sleep);
481 MSG_action_register("compute", compute);
484 /* Actually do the simulation using MSG_action_trace_run */
485 res = MSG_action_trace_run(argv[3]); // it's ok to pass a NULL argument here
487 INFO1("Simulation time %g", MSG_get_clock());