X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/e68e17956027521b8a3e8195d1ccd62b28f83aaa..124b9aa048dc0af73a539a2761ce5cbc95814f22:/examples/msg/kadeploy/kadeploy.c diff --git a/examples/msg/kadeploy/kadeploy.c b/examples/msg/kadeploy/kadeploy.c index d779acd344..ae5dc08ae2 100644 --- a/examples/msg/kadeploy/kadeploy.c +++ b/examples/msg/kadeploy/kadeploy.c @@ -65,6 +65,7 @@ typedef struct s_peer { const char *prev; const char *next; const char *me; + int pieces; } s_peer_t, *peer_t; /* Iterator methods */ @@ -297,11 +298,13 @@ int broadcaster_send_file(const char *first) /* TODO: stub */ task = task_message_data_new(me, first, NULL, 0); XBT_INFO("Sending (isend) from %s into mailbox %s", me, first); - comm = MSG_task_isend(task, first); + //comm = MSG_task_isend(task, first); + //status = + MSG_task_dsend(task, first, task_message_delete); - status = MSG_comm_wait(comm, -1); - xbt_assert(status == MSG_OK, __FILE__ ": broadcaster_send_file() failed"); - MSG_comm_destroy(comm); + //status = MSG_comm_wait(comm, -1); + //xbt_assert(status == MSG_OK, __FILE__ ": broadcaster_send_file() failed"); + //MSG_comm_destroy(comm); } return MSG_OK; @@ -359,6 +362,7 @@ void peer_init_chain(peer_t peer, message_t msg) { peer->prev = msg->prev_hostname; peer->next = msg->next_hostname; + peer->init = 1; } /* TODO: error checking */ @@ -368,11 +372,13 @@ void peer_forward_msg(peer_t peer, message_t msg) msg_task_t task = task_message_data_new(peer->me, peer->next, NULL, 0); msg_comm_t comm = NULL; XBT_INFO("Sending (isend) from %s into mailbox %s", peer->me, peer->next); - comm = MSG_task_isend(task, peer->next); + //comm = + //status = + MSG_task_dsend(task, peer->next, task_message_delete); - status = MSG_comm_wait(comm, -1); + //status = MSG_comm_wait(comm, -1); xbt_assert(status == MSG_OK, __FILE__ ": peer_forward_msg() failed"); - MSG_comm_destroy(comm); + //MSG_comm_destroy(comm); } int peer_execute_task(peer_t peer, msg_task_t task) @@ -384,16 +390,17 @@ int peer_execute_task(peer_t peer, msg_task_t task) switch (msg->type) { case MESSAGE_BUILD_CHAIN: peer_init_chain(peer, msg); - peer->init = 1; break; case MESSAGE_SEND_DATA: xbt_assert(peer->init, __FILE__ ": peer_execute_task() failed: got msg_type %d before initialization", msg->type); if (peer->next != NULL) peer_forward_msg(peer, msg); + peer->pieces++; break; case MESSAGE_END_DATA: xbt_assert(peer->init, __FILE__ ": peer_execute_task() failed: got msg_type %d before initialization", msg->type); done = 1; + XBT_INFO("%d pieces receieved", peer->pieces); break; } @@ -405,21 +412,27 @@ int peer_execute_task(peer_t peer, msg_task_t task) msg_error_t peer_wait_for_message(peer_t peer) { msg_error_t status; - msg_comm_t comm; + msg_comm_t comm = NULL; + msg_task_t task = NULL; int done = 0; - msg_task_t task = NULL; /* TODO: Error checking is not correct */ while (!done) { - comm = MSG_task_irecv(&task, peer->me); - status = MSG_comm_wait(comm, -1); - xbt_assert(status == MSG_OK, __FILE__ ": peer_wait_for_message() failed"); - MSG_comm_destroy(comm); - - done = peer_execute_task(peer, task); - task_message_delete(task); - task = NULL; + if (comm == NULL) + comm = MSG_task_irecv(&task, peer->me); + + if (MSG_comm_test(comm)) { + status = MSG_comm_get_status(comm); + xbt_assert(status == MSG_OK, __FILE__ ": peer_wait_for_message() failed"); + MSG_comm_destroy(comm); + comm = NULL; + done = peer_execute_task(peer, task); + task_message_delete(task); + task = NULL; + } else { + MSG_process_sleep(0.01); + } } return status; @@ -430,6 +443,7 @@ void peer_init(peer_t p) p->init = 0; p->prev = NULL; p->next = NULL; + p->pieces = 0; p->me = MSG_host_get_name(MSG_host_self()); }