Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Adapt masterslave_bypass to the brand new AS tag.
[simgrid.git] / examples / msg / masterslave / masterslave_forwarder.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include <stdio.h>
8 #include "msg/msg.h"            /* Yeah! If you want to use msg, you need to include msg/msg.h */
9 #include "xbt/sysdep.h"         /* calloc, printf */
10
11 /* Create a log channel to have nice outputs. */
12 #include "xbt/log.h"
13 #include "xbt/asserts.h"
14 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,
15                              "Messages specific for this msg example");
16
17 int master(int argc, char *argv[]);
18 int slave(int argc, char *argv[]);
19 int forwarder(int argc, char *argv[]);
20 MSG_error_t test_all(const char *platform_file, const char *application_file);
21
22 typedef enum {
23   PORT_22 = 0,
24   MAX_CHANNEL
25 } channel_t;
26
27 #define FINALIZE ((void*)221297)        /* a magic number to tell people to stop working */
28
29 /** Emitter function  */
30 int master(int argc, char *argv[])
31 {
32   int slaves_count = 0;
33   m_host_t *slaves = NULL;
34   m_task_t *todo = NULL;
35   int number_of_tasks = 0;
36   double task_comp_size = 0;
37   double task_comm_size = 0;
38
39   int i;
40
41   xbt_assert1(sscanf(argv[1], "%d", &number_of_tasks),
42               "Invalid argument %s\n", argv[1]);
43   xbt_assert1(sscanf(argv[2], "%lg", &task_comp_size),
44               "Invalid argument %s\n", argv[2]);
45   xbt_assert1(sscanf(argv[3], "%lg", &task_comm_size),
46               "Invalid argument %s\n", argv[3]);
47
48   {                             /*  Task creation */
49     char sprintf_buffer[64];
50
51     todo = xbt_new0(m_task_t, number_of_tasks);
52
53     for (i = 0; i < number_of_tasks; i++) {
54       sprintf(sprintf_buffer, "Task_%d", i);
55       todo[i] =
56         MSG_task_create(sprintf_buffer, task_comp_size, task_comm_size, NULL);
57     }
58   }
59
60   {                             /* Process organisation */
61     slaves_count = argc - 4;
62     slaves = xbt_new0(m_host_t, slaves_count);
63
64     for (i = 4; i < argc; i++) {
65       slaves[i - 4] = MSG_get_host_by_name(argv[i]);
66       xbt_assert1(slaves[i - 4] != NULL, "Unknown host %s. Stopping Now! ",
67                   argv[i]);
68     }
69   }
70
71   INFO2("Got %d slaves and %d tasks to process", slaves_count,
72         number_of_tasks);
73   for (i = 0; i < slaves_count; i++)
74     DEBUG1("%s", slaves[i]->name);
75
76   for (i = 0; i < number_of_tasks; i++) {
77     INFO2("Sending \"%s\" to \"%s\"",
78           todo[i]->name, slaves[i % slaves_count]->name);
79     if (MSG_host_self() == slaves[i % slaves_count]) {
80       INFO0("Hey ! It's me ! :)");
81     }
82
83     MSG_task_put(todo[i], slaves[i % slaves_count], PORT_22);
84     INFO0("Sent");
85   }
86
87   INFO0
88     ("All tasks have been dispatched. Let's tell everybody the computation is over.");
89   for (i = 0; i < slaves_count; i++){
90     m_task_t finalize=MSG_task_create("finalize", 0, 0, FINALIZE);
91     MSG_task_put(finalize, slaves[i], PORT_22);
92   }
93
94   INFO0("Goodbye now!");
95   free(slaves);
96   free(todo);
97   return 0;
98 }                               /* end_of_master */
99
100 /** Receiver function  */
101 int slave(int argc, char *argv[])
102 {
103   m_task_t task = NULL;
104   int res;
105   while (1) {
106     res = MSG_task_get(&(task), PORT_22);
107     xbt_assert0(res == MSG_OK, "MSG_task_get failed");
108
109     INFO1("Received \"%s\"", MSG_task_get_name(task));
110     if (!strcmp(MSG_task_get_name(task), "finalize")) {
111       MSG_task_destroy(task);
112       break;
113     }
114
115     INFO1("Processing \"%s\"", MSG_task_get_name(task));
116     MSG_task_execute(task);
117     INFO1("\"%s\" done", MSG_task_get_name(task));
118     MSG_task_destroy(task);
119     task = NULL;
120   }
121   INFO0("I'm done. See you!");
122   return 0;
123 }                               /* end_of_slave */
124
125 /** Forwarder function */
126 int forwarder(int argc, char *argv[])
127 {
128   int i;
129   int slaves_count;
130   m_host_t *slaves;
131
132   {                             /* Process organisation */
133     slaves_count = argc - 1;
134     slaves = xbt_new0(m_host_t, slaves_count);
135
136     for (i = 1; i < argc; i++) {
137       slaves[i - 1] = MSG_get_host_by_name(argv[i]);
138       if (slaves[i - 1] == NULL) {
139         INFO1("Unknown host %s. Stopping Now! ", argv[i]);
140         abort();
141       }
142     }
143   }
144
145   i = 0;
146   while (1) {
147     m_task_t task = NULL;
148     int a;
149     a = MSG_task_get(&(task), PORT_22);
150     if (a == MSG_OK) {
151       INFO1("Received \"%s\"", MSG_task_get_name(task));
152       if (MSG_task_get_data(task) == FINALIZE) {
153         INFO0
154           ("All tasks have been dispatched. Let's tell everybody the computation is over.");
155         for (i = 0; i < slaves_count; i++)
156           MSG_task_put(MSG_task_create("finalize", 0, 0, FINALIZE),
157                        slaves[i], PORT_22);
158         MSG_task_destroy(task);
159         break;
160       }
161       INFO2("Sending \"%s\" to \"%s\"",
162             MSG_task_get_name(task), slaves[i % slaves_count]->name);
163       MSG_task_put(task, slaves[i % slaves_count], PORT_22);
164       i++;
165     } else {
166       INFO0("Hey ?! What's up ? ");
167       xbt_assert0(0, "Unexpected behavior");
168     }
169   }
170   xbt_free(slaves);
171
172   INFO0("I'm done. See you!");
173   return 0;
174 }                               /* end_of_forwarder */
175
176 /** Test function */
177 MSG_error_t test_all(const char *platform_file, const char *application_file)
178 {
179   MSG_error_t res = MSG_OK;
180
181   /* MSG_config("workstation/model","KCCFLN05"); */
182   {                             /*  Simulation setting */
183     MSG_set_channel_number(MAX_CHANNEL);
184     MSG_create_environment(platform_file);
185   }
186   {                             /*   Application deployment */
187     MSG_function_register("master", master);
188     MSG_function_register("slave", slave);
189     MSG_function_register("forwarder", forwarder);
190     MSG_launch_application(application_file);
191   }
192   res = MSG_main();
193
194   INFO1("Simulation time %g", MSG_get_clock());
195   return res;
196 }                               /* end_of_test_all */
197
198
199 /** Main function */
200 int main(int argc, char *argv[])
201 {
202   MSG_error_t res = MSG_OK;
203
204   MSG_global_init(&argc, argv);
205   if (argc < 3) {
206     printf("Usage: %s platform_file deployment_file\n", argv[0]);
207     printf("example: %s msg_platform.xml msg_deployment.xml\n", argv[0]);
208     exit(1);
209   }
210   res = test_all(argv[1], argv[2]);
211   MSG_clean();
212
213   if (res == MSG_OK)
214     return 0;
215   else
216     return 1;
217 }                               /* end_of_main */