1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 /* This example replays a trace as produced by examples/simdag/dax, or other means
8 * This is mainly interesting when run on real platforms, to validate the results
9 * given in the simulator when running SimDag.
15 #include "xbt/dynar.h"
16 #include "xbt/synchro.h"
19 #include "amok/peermanagement.h"
22 #include "simix/simix.h"
24 int master(int argc, char *argv[]);
25 int worker(int argc, char *argv[]);
27 XBT_LOG_NEW_DEFAULT_CATEGORY(replay, "Messages specific to this example");
29 static void declare_msg()
32 xbt_workload_declare_datadesc();
33 gras_msgtype_declare("go", NULL);
34 gras_msgtype_declare("commands",
35 xbt_datadesc_dynar(xbt_datadesc_by_name
36 ("xbt_workload_elm_t"),
37 xbt_workload_elm_free_voidp));
38 gras_msgtype_declare("chunk",
39 xbt_datadesc_by_name("xbt_workload_data_chunk_t"));
42 int master(int argc, char *argv[])
45 gras_init(&argc, argv);
49 xbt_assert(argc == 3, "usage: replay_master tracefile port");
50 gras_socket_server(atoi(argv[2])); /* open my master socket, even if I don't use it */
51 xbt_dynar_t peers = amok_pm_group_new("replay"); /* group of slaves */
53 xbt_dynar_t cmds = xbt_workload_parse_file(argv[1]);
54 xbt_workload_sort_who_date(cmds);
56 xbt_workload_elm_t cmd;
57 xbt_dict_cursor_t dict_cursor;
59 xbt_dict_t pals_int = xbt_dict_new_homogeneous(NULL);
60 xbt_dynar_foreach(cmds, cursor, cmd) {
61 int *p = xbt_dict_get_or_null(pals_int, cmd->who);
64 xbt_dict_set(pals_int, cmd->who, &p, NULL);
68 /* friends, we're ready. Come and play */
69 XBT_INFO("Wait for peers for a while. I need %d peers",
70 xbt_dict_length(pals_int));
71 while (xbt_dynar_length(peers) < xbt_dict_length(pals_int)) {
76 xbt_dynar_foreach(peers, cursor, peer) {
77 xbt_dict_remove(pals_int, peer->name);
81 xbt_dict_foreach(pals_int, dict_cursor, peer_name, data) {
82 XBT_INFO("Peer %s didn't showed up", peer_name);
87 XBT_INFO("Got my %ld peers", xbt_dynar_length(peers));
88 xbt_dict_free(&pals_int);
91 xbt_dict_t pals = xbt_dict_new_homogeneous(NULL);
93 xbt_dynar_foreach(peers, cursor, peer) {
94 //XBT_INFO("%s is here",peer->name);
95 xbt_socket_t sock = gras_socket_client(peer->name, peer->port);
96 xbt_dict_set(pals, peer->name, sock, NULL);
98 /* check that we have a dude for every element of the trace */
99 xbt_dynar_foreach(cmds, cursor, cmd) {
100 pal = xbt_dict_get_or_null(pals, cmd->who);
102 XBT_CRITICAL("Process %s didn't came! Abording!", cmd->who);
103 amok_pm_group_shutdown("replay");
104 xbt_dynar_free(&cmds);
109 /* Send the commands to every pal */
111 xbt_dict_foreach(pals, dict_cursor, pal_name, pal) {
112 gras_msg_send(pal, "commands", &cmds);
114 XBT_INFO("Sent commands to every processes. Let them start now");
115 xbt_dict_cursor_t dict_it;
116 xbt_dict_foreach(pals, dict_it, pal_name, pal) {
117 gras_msg_send(pal, "go", NULL);
119 XBT_INFO("They should be started by now. Wait for their completion.");
121 for (cursor = 0; cursor < xbt_dynar_length(peers); cursor++) {
122 gras_msg_wait(-1, "go", NULL, NULL);
126 //amok_pm_group_shutdown("replay");
127 xbt_dynar_free(&cmds);
133 xbt_dynar_t commands;
136 } s_worker_data_t, *worker_data_t;
139 static xbt_socket_t get_peer_sock(char *peer)
141 worker_data_t g = gras_userdata_get();
142 xbt_socket_t peer_sock = xbt_dict_get_or_null(g->peers, peer);
144 XBT_INFO("Create a socket to %s", peer);
145 peer_sock = gras_socket_client(peer, 4000);
146 xbt_dict_set(g->peers, peer, peer_sock, NULL); //gras_socket_close_voidp);
151 static int worker_commands_cb(gras_msg_cb_ctx_t ctx, void *payload)
153 worker_data_t g = gras_userdata_get();
154 g->commands = *(xbt_dynar_t *) payload;
158 static void do_command(int rank, void *c)
160 xbt_workload_elm_t cmd = *(xbt_workload_elm_t *) c;
161 xbt_workload_data_chunk_t chunk;
163 if (cmd->action == XBT_WORKLOAD_SEND) {
164 xbt_socket_t sock = get_peer_sock(cmd->str_arg);
165 chunk = xbt_workload_data_chunk_new((int) (cmd->d_arg));
166 XBT_INFO("Send %.f bytes to %s %s:%d", cmd->d_arg, cmd->str_arg,
167 xbt_socket_peer_name(sock), xbt_socket_peer_port(sock));
168 gras_msg_send_(sock, gras_msgtype_by_name("chunk"), &chunk);
169 XBT_INFO("Done sending %.f bytes to %s", cmd->d_arg, cmd->str_arg);
171 } else if (cmd->action == XBT_WORKLOAD_RECV) {
172 XBT_INFO("Recv %.f bytes from %s", cmd->d_arg, cmd->str_arg);
174 gras_msg_wait(1000000, "chunk", NULL, &chunk);
177 SIMIX_display_process_status();
178 RETHROWF("Exception while waiting for %f bytes from %s: %s",
179 cmd->d_arg, cmd->str_arg);
181 xbt_workload_data_chunk_free(chunk);
182 XBT_INFO("Done receiving %.f bytes from %s", cmd->d_arg, cmd->str_arg);
185 xbt_die("unknown command: %s", xbt_workload_elm_to_string(cmd));
189 int worker(int argc, char *argv[])
192 worker_data_t globals;
193 gras_init(&argc, argv);
195 globals = gras_userdata_new(s_worker_data_t);
196 /* Create the connexions */
197 globals->mysock = gras_socket_server(4000); /* FIXME: shouldn't be hardcoded */
198 xbt_socket_t master = NULL;
201 gras_cb_register("commands", worker_commands_cb);
202 globals->peers = xbt_dict_new_homogeneous(NULL);
205 XBT_INFO("Sensor %s starting. Connecting to master on %s",
206 gras_os_myname(), argv[1]);
210 master = gras_socket_client_from_string(argv[1]);
214 if (e.category != system_error)
217 XBT_INFO("Failed to connect. Retry in 0.5 second");
221 /* Join and run the group */
222 amok_pm_group_join(master, "replay", -1);
223 gras_msg_handle(60); // command message
224 gras_msg_wait(60, "go", NULL, NULL);
226 worker_data_t g = gras_userdata_get();
228 xbt_workload_elm_t cmd;
229 const char *myname = gras_os_myname();
230 xbt_dynar_t cmd_to_go =
231 xbt_dynar_new(sizeof(xbt_workload_elm_t), NULL);
233 xbt_dynar_foreach(g->commands, cursor, cmd) {
234 if (!strcmp(cmd->who, myname)) {
235 char *c = xbt_workload_elm_to_string(cmd);
236 // XBT_INFO("TODO: %s",c);
239 switch (cmd->action) {
240 case XBT_WORKLOAD_COMPUTE:
241 /* If any communication were queued, do them in parallel */
242 if (!xbt_dynar_is_empty(cmd_to_go)) {
244 xbt_dynar_dopar(cmd_to_go, do_command);
245 xbt_dynar_reset(cmd_to_go);
248 SIMIX_display_process_status();
251 XBT_INFO("Communications all done");
252 xbt_dynar_reset(cmd_to_go);
254 XBT_INFO("Compute %.f flops", cmd->d_arg);
255 gras_cpu_burn(cmd->d_arg);
256 XBT_INFO("Done computing %.f flops", cmd->d_arg);
258 case XBT_WORKLOAD_SEND:
259 /* Create the socket from main thread since it seems to fails when done from dopar thread */
260 get_peer_sock(cmd->str_arg);
261 case XBT_WORKLOAD_RECV:
262 /* queue communications for later realization in parallel */
263 xbt_dynar_push(cmd_to_go, &cmd);
268 /* do in parallel any communication still queued */
269 XBT_INFO("Do %ld pending communications after end of TODO list",
270 xbt_dynar_length(cmd_to_go));
271 if (!xbt_dynar_is_empty(cmd_to_go)) {
272 xbt_dynar_dopar(cmd_to_go, do_command);
273 xbt_dynar_reset(cmd_to_go);
277 gras_msg_send(master, "go", NULL);
278 // amok_pm_group_leave(master, "replay");
280 gras_socket_close(globals->mysock);
281 xbt_dynar_free(&(globals->commands));
282 xbt_dict_free(&(globals->peers));