const char *prev;
const char *next;
const char *me;
+ int pieces;
} s_peer_t, *peer_t;
/* Iterator methods */
/* TODO: stub */
task = task_message_data_new(me, first, NULL, 0);
XBT_INFO("Sending (isend) from %s into mailbox %s", me, first);
- comm = MSG_task_isend(task, first);
+ //comm = MSG_task_isend(task, first);
+ //status =
+ MSG_task_dsend(task, first, task_message_delete);
- status = MSG_comm_wait(comm, -1);
- xbt_assert(status == MSG_OK, __FILE__ ": broadcaster_send_file() failed");
- MSG_comm_destroy(comm);
+ //status = MSG_comm_wait(comm, -1);
+ //xbt_assert(status == MSG_OK, __FILE__ ": broadcaster_send_file() failed");
+ //MSG_comm_destroy(comm);
}
return MSG_OK;
{
peer->prev = msg->prev_hostname;
peer->next = msg->next_hostname;
+ peer->init = 1;
}
/* TODO: error checking */
msg_task_t task = task_message_data_new(peer->me, peer->next, NULL, 0);
msg_comm_t comm = NULL;
XBT_INFO("Sending (isend) from %s into mailbox %s", peer->me, peer->next);
- comm = MSG_task_isend(task, peer->next);
+ //comm =
+ //status =
+ MSG_task_dsend(task, peer->next, task_message_delete);
- status = MSG_comm_wait(comm, -1);
+ //status = MSG_comm_wait(comm, -1);
xbt_assert(status == MSG_OK, __FILE__ ": peer_forward_msg() failed");
- MSG_comm_destroy(comm);
+ //MSG_comm_destroy(comm);
}
int peer_execute_task(peer_t peer, msg_task_t task)
switch (msg->type) {
case MESSAGE_BUILD_CHAIN:
peer_init_chain(peer, msg);
- peer->init = 1;
break;
case MESSAGE_SEND_DATA:
xbt_assert(peer->init, __FILE__ ": peer_execute_task() failed: got msg_type %d before initialization", msg->type);
if (peer->next != NULL)
peer_forward_msg(peer, msg);
+ peer->pieces++;
break;
case MESSAGE_END_DATA:
xbt_assert(peer->init, __FILE__ ": peer_execute_task() failed: got msg_type %d before initialization", msg->type);
done = 1;
+ XBT_INFO("%d pieces receieved", peer->pieces);
break;
}
msg_error_t peer_wait_for_message(peer_t peer)
{
msg_error_t status;
- msg_comm_t comm;
+ msg_comm_t comm = NULL;
+ msg_task_t task = NULL;
int done = 0;
- msg_task_t task = NULL;
/* TODO: Error checking is not correct */
while (!done) {
- comm = MSG_task_irecv(&task, peer->me);
- status = MSG_comm_wait(comm, -1);
- xbt_assert(status == MSG_OK, __FILE__ ": peer_wait_for_message() failed");
- MSG_comm_destroy(comm);
-
- done = peer_execute_task(peer, task);
- task_message_delete(task);
- task = NULL;
+ if (comm == NULL)
+ comm = MSG_task_irecv(&task, peer->me);
+
+ if (MSG_comm_test(comm)) {
+ status = MSG_comm_get_status(comm);
+ xbt_assert(status == MSG_OK, __FILE__ ": peer_wait_for_message() failed");
+ MSG_comm_destroy(comm);
+ comm = NULL;
+ done = peer_execute_task(peer, task);
+ task_message_delete(task);
+ task = NULL;
+ } else {
+ MSG_process_sleep(0.01);
+ }
}
return status;
p->init = 0;
p->prev = NULL;
p->next = NULL;
+ p->pieces = 0;
p->me = MSG_host_get_name(MSG_host_self());
}