Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
807d775d6d6156cdcab074401481b62e6e93e11b
[simgrid.git] / examples / msg / alias / masterslave_forwarder_with_alias.c
1 /* Copyright (c) 2008, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include <stdio.h>
8 #include "msg/msg.h"            /* Yeah! If you want to use msg, you need to include msg/msg.h */
9 #include "xbt/sysdep.h"         /* calloc, printf */
10
11 /* Create a log channel to have nice outputs. */
12 #include "xbt/log.h"
13 #include "xbt/asserts.h"
14 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,
15                              "Messages specific for this msg example");
16
17
18 typedef enum {
19   PORT_22 = 0,
20   MAX_CHANNEL
21 } channel_t;
22
23 int master(int argc, char *argv[]);
24 int slave(int argc, char *argv[]);
25 int forwarder(int argc, char *argv[]);
26 MSG_error_t test_all(const char *platform_file,
27                      const char *application_file);
28
29
30 #define FINALIZE ((void*)221297)        /* a magic number to tell people to stop working */
31
32 /** Emitter function  */
33 int master(int argc, char *argv[])
34 {
35   int alias_count = 0;
36   char **aliases = NULL;
37   m_task_t *todo = NULL;
38   int number_of_tasks = 0;
39   double task_comp_size = 0;
40   double task_comm_size = 0;
41
42
43   int i;
44
45   xbt_assert1(sscanf(argv[1], "%d", &number_of_tasks),
46               "Invalid argument %s\n", argv[1]);
47   xbt_assert1(sscanf(argv[2], "%lg", &task_comp_size),
48               "Invalid argument %s\n", argv[2]);
49   xbt_assert1(sscanf(argv[3], "%lg", &task_comm_size),
50               "Invalid argument %s\n", argv[3]);
51
52   {
53     /*  Task creation */
54     char sprintf_buffer[64];
55
56     todo = xbt_new0(m_task_t, number_of_tasks);
57
58     for (i = 0; i < number_of_tasks; i++) {
59       sprintf(sprintf_buffer, "Task_%d", i);
60       todo[i] =
61           MSG_task_create(sprintf_buffer, task_comp_size, task_comm_size,
62                           NULL);
63     }
64   }
65
66   {
67     /* Process organisation */
68     alias_count = argc - 4;
69     aliases = xbt_new0(char *, alias_count);
70
71     for (i = 4; i < argc; i++) {
72       aliases[i - 4] = strdup(argv[i]);
73     }
74   }
75
76   XBT_INFO("Got %d aliases and %d tasks to process", alias_count,
77         number_of_tasks);
78
79   for (i = 0; i < alias_count; i++)
80     XBT_DEBUG("%s", aliases[i]);
81
82   for (i = 0; i < number_of_tasks; i++) {
83     XBT_INFO("Sending \"%s\" to \"%s\"", todo[i]->name,
84           aliases[i % alias_count]);
85
86     if (!strcmp
87         (MSG_host_get_name(MSG_host_self()), aliases[i % alias_count])) {
88       XBT_INFO("Hey ! It's me ! :)");
89     }
90
91     MSG_task_send(todo[i], aliases[i % alias_count]);
92     XBT_INFO("Sent");
93   }
94
95   XBT_INFO
96       ("All tasks have been dispatched. Let's tell everybody the computation is over.");
97
98   for (i = 0; i < alias_count; i++)
99     MSG_task_send(MSG_task_create("finalize", 0, 0, FINALIZE), aliases[i]);
100
101   XBT_INFO("Goodbye now!");
102
103   for (i = 0; i < alias_count; i++)
104     free(aliases[i]);
105
106   free(aliases);
107   free(todo);
108   return 0;
109 }                               /* end_of_master */
110
111 /** Receiver function  */
112 int slave(int argc, char *argv[])
113 {
114   m_task_t task = NULL;
115   int res;
116
117   while (1) {
118     res = MSG_task_receive(&(task), MSG_host_get_name(MSG_host_self()));
119     xbt_assert0(res == MSG_OK, "MSG_task_receive failed");
120
121     XBT_INFO("Received \"%s\"", MSG_task_get_name(task));
122
123     if (!strcmp(MSG_task_get_name(task), "finalize")) {
124       MSG_task_destroy(task);
125       break;
126     }
127
128     XBT_INFO("Processing \"%s\"", MSG_task_get_name(task));
129     MSG_task_execute(task);
130     XBT_INFO("\"%s\" done", MSG_task_get_name(task));
131     MSG_task_destroy(task);
132     task = NULL;
133   }
134
135   XBT_INFO("I'm done. See you!");
136   return 0;
137 }                               /* end_of_slave */
138
139 /** Forwarder function */
140 int forwarder(int argc, char *argv[])
141 {
142   int i;
143   int alias_count;
144   char **aliases;
145
146   {                             /* Process organisation */
147     alias_count = argc - 1;
148     aliases = xbt_new0(char *, alias_count);
149
150     for (i = 1; i < argc; i++)
151       aliases[i - 1] = strdup(argv[i]);
152   }
153
154   i = 0;
155
156   while (1) {
157     m_task_t task = NULL;
158     int a;
159
160     a = MSG_task_receive(&(task), MSG_host_get_name(MSG_host_self()));
161
162     if (a == MSG_OK) {
163       XBT_INFO("Received \"%s\"", MSG_task_get_name(task));
164
165       if (MSG_task_get_data(task) == FINALIZE) {
166         XBT_INFO
167             ("All tasks have been dispatched. Let's tell everybody the computation is over.");
168
169         for (i = 0; i < alias_count; i++)
170           MSG_task_send(MSG_task_create("finalize", 0, 0, FINALIZE),
171                         aliases[i]);
172
173         MSG_task_destroy(task);
174         break;
175       }
176
177       XBT_INFO("Sending \"%s\" to \"%s\"", MSG_task_get_name(task),
178             aliases[i % alias_count]);
179       MSG_task_send(task, aliases[i % alias_count]);
180       i++;
181     } else {
182       XBT_INFO("Hey ?! What's up ? ");
183       xbt_assert0(0, "Unexpected behavior");
184     }
185   }
186
187   for (i = 0; i < alias_count; i++)
188     free(aliases[i]);
189
190   XBT_INFO("I'm done. See you!");
191   return 0;
192
193 }                               /* end_of_forwarder */
194
195 /** Test function */
196 MSG_error_t test_all(const char *platform_file,
197                      const char *application_file)
198 {
199   MSG_error_t res = MSG_OK;
200
201   {                             /*  Simulation setting */
202     MSG_set_channel_number(MAX_CHANNEL);
203     MSG_paje_output("msg_test.trace");
204     MSG_create_environment(platform_file);
205   }
206
207   {
208     /*   Application deployment */
209     MSG_function_register("master", master);
210     MSG_function_register("slave", slave);
211     MSG_function_register("forwarder", forwarder);
212     MSG_launch_application(application_file);
213   }
214
215   res = MSG_main();
216
217   XBT_INFO("Simulation time %g", MSG_get_clock());
218   return res;
219 }                               /* end_of_test_all */
220
221
222 /** Main function */
223 int main(int argc, char *argv[])
224 {
225   MSG_error_t res = MSG_OK;
226
227   MSG_global_init(&argc, argv);
228
229   if (argc < 3) {
230     printf("Usage: %s platform_file deployment_file\n", argv[0]);
231     printf("example: %s msg_platform.xml msg_deployment.xml\n", argv[0]);
232     exit(1);
233   }
234
235   res = test_all(argv[1], argv[2]);
236   MSG_clean();
237
238   if (res == MSG_OK)
239     return 0;
240   else
241     return 1;
242 }                               /* end_of_main */