Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Make sure this variable is always initialized
[simgrid.git] / examples / gras / replay / replay.c
1 /* Copyright (c) 2009 Da SimGrid Team.  All rights reserved.                */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 /* This example replays a trace as produced by examples/simdag/dax, or other means
7  * This is mainly interesting when run on real platforms, to validate the results
8  * given in the simulator when running SimDag.
9  */
10
11 #include "xbt/ex.h"
12 #include "xbt/log.h"
13 #include "xbt/str.h"
14 #include "xbt/dynar.h"
15 #include "workload.h"
16 #include "gras.h"
17 #include "amok/peermanagement.h"
18 #include <stdio.h>
19
20 int master(int argc,char *argv[]);
21 int worker(int argc,char *argv[]);
22
23 XBT_LOG_NEW_DEFAULT_CATEGORY(replay, "Messages specific to this example");
24
25 static void declare_msg() {
26   amok_pm_init();
27   xbt_workload_declare_datadesc();
28   gras_msgtype_declare("go",NULL);
29   gras_msgtype_declare("commands",
30       gras_datadesc_dynar(
31           gras_datadesc_by_name("xbt_workload_elm_t"),xbt_workload_elm_free_voidp));
32   gras_msgtype_declare("chunk",gras_datadesc_by_name("xbt_workload_data_chunk_t"));
33 }
34
35 int master(int argc,char *argv[]) {
36
37   gras_init(&argc,argv);
38   declare_msg();
39
40
41   xbt_assert0(argc==3,"usage: replay_master tracefile port");
42   gras_socket_server(atoi(argv[2])); /* open my master socket, even if I don't use it */
43   xbt_dynar_t peers = amok_pm_group_new("replay");            /* group of slaves */
44   xbt_peer_t peer;
45   xbt_dynar_t cmds = xbt_workload_parse_file(argv[1]);
46   xbt_workload_sort_who_date(cmds);
47   unsigned int cursor;
48   xbt_workload_elm_t cmd;
49
50   xbt_ex_t e;
51   xbt_dict_cursor_t dict_cursor;
52
53   xbt_dict_t pals_int=xbt_dict_new();
54   xbt_dynar_foreach(cmds,cursor,cmd) {
55     int *p = xbt_dict_get_or_null(pals_int,cmd->who);
56     if (!p) {
57       p=(int*)0xBEAF;
58       xbt_dict_set(pals_int,cmd->who,&p,NULL);
59     }
60   }
61
62   /* friends, we're ready. Come and play */
63   INFO1("Wait for peers for a while. I need %d peers",xbt_dict_size(pals_int));
64   while (xbt_dynar_length(peers)<xbt_dict_size(pals_int)) {
65     TRY {
66       gras_msg_handle(20);
67     } CATCH(e) {
68       xbt_dynar_foreach(peers,cursor,peer){
69         xbt_dict_remove(pals_int,peer->name);
70       }
71       char *peer_name;
72       void *data;
73       xbt_dict_foreach(pals_int,dict_cursor,peer_name,data) {
74         INFO1("Peer %s didn't showed up",peer_name);
75       }
76       RETHROW;
77     }
78   }
79   INFO1("Got my %ld peers", xbt_dynar_length(peers));
80   xbt_dict_free(&pals_int);
81
82   /* Check who came */
83   xbt_dict_t pals = xbt_dict_new();
84   gras_socket_t pal;
85   xbt_dynar_foreach(peers,cursor, peer) {
86     //INFO1("%s is here",peer->name);
87     gras_socket_t sock = gras_socket_client(peer->name,peer->port);
88     xbt_dict_set(pals,peer->name,sock,NULL);
89   }
90   /* check that we have a dude for every element of the trace */
91   xbt_dynar_foreach(cmds,cursor,cmd) {
92     if (0) {
93       char *str = xbt_workload_elm_to_string(cmd);
94       INFO1("%s",str);
95       free(str);
96     }
97
98     pal=xbt_dict_get_or_null(pals,cmd->who);
99     if (!pal) {
100       CRITICAL1("Process %s didn't came! Abording!",cmd->who);
101       amok_pm_group_shutdown("replay");
102       xbt_dynar_free(&cmds);
103       gras_exit();
104       abort();
105     }
106     gras_msg_send(pal,"commands",&cmds);
107   }
108   INFO0("Sent commands to every processes. Let them start now");
109   xbt_dict_cursor_t dict_it;
110   char *pal_name;
111   xbt_dict_foreach(pals,dict_it,pal_name,pal) {
112     gras_msg_send(pal,"go",NULL);
113   }
114   INFO0("They should be started by now.");
115
116   /* Done, exiting */
117   amok_pm_group_shutdown("replay");
118   xbt_dynar_free(&cmds);
119   gras_exit();
120   return 0;
121 }
122
123 typedef struct {
124   xbt_dynar_t commands;
125   xbt_dict_t peers;
126   gras_socket_t mysock;
127 } s_worker_data_t,*worker_data_t;
128
129
130 static gras_socket_t get_peer_sock(char*peer) {
131   worker_data_t g = gras_userdata_get();
132   gras_socket_t peer_sock = xbt_dict_get_or_null(g->peers,peer);
133   if (!peer_sock) {
134     peer_sock = gras_socket_client(peer,4000);
135     xbt_dict_set(g->peers,peer,peer_sock,gras_socket_close_voidp);
136   }
137   return peer_sock;
138 }
139 static int worker_commands_cb(gras_msg_cb_ctx_t ctx, void *payload) {
140   worker_data_t g = gras_userdata_get();
141   g->commands = *(xbt_dynar_t*)payload;
142   return 0;
143 }
144 static int worker_go_cb(gras_msg_cb_ctx_t ctx, void *payload) {
145   worker_data_t g = gras_userdata_get();
146   unsigned int cursor;
147   xbt_workload_elm_t cmd;
148   xbt_ex_t e;
149   const char *myname=gras_os_myname();
150   xbt_dynar_foreach(g->commands,cursor,cmd) {
151     xbt_workload_data_chunk_t chunk;
152
153     if (!strcmp(cmd->who,myname)) {
154       char *c = xbt_workload_elm_to_string(cmd);
155       INFO1("%s",c);
156       free(c);
157
158       switch (cmd->action) {
159       case XBT_WORKLOAD_COMPUTE:
160         gras_cpu_burn(cmd->d_arg);
161         break;
162       case XBT_WORKLOAD_SEND:
163         chunk = xbt_workload_data_chunk_new((int)(cmd->d_arg));
164         gras_msg_send(get_peer_sock(cmd->str_arg),"chunk",&chunk);
165         break;
166       case XBT_WORKLOAD_RECV:
167         TRY {
168           gras_msg_wait(1000000,"chunk",NULL,&chunk);
169         } CATCH(e) {
170           RETHROW2("Exception while waiting for %f bytes from %s: %s",
171                 cmd->d_arg,cmd->str_arg);
172         }
173         xbt_workload_data_chunk_free(chunk);
174         break;
175       }
176     }
177   }
178   return 0;
179 }
180
181 int worker(int argc,char *argv[]) {
182   worker_data_t globals;
183   gras_init(&argc,argv);
184   declare_msg();
185   globals = gras_userdata_new(s_worker_data_t);
186   /* Create the connexions */
187   globals->mysock = gras_socket_server(4000); /* FIXME: shouldn't be hardcoded */
188   gras_socket_t master=NULL;
189   int connected=0;
190
191   gras_cb_register("commands", worker_commands_cb);
192   gras_cb_register("go", worker_go_cb);
193   globals->peers=xbt_dict_new();
194
195   if (gras_if_RL())
196     INFO2("Sensor %s starting. Connecting to master on %s", gras_os_myname(), argv[1]);
197   while (!connected) {
198     xbt_ex_t e;
199     TRY {
200       master = gras_socket_client_from_string(argv[1]);
201       connected = 1;
202     }
203     CATCH(e) {
204       if (e.category != system_error)
205         RETHROW;
206       xbt_ex_free(e);
207       INFO0("Failed to connect. Retry in 0.5 second");
208       gras_os_sleep(0.5);
209     }
210   }
211   /* Join and run the group */
212   amok_pm_group_join(master, "replay", -1);
213   amok_pm_mainloop(600);
214
215   gras_socket_close(globals->mysock);
216   xbt_dynar_free(&(globals->commands));
217   xbt_dict_free(&(globals->peers));
218   free(globals);
219
220   gras_exit();
221   return 0;
222 }