XBT_DEBUG("Building chain -- broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next);
/* Send message to current peer */
- task = task_message_chain_new(me, current_host, prev, next);
- //MSG_task_set_category(task, current_host);
+ task = task_message_chain_new(me, current_host, prev, next, bc->piece_count);
MSG_task_send(task, current_host);
last = current_host;
return MSG_OK;
}
-int broadcaster_finish(broadcaster_t bc)
-{
- msg_task_t task = NULL;
- const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
- const char *current_host = NULL;
- char **cur = NULL;
-
- xbt_dynar_iterator_seek(bc->it, 0);
-
- /* Send goodbye message to every peer in the order generated by iterator it */
- for (cur = (char**)xbt_dynar_iterator_next(bc->it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(bc->it)) {
- /* Send message to current peer */
- current_host = *cur;
- task = task_message_end_data_new(me, current_host);
- //MSG_task_set_category(task, current_host);
- MSG_task_send(task, current_host);
- }
-
- return MSG_OK;
-}
-
broadcaster_t broadcaster_init(xbt_dynar_t host_list, unsigned int piece_count)
{
int status;
/* TODO: Error checking */
status = broadcaster_send_file(bc);
- status = broadcaster_finish(bc);
broadcaster_destroy(bc);
return task;
}
-msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next)
+msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next, const unsigned int num_pieces)
{
msg_task_t task = task_message_new(MESSAGE_BUILD_CHAIN, MESSAGE_BUILD_CHAIN_SIZE, issuer_hostname, mailbox);
message_t msg = MSG_task_get_data(task);
msg->prev_hostname = prev;
msg->next_hostname = next;
+ msg->num_pieces = num_pieces;
return task;
}
msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len)
{
msg_task_t task = task_message_new(MESSAGE_SEND_DATA, MESSAGE_SEND_DATA_HEADER_SIZE + len, issuer_hostname, mailbox);
- //if (strcmp(mailbox, "host4") == 0)
- //MSG_task_set_category(task, mailbox);
message_t msg = MSG_task_get_data(task);
msg->data_block = block;
msg->data_length = len;
return task;
}
-msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox)
-{
- return task_message_new(MESSAGE_END_DATA, MESSAGE_END_DATA_SIZE, issuer_hostname, mailbox);
-}
-
void task_message_delete(void *task)
{
message_t msg = MSG_task_get_data(task);
/* Messages enum */
typedef enum {
MESSAGE_BUILD_CHAIN = 0,
- MESSAGE_SEND_DATA,
- MESSAGE_END_DATA
+ MESSAGE_SEND_DATA
} e_message_type;
/* Message struct */
const char *next_hostname;
const char *data_block;
unsigned int data_length;
+ unsigned int num_pieces;
} s_message_t, *message_t;
/* Message methods */
msg_task_t task_message_new(e_message_type type, unsigned int len, const char *issuer_hostname, const char *mailbox);
-msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next);
+msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next, const unsigned int num_pieces);
msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len);
-msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox);
void task_message_delete(void *);
#endif /* KADEPLOY_MESSAGES_H */
{
peer->prev = msg->prev_hostname;
peer->next = msg->next_hostname;
+ peer->total_pieces = msg->num_pieces;
peer->init = 1;
}
peer_forward_msg(peer, msg);
peer->pieces++;
peer->bytes += msg->data_length;
- break;
- case MESSAGE_END_DATA:
- xbt_assert(peer->init, "peer_execute_task() failed: got msg_type %d before initialization", msg->type);
- done = 1;
- XBT_DEBUG("%d pieces receieved", peer->pieces);
+ if (peer->pieces >= peer->total_pieces) {
+ XBT_DEBUG("%d pieces receieved", peer->pieces);
+ done = 1;
+ }
break;
}
p->next = NULL;
p->pieces = 0;
p->bytes = 0;
- p->close_asap = 0;
p->pending_recvs = xbt_dynar_new(sizeof(msg_comm_t), NULL);
p->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
p->me = xbt_new(char, HOSTNAME_LENGTH);
unsigned long long bytes;
xbt_dynar_t pending_recvs;
xbt_dynar_t pending_sends;
- int close_asap; /* TODO: unused */
+ unsigned int total_pieces;
} s_peer_t, *peer_t;
/* Peer: helper functions */