From: Maximiliano Geier Date: Fri, 30 Nov 2012 10:11:05 +0000 (+0100) Subject: chainsend: change message sizes to more realistic values, avoid passing too many... X-Git-Tag: v3_9_rc1~86^2~239 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/6d7f851f116cff677f5d5cb009aa493a7510780c?hp=df18a796ce976b1f7fd63a1317e13b51dc25b833 chainsend: change message sizes to more realistic values, avoid passing too many parameters around, use MSG_task_isend in broadcaster instead of send Signed-off-by: Maximiliano Geier --- diff --git a/examples/msg/chainsend/broadcaster.c b/examples/msg/chainsend/broadcaster.c index c953e369b8..e311175969 100644 --- a/examples/msg/chainsend/broadcaster.c +++ b/examples/msg/chainsend/broadcaster.c @@ -18,15 +18,10 @@ xbt_dynar_t build_hostlist_from_hostcount(int hostcount) return host_list; } -static void delete_hostlist(xbt_dynar_t h) -{ - xbt_dynar_free(&h); -} - -int broadcaster_build_chain(const char **first, xbt_dynar_t host_list, xbt_dynar_iterator_t it) +int broadcaster_build_chain(broadcaster_t bc) { msg_task_t task = NULL; - char **cur = (char**)xbt_dynar_iterator_next(it); + char **cur = (char**)xbt_dynar_iterator_next(bc->it); const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/ const char *current_host = NULL; const char *prev = NULL; @@ -37,13 +32,13 @@ int broadcaster_build_chain(const char **first, xbt_dynar_t host_list, xbt_dynar if (cur != NULL) { /* init: prev=NULL, host=current cur, next=next cur */ next = *cur; - *first = next; + bc->first = next; /* This iterator iterates one step ahead: cur is current iterated element, but it's actually the next one in the chain */ do { /* following steps: prev=last, host=next, next=cur */ - cur = (char**)xbt_dynar_iterator_next(it); + cur = (char**)xbt_dynar_iterator_next(bc->it); prev = last; current_host = next; if (cur != NULL) @@ -64,37 +59,42 @@ int broadcaster_build_chain(const char **first, xbt_dynar_t host_list, xbt_dynar return MSG_OK; } -int broadcaster_send_file(const char *first) +int broadcaster_send_file(broadcaster_t bc) { - const char *me = MSG_host_get_name(MSG_host_self()); + const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/ + msg_comm_t comm = NULL; msg_task_t task = NULL; int status; - int piece_count = PIECE_COUNT; - int cur = 0; - - for (; cur < piece_count; cur++) { - task = task_message_data_new(me, first, NULL, 0); - XBT_DEBUG("Sending (send) from %s into mailbox %s", me, first); - status = MSG_task_send(task, first); - - xbt_assert(status == MSG_OK, "broadcaster_send_file() failed"); + bc->current_piece = 0; + + while (bc->current_piece < bc->piece_count) { + if (xbt_dynar_length(bc->pending_sends) < bc->max_pending_sends) { + task = task_message_data_new(me, bc->first, NULL, PIECE_SIZE); + XBT_DEBUG("Sending (isend) piece %d from %s into mailbox %s (current pending %d)", bc->current_piece, me, bc->first, xbt_dynar_length(bc->pending_sends)); + comm = MSG_task_isend(task, bc->first); + queue_pending_connection(comm, bc->pending_sends); + bc->current_piece++; + } else { + MSG_process_sleep(0.01); + } + process_pending_connections(bc->pending_sends); } return MSG_OK; } -int broadcaster_finish(xbt_dynar_iterator_t it) +int broadcaster_finish(broadcaster_t bc) { msg_task_t task = NULL; const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/ const char *current_host = NULL; char **cur = NULL; - xbt_dynar_iterator_seek(it, 0); + xbt_dynar_iterator_seek(bc->it, 0); /* Send goodbye message to every peer in the order generated by iterator it */ - for (cur = (char**)xbt_dynar_iterator_next(it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(it)) { + for (cur = (char**)xbt_dynar_iterator_next(bc->it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(bc->it)) { /* Send message to current peer */ current_host = *cur; task = task_message_end_data_new(me, current_host); @@ -105,31 +105,52 @@ int broadcaster_finish(xbt_dynar_iterator_t it) return MSG_OK; } +broadcaster_t broadcaster_init(xbt_dynar_t host_list) +{ + int status; + broadcaster_t bc = xbt_new(s_broadcaster_t, 1); + + bc->piece_count = PIECE_COUNT; + bc->current_piece = 0; + bc->host_list = host_list; + bc->it = xbt_dynar_iterator_new(bc->host_list, forward_indices_list); + bc->max_pending_sends = MAX_PENDING_SENDS; + bc->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL); + + status = broadcaster_build_chain(bc); + xbt_assert(status == MSG_OK, "Chain initialization failed"); + + return bc; +} + +static void broadcaster_destroy(broadcaster_t bc) +{ + /* Destroy iterator and hostlist */ + xbt_dynar_iterator_delete(bc->it); + xbt_dynar_free(&bc->pending_sends); + xbt_dynar_free(&bc->host_list); +} /** Emitter function */ int broadcaster(int argc, char *argv[]) { + broadcaster_t bc = NULL; xbt_dynar_t host_list = NULL; const char *first = NULL; - int status = !MSG_OK; + int status; XBT_INFO("broadcaster"); /* Add every mailbox given by the hostcount in argv[1] to a dynamic array */ host_list = build_hostlist_from_hostcount(atoi(argv[1])); - /*host_list = build_hostlist_from_argv(argc, argv);*/ - /* Initialize iterator */ - xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, forward_indices_list); + bc = broadcaster_init(host_list); /* TODO: Error checking */ - status = broadcaster_build_chain(&first, host_list, it); - status = broadcaster_send_file(first); - status = broadcaster_finish(it); + status = broadcaster_send_file(bc); + status = broadcaster_finish(bc); - /* Destroy iterator and hostlist */ - xbt_dynar_iterator_delete(it); - delete_hostlist(host_list); + broadcaster_destroy(bc); return status; } diff --git a/examples/msg/chainsend/broadcaster.h b/examples/msg/chainsend/broadcaster.h index 7d69cea3c6..e0c026c040 100644 --- a/examples/msg/chainsend/broadcaster.h +++ b/examples/msg/chainsend/broadcaster.h @@ -12,14 +12,32 @@ #include "iterator.h" #include "common.h" +/* Connection parameters */ +#define MAX_PENDING_SENDS 10 + +/* ``File'' details */ +#define PIECE_SIZE 16384 #define PIECE_COUNT 50 +/* Broadcaster struct */ +typedef struct s_broadcaster { + const char *first; + int piece_count; + int current_piece; + xbt_dynar_t host_list; + xbt_dynar_iterator_t it; + int max_pending_sends; + xbt_dynar_t pending_sends; +} s_broadcaster_t, *broadcaster_t; + xbt_dynar_t build_hostlist_from_hostcount(int hostcount); /* Broadcaster: helper functions */ -int broadcaster_build_chain(const char **first, xbt_dynar_t host_list, xbt_dynar_iterator_t it); -int broadcaster_send_file(const char *first); -int broadcaster_finish(xbt_dynar_iterator_t it); +broadcaster_t broadcaster_init(xbt_dynar_t host_list); +int broadcaster_build_chain(broadcaster_t bc); +int broadcaster_send_file(broadcaster_t bc); +int broadcaster_finish(broadcaster_t bc); +static void broadcaster_destroy(broadcaster_t bc); /* Tasks */ int broadcaster(int argc, char *argv[]); diff --git a/examples/msg/chainsend/chainsend.tesh b/examples/msg/chainsend/chainsend.tesh index fc4c599ac9..a78a40a485 100644 --- a/examples/msg/chainsend/chainsend.tesh +++ b/examples/msg/chainsend/chainsend.tesh @@ -15,12 +15,12 @@ $ $SG_TEST_EXENV ${bindir:=.}/chainsend ${srcdir:=.}/platform_chainsend.xml ${sr > [ 0.000000] (7:peer@host6) peer > [ 0.000000] (8:peer@host7) peer > [ 0.000000] (9:peer@host8) peer -> [ 93.000000] (2:peer@host1) Waiting for sends to finish before shutdown... -> [ 225.700000] (3:peer@host2) Waiting for sends to finish before shutdown... -> [ 294.700000] (4:peer@host3) Waiting for sends to finish before shutdown... -> [ 298.600000] (5:peer@host4) Waiting for sends to finish before shutdown... -> [ 309.100000] (6:peer@host5) Waiting for sends to finish before shutdown... -> [ 314.300000] (7:peer@host6) Waiting for sends to finish before shutdown... -> [ 318.300000] (8:peer@host7) Waiting for sends to finish before shutdown... -> [ 318.400000] (0:@) Total simulation time: 3.184000e+02 -> [ 318.400000] (9:peer@host8) Waiting for sends to finish before shutdown... +> [ 88.950000] (2:peer@host1) Waiting for sends to finish before shutdown... +> [ 221.070000] (3:peer@host2) Waiting for sends to finish before shutdown... +> [ 289.980000] (4:peer@host3) Waiting for sends to finish before shutdown... +> [ 293.890000] (5:peer@host4) Waiting for sends to finish before shutdown... +> [ 304.300000] (6:peer@host5) Waiting for sends to finish before shutdown... +> [ 310.940000] (7:peer@host6) Waiting for sends to finish before shutdown... +> [ 314.850000] (8:peer@host7) Waiting for sends to finish before shutdown... +> [ 314.930000] (9:peer@host8) Waiting for sends to finish before shutdown... +> [ 316.850000] (0:@) Total simulation time: 3.168500e+02 diff --git a/examples/msg/chainsend/messages.c b/examples/msg/chainsend/messages.c index 177b4e85b0..0bc23cd6c0 100644 --- a/examples/msg/chainsend/messages.c +++ b/examples/msg/chainsend/messages.c @@ -1,19 +1,19 @@ #include "messages.h" -msg_task_t task_message_new(e_message_type type, const char *issuer_hostname, const char *mailbox) +msg_task_t task_message_new(e_message_type type, unsigned int len, const char *issuer_hostname, const char *mailbox) { message_t msg = xbt_new(s_message_t, 1); msg->type = type; msg->issuer_hostname = issuer_hostname; msg->mailbox = mailbox; - msg_task_t task = MSG_task_create(NULL, 0, MESSAGE_SIZE, msg); + msg_task_t task = MSG_task_create(NULL, 0, len, msg); return task; } msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next) { - msg_task_t task = task_message_new(MESSAGE_BUILD_CHAIN, issuer_hostname, mailbox); + msg_task_t task = task_message_new(MESSAGE_BUILD_CHAIN, MESSAGE_BUILD_CHAIN_SIZE, issuer_hostname, mailbox); message_t msg = MSG_task_get_data(task); msg->prev_hostname = prev; msg->next_hostname = next; @@ -23,7 +23,7 @@ msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailb msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len) { - msg_task_t task = task_message_new(MESSAGE_SEND_DATA, issuer_hostname, mailbox); + msg_task_t task = task_message_new(MESSAGE_SEND_DATA, MESSAGE_SEND_DATA_HEADER_SIZE + len, issuer_hostname, mailbox); //if (strcmp(mailbox, "host4") == 0) //MSG_task_set_category(task, mailbox); message_t msg = MSG_task_get_data(task); @@ -35,7 +35,7 @@ msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbo msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox) { - return task_message_new(MESSAGE_END_DATA, issuer_hostname, mailbox); + return task_message_new(MESSAGE_END_DATA, MESSAGE_END_DATA_SIZE, issuer_hostname, mailbox); } void task_message_delete(void *task) diff --git a/examples/msg/chainsend/messages.h b/examples/msg/chainsend/messages.h index 8e89fd676b..44213051f2 100644 --- a/examples/msg/chainsend/messages.h +++ b/examples/msg/chainsend/messages.h @@ -4,7 +4,9 @@ #include "msg/msg.h" #include "xbt/sysdep.h" -#define MESSAGE_SIZE 1 +#define MESSAGE_BUILD_CHAIN_SIZE 40 +#define MESSAGE_SEND_DATA_HEADER_SIZE 10 +#define MESSAGE_END_DATA_SIZE 20 /* Messages enum */ typedef enum { @@ -25,7 +27,7 @@ typedef struct s_message { } s_message_t, *message_t; /* Message methods */ -msg_task_t task_message_new(e_message_type type, const char *issuer_hostname, const char *mailbox); +msg_task_t task_message_new(e_message_type type, unsigned int len, const char *issuer_hostname, const char *mailbox); msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next); msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len); msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox); diff --git a/examples/msg/chainsend/peer.c b/examples/msg/chainsend/peer.c index fa91a7bd07..5605c88dd8 100644 --- a/examples/msg/chainsend/peer.c +++ b/examples/msg/chainsend/peer.c @@ -73,7 +73,7 @@ msg_error_t peer_wait_for_message(peer_t peer) task = NULL; } else { process_pending_connections(peer->pending_sends); - MSG_process_sleep(0.1); + MSG_process_sleep(0.01); } } @@ -105,7 +105,7 @@ void peer_shutdown(peer_t p) XBT_INFO("Waiting for sends to finish before shutdown..."); while (xbt_dynar_length(p->pending_sends) && MSG_get_clock() < end_time) { process_pending_connections(p->pending_sends); - MSG_process_sleep(0.1); + MSG_process_sleep(1); } xbt_assert(xbt_dynar_length(p->pending_sends) == 0, "Shutdown failed, sends still pending after deadline"); diff --git a/examples/msg/chainsend/peer.h b/examples/msg/chainsend/peer.h index 58278625ec..a5d99d362e 100644 --- a/examples/msg/chainsend/peer.h +++ b/examples/msg/chainsend/peer.h @@ -7,7 +7,7 @@ #include "messages.h" #include "common.h" -#define PEER_SHUTDOWN_DEADLINE 6000 +#define PEER_SHUTDOWN_DEADLINE 60000 /* Peer struct */ typedef struct s_peer {