Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Fix copyright headers
[simgrid.git] / examples / msg / masterslave / masterslave_forwarder.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include <stdio.h>
8 #include "msg/msg.h"            /* Yeah! If you want to use msg, you need to include msg/msg.h */
9 #include "xbt/sysdep.h"         /* calloc, printf */
10
11 /* Create a log channel to have nice outputs. */
12 #include "xbt/log.h"
13 #include "xbt/asserts.h"
14 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,
15                              "Messages specific for this msg example");
16
17 int master(int argc, char *argv[]);
18 int slave(int argc, char *argv[]);
19 int forwarder(int argc, char *argv[]);
20 MSG_error_t test_all(const char *platform_file, const char *application_file);
21
22 typedef enum {
23   PORT_22 = 0,
24   MAX_CHANNEL
25 } channel_t;
26
27 #define FINALIZE ((void*)221297)        /* a magic number to tell people to stop working */
28
29 /** Emitter function  */
30 int master(int argc, char *argv[])
31 {
32   int slaves_count = 0;
33   m_host_t *slaves = NULL;
34   m_task_t *todo = NULL;
35   int number_of_tasks = 0;
36   double task_comp_size = 0;
37   double task_comm_size = 0;
38
39   TRACE_host_variable_set ("is_master", 1);
40
41   int i;
42
43   xbt_assert1(sscanf(argv[1], "%d", &number_of_tasks),
44               "Invalid argument %s\n", argv[1]);
45   xbt_assert1(sscanf(argv[2], "%lg", &task_comp_size),
46               "Invalid argument %s\n", argv[2]);
47   xbt_assert1(sscanf(argv[3], "%lg", &task_comm_size),
48               "Invalid argument %s\n", argv[3]);
49
50   {                             /*  Task creation */
51     char sprintf_buffer[64];
52
53     todo = xbt_new0(m_task_t, number_of_tasks);
54
55     for (i = 0; i < number_of_tasks; i++) {
56       sprintf(sprintf_buffer, "Task_%d", i);
57       todo[i] =
58         MSG_task_create(sprintf_buffer, task_comp_size, task_comm_size, NULL);
59       TRACE_host_variable_set ("task_creation", i);
60       TRACE_msg_set_task_category (todo[i], "compute");
61     }
62   }
63
64   {                             /* Process organisation */
65     slaves_count = argc - 4;
66     slaves = xbt_new0(m_host_t, slaves_count);
67
68     for (i = 4; i < argc; i++) {
69       slaves[i - 4] = MSG_get_host_by_name(argv[i]);
70       xbt_assert1(slaves[i - 4] != NULL, "Unknown host %s. Stopping Now! ",
71                   argv[i]);
72     }
73   }
74
75   INFO2("Got %d slaves and %d tasks to process", slaves_count,
76         number_of_tasks);
77   for (i = 0; i < slaves_count; i++)
78     DEBUG1("%s", slaves[i]->name);
79
80   for (i = 0; i < number_of_tasks; i++) {
81     INFO2("Sending \"%s\" to \"%s\"",
82           todo[i]->name, slaves[i % slaves_count]->name);
83     if (MSG_host_self() == slaves[i % slaves_count]) {
84       INFO0("Hey ! It's me ! :)");
85     }
86
87     MSG_task_put(todo[i], slaves[i % slaves_count], PORT_22);
88     INFO0("Sent");
89   }
90
91   INFO0
92     ("All tasks have been dispatched. Let's tell everybody the computation is over.");
93   for (i = 0; i < slaves_count; i++){
94     m_task_t finalize=MSG_task_create("finalize", 0, 0, FINALIZE);
95     TRACE_msg_set_task_category(finalize,"finalize");
96     MSG_task_put(finalize, slaves[i], PORT_22);
97   }
98
99   INFO0("Goodbye now!");
100   free(slaves);
101   free(todo);
102   return 0;
103 }                               /* end_of_master */
104
105 /** Receiver function  */
106 int slave(int argc, char *argv[])
107 {
108   m_task_t task = NULL;
109   TRACE_host_variable_set ("is_slave", 1);
110   int res;
111   while (1) {
112     res = MSG_task_get(&(task), PORT_22);
113     xbt_assert0(res == MSG_OK, "MSG_task_get failed");
114
115     INFO1("Received \"%s\"", MSG_task_get_name(task));
116     if (!strcmp(MSG_task_get_name(task), "finalize")) {
117       MSG_task_destroy(task);
118       break;
119     }
120
121     INFO1("Processing \"%s\"", MSG_task_get_name(task));
122     TRACE_host_variable_add ("task_computation", MSG_task_get_compute_duration(task));
123     MSG_task_execute(task);
124     INFO1("\"%s\" done", MSG_task_get_name(task));
125     MSG_task_destroy(task);
126     task = NULL;
127   }
128   INFO0("I'm done. See you!");
129   return 0;
130 }                               /* end_of_slave */
131
132 /** Forwarder function */
133 int forwarder(int argc, char *argv[])
134 {
135   int i;
136   int slaves_count;
137   m_host_t *slaves;
138
139   {                             /* Process organisation */
140     slaves_count = argc - 1;
141     slaves = xbt_new0(m_host_t, slaves_count);
142
143     for (i = 1; i < argc; i++) {
144       slaves[i - 1] = MSG_get_host_by_name(argv[i]);
145       if (slaves[i - 1] == NULL) {
146         INFO1("Unknown host %s. Stopping Now! ", argv[i]);
147         abort();
148       }
149     }
150   }
151
152   i = 0;
153   while (1) {
154     m_task_t task = NULL;
155     int a;
156     a = MSG_task_get(&(task), PORT_22);
157     if (a == MSG_OK) {
158       INFO1("Received \"%s\"", MSG_task_get_name(task));
159       if (MSG_task_get_data(task) == FINALIZE) {
160         INFO0
161           ("All tasks have been dispatched. Let's tell everybody the computation is over.");
162         for (i = 0; i < slaves_count; i++)
163           MSG_task_put(MSG_task_create("finalize", 0, 0, FINALIZE),
164                        slaves[i], PORT_22);
165         MSG_task_destroy(task);
166         break;
167       }
168       INFO2("Sending \"%s\" to \"%s\"",
169             MSG_task_get_name(task), slaves[i % slaves_count]->name);
170       MSG_task_put(task, slaves[i % slaves_count], PORT_22);
171       i++;
172     } else {
173       INFO0("Hey ?! What's up ? ");
174       xbt_assert0(0, "Unexpected behavior");
175     }
176   }
177   xbt_free(slaves);
178
179   INFO0("I'm done. See you!");
180   return 0;
181 }                               /* end_of_forwarder */
182
183 /** Test function */
184 MSG_error_t test_all(const char *platform_file, const char *application_file)
185 {
186   MSG_error_t res = MSG_OK;
187
188   /* MSG_config("workstation/model","KCCFLN05"); */
189   {                             /*  Simulation setting */
190     MSG_set_channel_number(MAX_CHANNEL);
191     MSG_create_environment(platform_file);
192   }
193   {                             /*   Application deployment */
194     MSG_function_register("master", master);
195     MSG_function_register("slave", slave);
196     MSG_function_register("forwarder", forwarder);
197     MSG_launch_application(application_file);
198   }
199   res = MSG_main();
200
201   INFO1("Simulation time %g", MSG_get_clock());
202   return res;
203 }                               /* end_of_test_all */
204
205
206 /** Main function */
207 int main(int argc, char *argv[])
208 {
209   MSG_error_t res = MSG_OK;
210   int is_tracing = 0;
211   int i;
212
213   for (i = 0; i < argc; i++){
214     if (!strcmp (argv[i], "--trace")){
215       is_tracing = 1;
216     }
217   }
218
219   if (is_tracing) {
220     //if TRACE_start is not called, all other tracing
221     //functions will be disabled
222     TRACE_start ("simulation.trace");
223   }
224   TRACE_host_variable_declare ("is_slave");
225   TRACE_host_variable_declare ("is_master");
226   TRACE_host_variable_declare ("task_creation");
227   TRACE_host_variable_declare ("task_computation");
228   TRACE_category ("compute");
229   TRACE_category ("finalize");
230
231   MSG_global_init(&argc, argv);
232   if (argc < 3) {
233     printf("Usage: %s platform_file deployment_file\n", argv[0]);
234     printf("example: %s msg_platform.xml msg_deployment.xml\n", argv[0]);
235     exit(1);
236   }
237   res = test_all(argv[1], argv[2]);
238   MSG_clean();
239
240   TRACE_end ();
241
242   if (res == MSG_OK)
243     return 0;
244   else
245     return 1;
246 }                               /* end_of_main */