Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Fix test pmm_rl by synchronizing the slaves [Arnaud Giersch]
[simgrid.git] / examples / gras / replay / replay.c
1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 /* This example replays a trace as produced by examples/simdag/dax, or other means
8  * This is mainly interesting when run on real platforms, to validate the results
9  * given in the simulator when running SimDag.
10  */
11
12 #include "xbt/ex.h"
13 #include "xbt/log.h"
14 #include "xbt/str.h"
15 #include "xbt/dynar.h"
16 #include "xbt/synchro.h"
17 #include "workload.h"
18 #include "gras.h"
19 #include "amok/peermanagement.h"
20 #include <stdio.h>
21
22 #include "simix/simix.h"
23
24 int master(int argc,char *argv[]);
25 int worker(int argc,char *argv[]);
26
27 XBT_LOG_NEW_DEFAULT_CATEGORY(replay, "Messages specific to this example");
28
29 static void declare_msg() {
30   amok_pm_init();
31   xbt_workload_declare_datadesc();
32   gras_msgtype_declare("go",NULL);
33   gras_msgtype_declare("commands",
34       gras_datadesc_dynar(
35           gras_datadesc_by_name("xbt_workload_elm_t"),xbt_workload_elm_free_voidp));
36   gras_msgtype_declare("chunk",gras_datadesc_by_name("xbt_workload_data_chunk_t"));
37 }
38
39 int master(int argc,char *argv[]) {
40
41   gras_init(&argc,argv);
42   declare_msg();
43
44
45   xbt_assert0(argc==3,"usage: replay_master tracefile port");
46   gras_socket_server(atoi(argv[2])); /* open my master socket, even if I don't use it */
47   xbt_dynar_t peers = amok_pm_group_new("replay");            /* group of slaves */
48   xbt_peer_t peer;
49   xbt_dynar_t cmds = xbt_workload_parse_file(argv[1]);
50   xbt_workload_sort_who_date(cmds);
51   unsigned int cursor;
52   xbt_workload_elm_t cmd;
53
54   xbt_ex_t e;
55   xbt_dict_cursor_t dict_cursor;
56
57   xbt_dict_t pals_int=xbt_dict_new();
58   xbt_dynar_foreach(cmds,cursor,cmd) {
59     int *p = xbt_dict_get_or_null(pals_int,cmd->who);
60     if (!p) {
61       p=(int*)0xBEAF;
62       xbt_dict_set(pals_int,cmd->who,&p,NULL);
63     }
64   }
65
66   /* friends, we're ready. Come and play */
67   INFO1("Wait for peers for a while. I need %d peers",xbt_dict_size(pals_int));
68   while (xbt_dynar_length(peers)<xbt_dict_size(pals_int)) {
69     TRY {
70       gras_msg_handle(20);
71     } CATCH(e) {
72       xbt_dynar_foreach(peers,cursor,peer){
73         xbt_dict_remove(pals_int,peer->name);
74       }
75       char *peer_name;
76       void *data;
77       xbt_dict_foreach(pals_int,dict_cursor,peer_name,data) {
78         INFO1("Peer %s didn't showed up",peer_name);
79       }
80       RETHROW;
81     }
82   }
83   INFO1("Got my %ld peers", xbt_dynar_length(peers));
84   xbt_dict_free(&pals_int);
85
86   /* Check who came */
87   xbt_dict_t pals = xbt_dict_new();
88   gras_socket_t pal;
89   xbt_dynar_foreach(peers,cursor, peer) {
90     //INFO1("%s is here",peer->name);
91     gras_socket_t sock = gras_socket_client(peer->name,peer->port);
92     xbt_dict_set(pals,peer->name,sock,NULL);
93   }
94   /* check that we have a dude for every element of the trace */
95   xbt_dynar_foreach(cmds,cursor,cmd) {
96     pal=xbt_dict_get_or_null(pals,cmd->who);
97     if (!pal) {
98       CRITICAL1("Process %s didn't came! Abording!",cmd->who);
99       amok_pm_group_shutdown("replay");
100       xbt_dynar_free(&cmds);
101       gras_exit();
102       abort();
103     }
104   }
105   /* Send the commands to every pal */
106   char *pal_name;
107   xbt_dict_foreach(pals,dict_cursor,pal_name,pal) {
108     gras_msg_send(pal,"commands",&cmds);
109   }
110   INFO0("Sent commands to every processes. Let them start now");
111   xbt_dict_cursor_t dict_it;
112   xbt_dict_foreach(pals,dict_it,pal_name,pal) {
113     gras_msg_send(pal,"go",NULL);
114   }
115   INFO0("They should be started by now. Wait for their completion.");
116
117   for (cursor=0;cursor<xbt_dynar_length(peers);cursor++) {
118     gras_msg_wait(-1,"go",NULL,NULL);
119   }
120
121   /* Done, exiting */
122   //amok_pm_group_shutdown("replay");
123   xbt_dynar_free(&cmds);
124   gras_exit();
125   return 0;
126 }
127
128 typedef struct {
129   xbt_dynar_t commands;
130   xbt_dict_t peers;
131   gras_socket_t mysock;
132 } s_worker_data_t,*worker_data_t;
133
134
135 static gras_socket_t get_peer_sock(char*peer) {
136   worker_data_t g = gras_userdata_get();
137   gras_socket_t peer_sock = xbt_dict_get_or_null(g->peers,peer);
138   if (!peer_sock) {
139     INFO1("Create a socket to %s",peer);
140     peer_sock = gras_socket_client(peer,4000);
141     xbt_dict_set(g->peers,peer,peer_sock,NULL);//gras_socket_close_voidp);
142   }
143   return peer_sock;
144 }
145 static int worker_commands_cb(gras_msg_cb_ctx_t ctx, void *payload) {
146   worker_data_t g = gras_userdata_get();
147   g->commands = *(xbt_dynar_t*)payload;
148   return 0;
149 }
150 static void do_command(int rank, void*c) {
151   xbt_ex_t e;
152   xbt_workload_elm_t cmd = *(xbt_workload_elm_t*)c;
153   xbt_workload_data_chunk_t chunk;
154
155   if (cmd->action == XBT_WORKLOAD_SEND) {
156     gras_socket_t sock = get_peer_sock(cmd->str_arg);
157     chunk = xbt_workload_data_chunk_new((int)(cmd->d_arg));
158     INFO4("Send %.f bytes to %s %s:%d",cmd->d_arg,cmd->str_arg,
159         gras_socket_peer_name(sock),gras_socket_peer_port(sock));
160     gras_msg_send_(sock,gras_msgtype_by_name("chunk"),&chunk);
161     INFO2("Done sending %.f bytes to %s",cmd->d_arg,cmd->str_arg);
162
163   } else if (cmd->action == XBT_WORKLOAD_RECV) {
164     INFO2("Recv %.f bytes from %s",cmd->d_arg,cmd->str_arg);
165     TRY {
166       gras_msg_wait(1000000,"chunk",NULL,&chunk);
167     } CATCH(e) {
168       SIMIX_display_process_status();
169       RETHROW2("Exception while waiting for %f bytes from %s: %s",
170             cmd->d_arg,cmd->str_arg);
171     }
172     xbt_workload_data_chunk_free(chunk);
173     INFO2("Done receiving %.f bytes from %s",cmd->d_arg,cmd->str_arg);
174
175   } else {
176     xbt_die(bprintf("unknown command: %s",xbt_workload_elm_to_string(cmd)));
177   }
178 }
179 int worker(int argc,char *argv[]) {
180   xbt_ex_t e;
181   worker_data_t globals;
182   gras_init(&argc,argv);
183   declare_msg();
184   globals = gras_userdata_new(s_worker_data_t);
185   /* Create the connexions */
186   globals->mysock = gras_socket_server(4000); /* FIXME: shouldn't be hardcoded */
187   gras_socket_t master=NULL;
188   int connected=0;
189
190   gras_cb_register("commands", worker_commands_cb);
191   globals->peers=xbt_dict_new();
192
193   if (gras_if_RL())
194     INFO2("Sensor %s starting. Connecting to master on %s", gras_os_myname(), argv[1]);
195   while (!connected) {
196     xbt_ex_t e;
197     TRY {
198       master = gras_socket_client_from_string(argv[1]);
199       connected = 1;
200     }
201     CATCH(e) {
202       if (e.category != system_error)
203         RETHROW;
204       xbt_ex_free(e);
205       INFO0("Failed to connect. Retry in 0.5 second");
206       gras_os_sleep(0.5);
207     }
208   }
209   /* Join and run the group */
210   amok_pm_group_join(master, "replay", -1);
211   gras_msg_handle(60); // command message
212   gras_msg_wait(60,"go",NULL,NULL);
213   {
214     worker_data_t g = gras_userdata_get();
215     unsigned int cursor;
216     xbt_workload_elm_t cmd;
217     const char *myname=gras_os_myname();
218     xbt_dynar_t cmd_to_go = xbt_dynar_new(sizeof(xbt_workload_elm_t),NULL);
219
220     xbt_dynar_foreach(g->commands,cursor,cmd) {
221       if (!strcmp(cmd->who,myname)) {
222         char *c = xbt_workload_elm_to_string(cmd);
223   //      INFO1("TODO: %s",c);
224         free(c);
225
226         switch (cmd->action) {
227         case XBT_WORKLOAD_COMPUTE:
228           /* If any communication were queued, do them in parallel */
229           if (xbt_dynar_length(cmd_to_go)){
230             TRY {
231               xbt_dynar_dopar(cmd_to_go,do_command);
232               xbt_dynar_reset(cmd_to_go);
233             } CATCH(e) {
234               SIMIX_display_process_status();
235             }
236             INFO0("Communications all done");
237             xbt_dynar_reset(cmd_to_go);
238           }
239           INFO1("Compute %.f flops",cmd->d_arg);
240           gras_cpu_burn(cmd->d_arg);
241           INFO1("Done computing %.f flops",cmd->d_arg);
242           break;
243         case XBT_WORKLOAD_SEND:
244           /* Create the socket from main thread since it seems to fails when done from dopar thread */
245           get_peer_sock(cmd->str_arg);
246         case XBT_WORKLOAD_RECV:
247           /* queue communications for later realization in parallel */
248           xbt_dynar_push(cmd_to_go,&cmd);
249           break;
250         }
251       }
252     }
253     /* do in parallel any communication still queued */
254     INFO1("Do %ld pending communications after end of TODO list",xbt_dynar_length(cmd_to_go));
255     if (xbt_dynar_length(cmd_to_go)){
256       xbt_dynar_dopar(cmd_to_go,do_command);
257       xbt_dynar_reset(cmd_to_go);
258     }
259   }
260
261   gras_msg_send(master,"go",NULL);
262 //  amok_pm_group_leave(master, "replay");
263
264   gras_socket_close(globals->mysock);
265   xbt_dynar_free(&(globals->commands));
266   xbt_dict_free(&(globals->peers));
267   free(globals);
268
269   gras_exit();
270   return 0;
271 }