3 /* Copyright (c) 2009. The SimGrid team. All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "msg/msg.h" /* Yeah! If you want to use msg, you need to include msg/msg.h */
11 #include "xbt.h" /* calloc, printf */
12 #include "simgrid_config.h" /* getline */
14 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,
15 "Messages specific for this msg example");
16 int communicator_size=0;
18 typedef struct coll_ctr_t{
21 int allReduce_counter;
25 static double parse_double(const char *string) {
29 value=strtod(string, &endptr);
31 THROW1(unknown_error, 0, "%s is not a double", string);
37 static void send(xbt_dynar_t action)
39 char *name = xbt_str_join(action, " ");
41 char *size = xbt_dynar_get_as(action, 3, char *);
42 double clock = MSG_get_clock();
43 sprintf (to,"%s_%s", MSG_process_get_name(MSG_process_self()),
44 xbt_dynar_get_as(action, 2, char *));
45 // char *to = xbt_dynar_get_as(action, 2, char *);
46 DEBUG2("Entering Send: %s (size: %lg)", name, parse_double(size));
47 MSG_task_send(MSG_task_create(name, 0, parse_double(size), NULL), to);
48 INFO2("%s %f", name, MSG_get_clock()-clock);
53 static int spawned_send(int argc, char *argv[])
55 DEBUG3("%s: Sending %s on %s", MSG_process_get_name(MSG_process_self()),
57 MSG_task_send(MSG_task_create(argv[0], 0, parse_double(argv[1]), NULL),
62 static void Isend(xbt_dynar_t action)
66 // char *to = xbt_dynar_get_as(action, 2, char *);
67 char *size = xbt_dynar_get_as(action, 3, char *);
69 m_process_t comm_helper;
70 double clock = MSG_get_clock();
71 DEBUG1("Isend on %s: spawn process ",
72 MSG_process_get_name(MSG_process_self()));
74 sprintf (to,"%s_%s", MSG_process_get_name(MSG_process_self()),
75 xbt_dynar_get_as(action, 2, char *));
76 myargv = (char**) calloc (3, sizeof (char*));
78 myargv[0] = xbt_strdup(to);
79 myargv[1] = xbt_strdup(size);
82 // sprintf(spawn_name,"%s_wait",MSG_process_get_name(MSG_process_self()));
83 sprintf(spawn_name,"%s_wait",to);
85 MSG_process_create_with_arguments(spawn_name, spawned_send,
86 NULL, MSG_host_self(), 2, myargv);
87 INFO2("%s %f",xbt_str_join(action, " "), MSG_get_clock()-clock);
91 static void recv(xbt_dynar_t action)
93 char *name = xbt_str_join(action, " ");
94 char mailbox_name[250];
96 double clock = MSG_get_clock();
97 //FIXME: argument of action ignored so far; semantic not clear
98 //char *from=xbt_dynar_get_as(action,2,char*);
99 sprintf (mailbox_name,"%s_%s", xbt_dynar_get_as(action, 2, char *),
100 MSG_process_get_name(MSG_process_self()));
101 DEBUG1("Receiving: %s", name);
102 MSG_task_receive(&task, mailbox_name);
103 // MSG_task_receive(&task, MSG_process_get_name(MSG_process_self()));
104 INFO2("%s %f", name, MSG_get_clock()-clock);
105 MSG_task_destroy(task);
109 static int spawned_recv(int argc, char *argv[])
111 m_task_t task = NULL;
112 DEBUG1("Receiving on %s", argv[0]);
113 MSG_task_receive(&task, argv[0]);
114 DEBUG1("Received %s", MSG_task_get_name(task));
115 DEBUG1("waiter on %s", MSG_process_get_name(MSG_process_self()));
116 MSG_task_send(MSG_task_create("waiter",0,0,NULL),
117 MSG_process_get_name(MSG_process_self()));
119 MSG_task_destroy(task);
124 static void Irecv(xbt_dynar_t action)
126 char *name = xbt_str_join(action, " ");
127 m_process_t comm_helper;
128 char mailbox_name[250];
130 double clock = MSG_get_clock();
131 DEBUG1("Irecv on %s: spawn process ",
132 MSG_process_get_name(MSG_process_self()));
134 sprintf (mailbox_name,"%s_%s", xbt_dynar_get_as(action, 2, char *),
135 MSG_process_get_name(MSG_process_self()));
136 sprintf(name,"%s_wait",MSG_process_get_name(MSG_process_self()));
137 myargv = (char**) calloc (2, sizeof (char*));
139 myargv[0] = xbt_strdup(mailbox_name);
141 comm_helper = MSG_process_create_with_arguments(name,spawned_recv,
142 NULL, MSG_host_self(),
145 INFO2("%s %f", xbt_str_join(action, " "),
146 MSG_get_clock()-clock);
152 static void wait_action(xbt_dynar_t action)
154 char *name = xbt_str_join(action, " ");
156 m_task_t task = NULL;
157 double clock = MSG_get_clock();
159 DEBUG1("Entering %s", name);
160 sprintf(task_name,"%s_wait",MSG_process_get_name(MSG_process_self()));
161 INFO1("wait: %s", task_name);
162 MSG_task_receive(&task,task_name);
163 MSG_task_destroy(task);
164 INFO2("%s %f", name, MSG_get_clock()-clock);
168 static void barrier (xbt_dynar_t action)
170 char *name = xbt_str_join(action, " ");
171 INFO1("barrier: %s", name);
178 static void reduce(xbt_dynar_t action)
184 char *comm_size = xbt_dynar_get_as(action, 2, char *);
185 char *comp_size = xbt_dynar_get_as(action, 3, char *);
186 m_process_t comm_helper=NULL;
187 m_task_t task=NULL, comp_task=NULL;
188 const char* process_name;
189 double clock = MSG_get_clock();
191 coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
193 xbt_assert0(communicator_size, "Size of Communicator is not defined"
194 ", can't use collective operations");
196 process_name = MSG_process_get_name(MSG_process_self());
199 DEBUG0("Initialize the counters");
200 counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t));
203 name = bprintf("reduce_%d", counters->reduce_counter++);
205 if (!strcmp(process_name, "process0")){
206 DEBUG2("%s: %s is the Root",name, process_name);
207 for(i=1;i<communicator_size;i++){
208 sprintf(spawn_name,"%s_process%d", name, i);
209 sprintf(task_name,"%s_wait", spawn_name);
211 MSG_process_create(task_name, spawned_recv,
212 (void *) xbt_strdup(spawn_name),
216 for(i=1;i<communicator_size;i++){
217 sprintf(task_name,"%s_process%d_wait", name, i);
218 MSG_task_receive(&task,task_name);
219 MSG_task_destroy(task);
224 MSG_task_create("reduce_comp", parse_double(comp_size), 0, NULL);
225 DEBUG1("%s: computing 'reduce_comp'", name);
226 MSG_task_execute(comp_task);
227 MSG_task_destroy(comp_task);
228 DEBUG1("%s: computed", name);
230 DEBUG2("%s: %s sends", name, process_name);
231 sprintf(task_name,"%s_%s", name, process_name);
232 DEBUG1("put on %s", task_name);
233 MSG_task_send(MSG_task_create(name, 0, parse_double(comm_size), NULL),
237 MSG_process_set_data(MSG_process_self(), (void*)counters);
238 INFO2("%s %f", xbt_str_join(action, " "), MSG_get_clock()-clock);
242 static void bcast (xbt_dynar_t action)
246 const char* process_name;
250 m_process_t comm_helper=NULL;
252 char *size = xbt_dynar_get_as(action, 2, char *);
253 coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
254 double clock = MSG_get_clock();
256 xbt_assert0(communicator_size, "Size of Communicator is not defined"
257 ", can't use collective operations");
260 process_name = MSG_process_get_name(MSG_process_self());
262 DEBUG0("Initialize the counters");
263 counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t));
266 name = bprintf("bcast_%d", counters->bcast_counter++);
267 if (!strcmp(process_name, "process0")){
268 DEBUG2("%s: %s is the Root",name, process_name);
270 for(i=1;i<communicator_size;i++){
271 myargv = (char**) calloc (3, sizeof (char*));
272 myargv[0] = xbt_strdup(name);
273 myargv[1] = xbt_strdup(size);
276 sprintf(spawn_name,"%s_%d", myargv[0], i);
278 MSG_process_create_with_arguments(spawn_name, spawned_send,
279 NULL, MSG_host_self(), 2, myargv);
282 for(i=1;i<communicator_size;i++){
283 sprintf(task_name,"process%d_wait", i);
284 DEBUG1("get on %s", task_name);
285 MSG_task_receive(&task,task_name);
286 MSG_task_destroy(task);
289 DEBUG2("%s: all messages sent by %s have been received",
292 DEBUG2("%s: %s receives", name, process_name);
293 MSG_task_receive(&task, name);
294 MSG_task_destroy(task);
295 DEBUG2("%s: %s has received", name,process_name);
296 sprintf(task_name,"%s_wait", process_name);
297 DEBUG1("put on %s", task_name);
298 MSG_task_send(MSG_task_create("waiter",0,0,NULL),task_name);
301 MSG_process_set_data(MSG_process_self(), (void*)counters);
302 INFO2("%s %f", xbt_str_join(action, " "), MSG_get_clock()-clock);
307 static void sleep(xbt_dynar_t action)
309 char *name = xbt_str_join(action, " ");
310 char *duration = xbt_dynar_get_as(action, 2, char *);
311 double clock = MSG_get_clock();
312 DEBUG1("Entering %s", name);
313 MSG_process_sleep(parse_double(duration));
314 INFO2("%s %f ", name, MSG_get_clock()-clock);
318 static void allReduce(xbt_dynar_t action)
325 char *comm_size = xbt_dynar_get_as(action, 2, char *);
326 char *comp_size = xbt_dynar_get_as(action, 3, char *);
327 m_process_t comm_helper=NULL;
328 m_task_t task=NULL, comp_task=NULL;
329 const char* process_name;
330 double clock = MSG_get_clock();
332 coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
334 xbt_assert0(communicator_size, "Size of Communicator is not defined"
335 ", can't use collective operations");
337 process_name = MSG_process_get_name(MSG_process_self());
340 DEBUG0("Initialize the counters");
341 counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t));
344 name = bprintf("allReduce_%d", counters->allReduce_counter++);
346 if (!strcmp(process_name, "process0")){
347 DEBUG2("%s: %s is the Root",name, process_name);
348 for(i=1;i<communicator_size;i++){
349 sprintf(spawn_name,"%s_process%d", name, i);
350 sprintf(task_name,"%s_wait", spawn_name);
352 MSG_process_create(task_name, spawned_recv,
353 (void *) xbt_strdup(spawn_name),
357 for(i=1;i<communicator_size;i++){
358 sprintf(task_name,"%s_process%d_wait", name, i);
359 MSG_task_receive(&task,task_name);
360 MSG_task_destroy(task);
365 MSG_task_create("allReduce_comp", parse_double(comp_size), 0, NULL);
366 DEBUG1("%s: computing 'reduce_comp'", name);
367 MSG_task_execute(comp_task);
368 MSG_task_destroy(comp_task);
369 DEBUG1("%s: computed", name);
371 for(i=1;i<communicator_size;i++){
372 myargv = (char**) calloc (3, sizeof (char*));
373 myargv[0] = xbt_strdup(name);
374 myargv[1] = xbt_strdup(comm_size);
377 sprintf(spawn_name,"%s_%d", myargv[0], i);
379 MSG_process_create_with_arguments(spawn_name, spawned_send,
380 NULL, MSG_host_self(), 2, myargv);
383 for(i=1;i<communicator_size;i++){
384 sprintf(task_name,"process%d_wait", i);
385 DEBUG1("get on %s", task_name);
386 MSG_task_receive(&task,task_name);
387 MSG_task_destroy(task);
390 DEBUG2("%s: all messages sent by %s have been received",
394 DEBUG2("%s: %s sends", name, process_name);
395 sprintf(task_name,"%s_%s", name, process_name);
396 DEBUG1("put on %s", task_name);
397 MSG_task_send(MSG_task_create(name, 0, parse_double(comm_size), NULL),
400 MSG_task_receive(&task, name);
401 MSG_task_destroy(task);
402 DEBUG2("%s: %s has received", name,process_name);
403 sprintf(task_name,"%s_wait", process_name);
404 DEBUG1("put on %s", task_name);
405 MSG_task_send(MSG_task_create("waiter",0,0,NULL),task_name);
409 MSG_process_set_data(MSG_process_self(), (void*)counters);
410 INFO2("%s %f", xbt_str_join(action, " "), MSG_get_clock()-clock);
414 static void comm_size(xbt_dynar_t action)
416 char *size = xbt_dynar_get_as(action, 2, char *);
417 communicator_size = parse_double(size);
420 static void compute(xbt_dynar_t action)
422 char *name = xbt_str_join(action, " ");
423 char *amout = xbt_dynar_get_as(action, 2, char *);
424 m_task_t task = MSG_task_create(name, parse_double(amout), 0, NULL);
425 double clock = MSG_get_clock();
427 DEBUG1("Entering %s", name);
428 MSG_task_execute(task);
429 MSG_task_destroy(task);
430 INFO2("%s %f", name, MSG_get_clock()-clock);
435 int main(int argc, char *argv[])
437 MSG_error_t res = MSG_OK;
439 /* Check the given arguments */
440 MSG_global_init(&argc, argv);
442 printf("Usage: %s platform_file deployment_file action_files\n", argv[0]);
443 printf("example: %s msg_platform.xml msg_deployment.xml actions\n",
448 /* Simulation setting */
449 MSG_create_environment(argv[1]);
451 /* No need to register functions as in classical MSG programs: the actions get started anyway */
452 MSG_launch_application(argv[2]);
454 /* Action registration */
455 MSG_action_register("comm_size", comm_size);
456 MSG_action_register("send", send);
457 MSG_action_register("Isend", Isend);
458 MSG_action_register("recv", recv);
459 MSG_action_register("Irecv", Irecv);
460 MSG_action_register("wait", wait_action);
461 MSG_action_register("barrier", barrier);
462 MSG_action_register("bcast", bcast);
463 MSG_action_register("reduce", reduce);
464 MSG_action_register("allReduce", allReduce);
465 MSG_action_register("sleep", sleep);
466 MSG_action_register("compute", compute);
469 /* Actually do the simulation using MSG_action_trace_run */
470 res = MSG_action_trace_run(argv[3]);
472 INFO1("Simulation time %g", MSG_get_clock());