From: mquinson Date: Thu, 15 Oct 2009 16:26:07 +0000 (+0000) Subject: Various fixes. Working (in SG) for ForkJoins, but not for Montage. X-Git-Tag: SVN~922 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/8961684b8ad3a24b5d940510fb65dd650753ac41 Various fixes. Working (in SG) for ForkJoins, but not for Montage. * Change synchronization scheme for ending: workers send go to master instead of having the master calling amok_pm_group_end() right after the setup. This killed the workers ways too early. * make sure that workers do create all mandatory sockets to others since dopar threads have real difficulties speaking to the listener thread to add new sockets * instead of having a callback to the go message in the worker which does all the work, do explicitely wait for that message and move the code in worker's main() Like I said, there is still a deadlock when trying to replay Montage_25, but this may related to the fact that this workflow seems somehow broken. the "cimages.tbl" file, generated by root and consumed by ID00021@mImgTbl is mangled in the dot output, and thus presumably also in memory representation. More to come tomorrow. git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6792 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- diff --git a/examples/gras/replay/replay.c b/examples/gras/replay/replay.c index f7b23fbea3..4f79090102 100644 --- a/examples/gras/replay/replay.c +++ b/examples/gras/replay/replay.c @@ -18,6 +18,8 @@ #include "amok/peermanagement.h" #include +#include "simix/simix.h" + int master(int argc,char *argv[]); int worker(int argc,char *argv[]); @@ -90,12 +92,6 @@ int master(int argc,char *argv[]) { } /* check that we have a dude for every element of the trace */ xbt_dynar_foreach(cmds,cursor,cmd) { - if (0) { - char *str = xbt_workload_elm_to_string(cmd); - INFO1("%s",str); - free(str); - } - pal=xbt_dict_get_or_null(pals,cmd->who); if (!pal) { CRITICAL1("Process %s didn't came! Abording!",cmd->who); @@ -104,18 +100,25 @@ int master(int argc,char *argv[]) { gras_exit(); abort(); } + } + /* Send the commands to every pal */ + char *pal_name; + xbt_dict_foreach(pals,dict_cursor,pal_name,pal) { gras_msg_send(pal,"commands",&cmds); } INFO0("Sent commands to every processes. Let them start now"); xbt_dict_cursor_t dict_it; - char *pal_name; xbt_dict_foreach(pals,dict_it,pal_name,pal) { gras_msg_send(pal,"go",NULL); } - INFO0("They should be started by now."); + INFO0("They should be started by now. Wait for their completion."); + + for (cursor=0;cursorpeers,peer); if (!peer_sock) { + INFO1("Create a socket to %s",peer); peer_sock = gras_socket_client(peer,4000); xbt_dict_set(g->peers,peer,peer_sock,NULL);//gras_socket_close_voidp); } @@ -148,66 +152,31 @@ static void do_command(int rank, void*c) { xbt_workload_data_chunk_t chunk; if (cmd->action == XBT_WORKLOAD_SEND) { + gras_socket_t sock = get_peer_sock(cmd->str_arg); chunk = xbt_workload_data_chunk_new((int)(cmd->d_arg)); - INFO2("Send %.f bytes to %s",cmd->d_arg,cmd->str_arg); - gras_msg_send(get_peer_sock(cmd->str_arg),"chunk",&chunk); + INFO4("Send %.f bytes to %s %s:%d",cmd->d_arg,cmd->str_arg, + gras_socket_peer_name(sock),gras_socket_peer_port(sock)); + gras_msg_send_(sock,gras_msgtype_by_name("chunk"),&chunk); + INFO2("Done sending %.f bytes to %s",cmd->d_arg,cmd->str_arg); } else if (cmd->action == XBT_WORKLOAD_RECV) { INFO2("Recv %.f bytes from %s",cmd->d_arg,cmd->str_arg); TRY { gras_msg_wait(1000000,"chunk",NULL,&chunk); } CATCH(e) { + SIMIX_display_process_status(); RETHROW2("Exception while waiting for %f bytes from %s: %s", cmd->d_arg,cmd->str_arg); } xbt_workload_data_chunk_free(chunk); + INFO2("Done receiving %.f bytes from %s",cmd->d_arg,cmd->str_arg); } else { xbt_die(bprintf("unknown command: %s",xbt_workload_elm_to_string(cmd))); } } -static int worker_go_cb(gras_msg_cb_ctx_t ctx, void *payload) { - worker_data_t g = gras_userdata_get(); - unsigned int cursor; - xbt_workload_elm_t cmd; - const char *myname=gras_os_myname(); - xbt_dynar_t cmd_to_go = xbt_dynar_new(sizeof(xbt_workload_elm_t),NULL); - - xbt_dynar_foreach(g->commands,cursor,cmd) { - if (!strcmp(cmd->who,myname)) { - char *c = xbt_workload_elm_to_string(cmd); - INFO1("%s",c); - free(c); - - switch (cmd->action) { - case XBT_WORKLOAD_COMPUTE: - /* If any communication were queued, do them in parallel */ - if (xbt_dynar_length(cmd_to_go)){ - xbt_dynar_dopar(cmd_to_go,do_command); - INFO0("Communications all done"); - xbt_dynar_reset(cmd_to_go); - } - INFO1("Compute %.f flops",cmd->d_arg); - gras_cpu_burn(cmd->d_arg); - break; - case XBT_WORKLOAD_SEND: - case XBT_WORKLOAD_RECV: - /* queue communications for later realization in parallel */ - xbt_dynar_push(cmd_to_go,&cmd); - break; - } - } - } - /* do in parallel any communication still queued */ - if (xbt_dynar_length(cmd_to_go)){ - xbt_dynar_dopar(cmd_to_go,do_command); - xbt_dynar_reset(cmd_to_go); - } - - return 0; -} - int worker(int argc,char *argv[]) { + xbt_ex_t e; worker_data_t globals; gras_init(&argc,argv); declare_msg(); @@ -218,7 +187,6 @@ int worker(int argc,char *argv[]) { int connected=0; gras_cb_register("commands", worker_commands_cb); - gras_cb_register("go", worker_go_cb); globals->peers=xbt_dict_new(); if (gras_if_RL()) @@ -239,7 +207,58 @@ int worker(int argc,char *argv[]) { } /* Join and run the group */ amok_pm_group_join(master, "replay", -1); - amok_pm_mainloop(600); + gras_msg_handle(60); // command message + gras_msg_wait(60,"go",NULL,NULL); + { + worker_data_t g = gras_userdata_get(); + unsigned int cursor; + xbt_workload_elm_t cmd; + const char *myname=gras_os_myname(); + xbt_dynar_t cmd_to_go = xbt_dynar_new(sizeof(xbt_workload_elm_t),NULL); + + xbt_dynar_foreach(g->commands,cursor,cmd) { + if (!strcmp(cmd->who,myname)) { + char *c = xbt_workload_elm_to_string(cmd); + // INFO1("TODO: %s",c); + free(c); + + switch (cmd->action) { + case XBT_WORKLOAD_COMPUTE: + /* If any communication were queued, do them in parallel */ + if (xbt_dynar_length(cmd_to_go)){ + TRY { + xbt_dynar_dopar(cmd_to_go,do_command); + xbt_dynar_reset(cmd_to_go); + } CATCH(e) { + SIMIX_display_process_status(); + } + INFO0("Communications all done"); + xbt_dynar_reset(cmd_to_go); + } + INFO1("Compute %.f flops",cmd->d_arg); + gras_cpu_burn(cmd->d_arg); + INFO1("Done computing %.f flops",cmd->d_arg); + break; + case XBT_WORKLOAD_SEND: + /* Create the socket from main thread since it seems to fails when done from dopar thread */ + get_peer_sock(cmd->str_arg); + case XBT_WORKLOAD_RECV: + /* queue communications for later realization in parallel */ + xbt_dynar_push(cmd_to_go,&cmd); + break; + } + } + } + /* do in parallel any communication still queued */ + INFO1("Do %ld pending communications after end of TODO list",xbt_dynar_length(cmd_to_go)); + if (xbt_dynar_length(cmd_to_go)){ + xbt_dynar_dopar(cmd_to_go,do_command); + xbt_dynar_reset(cmd_to_go); + } + } + + gras_msg_send(master,"go",NULL); +// amok_pm_group_leave(master, "replay"); gras_socket_close(globals->mysock); xbt_dynar_free(&(globals->commands));