1 /* Copyright (c) 2009 Da SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 /* This example replays a trace as produced by examples/simdag/dax, or other means
7 * This is mainly interesting when run on real platforms, to validate the results
8 * given in the simulator when running SimDag.
14 #include "xbt/dynar.h"
17 #include "amok/peermanagement.h"
20 int master(int argc,char *argv[]);
21 int worker(int argc,char *argv[]);
23 XBT_LOG_NEW_DEFAULT_CATEGORY(replay, "Messages specific to this example");
25 static void declare_msg() {
27 xbt_workload_declare_datadesc();
28 gras_msgtype_declare("go",NULL);
29 gras_msgtype_declare("commands",
31 gras_datadesc_by_name("xbt_workload_elm_t"),xbt_workload_elm_free_voidp));
32 gras_msgtype_declare("chunk",gras_datadesc_by_name("xbt_workload_data_chunk_t"));
35 int master(int argc,char *argv[]) {
37 gras_init(&argc,argv);
41 xbt_assert0(argc==3,"usage: replay_master tracefile port");
42 gras_socket_server(atoi(argv[2])); /* open my master socket, even if I don't use it */
43 xbt_dynar_t peers = amok_pm_group_new("replay"); /* group of slaves */
45 xbt_dynar_t cmds = xbt_workload_parse_file(argv[1]);
46 xbt_workload_sort_who_date(cmds);
48 xbt_workload_elm_t cmd;
51 xbt_dict_cursor_t dict_cursor;
53 xbt_dict_t pals_int=xbt_dict_new();
54 xbt_dynar_foreach(cmds,cursor,cmd) {
55 int *p = xbt_dict_get_or_null(pals_int,cmd->who);
58 xbt_dict_set(pals_int,cmd->who,&p,NULL);
62 /* friends, we're ready. Come and play */
63 INFO1("Wait for peers for a while. I need %d peers",xbt_dict_size(pals_int));
64 while (xbt_dynar_length(peers)<xbt_dict_size(pals_int)) {
68 xbt_dynar_foreach(peers,cursor,peer){
69 xbt_dict_remove(pals_int,peer->name);
73 xbt_dict_foreach(pals_int,dict_cursor,peer_name,data) {
74 INFO1("Peer %s didn't showed up",peer_name);
79 INFO1("Got my %ld peers", xbt_dynar_length(peers));
80 xbt_dict_free(&pals_int);
83 xbt_dict_t pals = xbt_dict_new();
85 xbt_dynar_foreach(peers,cursor, peer) {
86 //INFO1("%s is here",peer->name);
87 gras_socket_t sock = gras_socket_client(peer->name,peer->port);
88 xbt_dict_set(pals,peer->name,sock,NULL);
90 /* check that we have a dude for every element of the trace */
91 xbt_dynar_foreach(cmds,cursor,cmd) {
93 char *str = xbt_workload_elm_to_string(cmd);
98 pal=xbt_dict_get_or_null(pals,cmd->who);
100 CRITICAL1("Process %s didn't came! Abording!",cmd->who);
101 amok_pm_group_shutdown("replay");
102 xbt_dynar_free(&cmds);
106 gras_msg_send(pal,"commands",&cmds);
108 INFO0("Sent commands to every processes. Let them start now");
109 xbt_dict_cursor_t dict_it;
111 xbt_dict_foreach(pals,dict_it,pal_name,pal) {
112 gras_msg_send(pal,"go",NULL);
114 INFO0("They should be started by now.");
117 amok_pm_group_shutdown("replay");
118 xbt_dynar_free(&cmds);
124 xbt_dynar_t commands;
126 gras_socket_t mysock;
127 } s_worker_data_t,*worker_data_t;
130 static gras_socket_t get_peer_sock(char*peer) {
131 worker_data_t g = gras_userdata_get();
132 gras_socket_t peer_sock = xbt_dict_get_or_null(g->peers,peer);
134 peer_sock = gras_socket_client(peer,4000);
135 xbt_dict_set(g->peers,peer,peer_sock,gras_socket_close_voidp);
139 static int worker_commands_cb(gras_msg_cb_ctx_t ctx, void *payload) {
140 worker_data_t g = gras_userdata_get();
141 g->commands = *(xbt_dynar_t*)payload;
144 static int worker_go_cb(gras_msg_cb_ctx_t ctx, void *payload) {
145 worker_data_t g = gras_userdata_get();
147 xbt_workload_elm_t cmd;
149 const char *myname=gras_os_myname();
150 xbt_dynar_foreach(g->commands,cursor,cmd) {
151 xbt_workload_data_chunk_t chunk;
153 if (!strcmp(cmd->who,myname)) {
154 char *c = xbt_workload_elm_to_string(cmd);
158 switch (cmd->action) {
159 case XBT_WORKLOAD_COMPUTE:
160 gras_cpu_burn(cmd->d_arg);
162 case XBT_WORKLOAD_SEND:
163 chunk = xbt_workload_data_chunk_new((int)(cmd->d_arg));
164 gras_msg_send(get_peer_sock(cmd->str_arg),"chunk",&chunk);
166 case XBT_WORKLOAD_RECV:
168 gras_msg_wait(1000000,"chunk",NULL,&chunk);
170 RETHROW2("Exception while waiting for %f bytes from %s: %s",
171 cmd->d_arg,cmd->str_arg);
173 xbt_workload_data_chunk_free(chunk);
181 int worker(int argc,char *argv[]) {
182 worker_data_t globals;
183 gras_init(&argc,argv);
185 globals = gras_userdata_new(s_worker_data_t);
186 /* Create the connexions */
187 globals->mysock = gras_socket_server(4000); /* FIXME: shouldn't be hardcoded */
188 gras_socket_t master=NULL;
191 gras_cb_register("commands", worker_commands_cb);
192 gras_cb_register("go", worker_go_cb);
193 globals->peers=xbt_dict_new();
196 INFO2("Sensor %s starting. Connecting to master on %s", gras_os_myname(), argv[1]);
200 master = gras_socket_client_from_string(argv[1]);
204 if (e.category != system_error)
207 INFO0("Failed to connect. Retry in 0.5 second");
211 /* Join and run the group */
212 amok_pm_group_join(master, "replay", -1);
213 amok_pm_mainloop(600);
215 gras_socket_close(globals->mysock);
216 xbt_dynar_free(&(globals->commands));
217 xbt_dict_free(&(globals->peers));