+typedef struct {
+ xbt_dynar_t commands;
+ xbt_dict_t peers;
+ gras_socket_t mysock;
+} s_worker_data_t, *worker_data_t;
+
+
+static gras_socket_t get_peer_sock(char *peer)
+{
+ worker_data_t g = gras_userdata_get();
+ gras_socket_t peer_sock = xbt_dict_get_or_null(g->peers, peer);
+ if (!peer_sock) {
+ XBT_INFO("Create a socket to %s", peer);
+ peer_sock = gras_socket_client(peer, 4000);
+ xbt_dict_set(g->peers, peer, peer_sock, NULL); //gras_socket_close_voidp);
+ }
+ return peer_sock;
+}
+
+static int worker_commands_cb(gras_msg_cb_ctx_t ctx, void *payload)
+{
+ worker_data_t g = gras_userdata_get();
+ g->commands = *(xbt_dynar_t *) payload;
+ return 0;
+}
+
+static void do_command(int rank, void *c)
+{
+ xbt_ex_t e;
+ xbt_workload_elm_t cmd = *(xbt_workload_elm_t *) c;
+ xbt_workload_data_chunk_t chunk;
+
+ if (cmd->action == XBT_WORKLOAD_SEND) {
+ gras_socket_t sock = get_peer_sock(cmd->str_arg);
+ chunk = xbt_workload_data_chunk_new((int) (cmd->d_arg));
+ XBT_INFO("Send %.f bytes to %s %s:%d", cmd->d_arg, cmd->str_arg,
+ gras_socket_peer_name(sock), gras_socket_peer_port(sock));
+ gras_msg_send_(sock, gras_msgtype_by_name("chunk"), &chunk);
+ XBT_INFO("Done sending %.f bytes to %s", cmd->d_arg, cmd->str_arg);
+
+ } else if (cmd->action == XBT_WORKLOAD_RECV) {
+ XBT_INFO("Recv %.f bytes from %s", cmd->d_arg, cmd->str_arg);
+ TRY {
+ gras_msg_wait(1000000, "chunk", NULL, &chunk);
+ }
+ CATCH(e) {
+ SIMIX_display_process_status();
+ RETHROW2("Exception while waiting for %f bytes from %s: %s",
+ cmd->d_arg, cmd->str_arg);
+ }
+ xbt_workload_data_chunk_free(chunk);
+ XBT_INFO("Done receiving %.f bytes from %s", cmd->d_arg, cmd->str_arg);
+
+ } else {
+ xbt_die("unknown command: %s", xbt_workload_elm_to_string(cmd));
+ }
+}
+
+int worker(int argc, char *argv[])
+{
+ xbt_ex_t e;
+ worker_data_t globals;
+ gras_init(&argc, argv);
+ declare_msg();
+ globals = gras_userdata_new(s_worker_data_t);
+ /* Create the connexions */
+ globals->mysock = gras_socket_server(4000); /* FIXME: shouldn't be hardcoded */
+ gras_socket_t master = NULL;
+ int connected = 0;
+
+ gras_cb_register("commands", worker_commands_cb);
+ globals->peers = xbt_dict_new();
+
+ if (gras_if_RL())
+ XBT_INFO("Sensor %s starting. Connecting to master on %s",
+ gras_os_myname(), argv[1]);
+ while (!connected) {
+ xbt_ex_t e;
+ TRY {
+ master = gras_socket_client_from_string(argv[1]);
+ connected = 1;
+ }
+ CATCH(e) {
+ if (e.category != system_error)
+ RETHROW;
+ xbt_ex_free(e);
+ XBT_INFO("Failed to connect. Retry in 0.5 second");
+ gras_os_sleep(0.5);
+ }
+ }
+ /* Join and run the group */
+ amok_pm_group_join(master, "replay", -1);
+ gras_msg_handle(60); // command message
+ gras_msg_wait(60, "go", NULL, NULL);
+ {
+ worker_data_t g = gras_userdata_get();
+ unsigned int cursor;
+ xbt_workload_elm_t cmd;
+ const char *myname = gras_os_myname();
+ xbt_dynar_t cmd_to_go =
+ xbt_dynar_new(sizeof(xbt_workload_elm_t), NULL);
+
+ xbt_dynar_foreach(g->commands, cursor, cmd) {
+ if (!strcmp(cmd->who, myname)) {
+ char *c = xbt_workload_elm_to_string(cmd);
+ // XBT_INFO("TODO: %s",c);
+ free(c);
+
+ switch (cmd->action) {
+ case XBT_WORKLOAD_COMPUTE:
+ /* If any communication were queued, do them in parallel */
+ if (xbt_dynar_length(cmd_to_go)) {
+ TRY {
+ xbt_dynar_dopar(cmd_to_go, do_command);
+ xbt_dynar_reset(cmd_to_go);
+ }
+ CATCH(e) {
+ SIMIX_display_process_status();
+ }
+ XBT_INFO("Communications all done");
+ xbt_dynar_reset(cmd_to_go);
+ }
+ XBT_INFO("Compute %.f flops", cmd->d_arg);
+ gras_cpu_burn(cmd->d_arg);
+ XBT_INFO("Done computing %.f flops", cmd->d_arg);
+ break;
+ case XBT_WORKLOAD_SEND:
+ /* Create the socket from main thread since it seems to fails when done from dopar thread */
+ get_peer_sock(cmd->str_arg);
+ case XBT_WORKLOAD_RECV:
+ /* queue communications for later realization in parallel */
+ xbt_dynar_push(cmd_to_go, &cmd);
+ break;
+ }
+ }
+ }
+ /* do in parallel any communication still queued */
+ XBT_INFO("Do %ld pending communications after end of TODO list",
+ xbt_dynar_length(cmd_to_go));
+ if (xbt_dynar_length(cmd_to_go)) {
+ xbt_dynar_dopar(cmd_to_go, do_command);
+ xbt_dynar_reset(cmd_to_go);
+ }
+ }
+
+ gras_msg_send(master, "go", NULL);
+// amok_pm_group_leave(master, "replay");
+
+ gras_socket_close(globals->mysock);
+ xbt_dynar_free(&(globals->commands));
+ xbt_dict_free(&(globals->peers));
+ free(globals);