From: mquinson Date: Thu, 15 Oct 2009 11:54:49 +0000 (+0000) Subject: Try to do all communications in parallel (not quite working) X-Git-Tag: SVN~930 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/b51a186124ce97844fc071cd2f1d8859dbac1346 Try to do all communications in parallel (not quite working) git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6784 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- diff --git a/examples/gras/replay/replay.c b/examples/gras/replay/replay.c index a8e99962fe..f7b23fbea3 100644 --- a/examples/gras/replay/replay.c +++ b/examples/gras/replay/replay.c @@ -12,6 +12,7 @@ #include "xbt/log.h" #include "xbt/str.h" #include "xbt/dynar.h" +#include "xbt/synchro.h" #include "workload.h" #include "gras.h" #include "amok/peermanagement.h" @@ -132,7 +133,7 @@ static gras_socket_t get_peer_sock(char*peer) { gras_socket_t peer_sock = xbt_dict_get_or_null(g->peers,peer); if (!peer_sock) { peer_sock = gras_socket_client(peer,4000); - xbt_dict_set(g->peers,peer,peer_sock,gras_socket_close_voidp); + xbt_dict_set(g->peers,peer,peer_sock,NULL);//gras_socket_close_voidp); } return peer_sock; } @@ -141,15 +142,38 @@ static int worker_commands_cb(gras_msg_cb_ctx_t ctx, void *payload) { g->commands = *(xbt_dynar_t*)payload; return 0; } +static void do_command(int rank, void*c) { + xbt_ex_t e; + xbt_workload_elm_t cmd = *(xbt_workload_elm_t*)c; + xbt_workload_data_chunk_t chunk; + + if (cmd->action == XBT_WORKLOAD_SEND) { + chunk = xbt_workload_data_chunk_new((int)(cmd->d_arg)); + INFO2("Send %.f bytes to %s",cmd->d_arg,cmd->str_arg); + gras_msg_send(get_peer_sock(cmd->str_arg),"chunk",&chunk); + + } else if (cmd->action == XBT_WORKLOAD_RECV) { + INFO2("Recv %.f bytes from %s",cmd->d_arg,cmd->str_arg); + TRY { + gras_msg_wait(1000000,"chunk",NULL,&chunk); + } CATCH(e) { + RETHROW2("Exception while waiting for %f bytes from %s: %s", + cmd->d_arg,cmd->str_arg); + } + xbt_workload_data_chunk_free(chunk); + + } else { + xbt_die(bprintf("unknown command: %s",xbt_workload_elm_to_string(cmd))); + } +} static int worker_go_cb(gras_msg_cb_ctx_t ctx, void *payload) { worker_data_t g = gras_userdata_get(); unsigned int cursor; xbt_workload_elm_t cmd; - xbt_ex_t e; const char *myname=gras_os_myname(); - xbt_dynar_foreach(g->commands,cursor,cmd) { - xbt_workload_data_chunk_t chunk; + xbt_dynar_t cmd_to_go = xbt_dynar_new(sizeof(xbt_workload_elm_t),NULL); + xbt_dynar_foreach(g->commands,cursor,cmd) { if (!strcmp(cmd->who,myname)) { char *c = xbt_workload_elm_to_string(cmd); INFO1("%s",c); @@ -157,24 +181,29 @@ static int worker_go_cb(gras_msg_cb_ctx_t ctx, void *payload) { switch (cmd->action) { case XBT_WORKLOAD_COMPUTE: + /* If any communication were queued, do them in parallel */ + if (xbt_dynar_length(cmd_to_go)){ + xbt_dynar_dopar(cmd_to_go,do_command); + INFO0("Communications all done"); + xbt_dynar_reset(cmd_to_go); + } + INFO1("Compute %.f flops",cmd->d_arg); gras_cpu_burn(cmd->d_arg); break; case XBT_WORKLOAD_SEND: - chunk = xbt_workload_data_chunk_new((int)(cmd->d_arg)); - gras_msg_send(get_peer_sock(cmd->str_arg),"chunk",&chunk); - break; case XBT_WORKLOAD_RECV: - TRY { - gras_msg_wait(1000000,"chunk",NULL,&chunk); - } CATCH(e) { - RETHROW2("Exception while waiting for %f bytes from %s: %s", - cmd->d_arg,cmd->str_arg); - } - xbt_workload_data_chunk_free(chunk); + /* queue communications for later realization in parallel */ + xbt_dynar_push(cmd_to_go,&cmd); break; } } } + /* do in parallel any communication still queued */ + if (xbt_dynar_length(cmd_to_go)){ + xbt_dynar_dopar(cmd_to_go,do_command); + xbt_dynar_reset(cmd_to_go); + } + return 0; }