#include "xbt/log.h"
#include "xbt/str.h"
#include "xbt/dynar.h"
+#include "xbt/synchro.h"
#include "workload.h"
#include "gras.h"
#include "amok/peermanagement.h"
gras_socket_t peer_sock = xbt_dict_get_or_null(g->peers,peer);
if (!peer_sock) {
peer_sock = gras_socket_client(peer,4000);
- xbt_dict_set(g->peers,peer,peer_sock,gras_socket_close_voidp);
+ xbt_dict_set(g->peers,peer,peer_sock,NULL);//gras_socket_close_voidp);
}
return peer_sock;
}
g->commands = *(xbt_dynar_t*)payload;
return 0;
}
+static void do_command(int rank, void*c) {
+ xbt_ex_t e;
+ xbt_workload_elm_t cmd = *(xbt_workload_elm_t*)c;
+ xbt_workload_data_chunk_t chunk;
+
+ if (cmd->action == XBT_WORKLOAD_SEND) {
+ chunk = xbt_workload_data_chunk_new((int)(cmd->d_arg));
+ INFO2("Send %.f bytes to %s",cmd->d_arg,cmd->str_arg);
+ gras_msg_send(get_peer_sock(cmd->str_arg),"chunk",&chunk);
+
+ } else if (cmd->action == XBT_WORKLOAD_RECV) {
+ INFO2("Recv %.f bytes from %s",cmd->d_arg,cmd->str_arg);
+ TRY {
+ gras_msg_wait(1000000,"chunk",NULL,&chunk);
+ } CATCH(e) {
+ RETHROW2("Exception while waiting for %f bytes from %s: %s",
+ cmd->d_arg,cmd->str_arg);
+ }
+ xbt_workload_data_chunk_free(chunk);
+
+ } else {
+ xbt_die(bprintf("unknown command: %s",xbt_workload_elm_to_string(cmd)));
+ }
+}
static int worker_go_cb(gras_msg_cb_ctx_t ctx, void *payload) {
worker_data_t g = gras_userdata_get();
unsigned int cursor;
xbt_workload_elm_t cmd;
- xbt_ex_t e;
const char *myname=gras_os_myname();
- xbt_dynar_foreach(g->commands,cursor,cmd) {
- xbt_workload_data_chunk_t chunk;
+ xbt_dynar_t cmd_to_go = xbt_dynar_new(sizeof(xbt_workload_elm_t),NULL);
+ xbt_dynar_foreach(g->commands,cursor,cmd) {
if (!strcmp(cmd->who,myname)) {
char *c = xbt_workload_elm_to_string(cmd);
INFO1("%s",c);
switch (cmd->action) {
case XBT_WORKLOAD_COMPUTE:
+ /* If any communication were queued, do them in parallel */
+ if (xbt_dynar_length(cmd_to_go)){
+ xbt_dynar_dopar(cmd_to_go,do_command);
+ INFO0("Communications all done");
+ xbt_dynar_reset(cmd_to_go);
+ }
+ INFO1("Compute %.f flops",cmd->d_arg);
gras_cpu_burn(cmd->d_arg);
break;
case XBT_WORKLOAD_SEND:
- chunk = xbt_workload_data_chunk_new((int)(cmd->d_arg));
- gras_msg_send(get_peer_sock(cmd->str_arg),"chunk",&chunk);
- break;
case XBT_WORKLOAD_RECV:
- TRY {
- gras_msg_wait(1000000,"chunk",NULL,&chunk);
- } CATCH(e) {
- RETHROW2("Exception while waiting for %f bytes from %s: %s",
- cmd->d_arg,cmd->str_arg);
- }
- xbt_workload_data_chunk_free(chunk);
+ /* queue communications for later realization in parallel */
+ xbt_dynar_push(cmd_to_go,&cmd);
break;
}
}
}
+ /* do in parallel any communication still queued */
+ if (xbt_dynar_length(cmd_to_go)){
+ xbt_dynar_dopar(cmd_to_go,do_command);
+ xbt_dynar_reset(cmd_to_go);
+ }
+
return 0;
}