#include "amok/peermanagement.h"
#include <stdio.h>
+#include "simix/simix.h"
+
int master(int argc,char *argv[]);
int worker(int argc,char *argv[]);
}
/* check that we have a dude for every element of the trace */
xbt_dynar_foreach(cmds,cursor,cmd) {
- if (0) {
- char *str = xbt_workload_elm_to_string(cmd);
- INFO1("%s",str);
- free(str);
- }
-
pal=xbt_dict_get_or_null(pals,cmd->who);
if (!pal) {
CRITICAL1("Process %s didn't came! Abording!",cmd->who);
gras_exit();
abort();
}
+ }
+ /* Send the commands to every pal */
+ char *pal_name;
+ xbt_dict_foreach(pals,dict_cursor,pal_name,pal) {
gras_msg_send(pal,"commands",&cmds);
}
INFO0("Sent commands to every processes. Let them start now");
xbt_dict_cursor_t dict_it;
- char *pal_name;
xbt_dict_foreach(pals,dict_it,pal_name,pal) {
gras_msg_send(pal,"go",NULL);
}
- INFO0("They should be started by now.");
+ INFO0("They should be started by now. Wait for their completion.");
+
+ for (cursor=0;cursor<xbt_dynar_length(peers);cursor++) {
+ gras_msg_wait(-1,"go",NULL,NULL);
+ }
/* Done, exiting */
- amok_pm_group_shutdown("replay");
+ //amok_pm_group_shutdown("replay");
xbt_dynar_free(&cmds);
gras_exit();
return 0;
worker_data_t g = gras_userdata_get();
gras_socket_t peer_sock = xbt_dict_get_or_null(g->peers,peer);
if (!peer_sock) {
+ INFO1("Create a socket to %s",peer);
peer_sock = gras_socket_client(peer,4000);
xbt_dict_set(g->peers,peer,peer_sock,NULL);//gras_socket_close_voidp);
}
xbt_workload_data_chunk_t chunk;
if (cmd->action == XBT_WORKLOAD_SEND) {
+ gras_socket_t sock = get_peer_sock(cmd->str_arg);
chunk = xbt_workload_data_chunk_new((int)(cmd->d_arg));
- INFO2("Send %.f bytes to %s",cmd->d_arg,cmd->str_arg);
- gras_msg_send(get_peer_sock(cmd->str_arg),"chunk",&chunk);
+ INFO4("Send %.f bytes to %s %s:%d",cmd->d_arg,cmd->str_arg,
+ gras_socket_peer_name(sock),gras_socket_peer_port(sock));
+ gras_msg_send_(sock,gras_msgtype_by_name("chunk"),&chunk);
+ INFO2("Done sending %.f bytes to %s",cmd->d_arg,cmd->str_arg);
} else if (cmd->action == XBT_WORKLOAD_RECV) {
INFO2("Recv %.f bytes from %s",cmd->d_arg,cmd->str_arg);
TRY {
gras_msg_wait(1000000,"chunk",NULL,&chunk);
} CATCH(e) {
+ SIMIX_display_process_status();
RETHROW2("Exception while waiting for %f bytes from %s: %s",
cmd->d_arg,cmd->str_arg);
}
xbt_workload_data_chunk_free(chunk);
+ INFO2("Done receiving %.f bytes from %s",cmd->d_arg,cmd->str_arg);
} else {
xbt_die(bprintf("unknown command: %s",xbt_workload_elm_to_string(cmd)));
}
}
-static int worker_go_cb(gras_msg_cb_ctx_t ctx, void *payload) {
- worker_data_t g = gras_userdata_get();
- unsigned int cursor;
- xbt_workload_elm_t cmd;
- const char *myname=gras_os_myname();
- xbt_dynar_t cmd_to_go = xbt_dynar_new(sizeof(xbt_workload_elm_t),NULL);
-
- xbt_dynar_foreach(g->commands,cursor,cmd) {
- if (!strcmp(cmd->who,myname)) {
- char *c = xbt_workload_elm_to_string(cmd);
- INFO1("%s",c);
- free(c);
-
- switch (cmd->action) {
- case XBT_WORKLOAD_COMPUTE:
- /* If any communication were queued, do them in parallel */
- if (xbt_dynar_length(cmd_to_go)){
- xbt_dynar_dopar(cmd_to_go,do_command);
- INFO0("Communications all done");
- xbt_dynar_reset(cmd_to_go);
- }
- INFO1("Compute %.f flops",cmd->d_arg);
- gras_cpu_burn(cmd->d_arg);
- break;
- case XBT_WORKLOAD_SEND:
- case XBT_WORKLOAD_RECV:
- /* queue communications for later realization in parallel */
- xbt_dynar_push(cmd_to_go,&cmd);
- break;
- }
- }
- }
- /* do in parallel any communication still queued */
- if (xbt_dynar_length(cmd_to_go)){
- xbt_dynar_dopar(cmd_to_go,do_command);
- xbt_dynar_reset(cmd_to_go);
- }
-
- return 0;
-}
-
int worker(int argc,char *argv[]) {
+ xbt_ex_t e;
worker_data_t globals;
gras_init(&argc,argv);
declare_msg();
int connected=0;
gras_cb_register("commands", worker_commands_cb);
- gras_cb_register("go", worker_go_cb);
globals->peers=xbt_dict_new();
if (gras_if_RL())
}
/* Join and run the group */
amok_pm_group_join(master, "replay", -1);
- amok_pm_mainloop(600);
+ gras_msg_handle(60); // command message
+ gras_msg_wait(60,"go",NULL,NULL);
+ {
+ worker_data_t g = gras_userdata_get();
+ unsigned int cursor;
+ xbt_workload_elm_t cmd;
+ const char *myname=gras_os_myname();
+ xbt_dynar_t cmd_to_go = xbt_dynar_new(sizeof(xbt_workload_elm_t),NULL);
+
+ xbt_dynar_foreach(g->commands,cursor,cmd) {
+ if (!strcmp(cmd->who,myname)) {
+ char *c = xbt_workload_elm_to_string(cmd);
+ // INFO1("TODO: %s",c);
+ free(c);
+
+ switch (cmd->action) {
+ case XBT_WORKLOAD_COMPUTE:
+ /* If any communication were queued, do them in parallel */
+ if (xbt_dynar_length(cmd_to_go)){
+ TRY {
+ xbt_dynar_dopar(cmd_to_go,do_command);
+ xbt_dynar_reset(cmd_to_go);
+ } CATCH(e) {
+ SIMIX_display_process_status();
+ }
+ INFO0("Communications all done");
+ xbt_dynar_reset(cmd_to_go);
+ }
+ INFO1("Compute %.f flops",cmd->d_arg);
+ gras_cpu_burn(cmd->d_arg);
+ INFO1("Done computing %.f flops",cmd->d_arg);
+ break;
+ case XBT_WORKLOAD_SEND:
+ /* Create the socket from main thread since it seems to fails when done from dopar thread */
+ get_peer_sock(cmd->str_arg);
+ case XBT_WORKLOAD_RECV:
+ /* queue communications for later realization in parallel */
+ xbt_dynar_push(cmd_to_go,&cmd);
+ break;
+ }
+ }
+ }
+ /* do in parallel any communication still queued */
+ INFO1("Do %ld pending communications after end of TODO list",xbt_dynar_length(cmd_to_go));
+ if (xbt_dynar_length(cmd_to_go)){
+ xbt_dynar_dopar(cmd_to_go,do_command);
+ xbt_dynar_reset(cmd_to_go);
+ }
+ }
+
+ gras_msg_send(master,"go",NULL);
+// amok_pm_group_leave(master, "replay");
gras_socket_close(globals->mysock);
xbt_dynar_free(&(globals->commands));