From: mquinson Date: Tue, 13 Oct 2009 13:53:16 +0000 (+0000) Subject: GRAS replayer partially works. Need a real platform to test it now X-Git-Tag: SVN~957 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/d6839b9c710b3b0e54743f4a700131f6105c02c8 GRAS replayer partially works. Need a real platform to test it now git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6757 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- diff --git a/examples/gras/replay/replay.c b/examples/gras/replay/replay.c index 327c8187bb..a7ff823c31 100644 --- a/examples/gras/replay/replay.c +++ b/examples/gras/replay/replay.c @@ -23,44 +23,199 @@ int worker(int argc,char *argv[]); XBT_LOG_NEW_DEFAULT_CATEGORY(replay, "Messages specific to this example"); static void declare_msg() { - gras_datadesc_type_t command_assignment_type; - - + amok_pm_init(); + xbt_workload_declare_datadesc(); + gras_msgtype_declare("go",NULL); + gras_msgtype_declare("commands", + gras_datadesc_dynar( + gras_datadesc_by_name("xbt_workload_elm_t"),xbt_workload_elm_free_voidp)); + gras_msgtype_declare("chunk",gras_datadesc_by_name("xbt_workload_data_chunk_t")); } int master(int argc,char *argv[]) { + gras_init(&argc,argv); - amok_pm_init(); + declare_msg(); + xbt_assert0(argc==3,"usage: replay_master tracefile port"); gras_socket_server(atoi(argv[2])); /* open my master socket, even if I don't use it */ xbt_dynar_t peers = amok_pm_group_new("replay"); /* group of slaves */ + xbt_peer_t peer; xbt_dynar_t cmds = xbt_workload_parse_file(argv[1]); xbt_workload_sort_who_date(cmds); unsigned int cursor; xbt_workload_elm_t cmd; + xbt_ex_t e; + xbt_dict_cursor_t dict_cursor; + + xbt_dict_t pals_int=xbt_dict_new(); xbt_dynar_foreach(cmds,cursor,cmd) { - char *str = xbt_workload_elm_to_string(cmd); - INFO1("%s",str); - free(str); + int *p = xbt_dict_get_or_null(pals_int,cmd->who); + if (!p) { + p=(int*)0xBEAF; + xbt_dict_set(pals_int,cmd->who,&p,NULL); + } } /* friends, we're ready. Come and play */ - INFO0("Wait for peers for 5 sec"); - gras_msg_handleall(5); - INFO1("Got %ld pals", xbt_dynar_length(peers)); - + INFO1("Wait for peers for a while. I need %d peers",xbt_dict_size(pals_int)); + while (xbt_dynar_length(peers)name); + } + char *peer_name; + void *data; + xbt_dict_foreach(pals_int,dict_cursor,peer_name,data) { + INFO1("Peer %s didn't showed up",peer_name); + } + RETHROW; + } + } + INFO1("Got my %ld peers", xbt_dynar_length(peers)); + xbt_dict_free(&pals_int); + + /* Check who came */ + xbt_dict_t pals = xbt_dict_new(); + gras_socket_t pal; + xbt_dynar_foreach(peers,cursor, peer) { + //INFO1("%s is here",peer->name); + gras_socket_t sock = gras_socket_client(peer->name,peer->port); + xbt_dict_set(pals,peer->name,sock,NULL); + } + /* check that we have a dude for every element of the trace */ + xbt_dynar_foreach(cmds,cursor,cmd) { + if (0) { + char *str = xbt_workload_elm_to_string(cmd); + INFO1("%s",str); + free(str); + } + + pal=xbt_dict_get_or_null(pals,cmd->who); + if (!pal) { + CRITICAL1("Process %s didn't came! Abording!",cmd->who); + amok_pm_group_shutdown("replay"); + xbt_dynar_free(&cmds); + gras_exit(); + abort(); + } + gras_msg_send(pal,"commands",&cmds); + } + INFO0("Sent commands to every processes. Let them start now"); + xbt_dict_cursor_t dict_it; + char *pal_name; + xbt_dict_foreach(pals,dict_it,pal_name,pal) { + gras_msg_send(pal,"go",NULL); + } + INFO0("They should be started by now."); /* Done, exiting */ + amok_pm_group_shutdown("replay"); xbt_dynar_free(&cmds); gras_exit(); return 0; } +typedef struct { + xbt_dynar_t commands; + xbt_dict_t peers; + gras_socket_t mysock; +} s_worker_data_t,*worker_data_t; + + +static gras_socket_t get_peer_sock(char*peer) { + worker_data_t g = gras_userdata_get(); + gras_socket_t peer_sock = xbt_dict_get_or_null(g->peers,peer); + if (!peer_sock) { + peer_sock = gras_socket_client(peer,4000); + xbt_dict_set(g->peers,peer,peer_sock,gras_socket_close_voidp); + } + return peer_sock; +} +static int worker_commands_cb(gras_msg_cb_ctx_t ctx, void *payload) { + worker_data_t g = gras_userdata_get(); + g->commands = *(xbt_dynar_t*)payload; + return 0; +} +static int worker_go_cb(gras_msg_cb_ctx_t ctx, void *payload) { + worker_data_t g = gras_userdata_get(); + unsigned int cursor; + xbt_workload_elm_t cmd; + xbt_ex_t e; + const char *myname=gras_os_myname(); + xbt_dynar_foreach(g->commands,cursor,cmd) { + xbt_workload_data_chunk_t chunk; + + if (!strcmp(cmd->who,myname)) { + char *c = xbt_workload_elm_to_string(cmd); + INFO1("%s",c); + free(c); + + switch (cmd->action) { + case XBT_WORKLOAD_COMPUTE: + gras_cpu_burn(cmd->d_arg); + break; + case XBT_WORKLOAD_SEND: + chunk = xbt_workload_data_chunk_new((int)(cmd->d_arg)); + gras_msg_send(get_peer_sock(cmd->str_arg),"chunk",&chunk); + break; + case XBT_WORKLOAD_RECV: + TRY { + gras_msg_wait(1000000,"chunk",NULL,&chunk); + } CATCH(e) { + RETHROW2("Exception while waiting for %f bytes from %s: %s", + cmd->d_arg,cmd->str_arg); + } + xbt_workload_data_chunk_free(chunk); + break; + } + } + } + return 0; +} + int worker(int argc,char *argv[]) { + worker_data_t globals; gras_init(&argc,argv); - amok_pm_init(); + declare_msg(); + globals = gras_userdata_new(s_worker_data_t); + /* Create the connexions */ + globals->mysock = gras_socket_server(4000); /* FIXME: shouldn't be hardcoded */ + gras_socket_t master; + int connected=0; + + gras_cb_register("commands", worker_commands_cb); + gras_cb_register("go", worker_go_cb); + globals->peers=xbt_dict_new(); + + if (gras_if_RL()) + INFO2("Sensor %s starting. Connecting to master on %s", gras_os_myname(), argv[1]); + while (!connected) { + xbt_ex_t e; + TRY { + master = gras_socket_client_from_string(argv[1]); + connected = 1; + } + CATCH(e) { + if (e.category != system_error) + RETHROW; + xbt_ex_free(e); + INFO0("Failed to connect. Retry in 0.5 second"); + gras_os_sleep(0.5); + } + } + /* Join and run the group */ + amok_pm_group_join(master, "replay", -1); + amok_pm_mainloop(600); + + gras_socket_close(globals->mysock); + xbt_dynar_free(&(globals->commands)); + xbt_dict_free(&(globals->peers)); + free(globals); gras_exit(); return 0; diff --git a/examples/gras/replay/replay.xml b/examples/gras/replay/replay.xml index 27a3d7585c..b30850e969 100644 --- a/examples/gras/replay/replay.xml +++ b/examples/gras/replay/replay.xml @@ -1,12 +1,34 @@ - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/examples/gras/replay/workload.h b/examples/gras/replay/workload.h index e4e3543fa1..67f0c0c685 100644 --- a/examples/gras/replay/workload.h +++ b/examples/gras/replay/workload.h @@ -21,6 +21,8 @@ /* one command to do */ typedef struct { + /* keep it in sync with function xbt_workload_declare_datadesc() */ + char *who; /* the slave who should do it */ char *comment; /* a comment placed at the end of the line, if any */ int action; /* 0: compute(darg flops); 1: send darg bytes to strarg; 2: recv darg bytes from strarg */ @@ -37,4 +39,14 @@ XBT_PUBLIC(int) xbt_workload_elm_cmp_who_date(const void* _c1, const void* _c2); XBT_PUBLIC(void) xbt_workload_sort_who_date(xbt_dynar_t c); XBT_PUBLIC(xbt_dynar_t) xbt_workload_parse_file(char *filename); +XBT_PUBLIC(void) xbt_workload_declare_datadesc(void); + + +typedef struct { + int size; + char *chunk; +} s_xbt_workload_data_chunk_t,*xbt_workload_data_chunk_t; +XBT_PUBLIC(xbt_workload_data_chunk_t) xbt_workload_data_chunk_new(int size); +XBT_PUBLIC(void) xbt_workload_data_chunk_free(xbt_workload_data_chunk_t c); + #endif /* XBT_WORKLOAD_H_ */ diff --git a/examples/gras/replay/xbt_workload.c b/examples/gras/replay/xbt_workload.c index ab22b50b07..1dbdff7837 100644 --- a/examples/gras/replay/xbt_workload.c +++ b/examples/gras/replay/xbt_workload.c @@ -12,6 +12,7 @@ #include "xbt/sysdep.h" #include "xbt/str.h" #include "workload.h" +#include "gras/datadesc.h" XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_workload,xbt, "Workload characterisation mecanisms"); @@ -154,3 +155,40 @@ xbt_dynar_t xbt_workload_parse_file(char *filename) { xbt_dynar_free(&in); return cmds; } + + +void xbt_workload_declare_datadesc(void) { + gras_datadesc_type_t ddt; + + ddt = gras_datadesc_struct("s_xbt_workload_elm_t"); + gras_datadesc_struct_append(ddt,"who",gras_datadesc_by_name("string")); + gras_datadesc_struct_append(ddt,"comment",gras_datadesc_by_name("string")); + gras_datadesc_struct_append(ddt,"action",gras_datadesc_by_name("int")); + gras_datadesc_struct_append(ddt,"date",gras_datadesc_by_name("double")); + gras_datadesc_struct_append(ddt,"d_arg",gras_datadesc_by_name("double")); + gras_datadesc_struct_append(ddt,"str_arg",gras_datadesc_by_name("string")); + gras_datadesc_struct_close(ddt); + + gras_datadesc_ref("xbt_workload_elm_t",ddt); + + ddt = gras_datadesc_struct("s_xbt_workload_data_chunk_t"); + gras_datadesc_struct_append(ddt,"size",gras_datadesc_by_name("int")); + gras_datadesc_cb_field_push(ddt, "size"); + gras_datadesc_struct_append(ddt,"chunk",gras_datadesc_ref_pop_arr(gras_datadesc_by_name("char"))); + gras_datadesc_struct_close(ddt); + + gras_datadesc_ref("xbt_workload_data_chunk_t",ddt); +} + + + +xbt_workload_data_chunk_t xbt_workload_data_chunk_new(int size) { + xbt_workload_data_chunk_t res = xbt_new0(s_xbt_workload_data_chunk_t,1); + res->size = size; + res->chunk = xbt_new(char,size-sizeof(res)-sizeof(int)); + return res; +} +void xbt_workload_data_chunk_free(xbt_workload_data_chunk_t c) { + free(c->chunk); + free(c); +}