XBT_DEBUG("Building chain -- broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next);
/* Send message to current peer */
- task = task_message_chain_new(me, current_host, prev, next);
- //MSG_task_set_category(task, current_host);
+ task = task_message_chain_new(me, current_host, prev, next, bc->piece_count);
MSG_task_send(task, current_host);
last = current_host;
int broadcaster_send_file(broadcaster_t bc)
{
const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
- msg_comm_t comm = NULL;
+ //msg_comm_t comm = NULL;
msg_task_t task = NULL;
bc->current_piece = 0;
while (bc->current_piece < bc->piece_count) {
- if (xbt_dynar_length(bc->pending_sends) < bc->max_pending_sends) {
- task = task_message_data_new(me, bc->first, NULL, PIECE_SIZE);
- XBT_DEBUG("Sending (isend) piece %d from %s into mailbox %s (current pending %lu)", bc->current_piece, me, bc->first, xbt_dynar_length(bc->pending_sends));
- comm = MSG_task_isend(task, bc->first);
- queue_pending_connection(comm, bc->pending_sends);
- bc->current_piece++;
- } else {
- MSG_process_sleep(0.01);
- }
- process_pending_connections(bc->pending_sends);
- }
-
- return MSG_OK;
-}
-
-int broadcaster_finish(broadcaster_t bc)
-{
- msg_task_t task = NULL;
- const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
- const char *current_host = NULL;
- char **cur = NULL;
-
- xbt_dynar_iterator_seek(bc->it, 0);
-
- /* Send goodbye message to every peer in the order generated by iterator it */
- for (cur = (char**)xbt_dynar_iterator_next(bc->it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(bc->it)) {
- /* Send message to current peer */
- current_host = *cur;
- task = task_message_end_data_new(me, current_host);
- //MSG_task_set_category(task, current_host);
- MSG_task_send(task, current_host);
+ task = task_message_data_new(me, bc->first, NULL, PIECE_SIZE);
+ XBT_DEBUG("Sending (send) piece %d from %s into mailbox %s", bc->current_piece, me, bc->first);
+ MSG_task_send(task, bc->first);
+ bc->current_piece++;
}
return MSG_OK;
int status;
unsigned int piece_count = PIECE_COUNT;
- XBT_INFO("broadcaster");
+ XBT_DEBUG("broadcaster");
/* Add every mailbox given by the hostcount in argv[1] to a dynamic array */
host_list = build_hostlist_from_hostcount(atoi(argv[1]));
/* TODO: Error checking */
status = broadcaster_send_file(bc);
- status = broadcaster_finish(bc);
broadcaster_destroy(bc);