Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
4f790901022ebe8aff67e4906e4d10e4707f61ce
[simgrid.git] / examples / gras / replay / replay.c
1 /* Copyright (c) 2009 Da SimGrid Team.  All rights reserved.                */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 /* This example replays a trace as produced by examples/simdag/dax, or other means
7  * This is mainly interesting when run on real platforms, to validate the results
8  * given in the simulator when running SimDag.
9  */
10
11 #include "xbt/ex.h"
12 #include "xbt/log.h"
13 #include "xbt/str.h"
14 #include "xbt/dynar.h"
15 #include "xbt/synchro.h"
16 #include "workload.h"
17 #include "gras.h"
18 #include "amok/peermanagement.h"
19 #include <stdio.h>
20
21 #include "simix/simix.h"
22
23 int master(int argc,char *argv[]);
24 int worker(int argc,char *argv[]);
25
26 XBT_LOG_NEW_DEFAULT_CATEGORY(replay, "Messages specific to this example");
27
28 static void declare_msg() {
29   amok_pm_init();
30   xbt_workload_declare_datadesc();
31   gras_msgtype_declare("go",NULL);
32   gras_msgtype_declare("commands",
33       gras_datadesc_dynar(
34           gras_datadesc_by_name("xbt_workload_elm_t"),xbt_workload_elm_free_voidp));
35   gras_msgtype_declare("chunk",gras_datadesc_by_name("xbt_workload_data_chunk_t"));
36 }
37
38 int master(int argc,char *argv[]) {
39
40   gras_init(&argc,argv);
41   declare_msg();
42
43
44   xbt_assert0(argc==3,"usage: replay_master tracefile port");
45   gras_socket_server(atoi(argv[2])); /* open my master socket, even if I don't use it */
46   xbt_dynar_t peers = amok_pm_group_new("replay");            /* group of slaves */
47   xbt_peer_t peer;
48   xbt_dynar_t cmds = xbt_workload_parse_file(argv[1]);
49   xbt_workload_sort_who_date(cmds);
50   unsigned int cursor;
51   xbt_workload_elm_t cmd;
52
53   xbt_ex_t e;
54   xbt_dict_cursor_t dict_cursor;
55
56   xbt_dict_t pals_int=xbt_dict_new();
57   xbt_dynar_foreach(cmds,cursor,cmd) {
58     int *p = xbt_dict_get_or_null(pals_int,cmd->who);
59     if (!p) {
60       p=(int*)0xBEAF;
61       xbt_dict_set(pals_int,cmd->who,&p,NULL);
62     }
63   }
64
65   /* friends, we're ready. Come and play */
66   INFO1("Wait for peers for a while. I need %d peers",xbt_dict_size(pals_int));
67   while (xbt_dynar_length(peers)<xbt_dict_size(pals_int)) {
68     TRY {
69       gras_msg_handle(20);
70     } CATCH(e) {
71       xbt_dynar_foreach(peers,cursor,peer){
72         xbt_dict_remove(pals_int,peer->name);
73       }
74       char *peer_name;
75       void *data;
76       xbt_dict_foreach(pals_int,dict_cursor,peer_name,data) {
77         INFO1("Peer %s didn't showed up",peer_name);
78       }
79       RETHROW;
80     }
81   }
82   INFO1("Got my %ld peers", xbt_dynar_length(peers));
83   xbt_dict_free(&pals_int);
84
85   /* Check who came */
86   xbt_dict_t pals = xbt_dict_new();
87   gras_socket_t pal;
88   xbt_dynar_foreach(peers,cursor, peer) {
89     //INFO1("%s is here",peer->name);
90     gras_socket_t sock = gras_socket_client(peer->name,peer->port);
91     xbt_dict_set(pals,peer->name,sock,NULL);
92   }
93   /* check that we have a dude for every element of the trace */
94   xbt_dynar_foreach(cmds,cursor,cmd) {
95     pal=xbt_dict_get_or_null(pals,cmd->who);
96     if (!pal) {
97       CRITICAL1("Process %s didn't came! Abording!",cmd->who);
98       amok_pm_group_shutdown("replay");
99       xbt_dynar_free(&cmds);
100       gras_exit();
101       abort();
102     }
103   }
104   /* Send the commands to every pal */
105   char *pal_name;
106   xbt_dict_foreach(pals,dict_cursor,pal_name,pal) {
107     gras_msg_send(pal,"commands",&cmds);
108   }
109   INFO0("Sent commands to every processes. Let them start now");
110   xbt_dict_cursor_t dict_it;
111   xbt_dict_foreach(pals,dict_it,pal_name,pal) {
112     gras_msg_send(pal,"go",NULL);
113   }
114   INFO0("They should be started by now. Wait for their completion.");
115
116   for (cursor=0;cursor<xbt_dynar_length(peers);cursor++) {
117     gras_msg_wait(-1,"go",NULL,NULL);
118   }
119
120   /* Done, exiting */
121   //amok_pm_group_shutdown("replay");
122   xbt_dynar_free(&cmds);
123   gras_exit();
124   return 0;
125 }
126
127 typedef struct {
128   xbt_dynar_t commands;
129   xbt_dict_t peers;
130   gras_socket_t mysock;
131 } s_worker_data_t,*worker_data_t;
132
133
134 static gras_socket_t get_peer_sock(char*peer) {
135   worker_data_t g = gras_userdata_get();
136   gras_socket_t peer_sock = xbt_dict_get_or_null(g->peers,peer);
137   if (!peer_sock) {
138     INFO1("Create a socket to %s",peer);
139     peer_sock = gras_socket_client(peer,4000);
140     xbt_dict_set(g->peers,peer,peer_sock,NULL);//gras_socket_close_voidp);
141   }
142   return peer_sock;
143 }
144 static int worker_commands_cb(gras_msg_cb_ctx_t ctx, void *payload) {
145   worker_data_t g = gras_userdata_get();
146   g->commands = *(xbt_dynar_t*)payload;
147   return 0;
148 }
149 static void do_command(int rank, void*c) {
150   xbt_ex_t e;
151   xbt_workload_elm_t cmd = *(xbt_workload_elm_t*)c;
152   xbt_workload_data_chunk_t chunk;
153
154   if (cmd->action == XBT_WORKLOAD_SEND) {
155     gras_socket_t sock = get_peer_sock(cmd->str_arg);
156     chunk = xbt_workload_data_chunk_new((int)(cmd->d_arg));
157     INFO4("Send %.f bytes to %s %s:%d",cmd->d_arg,cmd->str_arg,
158         gras_socket_peer_name(sock),gras_socket_peer_port(sock));
159     gras_msg_send_(sock,gras_msgtype_by_name("chunk"),&chunk);
160     INFO2("Done sending %.f bytes to %s",cmd->d_arg,cmd->str_arg);
161
162   } else if (cmd->action == XBT_WORKLOAD_RECV) {
163     INFO2("Recv %.f bytes from %s",cmd->d_arg,cmd->str_arg);
164     TRY {
165       gras_msg_wait(1000000,"chunk",NULL,&chunk);
166     } CATCH(e) {
167       SIMIX_display_process_status();
168       RETHROW2("Exception while waiting for %f bytes from %s: %s",
169             cmd->d_arg,cmd->str_arg);
170     }
171     xbt_workload_data_chunk_free(chunk);
172     INFO2("Done receiving %.f bytes from %s",cmd->d_arg,cmd->str_arg);
173
174   } else {
175     xbt_die(bprintf("unknown command: %s",xbt_workload_elm_to_string(cmd)));
176   }
177 }
178 int worker(int argc,char *argv[]) {
179   xbt_ex_t e;
180   worker_data_t globals;
181   gras_init(&argc,argv);
182   declare_msg();
183   globals = gras_userdata_new(s_worker_data_t);
184   /* Create the connexions */
185   globals->mysock = gras_socket_server(4000); /* FIXME: shouldn't be hardcoded */
186   gras_socket_t master=NULL;
187   int connected=0;
188
189   gras_cb_register("commands", worker_commands_cb);
190   globals->peers=xbt_dict_new();
191
192   if (gras_if_RL())
193     INFO2("Sensor %s starting. Connecting to master on %s", gras_os_myname(), argv[1]);
194   while (!connected) {
195     xbt_ex_t e;
196     TRY {
197       master = gras_socket_client_from_string(argv[1]);
198       connected = 1;
199     }
200     CATCH(e) {
201       if (e.category != system_error)
202         RETHROW;
203       xbt_ex_free(e);
204       INFO0("Failed to connect. Retry in 0.5 second");
205       gras_os_sleep(0.5);
206     }
207   }
208   /* Join and run the group */
209   amok_pm_group_join(master, "replay", -1);
210   gras_msg_handle(60); // command message
211   gras_msg_wait(60,"go",NULL,NULL);
212   {
213     worker_data_t g = gras_userdata_get();
214     unsigned int cursor;
215     xbt_workload_elm_t cmd;
216     const char *myname=gras_os_myname();
217     xbt_dynar_t cmd_to_go = xbt_dynar_new(sizeof(xbt_workload_elm_t),NULL);
218
219     xbt_dynar_foreach(g->commands,cursor,cmd) {
220       if (!strcmp(cmd->who,myname)) {
221         char *c = xbt_workload_elm_to_string(cmd);
222   //      INFO1("TODO: %s",c);
223         free(c);
224
225         switch (cmd->action) {
226         case XBT_WORKLOAD_COMPUTE:
227           /* If any communication were queued, do them in parallel */
228           if (xbt_dynar_length(cmd_to_go)){
229             TRY {
230               xbt_dynar_dopar(cmd_to_go,do_command);
231               xbt_dynar_reset(cmd_to_go);
232             } CATCH(e) {
233               SIMIX_display_process_status();
234             }
235             INFO0("Communications all done");
236             xbt_dynar_reset(cmd_to_go);
237           }
238           INFO1("Compute %.f flops",cmd->d_arg);
239           gras_cpu_burn(cmd->d_arg);
240           INFO1("Done computing %.f flops",cmd->d_arg);
241           break;
242         case XBT_WORKLOAD_SEND:
243           /* Create the socket from main thread since it seems to fails when done from dopar thread */
244           get_peer_sock(cmd->str_arg);
245         case XBT_WORKLOAD_RECV:
246           /* queue communications for later realization in parallel */
247           xbt_dynar_push(cmd_to_go,&cmd);
248           break;
249         }
250       }
251     }
252     /* do in parallel any communication still queued */
253     INFO1("Do %ld pending communications after end of TODO list",xbt_dynar_length(cmd_to_go));
254     if (xbt_dynar_length(cmd_to_go)){
255       xbt_dynar_dopar(cmd_to_go,do_command);
256       xbt_dynar_reset(cmd_to_go);
257     }
258   }
259
260   gras_msg_send(master,"go",NULL);
261 //  amok_pm_group_leave(master, "replay");
262
263   gras_socket_close(globals->mysock);
264   xbt_dynar_free(&(globals->commands));
265   xbt_dict_free(&(globals->peers));
266   free(globals);
267
268   gras_exit();
269   return 0;
270 }