Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
e47c1baac48d8a8cff5270d443d98de7e2283cdb
[simgrid.git] / examples / gras / replay / replay.c
1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 /* This example replays a trace as produced by examples/simdag/dax, or other means
8  * This is mainly interesting when run on real platforms, to validate the results
9  * given in the simulator when running SimDag.
10  */
11
12 #include "xbt/ex.h"
13 #include "xbt/log.h"
14 #include "xbt/str.h"
15 #include "xbt/dynar.h"
16 #include "xbt/synchro.h"
17 #include "workload.h"
18 #include "gras.h"
19 #include "amok/peermanagement.h"
20 #include <stdio.h>
21
22 #include "simix/simix.h"
23
24 int master(int argc, char *argv[]);
25 int worker(int argc, char *argv[]);
26
27 XBT_LOG_NEW_DEFAULT_CATEGORY(replay, "Messages specific to this example");
28
29 static void declare_msg()
30 {
31   amok_pm_init();
32   xbt_workload_declare_datadesc();
33   gras_msgtype_declare("go", NULL);
34   gras_msgtype_declare("commands",
35                        gras_datadesc_dynar(gras_datadesc_by_name
36                                            ("xbt_workload_elm_t"),
37                                            xbt_workload_elm_free_voidp));
38   gras_msgtype_declare("chunk",
39                        gras_datadesc_by_name("xbt_workload_data_chunk_t"));
40 }
41
42 int master(int argc, char *argv[])
43 {
44
45   gras_init(&argc, argv);
46   declare_msg();
47
48
49   xbt_assert0(argc == 3, "usage: replay_master tracefile port");
50   gras_socket_server(atoi(argv[2]));    /* open my master socket, even if I don't use it */
51   xbt_dynar_t peers = amok_pm_group_new("replay");      /* group of slaves */
52   xbt_peer_t peer;
53   xbt_dynar_t cmds = xbt_workload_parse_file(argv[1]);
54   xbt_workload_sort_who_date(cmds);
55   unsigned int cursor;
56   xbt_workload_elm_t cmd;
57
58   xbt_ex_t e;
59   xbt_dict_cursor_t dict_cursor;
60
61   xbt_dict_t pals_int = xbt_dict_new();
62   xbt_dynar_foreach(cmds, cursor, cmd) {
63     int *p = xbt_dict_get_or_null(pals_int, cmd->who);
64     if (!p) {
65       p = (int *) 0xBEAF;
66       xbt_dict_set(pals_int, cmd->who, &p, NULL);
67     }
68   }
69
70   /* friends, we're ready. Come and play */
71   XBT_INFO("Wait for peers for a while. I need %d peers",
72         xbt_dict_size(pals_int));
73   while (xbt_dynar_length(peers) < xbt_dict_size(pals_int)) {
74     TRY {
75       gras_msg_handle(20);
76     }
77     CATCH(e) {
78       xbt_dynar_foreach(peers, cursor, peer) {
79         xbt_dict_remove(pals_int, peer->name);
80       }
81       char *peer_name;
82       void *data;
83       xbt_dict_foreach(pals_int, dict_cursor, peer_name, data) {
84         XBT_INFO("Peer %s didn't showed up", peer_name);
85       }
86       RETHROW;
87     }
88   }
89   XBT_INFO("Got my %ld peers", xbt_dynar_length(peers));
90   xbt_dict_free(&pals_int);
91
92   /* Check who came */
93   xbt_dict_t pals = xbt_dict_new();
94   gras_socket_t pal;
95   xbt_dynar_foreach(peers, cursor, peer) {
96     //XBT_INFO("%s is here",peer->name);
97     gras_socket_t sock = gras_socket_client(peer->name, peer->port);
98     xbt_dict_set(pals, peer->name, sock, NULL);
99   }
100   /* check that we have a dude for every element of the trace */
101   xbt_dynar_foreach(cmds, cursor, cmd) {
102     pal = xbt_dict_get_or_null(pals, cmd->who);
103     if (!pal) {
104       XBT_CRITICAL("Process %s didn't came! Abording!", cmd->who);
105       amok_pm_group_shutdown("replay");
106       xbt_dynar_free(&cmds);
107       gras_exit();
108       abort();
109     }
110   }
111   /* Send the commands to every pal */
112   char *pal_name;
113   xbt_dict_foreach(pals, dict_cursor, pal_name, pal) {
114     gras_msg_send(pal, "commands", &cmds);
115   }
116   XBT_INFO("Sent commands to every processes. Let them start now");
117   xbt_dict_cursor_t dict_it;
118   xbt_dict_foreach(pals, dict_it, pal_name, pal) {
119     gras_msg_send(pal, "go", NULL);
120   }
121   XBT_INFO("They should be started by now. Wait for their completion.");
122
123   for (cursor = 0; cursor < xbt_dynar_length(peers); cursor++) {
124     gras_msg_wait(-1, "go", NULL, NULL);
125   }
126
127   /* Done, exiting */
128   //amok_pm_group_shutdown("replay");
129   xbt_dynar_free(&cmds);
130   gras_exit();
131   return 0;
132 }
133
134 typedef struct {
135   xbt_dynar_t commands;
136   xbt_dict_t peers;
137   gras_socket_t mysock;
138 } s_worker_data_t, *worker_data_t;
139
140
141 static gras_socket_t get_peer_sock(char *peer)
142 {
143   worker_data_t g = gras_userdata_get();
144   gras_socket_t peer_sock = xbt_dict_get_or_null(g->peers, peer);
145   if (!peer_sock) {
146     XBT_INFO("Create a socket to %s", peer);
147     peer_sock = gras_socket_client(peer, 4000);
148     xbt_dict_set(g->peers, peer, peer_sock, NULL);      //gras_socket_close_voidp);
149   }
150   return peer_sock;
151 }
152
153 static int worker_commands_cb(gras_msg_cb_ctx_t ctx, void *payload)
154 {
155   worker_data_t g = gras_userdata_get();
156   g->commands = *(xbt_dynar_t *) payload;
157   return 0;
158 }
159
160 static void do_command(int rank, void *c)
161 {
162   xbt_ex_t e;
163   xbt_workload_elm_t cmd = *(xbt_workload_elm_t *) c;
164   xbt_workload_data_chunk_t chunk;
165
166   if (cmd->action == XBT_WORKLOAD_SEND) {
167     gras_socket_t sock = get_peer_sock(cmd->str_arg);
168     chunk = xbt_workload_data_chunk_new((int) (cmd->d_arg));
169     XBT_INFO("Send %.f bytes to %s %s:%d", cmd->d_arg, cmd->str_arg,
170           gras_socket_peer_name(sock), gras_socket_peer_port(sock));
171     gras_msg_send_(sock, gras_msgtype_by_name("chunk"), &chunk);
172     XBT_INFO("Done sending %.f bytes to %s", cmd->d_arg, cmd->str_arg);
173
174   } else if (cmd->action == XBT_WORKLOAD_RECV) {
175     XBT_INFO("Recv %.f bytes from %s", cmd->d_arg, cmd->str_arg);
176     TRY {
177       gras_msg_wait(1000000, "chunk", NULL, &chunk);
178     }
179     CATCH(e) {
180       SIMIX_display_process_status();
181       RETHROW2("Exception while waiting for %f bytes from %s: %s",
182                cmd->d_arg, cmd->str_arg);
183     }
184     xbt_workload_data_chunk_free(chunk);
185     XBT_INFO("Done receiving %.f bytes from %s", cmd->d_arg, cmd->str_arg);
186
187   } else {
188     xbt_die(bprintf
189             ("unknown command: %s", xbt_workload_elm_to_string(cmd)));
190   }
191 }
192
193 int worker(int argc, char *argv[])
194 {
195   xbt_ex_t e;
196   worker_data_t globals;
197   gras_init(&argc, argv);
198   declare_msg();
199   globals = gras_userdata_new(s_worker_data_t);
200   /* Create the connexions */
201   globals->mysock = gras_socket_server(4000);   /* FIXME: shouldn't be hardcoded */
202   gras_socket_t master = NULL;
203   int connected = 0;
204
205   gras_cb_register("commands", worker_commands_cb);
206   globals->peers = xbt_dict_new();
207
208   if (gras_if_RL())
209     XBT_INFO("Sensor %s starting. Connecting to master on %s",
210           gras_os_myname(), argv[1]);
211   while (!connected) {
212     xbt_ex_t e;
213     TRY {
214       master = gras_socket_client_from_string(argv[1]);
215       connected = 1;
216     }
217     CATCH(e) {
218       if (e.category != system_error)
219         RETHROW;
220       xbt_ex_free(e);
221       XBT_INFO("Failed to connect. Retry in 0.5 second");
222       gras_os_sleep(0.5);
223     }
224   }
225   /* Join and run the group */
226   amok_pm_group_join(master, "replay", -1);
227   gras_msg_handle(60);          // command message
228   gras_msg_wait(60, "go", NULL, NULL);
229   {
230     worker_data_t g = gras_userdata_get();
231     unsigned int cursor;
232     xbt_workload_elm_t cmd;
233     const char *myname = gras_os_myname();
234     xbt_dynar_t cmd_to_go =
235         xbt_dynar_new(sizeof(xbt_workload_elm_t), NULL);
236
237     xbt_dynar_foreach(g->commands, cursor, cmd) {
238       if (!strcmp(cmd->who, myname)) {
239         char *c = xbt_workload_elm_to_string(cmd);
240         //      XBT_INFO("TODO: %s",c);
241         free(c);
242
243         switch (cmd->action) {
244         case XBT_WORKLOAD_COMPUTE:
245           /* If any communication were queued, do them in parallel */
246           if (xbt_dynar_length(cmd_to_go)) {
247             TRY {
248               xbt_dynar_dopar(cmd_to_go, do_command);
249               xbt_dynar_reset(cmd_to_go);
250             }
251             CATCH(e) {
252               SIMIX_display_process_status();
253             }
254             XBT_INFO("Communications all done");
255             xbt_dynar_reset(cmd_to_go);
256           }
257           XBT_INFO("Compute %.f flops", cmd->d_arg);
258           gras_cpu_burn(cmd->d_arg);
259           XBT_INFO("Done computing %.f flops", cmd->d_arg);
260           break;
261         case XBT_WORKLOAD_SEND:
262           /* Create the socket from main thread since it seems to fails when done from dopar thread */
263           get_peer_sock(cmd->str_arg);
264         case XBT_WORKLOAD_RECV:
265           /* queue communications for later realization in parallel */
266           xbt_dynar_push(cmd_to_go, &cmd);
267           break;
268         }
269       }
270     }
271     /* do in parallel any communication still queued */
272     XBT_INFO("Do %ld pending communications after end of TODO list",
273           xbt_dynar_length(cmd_to_go));
274     if (xbt_dynar_length(cmd_to_go)) {
275       xbt_dynar_dopar(cmd_to_go, do_command);
276       xbt_dynar_reset(cmd_to_go);
277     }
278   }
279
280   gras_msg_send(master, "go", NULL);
281 //  amok_pm_group_leave(master, "replay");
282
283   gras_socket_close(globals->mysock);
284   xbt_dynar_free(&(globals->commands));
285   xbt_dict_free(&(globals->peers));
286   free(globals);
287
288   gras_exit();
289   return 0;
290 }