From: Maximiliano Geier Date: Tue, 9 Oct 2012 16:06:57 +0000 (+0200) Subject: First working version with 8 peers I have not evaluated how it works yet, only that... X-Git-Tag: v3_9_rc1~86^2~273 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/69d66050bef9b3b9ec134c84f71f303194d5b13e?hp=124b9aa048dc0af73a539a2761ce5cbc95814f22 First working version with 8 peers I have not evaluated how it works yet, only that every piece is delivered successfully --- diff --git a/examples/msg/kadeploy/deployment_kadeploy.xml b/examples/msg/kadeploy/deployment_kadeploy.xml index 205e5c32fa..1fb385c05f 100644 --- a/examples/msg/kadeploy/deployment_kadeploy.xml +++ b/examples/msg/kadeploy/deployment_kadeploy.xml @@ -4,7 +4,7 @@ - + @@ -17,7 +17,7 @@ - diff --git a/examples/msg/kadeploy/kadeploy.c b/examples/msg/kadeploy/kadeploy.c index ae5dc08ae2..93e69987fc 100644 --- a/examples/msg/kadeploy/kadeploy.c +++ b/examples/msg/kadeploy/kadeploy.c @@ -25,8 +25,11 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kadeploy, "Messages specific for kadeploy"); #define MESSAGE_SIZE 1 +#define PIECE_COUNT 100 #define HOSTNAME_LENGTH 20 +#define PEER_SHUTDOWN_DEADLINE 600 + /* Data structures */ @@ -66,6 +69,8 @@ typedef struct s_peer { const char *next; const char *me; int pieces; + xbt_dynar_t pending_sends; + int close_asap; /* TODO: unused */ } s_peer_t, *peer_t; /* Iterator methods */ @@ -94,9 +99,11 @@ int broadcaster_send_file(const char *first); int broadcaster_finish(xbt_dynar_t host_list); /* Peer: helper functions */ -msg_error_t peer_wait_for_message(); +msg_error_t peer_wait_for_message(peer_t peer); int peer_execute_task(peer_t peer, msg_task_t task); void peer_init_chain(peer_t peer, message_t msg); +void peer_shutdown(peer_t p); +void peer_init(peer_t p); /* Initialization stuff */ msg_error_t test_all(const char *platform_file, @@ -120,9 +127,9 @@ xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, int (*criteria_fn) void *xbt_dynar_iterator_next(xbt_dynar_iterator_t it) { int next = it->criteria_fn((xbt_dynar_iterator_t)it); - XBT_INFO("%d current\n", next); + //XBT_INFO("%d current\n", next); if (next < 0) { - XBT_INFO("Nothing to return!\n"); + //XBT_INFO("Nothing to return!\n"); return NULL; } else { xbt_dynar_push(it->indices_list, &next); @@ -176,6 +183,7 @@ msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailb msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len) { msg_task_t task = task_message_new(MESSAGE_SEND_DATA, issuer_hostname, mailbox); + if (strcmp(mailbox, "host4") == 0) MSG_task_set_category(task, mailbox); message_t msg = MSG_task_get_data(task); msg->data_block = block; msg->data_length = len; @@ -188,7 +196,6 @@ msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *ma return task_message_new(MESSAGE_END_DATA, issuer_hostname, mailbox); } - void task_message_delete(void *task) { message_t msg = MSG_task_get_data(task); @@ -196,6 +203,31 @@ void task_message_delete(void *task) MSG_task_destroy(task); } +void queue_pending_connection(msg_comm_t comm, xbt_dynar_t q) +{ + xbt_dynar_push(q, &comm); +} + +int process_pending_connections(xbt_dynar_t q) +{ + unsigned int iter; + int status; + int empty = 0; + msg_comm_t comm; + + xbt_dynar_foreach(q, iter, comm) { + empty = 1; + if (MSG_comm_test(comm)) { + MSG_comm_destroy(comm); + status = MSG_comm_get_status(comm); + xbt_assert(status == MSG_OK, __FILE__ ": process_pending_connections() failed"); + xbt_dynar_cursor_rm(q, &iter); + empty = 0; + } + } + return empty; +} + xbt_dynar_t build_hostlist_from_hostcount(int hostcount) { xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL); @@ -206,7 +238,7 @@ xbt_dynar_t build_hostlist_from_hostcount(int hostcount) for (; i < hostcount+1; i++) { hostname = xbt_new(char, HOSTNAME_LENGTH); snprintf(hostname, HOSTNAME_LENGTH, "host%d", i); - XBT_INFO("%s", hostname); + //XBT_INFO("%s", hostname); h = MSG_get_host_by_name(hostname); if (h == NULL) { XBT_INFO("Unknown host %s. Stopping Now! ", hostname); @@ -270,10 +302,11 @@ int broadcaster_build_chain(const char **first, xbt_dynar_t host_list) next = *cur; else next = NULL; - XBT_INFO("Building chain -- broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next); + //XBT_INFO("Building chain -- broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next); /* Send message to current peer */ task = task_message_chain_new(me, current_host, prev, next); + //MSG_task_set_category(task, current_host); MSG_task_send(task, current_host); last = current_host; @@ -291,19 +324,19 @@ int broadcaster_send_file(const char *first) msg_comm_t comm = NULL; int status; - int piece_count = 10; + int piece_count = PIECE_COUNT; int cur = 0; for (; cur < piece_count; cur++) { /* TODO: stub */ task = task_message_data_new(me, first, NULL, 0); - XBT_INFO("Sending (isend) from %s into mailbox %s", me, first); + //XBT_INFO("Sending (isend) from %s into mailbox %s", me, first); //comm = MSG_task_isend(task, first); - //status = - MSG_task_dsend(task, first, task_message_delete); + status = MSG_task_send(task, first); + //MSG_task_dsend(task, first, task_message_delete); //status = MSG_comm_wait(comm, -1); - //xbt_assert(status == MSG_OK, __FILE__ ": broadcaster_send_file() failed"); + xbt_assert(status == MSG_OK, __FILE__ ": broadcaster_send_file() failed"); //MSG_comm_destroy(comm); } @@ -323,6 +356,7 @@ int broadcaster_finish(xbt_dynar_t host_list) /* Send message to current peer */ current_host = *cur; task = task_message_end_data_new(me, current_host); + //MSG_task_set_category(task, current_host); MSG_task_send(task, current_host); } @@ -365,28 +399,22 @@ void peer_init_chain(peer_t peer, message_t msg) peer->init = 1; } -/* TODO: error checking */ void peer_forward_msg(peer_t peer, message_t msg) { int status; msg_task_t task = task_message_data_new(peer->me, peer->next, NULL, 0); msg_comm_t comm = NULL; - XBT_INFO("Sending (isend) from %s into mailbox %s", peer->me, peer->next); - //comm = - //status = - MSG_task_dsend(task, peer->next, task_message_delete); - - //status = MSG_comm_wait(comm, -1); - xbt_assert(status == MSG_OK, __FILE__ ": peer_forward_msg() failed"); - //MSG_comm_destroy(comm); + //XBT_INFO("Sending (isend) from %s into mailbox %s", peer->me, peer->next); + comm = MSG_task_isend(task, peer->next); + queue_pending_connection(comm, peer->pending_sends); } int peer_execute_task(peer_t peer, msg_task_t task) { - int done = 0, init = 0; + int done = 0; message_t msg = MSG_task_get_data(task); - XBT_INFO("Peer %s got message of type %d\n", peer->me, msg->type); + //XBT_INFO("Peer %s got message of type %d\n", peer->me, msg->type); switch (msg->type) { case MESSAGE_BUILD_CHAIN: peer_init_chain(peer, msg); @@ -416,14 +444,13 @@ msg_error_t peer_wait_for_message(peer_t peer) msg_task_t task = NULL; int done = 0; - - /* TODO: Error checking is not correct */ while (!done) { if (comm == NULL) comm = MSG_task_irecv(&task, peer->me); if (MSG_comm_test(comm)) { status = MSG_comm_get_status(comm); + //XBT_INFO("peer_wait_for_message: error code = %d", status); xbt_assert(status == MSG_OK, __FILE__ ": peer_wait_for_message() failed"); MSG_comm_destroy(comm); comm = NULL; @@ -431,6 +458,7 @@ msg_error_t peer_wait_for_message(peer_t peer) task_message_delete(task); task = NULL; } else { + process_pending_connections(peer->pending_sends); MSG_process_sleep(0.01); } } @@ -444,9 +472,28 @@ void peer_init(peer_t p) p->prev = NULL; p->next = NULL; p->pieces = 0; + p->close_asap = 0; + p->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL); p->me = MSG_host_get_name(MSG_host_self()); } +void peer_shutdown(peer_t p) +{ + float start_time = MSG_get_clock(); + float end_time = start_time + PEER_SHUTDOWN_DEADLINE; + + XBT_INFO("Waiting for sends to finish before shutdown..."); + while (xbt_dynar_length(p->pending_sends) && MSG_get_clock() < end_time) { + process_pending_connections(p->pending_sends); + MSG_process_sleep(0.1); + } + + xbt_assert(xbt_dynar_length(p->pending_sends) == 0, "Shutdown failed, sends still pending after deadline"); + xbt_dynar_free(&p->pending_sends); + + xbt_free(p); +} + /** Peer function */ int peer(int argc, char *argv[]) { @@ -457,8 +504,7 @@ int peer(int argc, char *argv[]) peer_init(p); status = peer_wait_for_message(p); - - xbt_free(p); + peer_shutdown(p); return MSG_OK; } /* end_of_receiver */ @@ -478,6 +524,14 @@ msg_error_t test_all(const char *platform_file, /* Simulation setting */ MSG_create_environment(platform_file); + /* Trace categories */ + TRACE_category_with_color("host0", "0 0 1"); + TRACE_category_with_color("host1", "0 1 0"); + TRACE_category_with_color("host2", "0 1 1"); + TRACE_category_with_color("host3", "1 0 0"); + TRACE_category_with_color("host4", "1 0 1"); + TRACE_category_with_color("host5", "1 1 0"); + /* Application deployment */ MSG_function_register("broadcaster", broadcaster); MSG_function_register("peer", peer); @@ -502,7 +556,6 @@ int main(int argc, char *argv[]) MSG_init(&argc, argv); - /*if (argc <= 3) { XBT_CRITICAL("Usage: %s platform_file deployment_file \n", argv[0]); diff --git a/examples/msg/kadeploy/platform_kadeploy.xml b/examples/msg/kadeploy/platform_kadeploy.xml index 9574825b98..a1fe77ac10 100644 --- a/examples/msg/kadeploy/platform_kadeploy.xml +++ b/examples/msg/kadeploy/platform_kadeploy.xml @@ -7,9 +7,19 @@ + + + + + + + + + + @@ -19,5 +29,20 @@ + + + + + + + + + + + + + + +