Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
e3a44dbb242156d5ded427d0c31fb47ce61e927e
[simgrid.git] / examples / msg / masterslave / masterslave_forwarder.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include <stdio.h>
9 #include "msg/msg.h" /* Yeah! If you want to use msg, you need to include msg/msg.h */
10 #include "xbt/sysdep.h" /* calloc, printf */
11
12 /* Create a log channel to have nice outputs. */
13 #include "xbt/log.h"
14 #include "xbt/asserts.h"
15 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,"Messages specific for this msg example");
16
17 int master(int argc, char *argv[]);
18 int slave(int argc, char *argv[]);
19 int forwarder(int argc, char *argv[]);
20 MSG_error_t test_all(const char *platform_file, const char *application_file);
21
22 typedef enum {
23   PORT_22 = 0,
24   MAX_CHANNEL
25 } channel_t;
26
27 #define FINALIZE ((void*)221297) /* a magic number to tell people to stop working */
28
29 /** Emitter function  */
30 int master(int argc, char *argv[])
31 {
32   int slaves_count = 0;
33   m_host_t *slaves = NULL;
34   m_task_t *todo = NULL;
35   int number_of_tasks = 0;
36   double task_comp_size = 0;
37   double task_comm_size = 0;
38
39
40   int i;
41
42   xbt_assert1(sscanf(argv[1],"%d", &number_of_tasks),
43          "Invalid argument %s\n",argv[1]);
44   xbt_assert1(sscanf(argv[2],"%lg", &task_comp_size),
45          "Invalid argument %s\n",argv[2]);
46   xbt_assert1(sscanf(argv[3],"%lg", &task_comm_size),
47          "Invalid argument %s\n",argv[3]);
48
49   {                  /*  Task creation */
50     char sprintf_buffer[64];
51
52     todo = calloc(number_of_tasks, sizeof(m_task_t));
53
54     for (i = 0; i < number_of_tasks; i++) {
55       sprintf(sprintf_buffer, "Task_%d", i);
56       todo[i] = MSG_task_create(sprintf_buffer, task_comp_size, task_comm_size, NULL);
57     }
58   }
59
60   {                  /* Process organisation */
61     slaves_count = argc - 4;
62     slaves = calloc(slaves_count, sizeof(m_host_t));
63     
64     for (i = 4; i < argc; i++) {
65       slaves[i-4] = MSG_get_host_by_name(argv[i]);
66       xbt_assert1(slaves[i-4]!=NULL, "Unknown host %s. Stopping Now! ", argv[i]);
67     }
68   }
69
70   INFO2("Got %d slaves and %d tasks to process", slaves_count,number_of_tasks);
71   for (i = 0; i < slaves_count; i++)
72     DEBUG1("%s", slaves[i]->name);
73
74   for (i = 0; i < number_of_tasks; i++) {
75     INFO2("Sending \"%s\" to \"%s\"",
76                   todo[i]->name,
77                   slaves[i % slaves_count]->name);
78     if(MSG_host_self()==slaves[i % slaves_count]) {
79       INFO0("Hey ! It's me ! :)");
80     }
81
82     MSG_task_put(todo[i], slaves[i % slaves_count],
83                  PORT_22);
84     INFO0("Sent");
85   }
86   
87   INFO0("All tasks have been dispatched. Let's tell everybody the computation is over.");
88   for (i = 0; i < slaves_count; i++) 
89     MSG_task_put(MSG_task_create("finalize", 0, 0, FINALIZE),
90                  slaves[i], PORT_22);
91   
92   INFO0("Goodbye now!");
93   free(slaves);
94   free(todo);
95   return 0;
96 } /* end_of_master */
97
98 /** Receiver function  */
99 int slave(int argc, char *argv[])
100 {
101   m_task_t task = NULL;
102   int res;
103   while(1) {
104     res = MSG_task_get(&(task), PORT_22);
105     xbt_assert0(res == MSG_OK, "MSG_task_get failed");
106
107     INFO1("Received \"%s\"", MSG_task_get_name(task));
108     if (!strcmp(MSG_task_get_name(task),"finalize")) {
109         MSG_task_destroy(task);
110         break;
111     }
112      
113     INFO1("Processing \"%s\"", MSG_task_get_name(task));
114     MSG_task_execute(task);
115     INFO1("\"%s\" done", MSG_task_get_name(task));
116     MSG_task_destroy(task);
117     task = NULL;
118   }
119   INFO0("I'm done. See you!");
120   return 0;
121 } /* end_of_slave */
122
123 /** Forwarder function */
124 int forwarder(int argc, char *argv[])
125 {
126   int i;
127   int slaves_count;
128   m_host_t *slaves;
129
130   {                  /* Process organisation */
131     slaves_count = argc - 1;
132     slaves = calloc(slaves_count, sizeof(m_host_t));
133     
134     for (i = 1; i < argc; i++) {
135       slaves[i-1] = MSG_get_host_by_name(argv[i]);
136       if(slaves[i-1]==NULL) {
137         INFO1("Unknown host %s. Stopping Now! ", argv[i]);
138         abort();
139       }
140     }
141   }
142
143   i=0;
144   while(1) {
145     m_task_t task = NULL;
146     int a;
147     a = MSG_task_get(&(task), PORT_22);
148     if (a == MSG_OK) {
149       INFO1("Received \"%s\"", MSG_task_get_name(task));
150       if(MSG_task_get_data(task)==FINALIZE) {
151         INFO0("All tasks have been dispatched. Let's tell everybody the computation is over.");
152         for (i = 0; i < slaves_count; i++) 
153           MSG_task_put(MSG_task_create("finalize", 0, 0, FINALIZE),
154                        slaves[i], PORT_22);
155         MSG_task_destroy(task);
156         break;
157       }
158       INFO2("Sending \"%s\" to \"%s\"",
159                     MSG_task_get_name(task),
160                     slaves[i% slaves_count]->name);
161       MSG_task_put(task, slaves[i % slaves_count],
162                    PORT_22);
163       i++;
164     } else {
165       INFO0("Hey ?! What's up ? ");
166       xbt_assert0(0,"Unexpected behavior");
167     }
168   }
169
170   INFO0("I'm done. See you!");
171   return 0;
172 } /* end_of_forwarder */
173
174 /** Test function */
175 MSG_error_t test_all(const char *platform_file,
176                             const char *application_file)
177 {
178   MSG_error_t res = MSG_OK;
179
180   /* MSG_config("surf_workstation_model","KCCFLN05"); */
181   {                             /*  Simulation setting */
182     MSG_set_channel_number(MAX_CHANNEL);
183     MSG_paje_output("msg_test.trace");
184     MSG_create_environment(platform_file);
185   }
186   {                            /*   Application deployment */
187     MSG_function_register("master", master);
188     MSG_function_register("slave", slave);
189     MSG_function_register("forwarder", forwarder);
190     MSG_launch_application(application_file);
191   }
192   res = MSG_main();
193   
194   INFO1("Simulation time %g",MSG_get_clock());
195   return res;
196 } /* end_of_test_all */
197
198
199 /** Main function */
200 int main(int argc, char *argv[])
201 {
202   MSG_error_t res = MSG_OK;
203
204   MSG_global_init(&argc,argv);
205   if (argc < 3) {
206      printf ("Usage: %s platform_file deployment_file\n",argv[0]);
207      printf ("example: %s msg_platform.xml msg_deployment.xml\n",argv[0]);
208      exit(1);
209   }
210   res = test_all(argv[1],argv[2]);
211   MSG_clean();
212
213   if(res==MSG_OK)
214     return 0;
215   else
216     return 1;
217 } /* end_of_main */