From: Maximiliano Geier Date: Fri, 5 Oct 2012 15:57:17 +0000 (+0200) Subject: messages implementation: it dies when trying to forward data from one peer to another... X-Git-Tag: v3_9_rc1~86^2~275 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/e68e17956027521b8a3e8195d1ccd62b28f83aaa messages implementation: it dies when trying to forward data from one peer to another unified message parsing, it parses and executes chain and end of data messages correctly --- diff --git a/examples/msg/kadeploy/kadeploy.c b/examples/msg/kadeploy/kadeploy.c index cc8b0a3ffa..d779acd344 100644 --- a/examples/msg/kadeploy/kadeploy.c +++ b/examples/msg/kadeploy/kadeploy.c @@ -59,6 +59,14 @@ typedef struct s_message { unsigned int data_length; } s_message_t, *message_t; +/* Peer struct */ +typedef struct s_peer { + int init; + const char *prev; + const char *next; + const char *me; +} s_peer_t, *peer_t; + /* Iterator methods */ xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, int (*criteria_fn)(void*)); void *xbt_dynar_iterator_next(xbt_dynar_iterator_t it); @@ -80,12 +88,14 @@ xbt_dynar_t build_hostlist_from_hostcount(int hostcount); /*xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[]);*/ /* Broadcaster: helper functions */ -int broadcaster_build_chain(xbt_dynar_t host_list); -int broadcaster_send_file(xbt_dynar_t host_list); +int broadcaster_build_chain(const char **first, xbt_dynar_t host_list); +int broadcaster_send_file(const char *first); int broadcaster_finish(xbt_dynar_t host_list); /* Peer: helper functions */ -int peer_wait_for_init(); +msg_error_t peer_wait_for_message(); +int peer_execute_task(peer_t peer, msg_task_t task); +void peer_init_chain(peer_t peer, message_t msg); /* Initialization stuff */ msg_error_t test_all(const char *platform_file, @@ -231,21 +241,22 @@ void delete_hostlist(xbt_dynar_t h) xbt_dynar_free(&h); } -int broadcaster_build_chain(xbt_dynar_t host_list) +int broadcaster_build_chain(const char **first, xbt_dynar_t host_list) { xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, xbt_dynar_iterator_forward_criteria); msg_task_t task = NULL; char **cur = (char**)xbt_dynar_iterator_next(it); + const char *me = MSG_host_get_name(MSG_host_self()); const char *current_host = NULL; const char *prev = NULL; const char *next = NULL; - const char *me = MSG_host_get_name(MSG_host_self()); const char *last = NULL; /* Build the chain if there's at least one peer */ if (cur != NULL) { /* init: prev=NULL, host=current cur, next=next cur */ next = *cur; + *first = next; /* This iterator iterates one step ahead: cur is current iterated element, but it's actually the next one in the chain */ @@ -272,9 +283,26 @@ int broadcaster_build_chain(xbt_dynar_t host_list) return MSG_OK; } -int broadcaster_send_file(xbt_dynar_t host_list) +int broadcaster_send_file(const char *first) { - /* ... */ + const char *me = MSG_host_get_name(MSG_host_self()); + msg_task_t task = NULL; + msg_comm_t comm = NULL; + int status; + + int piece_count = 10; + int cur = 0; + + for (; cur < piece_count; cur++) { + /* TODO: stub */ + task = task_message_data_new(me, first, NULL, 0); + XBT_INFO("Sending (isend) from %s into mailbox %s", me, first); + comm = MSG_task_isend(task, first); + + status = MSG_comm_wait(comm, -1); + xbt_assert(status == MSG_OK, __FILE__ ": broadcaster_send_file() failed"); + MSG_comm_destroy(comm); + } return MSG_OK; } @@ -289,10 +317,10 @@ int broadcaster_finish(xbt_dynar_t host_list) /* Send goodbye message to every peer */ for (cur = (char**)xbt_dynar_iterator_next(it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(it)) { - /* Send message to current peer */ - current_host = *cur; - task = task_message_end_data_new(me, current_host); - MSG_task_send(task, current_host); + /* Send message to current peer */ + current_host = *cur; + task = task_message_end_data_new(me, current_host); + MSG_task_send(task, current_host); } return MSG_OK; @@ -314,100 +342,111 @@ int broadcaster(int argc, char *argv[]) /*host_list = build_hostlist_from_argv(argc, argv);*/ /* TODO: Error checking */ - status = broadcaster_build_chain(host_list); - status = broadcaster_send_file(host_list); + status = broadcaster_build_chain(&first, host_list); + status = broadcaster_send_file(first); status = broadcaster_finish(host_list); delete_hostlist(host_list); - /* Latency */ - /*time = MSG_get_clock(); - sprintf(sprintf_buffer_la, "latency task"); - task_la = - MSG_task_create(sprintf_buffer_la, 0.0, task_comm_size_lat, NULL); - task_la->data = xbt_new(double, 1); - *(double *) task_la->data = time; - XBT_INFO("task_la->data = %le", *((double *) task_la->data)); - MSG_task_send(task_la, argv[1]);*/ - - /* Bandwidth */ - /*time = MSG_get_clock(); - sprintf(sprintf_buffer_bw, "bandwidth task"); - task_bw = - MSG_task_create(sprintf_buffer_bw, 0.0, task_comm_size_bw, NULL); - task_bw->data = xbt_new(double, 1); - *(double *) task_bw->data = time; - XBT_INFO("task_bw->data = %le", *((double *) task_bw->data)); - MSG_task_send(task_bw, argv[1]); - */ return status; } -int peer_wait_for_init() +/******************************************************* + * Peer * + *******************************************************/ + +void peer_init_chain(peer_t peer, message_t msg) +{ + peer->prev = msg->prev_hostname; + peer->next = msg->next_hostname; +} + +/* TODO: error checking */ +void peer_forward_msg(peer_t peer, message_t msg) +{ + int status; + msg_task_t task = task_message_data_new(peer->me, peer->next, NULL, 0); + msg_comm_t comm = NULL; + XBT_INFO("Sending (isend) from %s into mailbox %s", peer->me, peer->next); + comm = MSG_task_isend(task, peer->next); + + status = MSG_comm_wait(comm, -1); + xbt_assert(status == MSG_OK, __FILE__ ": peer_forward_msg() failed"); + MSG_comm_destroy(comm); +} + +int peer_execute_task(peer_t peer, msg_task_t task) +{ + int done = 0, init = 0; + message_t msg = MSG_task_get_data(task); + + XBT_INFO("Peer %s got message of type %d\n", peer->me, msg->type); + switch (msg->type) { + case MESSAGE_BUILD_CHAIN: + peer_init_chain(peer, msg); + peer->init = 1; + break; + case MESSAGE_SEND_DATA: + xbt_assert(peer->init, __FILE__ ": peer_execute_task() failed: got msg_type %d before initialization", msg->type); + if (peer->next != NULL) + peer_forward_msg(peer, msg); + break; + case MESSAGE_END_DATA: + xbt_assert(peer->init, __FILE__ ": peer_execute_task() failed: got msg_type %d before initialization", msg->type); + done = 1; + break; + } + + MSG_task_execute(task); + + return done; +} + +msg_error_t peer_wait_for_message(peer_t peer) { + msg_error_t status; + msg_comm_t comm; + int done = 0; + msg_task_t task = NULL; - const char *me = MSG_host_get_name(MSG_host_self()); - int a = MSG_task_receive(&task, me); + /* TODO: Error checking is not correct */ + while (!done) { + comm = MSG_task_irecv(&task, peer->me); + status = MSG_comm_wait(comm, -1); + xbt_assert(status == MSG_OK, __FILE__ ": peer_wait_for_message() failed"); + MSG_comm_destroy(comm); - if (a == MSG_OK) { - XBT_INFO("Peer %s got message\n", me); + done = peer_execute_task(peer, task); + task_message_delete(task); + task = NULL; } - task_message_delete(task); + return status; +} - return MSG_OK; +void peer_init(peer_t p) +{ + p->init = 0; + p->prev = NULL; + p->next = NULL; + p->me = MSG_host_get_name(MSG_host_self()); } /** Peer function */ int peer(int argc, char *argv[]) { - double time, time1, sender_time; - msg_task_t task_la = NULL; - msg_task_t task_bw = NULL; - int a; - double communication_time = 0; + peer_t p = xbt_new(s_peer_t, 1); + msg_error_t status; XBT_INFO("peer"); - time = MSG_get_clock(); - - a = peer_wait_for_init(); - /* Get Latency */ - /*a = MSG_task_receive(&task_la,MSG_host_get_name(MSG_host_self())); - if (a == MSG_OK) { - time1 = MSG_get_clock(); - sender_time = *((double *) (task_la->data)); - time = sender_time; - communication_time = time1 - time; - XBT_INFO("Task received : %s", task_la->name); - xbt_free(task_la->data); - MSG_task_destroy(task_la); - XBT_INFO("Communic. time %le", communication_time); - XBT_INFO("--- la %f ----", communication_time); - } else { - xbt_die("Unexpected behavior"); - }*/ + peer_init(p); + status = peer_wait_for_message(p); + xbt_free(p); - /* Get Bandwidth */ - /*a = MSG_task_receive(&task_bw,MSG_host_get_name(MSG_host_self())); - if (a == MSG_OK) { - time1 = MSG_get_clock(); - sender_time = *((double *) (task_bw->data)); - time = sender_time; - communication_time = time1 - time; - XBT_INFO("Task received : %s", task_bw->name); - xbt_free(task_bw->data); - MSG_task_destroy(task_bw); - XBT_INFO("Communic. time %le", communication_time); - XBT_INFO("--- bw %f ----", task_comm_size_bw / communication_time); - } else { - xbt_die("Unexpected behavior"); - }*/ - - - return 0; + return MSG_OK; } /* end_of_receiver */