1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 /* This example replays a trace as produced by examples/simdag/dax, or other means
8 * This is mainly interesting when run on real platforms, to validate the results
9 * given in the simulator when running SimDag.
15 #include "xbt/dynar.h"
16 #include "xbt/synchro.h"
19 #include "amok/peermanagement.h"
22 #include "simix/simix.h"
24 int master(int argc,char *argv[]);
25 int worker(int argc,char *argv[]);
27 XBT_LOG_NEW_DEFAULT_CATEGORY(replay, "Messages specific to this example");
29 static void declare_msg() {
31 xbt_workload_declare_datadesc();
32 gras_msgtype_declare("go",NULL);
33 gras_msgtype_declare("commands",
35 gras_datadesc_by_name("xbt_workload_elm_t"),xbt_workload_elm_free_voidp));
36 gras_msgtype_declare("chunk",gras_datadesc_by_name("xbt_workload_data_chunk_t"));
39 int master(int argc,char *argv[]) {
41 gras_init(&argc,argv);
45 xbt_assert0(argc==3,"usage: replay_master tracefile port");
46 gras_socket_server(atoi(argv[2])); /* open my master socket, even if I don't use it */
47 xbt_dynar_t peers = amok_pm_group_new("replay"); /* group of slaves */
49 xbt_dynar_t cmds = xbt_workload_parse_file(argv[1]);
50 xbt_workload_sort_who_date(cmds);
52 xbt_workload_elm_t cmd;
55 xbt_dict_cursor_t dict_cursor;
57 xbt_dict_t pals_int=xbt_dict_new();
58 xbt_dynar_foreach(cmds,cursor,cmd) {
59 int *p = xbt_dict_get_or_null(pals_int,cmd->who);
62 xbt_dict_set(pals_int,cmd->who,&p,NULL);
66 /* friends, we're ready. Come and play */
67 INFO1("Wait for peers for a while. I need %d peers",xbt_dict_size(pals_int));
68 while (xbt_dynar_length(peers)<xbt_dict_size(pals_int)) {
72 xbt_dynar_foreach(peers,cursor,peer){
73 xbt_dict_remove(pals_int,peer->name);
77 xbt_dict_foreach(pals_int,dict_cursor,peer_name,data) {
78 INFO1("Peer %s didn't showed up",peer_name);
83 INFO1("Got my %ld peers", xbt_dynar_length(peers));
84 xbt_dict_free(&pals_int);
87 xbt_dict_t pals = xbt_dict_new();
89 xbt_dynar_foreach(peers,cursor, peer) {
90 //INFO1("%s is here",peer->name);
91 gras_socket_t sock = gras_socket_client(peer->name,peer->port);
92 xbt_dict_set(pals,peer->name,sock,NULL);
94 /* check that we have a dude for every element of the trace */
95 xbt_dynar_foreach(cmds,cursor,cmd) {
96 pal=xbt_dict_get_or_null(pals,cmd->who);
98 CRITICAL1("Process %s didn't came! Abording!",cmd->who);
99 amok_pm_group_shutdown("replay");
100 xbt_dynar_free(&cmds);
105 /* Send the commands to every pal */
107 xbt_dict_foreach(pals,dict_cursor,pal_name,pal) {
108 gras_msg_send(pal,"commands",&cmds);
110 INFO0("Sent commands to every processes. Let them start now");
111 xbt_dict_cursor_t dict_it;
112 xbt_dict_foreach(pals,dict_it,pal_name,pal) {
113 gras_msg_send(pal,"go",NULL);
115 INFO0("They should be started by now. Wait for their completion.");
117 for (cursor=0;cursor<xbt_dynar_length(peers);cursor++) {
118 gras_msg_wait(-1,"go",NULL,NULL);
122 //amok_pm_group_shutdown("replay");
123 xbt_dynar_free(&cmds);
129 xbt_dynar_t commands;
131 gras_socket_t mysock;
132 } s_worker_data_t,*worker_data_t;
135 static gras_socket_t get_peer_sock(char*peer) {
136 worker_data_t g = gras_userdata_get();
137 gras_socket_t peer_sock = xbt_dict_get_or_null(g->peers,peer);
139 INFO1("Create a socket to %s",peer);
140 peer_sock = gras_socket_client(peer,4000);
141 xbt_dict_set(g->peers,peer,peer_sock,NULL);//gras_socket_close_voidp);
145 static int worker_commands_cb(gras_msg_cb_ctx_t ctx, void *payload) {
146 worker_data_t g = gras_userdata_get();
147 g->commands = *(xbt_dynar_t*)payload;
150 static void do_command(int rank, void*c) {
152 xbt_workload_elm_t cmd = *(xbt_workload_elm_t*)c;
153 xbt_workload_data_chunk_t chunk;
155 if (cmd->action == XBT_WORKLOAD_SEND) {
156 gras_socket_t sock = get_peer_sock(cmd->str_arg);
157 chunk = xbt_workload_data_chunk_new((int)(cmd->d_arg));
158 INFO4("Send %.f bytes to %s %s:%d",cmd->d_arg,cmd->str_arg,
159 gras_socket_peer_name(sock),gras_socket_peer_port(sock));
160 gras_msg_send_(sock,gras_msgtype_by_name("chunk"),&chunk);
161 INFO2("Done sending %.f bytes to %s",cmd->d_arg,cmd->str_arg);
163 } else if (cmd->action == XBT_WORKLOAD_RECV) {
164 INFO2("Recv %.f bytes from %s",cmd->d_arg,cmd->str_arg);
166 gras_msg_wait(1000000,"chunk",NULL,&chunk);
168 SIMIX_display_process_status();
169 RETHROW2("Exception while waiting for %f bytes from %s: %s",
170 cmd->d_arg,cmd->str_arg);
172 xbt_workload_data_chunk_free(chunk);
173 INFO2("Done receiving %.f bytes from %s",cmd->d_arg,cmd->str_arg);
176 xbt_die(bprintf("unknown command: %s",xbt_workload_elm_to_string(cmd)));
179 int worker(int argc,char *argv[]) {
181 worker_data_t globals;
182 gras_init(&argc,argv);
184 globals = gras_userdata_new(s_worker_data_t);
185 /* Create the connexions */
186 globals->mysock = gras_socket_server(4000); /* FIXME: shouldn't be hardcoded */
187 gras_socket_t master=NULL;
190 gras_cb_register("commands", worker_commands_cb);
191 globals->peers=xbt_dict_new();
194 INFO2("Sensor %s starting. Connecting to master on %s", gras_os_myname(), argv[1]);
198 master = gras_socket_client_from_string(argv[1]);
202 if (e.category != system_error)
205 INFO0("Failed to connect. Retry in 0.5 second");
209 /* Join and run the group */
210 amok_pm_group_join(master, "replay", -1);
211 gras_msg_handle(60); // command message
212 gras_msg_wait(60,"go",NULL,NULL);
214 worker_data_t g = gras_userdata_get();
216 xbt_workload_elm_t cmd;
217 const char *myname=gras_os_myname();
218 xbt_dynar_t cmd_to_go = xbt_dynar_new(sizeof(xbt_workload_elm_t),NULL);
220 xbt_dynar_foreach(g->commands,cursor,cmd) {
221 if (!strcmp(cmd->who,myname)) {
222 char *c = xbt_workload_elm_to_string(cmd);
223 // INFO1("TODO: %s",c);
226 switch (cmd->action) {
227 case XBT_WORKLOAD_COMPUTE:
228 /* If any communication were queued, do them in parallel */
229 if (xbt_dynar_length(cmd_to_go)){
231 xbt_dynar_dopar(cmd_to_go,do_command);
232 xbt_dynar_reset(cmd_to_go);
234 SIMIX_display_process_status();
236 INFO0("Communications all done");
237 xbt_dynar_reset(cmd_to_go);
239 INFO1("Compute %.f flops",cmd->d_arg);
240 gras_cpu_burn(cmd->d_arg);
241 INFO1("Done computing %.f flops",cmd->d_arg);
243 case XBT_WORKLOAD_SEND:
244 /* Create the socket from main thread since it seems to fails when done from dopar thread */
245 get_peer_sock(cmd->str_arg);
246 case XBT_WORKLOAD_RECV:
247 /* queue communications for later realization in parallel */
248 xbt_dynar_push(cmd_to_go,&cmd);
253 /* do in parallel any communication still queued */
254 INFO1("Do %ld pending communications after end of TODO list",xbt_dynar_length(cmd_to_go));
255 if (xbt_dynar_length(cmd_to_go)){
256 xbt_dynar_dopar(cmd_to_go,do_command);
257 xbt_dynar_reset(cmd_to_go);
261 gras_msg_send(master,"go",NULL);
262 // amok_pm_group_leave(master, "replay");
264 gras_socket_close(globals->mysock);
265 xbt_dynar_free(&(globals->commands));
266 xbt_dict_free(&(globals->peers));