Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
merge conflict resolved
[simgrid.git] / examples / msg / actions / actions.c
1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include "msg/msg.h"            /* Yeah! If you want to use msg, you need to include msg/msg.h */
10 #include "simix/simix.h"        /* semaphores for the barrier */
11 #include "xbt.h"                /* calloc, printf */
12 #include "instr/instr_private.h"
13
14 XBT_LOG_NEW_DEFAULT_CATEGORY(actions,
15                              "Messages specific for this msg example");
16 int communicator_size = 0;
17
18 static void action_Isend(const char *const *action);
19
20 typedef struct  {
21   int last_Irecv_sender_id;
22   int bcast_counter;
23   int reduce_counter;
24   int allReduce_counter;
25   xbt_dynar_t isends; /* of msg_comm_t */
26   /* Used to implement irecv+wait */
27   xbt_dynar_t irecvs; /* of msg_comm_t */
28   xbt_dynar_t tasks; /* of m_task_t */
29 } s_process_globals_t, *process_globals_t;
30
31 /* Helper function */
32 static double parse_double(const char *string)
33 {
34   double value;
35   char *endptr;
36
37   value = strtod(string, &endptr);
38   if (*endptr != '\0')
39     THROWF(unknown_error, 0, "%s is not a double", string);
40   return value;
41 }
42
43 static int get_rank (const char *process_name)
44 {
45   return atoi(&(process_name[1]));
46
47
48 static void asynchronous_cleanup(void) {
49   process_globals_t globals = (process_globals_t) MSG_process_get_data(MSG_process_self());
50
51   /* Destroy any isend which correspond to completed communications */
52   int found;
53   msg_comm_t comm;
54   while ((found = MSG_comm_testany(globals->isends)) != -1) {
55     xbt_dynar_remove_at(globals->isends,found,&comm);
56     MSG_comm_destroy(comm);
57   }
58 }
59
60 /* My actions */
61 static void action_send(const char *const *action)
62 {
63   char *name = NULL;
64   char to[250];
65   const char *size_str = action[3];
66   double size=parse_double(size_str);
67   double clock = MSG_get_clock(); /* this "call" is free thanks to inlining */
68
69   sprintf(to, "%s_%s", MSG_process_get_name(MSG_process_self()),action[2]);
70
71   if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
72     name = xbt_str_join_array(action, " ");
73
74 #ifdef HAVE_TRACING
75   int rank = get_rank(MSG_process_get_name(MSG_process_self()));
76   int dst_traced = get_rank(action[2]);
77   TRACE_smpi_ptp_in(rank, rank, dst_traced, "send");
78   TRACE_smpi_send(rank, rank, dst_traced);
79 #endif
80
81   XBT_DEBUG("Entering Send: %s (size: %lg)", name, size);
82    if (size<65536) {
83      action_Isend(action);
84    } else {
85      MSG_task_send(MSG_task_create(name, 0, size, NULL), to);
86    }
87    
88    XBT_VERB("%s %f", name, MSG_get_clock() - clock);
89
90   free(name);
91
92 #ifdef HAVE_TRACING
93   TRACE_smpi_ptp_out(rank, rank, dst_traced, "send");
94 #endif
95
96   asynchronous_cleanup();
97 }
98
99 static void action_Isend(const char *const *action)
100 {
101   char to[250];
102   const char *size = action[3];
103   double clock = MSG_get_clock();
104   process_globals_t globals = (process_globals_t) MSG_process_get_data(MSG_process_self());
105
106
107   sprintf(to, "%s_%s", MSG_process_get_name(MSG_process_self()),action[2]);
108   msg_comm_t comm =
109       MSG_task_isend( MSG_task_create(to,0,parse_double(size),NULL), to);
110   xbt_dynar_push(globals->isends,&comm);
111
112   XBT_DEBUG("Isend on %s", MSG_process_get_name(MSG_process_self()));
113   XBT_VERB("%s %f", xbt_str_join_array(action, " "), MSG_get_clock() - clock);
114
115   asynchronous_cleanup();
116 }
117
118
119 static void action_recv(const char *const *action)
120 {
121   char *name = NULL;
122   char mailbox_name[250];
123   m_task_t task = NULL;
124   double clock = MSG_get_clock();
125
126   sprintf(mailbox_name, "%s_%s", action[2],
127           MSG_process_get_name(MSG_process_self()));
128
129   if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
130     name = xbt_str_join_array(action, " ");
131
132 #ifdef HAVE_TRACING
133   int rank = get_rank(MSG_process_get_name(MSG_process_self()));
134   int src_traced = get_rank(action[2]);
135   TRACE_smpi_ptp_in(rank, src_traced, rank, "recv");
136 #endif
137
138   XBT_DEBUG("Receiving: %s", name);
139   MSG_error_t res = MSG_task_receive(&task, mailbox_name);
140   //  MSG_task_receive(&task, MSG_process_get_name(MSG_process_self()));
141   XBT_VERB("%s %f", name, MSG_get_clock() - clock);
142
143   if (res == MSG_OK) {
144     MSG_task_destroy(task);
145   }
146
147   free(name);
148 #ifdef HAVE_TRACING
149   TRACE_smpi_ptp_out(rank, src_traced, rank, "recv");
150   TRACE_smpi_recv(rank, src_traced, rank);
151 #endif
152
153   asynchronous_cleanup();
154 }
155
156 static void action_Irecv(const char *const *action)
157 {
158   char mailbox[250];
159   double clock = MSG_get_clock();
160   process_globals_t globals = (process_globals_t) MSG_process_get_data(MSG_process_self());
161
162   XBT_DEBUG("Irecv on %s", MSG_process_get_name(MSG_process_self()));
163 #ifdef HAVE_TRACING
164   int rank = get_rank(MSG_process_get_name(MSG_process_self()));
165   int src_traced = get_rank(action[2]);
166   globals->last_Irecv_sender_id = src_traced;
167   MSG_process_set_data(MSG_process_self(), (void *) globals);
168
169   TRACE_smpi_ptp_in(rank, src_traced, rank, "Irecv");
170 #endif
171
172   sprintf(mailbox, "%s_%s", action[2],
173           MSG_process_get_name(MSG_process_self()));
174   m_task_t t=NULL;
175   xbt_dynar_push(globals->tasks,&t);
176   msg_comm_t c =
177       MSG_task_irecv(
178           xbt_dynar_get_ptr(globals->tasks, xbt_dynar_length(globals->tasks)-1),
179           mailbox);
180   xbt_dynar_push(globals->irecvs,&c);
181
182   XBT_VERB("%s %f", xbt_str_join_array(action, " "), MSG_get_clock() - clock);
183
184 #ifdef HAVE_TRACING
185   TRACE_smpi_ptp_out(rank, src_traced, rank, "Irecv");
186 #endif
187
188   asynchronous_cleanup();
189 }
190
191
192 static void action_wait(const char *const *action)
193 {
194   char *name = NULL;
195   m_task_t task = NULL;
196   msg_comm_t comm;
197   double clock = MSG_get_clock();
198   process_globals_t globals = (process_globals_t) MSG_process_get_data(MSG_process_self());
199
200   xbt_assert(xbt_dynar_length(globals->irecvs),
201       "action wait not preceded by any irecv: %s", xbt_str_join_array(action," "));
202
203   if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
204     name = xbt_str_join_array(action, " ");
205 #ifdef HAVE_TRACING
206   process_globals_t counters = (process_globals_t) MSG_process_get_data(MSG_process_self());
207   int src_traced = counters->last_Irecv_sender_id;
208   int rank = get_rank(MSG_process_get_name(MSG_process_self()));
209   TRACE_smpi_ptp_in(rank, src_traced, rank, "wait");
210 #endif
211
212   XBT_DEBUG("Entering %s", name);
213   comm = xbt_dynar_pop_as(globals->irecvs,msg_comm_t);
214   MSG_comm_wait(comm,-1);
215   task = xbt_dynar_pop_as(globals->tasks,m_task_t);
216   MSG_comm_destroy(comm);
217   MSG_task_destroy(task);
218
219   XBT_VERB("%s %f", name, MSG_get_clock() - clock);
220   free(name);
221 #ifdef HAVE_TRACING
222   TRACE_smpi_ptp_out(rank, src_traced, rank, "wait");
223   TRACE_smpi_recv(rank, src_traced, rank);
224 #endif
225
226 }
227
228 /* FIXME: that's a poor man's implementation: we should take the message exchanges into account */
229 static void action_barrier(const char *const *action)
230 {
231   char *name = NULL;
232   static smx_mutex_t mutex = NULL;
233   static smx_cond_t cond = NULL;
234   static int processes_arrived_sofar=0;
235
236   if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
237     name = xbt_str_join_array(action, " ");
238
239   if (mutex == NULL) {       // first arriving on the barrier
240     mutex = simcall_mutex_init();
241     cond = simcall_cond_init();
242     processes_arrived_sofar=0;
243   }
244   XBT_DEBUG("Entering barrier: %s (%d already there)", name,processes_arrived_sofar);
245
246   simcall_mutex_lock(mutex);
247   if (++processes_arrived_sofar == communicator_size) {
248     simcall_cond_broadcast(cond);
249     simcall_mutex_unlock(mutex);
250   } else {
251     simcall_cond_wait(cond,mutex);
252     simcall_mutex_unlock(mutex);
253   }
254
255   XBT_DEBUG("Exiting barrier: %s", name);
256
257   processes_arrived_sofar--;
258   if (!processes_arrived_sofar) {
259     simcall_cond_destroy(cond);
260     simcall_mutex_destroy(mutex);
261     mutex=NULL;
262   }
263
264   free(name);
265
266 }
267
268 static void action_reduce(const char *const *action)
269 {
270         int i;
271         char *reduce_identifier;
272         char mailbox[80];
273         double comm_size = parse_double(action[2]);
274         double comp_size = parse_double(action[3]);
275         m_task_t comp_task = NULL;
276         const char *process_name;
277         double clock = MSG_get_clock();
278
279         process_globals_t counters = (process_globals_t) MSG_process_get_data(MSG_process_self());
280
281         xbt_assert(communicator_size, "Size of Communicator is not defined, "
282                         "can't use collective operations");
283
284         process_name = MSG_process_get_name(MSG_process_self());
285
286         reduce_identifier = bprintf("reduce_%d", counters->reduce_counter++);
287
288         if (!strcmp(process_name, "p0")) {
289                 XBT_DEBUG("%s: %s is the Root", reduce_identifier, process_name);
290
291                 msg_comm_t *comms = xbt_new0(msg_comm_t,communicator_size-1);
292             m_task_t *tasks = xbt_new0(m_task_t,communicator_size-1);
293             for (i = 1; i < communicator_size; i++) {
294               sprintf(mailbox, "%s_p%d_p0", reduce_identifier, i);
295               comms[i-1] = MSG_task_irecv(&(tasks[i-1]),mailbox);
296             }
297             MSG_comm_waitall(comms,communicator_size-1,-1);
298             for (i = 1; i < communicator_size; i++) {
299                 MSG_comm_destroy(comms[i-1]);
300                 MSG_task_destroy(tasks[i-1]);
301             }
302             free(tasks);
303
304             comp_task = MSG_task_create("reduce_comp", comp_size, 0, NULL);
305             XBT_DEBUG("%s: computing 'reduce_comp'", reduce_identifier);
306             MSG_task_execute(comp_task);
307             MSG_task_destroy(comp_task);
308             XBT_DEBUG("%s: computed", reduce_identifier);
309
310         } else {
311                 XBT_DEBUG("%s: %s sends", reduce_identifier, process_name);
312                 sprintf(mailbox, "%s_%s_p0", reduce_identifier, process_name);
313             XBT_DEBUG("put on %s", mailbox);
314             MSG_task_send(MSG_task_create(reduce_identifier, 0, comm_size, NULL),
315                           mailbox);
316         }
317
318         XBT_VERB("%s %f", xbt_str_join_array(action, " "), MSG_get_clock() - clock);
319         free(reduce_identifier);
320 }
321
322 static void action_bcast(const char *const *action)
323 {
324         int i;
325         char *bcast_identifier;
326         char mailbox[80];
327         double comm_size = parse_double(action[2]);
328         m_task_t task = NULL;
329         const char *process_name;
330         double clock = MSG_get_clock();
331
332         process_globals_t counters = (process_globals_t) MSG_process_get_data(MSG_process_self());
333
334         xbt_assert(communicator_size, "Size of Communicator is not defined, "
335                         "can't use collective operations");
336
337         process_name = MSG_process_get_name(MSG_process_self());
338
339         bcast_identifier = bprintf("bcast_%d", counters->bcast_counter++);
340
341         if (!strcmp(process_name, "p0")) {
342                 XBT_DEBUG("%s: %s is the Root", bcast_identifier, process_name);
343
344             msg_comm_t *comms = xbt_new0(msg_comm_t,communicator_size-1);
345
346             for (i = 1; i < communicator_size; i++) {
347               sprintf(mailbox, "%s_p0_p%d", bcast_identifier, i);
348               comms[i-1] =
349                   MSG_task_isend(MSG_task_create(mailbox,0,comm_size,NULL),
350                       mailbox);
351             }
352             MSG_comm_waitall(comms,communicator_size-1,-1);
353                 for (i = 1; i < communicator_size; i++)
354                MSG_comm_destroy(comms[i-1]);
355             free(comms);
356
357             XBT_DEBUG("%s: all messages sent by %s have been received",
358                    bcast_identifier, process_name);
359
360         } else {
361             sprintf(mailbox, "%s_p0_%s", bcast_identifier, process_name);
362             MSG_task_receive(&task, mailbox);
363             MSG_task_destroy(task);
364             XBT_DEBUG("%s: %s has received", bcast_identifier, process_name);
365         }
366
367         XBT_VERB("%s %f", xbt_str_join_array(action, " "), MSG_get_clock() - clock);
368         free(bcast_identifier);
369 }
370
371
372 static void action_sleep(const char *const *action)
373 {
374   char *name = NULL;
375   const char *duration = action[2];
376   double clock = MSG_get_clock();
377
378   if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
379     name = xbt_str_join_array(action, " ");
380
381   XBT_DEBUG("Entering %s", name);
382   MSG_process_sleep(parse_double(duration));
383   XBT_VERB("%s %f ", name, MSG_get_clock() - clock);
384
385   free(name);
386 }
387
388 static void action_allReduce(const char *const *action) {
389   int i;
390   char *allreduce_identifier;
391   char mailbox[80];
392   double comm_size = parse_double(action[2]);
393   double comp_size = parse_double(action[3]);
394   m_task_t task = NULL, comp_task = NULL;
395   const char *process_name;
396   double clock = MSG_get_clock();
397
398   process_globals_t counters = (process_globals_t) MSG_process_get_data(MSG_process_self());
399
400   xbt_assert(communicator_size, "Size of Communicator is not defined, "
401               "can't use collective operations");
402
403   process_name = MSG_process_get_name(MSG_process_self());
404
405   allreduce_identifier = bprintf("allReduce_%d", counters->allReduce_counter++);
406
407   if (!strcmp(process_name, "p0")) {
408     XBT_DEBUG("%s: %s is the Root", allreduce_identifier, process_name);
409
410     msg_comm_t *comms = xbt_new0(msg_comm_t,communicator_size-1);
411     m_task_t *tasks = xbt_new0(m_task_t,communicator_size-1);
412     for (i = 1; i < communicator_size; i++) {
413       sprintf(mailbox, "%s_p%d_p0", allreduce_identifier, i);
414       comms[i-1] = MSG_task_irecv(&(tasks[i-1]),mailbox);
415     }
416     MSG_comm_waitall(comms,communicator_size-1,-1);
417     for (i = 1; i < communicator_size; i++) {
418       MSG_comm_destroy(comms[i-1]);
419       MSG_task_destroy(tasks[i-1]);
420     }
421     free(tasks);
422
423     comp_task = MSG_task_create("allReduce_comp", comp_size, 0, NULL);
424     XBT_DEBUG("%s: computing 'reduce_comp'", allreduce_identifier);
425     MSG_task_execute(comp_task);
426     MSG_task_destroy(comp_task);
427     XBT_DEBUG("%s: computed", allreduce_identifier);
428
429     for (i = 1; i < communicator_size; i++) {
430       sprintf(mailbox, "%s_p0_p%d", allreduce_identifier, i);
431       comms[i-1] =
432           MSG_task_isend(MSG_task_create(mailbox,0,comm_size,NULL),
433               mailbox);
434     }
435     MSG_comm_waitall(comms,communicator_size-1,-1);
436     for (i = 1; i < communicator_size; i++)
437        MSG_comm_destroy(comms[i-1]);
438     free(comms);
439
440     XBT_DEBUG("%s: all messages sent by %s have been received",
441            allreduce_identifier, process_name);
442
443   } else {
444     XBT_DEBUG("%s: %s sends", allreduce_identifier, process_name);
445     sprintf(mailbox, "%s_%s_p0", allreduce_identifier, process_name);
446     XBT_DEBUG("put on %s", mailbox);
447     MSG_task_send(MSG_task_create(allreduce_identifier, 0, comm_size, NULL),
448                   mailbox);
449
450     sprintf(mailbox, "%s_p0_%s", allreduce_identifier, process_name);
451     MSG_task_receive(&task, mailbox);
452     MSG_task_destroy(task);
453     XBT_DEBUG("%s: %s has received", allreduce_identifier, process_name);
454   }
455
456   XBT_VERB("%s %f", xbt_str_join_array(action, " "), MSG_get_clock() - clock);
457   free(allreduce_identifier);
458 }
459
460 static void action_comm_size(const char *const *action)
461 {
462   char *name = NULL;
463   const char *size = action[2];
464   double clock = MSG_get_clock();
465
466   if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
467     name = xbt_str_join_array(action, " ");
468   communicator_size = parse_double(size);
469   XBT_VERB("%s %f", name, MSG_get_clock() - clock);
470   free(name);
471 }
472
473 static void action_compute(const char *const *action)
474 {
475   char *name = NULL;
476   const char *amout = action[2];
477   m_task_t task = MSG_task_create(name, parse_double(amout), 0, NULL);
478   double clock = MSG_get_clock();
479
480   if (XBT_LOG_ISENABLED(actions, xbt_log_priority_verbose))
481     name = xbt_str_join_array(action, " ");
482   XBT_DEBUG("Entering %s", name);
483   MSG_task_execute(task);
484   MSG_task_destroy(task);
485   XBT_VERB("%s %f", name, MSG_get_clock() - clock);
486   free(name);
487 }
488
489 static void action_init(const char *const *action)
490
491 #ifdef HAVE_TRACING
492   TRACE_smpi_init(get_rank(MSG_process_get_name(MSG_process_self())));
493 #endif
494   XBT_DEBUG("Initialize the counters");
495   process_globals_t globals = (process_globals_t) calloc(1, sizeof(s_process_globals_t));
496   globals->isends = xbt_dynar_new(sizeof(msg_comm_t),NULL);
497   globals->irecvs = xbt_dynar_new(sizeof(msg_comm_t),NULL);
498   globals->tasks  = xbt_dynar_new(sizeof(m_task_t),NULL);
499   MSG_process_set_data(MSG_process_self(),globals);
500
501 }
502
503 static void action_finalize(const char *const *action)
504 {
505 #ifdef HAVE_TRACING
506   TRACE_smpi_finalize(get_rank(MSG_process_get_name(MSG_process_self())));
507 #endif
508   process_globals_t globals = (process_globals_t) MSG_process_get_data(MSG_process_self());
509   if (globals){
510     xbt_dynar_free_container(&(globals->isends));
511     xbt_dynar_free_container(&(globals->irecvs));
512     xbt_dynar_free_container(&(globals->tasks));
513     free(globals);
514   }
515 }
516
517 /** Main function */
518 int main(int argc, char *argv[])
519 {
520   MSG_error_t res = MSG_OK;
521
522   /* Check the given arguments */
523   MSG_global_init(&argc, argv);
524   if (argc < 3) {
525     printf("Usage: %s platform_file deployment_file [action_files]\n",
526            argv[0]);
527     printf
528         ("example: %s msg_platform.xml msg_deployment.xml actions # if all actions are in the same file\n",
529          argv[0]);
530     printf
531         ("example: %s msg_platform.xml msg_deployment.xml # if actions are in separate files, specified in deployment\n",
532          argv[0]);
533     exit(1);
534   }
535
536   /*  Simulation setting */
537   MSG_create_environment(argv[1]);
538
539   /* No need to register functions as in classical MSG programs: the actions get started anyway */
540   MSG_launch_application(argv[2]);
541
542   /*   Action registration */
543   MSG_action_register("init",     action_init);
544   MSG_action_register("finalize", action_finalize);
545   MSG_action_register("comm_size",action_comm_size);
546   MSG_action_register("send",     action_send);
547   MSG_action_register("Isend",    action_Isend);
548   MSG_action_register("recv",     action_recv);
549   MSG_action_register("Irecv",    action_Irecv);
550   MSG_action_register("wait",     action_wait);
551   MSG_action_register("barrier",  action_barrier);
552   MSG_action_register("bcast",    action_bcast);
553   MSG_action_register("reduce",   action_reduce);
554   MSG_action_register("allReduce",action_allReduce);
555   MSG_action_register("sleep",    action_sleep);
556   MSG_action_register("compute",  action_compute);
557
558
559   /* Actually do the simulation using MSG_action_trace_run */
560   res = MSG_action_trace_run(argv[3]);  // it's ok to pass a NULL argument here
561
562   XBT_INFO("Simulation time %g", MSG_get_clock());
563   MSG_clean();
564
565   if (res == MSG_OK)
566     return 0;
567   else
568     return 1;
569 }                               /* end_of_main */