3 /* Copyright (c) 2009. The SimGrid team. All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "msg/msg.h" /* Yeah! If you want to use msg, you need to include msg/msg.h */
11 #include "xbt.h" /* calloc, printf */
12 #include "simgrid_config.h" /* getline */
14 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,
15 "Messages specific for this msg example");
16 int communicator_size=0;
18 typedef struct coll_ctr_t{
21 int allReduce_counter;
25 static double parse_double(const char *string) {
29 value=strtod(string, &endptr);
31 THROW1(unknown_error, 0, "%s is not a double", string);
37 static void send(xbt_dynar_t action)
39 char *name = xbt_str_join(action, " ");
40 char *to = xbt_dynar_get_as(action, 2, char *);
41 char *size = xbt_dynar_get_as(action, 3, char *);
42 double clock = MSG_get_clock();
43 DEBUG2("Entering Send: %s (size: %lg)", name, parse_double(size));
44 MSG_task_send(MSG_task_create(name, 0, parse_double(size), NULL), to);
45 INFO2("%s %f", name, MSG_get_clock()-clock);
50 static int spawned_send(int argc, char *argv[])
52 DEBUG3("%s: Sending %s on %s", MSG_process_self()->name,
54 MSG_task_send(MSG_task_create(argv[0], 0, parse_double(argv[1]), NULL),
59 static void Isend(xbt_dynar_t action)
62 char *to = xbt_dynar_get_as(action, 2, char *);
63 char *size = xbt_dynar_get_as(action, 3, char *);
65 m_process_t comm_helper;
66 double clock = MSG_get_clock();
67 DEBUG1("Isend on %s: spawn process ",
68 MSG_process_get_name(MSG_process_self()));
70 myargv = (char**) calloc (3, sizeof (char*));
72 myargv[0] = xbt_strdup(to);
73 myargv[1] = xbt_strdup(size);
76 sprintf(spawn_name,"%s_wait",MSG_process_get_name(MSG_process_self()));
78 MSG_process_create_with_arguments(spawn_name, spawned_send,
79 NULL, MSG_host_self(), 2, myargv);
80 INFO2("%s %f",xbt_str_join(action, " "), MSG_get_clock()-clock);
84 static void recv(xbt_dynar_t action)
86 char *name = xbt_str_join(action, " ");
88 double clock = MSG_get_clock();
89 //FIXME: argument of action ignored so far; semantic not clear
90 //char *from=xbt_dynar_get_as(action,2,char*);
92 DEBUG1("Receiving: %s", name);
93 MSG_task_receive(&task, MSG_process_get_name(MSG_process_self()));
94 INFO2("%s %f", name, MSG_get_clock()-clock);
95 MSG_task_destroy(task);
99 static int spawned_recv(int argc, char *argv[])
101 m_task_t task = NULL;
102 char* name = (char *) MSG_process_get_data(MSG_process_self());
103 DEBUG1("Receiving on %s", name);
104 MSG_task_receive(&task, name);
105 DEBUG1("Received %s", MSG_task_get_name(task));
106 DEBUG1("waiter on %s", MSG_process_get_name(MSG_process_self()));
107 MSG_task_send(MSG_task_create("waiter",0,0,NULL),MSG_process_get_name(MSG_process_self()));
109 MSG_task_destroy(task);
114 static void Irecv(xbt_dynar_t action)
116 char *name = xbt_str_join(action, " ");
117 m_process_t comm_helper;
118 double clock = MSG_get_clock();
119 DEBUG1("Irecv on %s: spawn process ",
120 MSG_process_get_name(MSG_process_self()));
122 sprintf(name,"%s_wait",MSG_process_get_name(MSG_process_self()));
123 comm_helper = MSG_process_create(name,spawned_recv,
124 (void *) MSG_process_get_name(MSG_process_self()),
127 INFO2("%s %f", xbt_str_join(action, " "),
128 MSG_get_clock()-clock);
134 static void wait(xbt_dynar_t action)
136 char *name = xbt_str_join(action, " ");
138 m_task_t task = NULL;
139 double clock = MSG_get_clock();
141 DEBUG1("Entering %s", name);
142 sprintf(task_name,"%s_wait",MSG_process_get_name(MSG_process_self()));
143 MSG_task_receive(&task,task_name);
144 MSG_task_destroy(task);
145 INFO2("%s %f", name, MSG_get_clock()-clock);
149 static void barrier (xbt_dynar_t action)
151 char *name = xbt_str_join(action, " ");
152 INFO1("barrier: %s", name);
159 static void reduce(xbt_dynar_t action)
165 char *comm_size = xbt_dynar_get_as(action, 2, char *);
166 char *comp_size = xbt_dynar_get_as(action, 3, char *);
167 m_process_t comm_helper=NULL;
168 m_task_t task=NULL, comp_task=NULL;
169 const char* process_name;
170 double clock = MSG_get_clock();
172 coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
174 xbt_assert0(communicator_size, "Size of Communicator is not defined"
175 ", can't use collective operations");
177 process_name = MSG_process_get_name(MSG_process_self());
180 DEBUG0("Initialize the counters");
181 counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t));
184 name = bprintf("reduce_%d", counters->reduce_counter++);
186 if (!strcmp(process_name, "process0")){
187 DEBUG2("%s: %s is the Root",name, process_name);
188 for(i=1;i<communicator_size;i++){
189 sprintf(spawn_name,"%s_process%d", name, i);
190 sprintf(task_name,"%s_wait", spawn_name);
192 MSG_process_create(task_name, spawned_recv,
193 (void *) xbt_strdup(spawn_name),
197 for(i=1;i<communicator_size;i++){
198 sprintf(task_name,"%s_process%d_wait", name, i);
199 MSG_task_receive(&task,task_name);
200 MSG_task_destroy(task);
205 MSG_task_create("reduce_comp", parse_double(comp_size), 0, NULL);
206 DEBUG1("%s: computing 'reduce_comp'", name);
207 MSG_task_execute(comp_task);
208 MSG_task_destroy(comp_task);
209 DEBUG1("%s: computed", name);
211 DEBUG2("%s: %s sends", name, process_name);
212 sprintf(task_name,"%s_%s", name, process_name);
213 DEBUG1("put on %s", task_name);
214 MSG_task_send(MSG_task_create(name, 0, parse_double(comm_size), NULL),
218 MSG_process_set_data(MSG_process_self(), (void*)counters);
219 INFO2("%s %f", xbt_str_join(action, " "), MSG_get_clock()-clock);
223 static void bcast (xbt_dynar_t action)
227 const char* process_name;
231 m_process_t comm_helper=NULL;
233 char *size = xbt_dynar_get_as(action, 2, char *);
234 coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
235 double clock = MSG_get_clock();
237 xbt_assert0(communicator_size, "Size of Communicator is not defined"
238 ", can't use collective operations");
241 process_name = MSG_process_get_name(MSG_process_self());
243 DEBUG0("Initialize the counters");
244 counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t));
247 name = bprintf("bcast_%d", counters->bcast_counter++);
248 if (!strcmp(process_name, "process0")){
249 DEBUG2("%s: %s is the Root",name, process_name);
251 for(i=1;i<communicator_size;i++){
252 myargv = (char**) calloc (3, sizeof (char*));
253 myargv[0] = xbt_strdup(name);
254 myargv[1] = xbt_strdup(size);
257 sprintf(spawn_name,"%s_%d", myargv[0], i);
259 MSG_process_create_with_arguments(spawn_name, spawned_send,
260 NULL, MSG_host_self(), 2, myargv);
263 for(i=1;i<communicator_size;i++){
264 sprintf(task_name,"process%d_wait", i);
265 DEBUG1("get on %s", task_name);
266 MSG_task_receive(&task,task_name);
267 MSG_task_destroy(task);
270 DEBUG2("%s: all messages sent by %s have been received",
273 DEBUG2("%s: %s receives", name, process_name);
274 MSG_task_receive(&task, name);
275 MSG_task_destroy(task);
276 DEBUG2("%s: %s has received", name,process_name);
277 sprintf(task_name,"%s_wait", process_name);
278 DEBUG1("put on %s", task_name);
279 MSG_task_send(MSG_task_create("waiter",0,0,NULL),task_name);
282 MSG_process_set_data(MSG_process_self(), (void*)counters);
283 INFO2("%s %f", xbt_str_join(action, " "), MSG_get_clock()-clock);
288 static void sleep(xbt_dynar_t action)
290 char *name = xbt_str_join(action, " ");
291 char *duration = xbt_dynar_get_as(action, 2, char *);
292 double clock = MSG_get_clock();
293 DEBUG1("Entering %s", name);
294 MSG_process_sleep(parse_double(duration));
295 INFO2("%s %f ", name, MSG_get_clock()-clock);
299 static void allReduce(xbt_dynar_t action)
306 char *comm_size = xbt_dynar_get_as(action, 2, char *);
307 char *comp_size = xbt_dynar_get_as(action, 3, char *);
308 m_process_t comm_helper=NULL;
309 m_task_t task=NULL, comp_task=NULL;
310 const char* process_name;
311 double clock = MSG_get_clock();
313 coll_ctr counters = (coll_ctr) MSG_process_get_data(MSG_process_self());
315 xbt_assert0(communicator_size, "Size of Communicator is not defined"
316 ", can't use collective operations");
318 process_name = MSG_process_get_name(MSG_process_self());
321 DEBUG0("Initialize the counters");
322 counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t));
325 name = bprintf("allReduce_%d", counters->allReduce_counter++);
327 if (!strcmp(process_name, "process0")){
328 DEBUG2("%s: %s is the Root",name, process_name);
329 for(i=1;i<communicator_size;i++){
330 sprintf(spawn_name,"%s_process%d", name, i);
331 sprintf(task_name,"%s_wait", spawn_name);
333 MSG_process_create(task_name, spawned_recv,
334 (void *) xbt_strdup(spawn_name),
338 for(i=1;i<communicator_size;i++){
339 sprintf(task_name,"%s_process%d_wait", name, i);
340 MSG_task_receive(&task,task_name);
341 MSG_task_destroy(task);
346 MSG_task_create("allReduce_comp", parse_double(comp_size), 0, NULL);
347 DEBUG1("%s: computing 'reduce_comp'", name);
348 MSG_task_execute(comp_task);
349 MSG_task_destroy(comp_task);
350 DEBUG1("%s: computed", name);
352 for(i=1;i<communicator_size;i++){
353 myargv = (char**) calloc (3, sizeof (char*));
354 myargv[0] = xbt_strdup(name);
355 myargv[1] = xbt_strdup(comm_size);
358 sprintf(spawn_name,"%s_%d", myargv[0], i);
360 MSG_process_create_with_arguments(spawn_name, spawned_send,
361 NULL, MSG_host_self(), 2, myargv);
364 for(i=1;i<communicator_size;i++){
365 sprintf(task_name,"process%d_wait", i);
366 DEBUG1("get on %s", task_name);
367 MSG_task_receive(&task,task_name);
368 MSG_task_destroy(task);
371 DEBUG2("%s: all messages sent by %s have been received",
375 DEBUG2("%s: %s sends", name, process_name);
376 sprintf(task_name,"%s_%s", name, process_name);
377 DEBUG1("put on %s", task_name);
378 MSG_task_send(MSG_task_create(name, 0, parse_double(comm_size), NULL),
381 MSG_task_receive(&task, name);
382 MSG_task_destroy(task);
383 DEBUG2("%s: %s has received", name,process_name);
384 sprintf(task_name,"%s_wait", process_name);
385 DEBUG1("put on %s", task_name);
386 MSG_task_send(MSG_task_create("waiter",0,0,NULL),task_name);
390 MSG_process_set_data(MSG_process_self(), (void*)counters);
391 INFO2("%s %f", xbt_str_join(action, " "), MSG_get_clock()-clock);
395 static void comm_size(xbt_dynar_t action)
397 char *size = xbt_dynar_get_as(action, 2, char *);
398 communicator_size = parse_double(size);
401 static void compute(xbt_dynar_t action)
403 char *name = xbt_str_join(action, " ");
404 char *amout = xbt_dynar_get_as(action, 2, char *);
405 m_task_t task = MSG_task_create(name, parse_double(amout), 0, NULL);
406 double clock = MSG_get_clock();
408 DEBUG1("Entering %s", name);
409 MSG_task_execute(task);
410 MSG_task_destroy(task);
411 INFO2("%s %f", name, MSG_get_clock()-clock);
416 int main(int argc, char *argv[])
418 MSG_error_t res = MSG_OK;
420 /* Check the given arguments */
421 MSG_global_init(&argc, argv);
423 printf("Usage: %s platform_file deployment_file action_files\n", argv[0]);
424 printf("example: %s msg_platform.xml msg_deployment.xml actions\n",
429 /* Simulation setting */
430 MSG_create_environment(argv[1]);
432 /* No need to register functions as in classical MSG programs: the actions get started anyway */
433 MSG_launch_application(argv[2]);
435 /* Action registration */
436 MSG_action_register("comm_size", comm_size);
437 MSG_action_register("send", send);
438 MSG_action_register("Isend", Isend);
439 MSG_action_register("recv", recv);
440 MSG_action_register("Irecv", Irecv);
441 MSG_action_register("wait", wait);
442 MSG_action_register("barrier", barrier);
443 MSG_action_register("bcast", bcast);
444 MSG_action_register("reduce", reduce);
445 MSG_action_register("allReduce", allReduce);
446 MSG_action_register("sleep", sleep);
447 MSG_action_register("compute", compute);
450 /* Actually do the simulation using MSG_action_trace_run */
451 res = MSG_action_trace_run(argv[3]);
453 INFO1("Simulation time %g", MSG_get_clock());