1 #include "broadcaster.h"
3 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_broadcaster,
4 "Messages specific for the broadcaster");
6 xbt_dynar_t build_hostlist_from_hostcount(int hostcount)
8 xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL);
12 for (; i < hostcount+1; i++) {
13 hostname = xbt_new(char, HOSTNAME_LENGTH);
14 snprintf(hostname, HOSTNAME_LENGTH, "host%d", i);
15 XBT_DEBUG("%s", hostname);
16 xbt_dynar_push(host_list, &hostname);
21 int broadcaster_build_chain(broadcaster_t bc)
23 msg_task_t task = NULL;
24 char **cur = (char**)xbt_dynar_iterator_next(bc->it);
25 const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
26 const char *current_host = NULL;
27 const char *prev = NULL;
28 const char *next = NULL;
29 const char *last = NULL;
31 /* Build the chain if there's at least one peer */
33 /* init: prev=NULL, host=current cur, next=next cur */
37 /* This iterator iterates one step ahead: cur is current iterated element,
38 but it's actually the next one in the chain */
40 /* following steps: prev=last, host=next, next=cur */
41 cur = (char**)xbt_dynar_iterator_next(bc->it);
48 XBT_DEBUG("Building chain -- broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next);
50 /* Send message to current peer */
51 task = task_message_chain_new(me, current_host, prev, next);
52 //MSG_task_set_category(task, current_host);
53 MSG_task_send(task, current_host);
56 } while (cur != NULL);
62 int broadcaster_send_file(broadcaster_t bc)
64 const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
65 msg_comm_t comm = NULL;
66 msg_task_t task = NULL;
68 bc->current_piece = 0;
70 while (bc->current_piece < bc->piece_count) {
71 if (xbt_dynar_length(bc->pending_sends) < bc->max_pending_sends) {
72 task = task_message_data_new(me, bc->first, NULL, PIECE_SIZE);
73 XBT_DEBUG("Sending (isend) piece %d from %s into mailbox %s (current pending %lu)", bc->current_piece, me, bc->first, xbt_dynar_length(bc->pending_sends));
74 comm = MSG_task_isend(task, bc->first);
75 queue_pending_connection(comm, bc->pending_sends);
78 MSG_process_sleep(0.01);
80 process_pending_connections(bc->pending_sends);
86 int broadcaster_finish(broadcaster_t bc)
88 msg_task_t task = NULL;
89 const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
90 const char *current_host = NULL;
93 xbt_dynar_iterator_seek(bc->it, 0);
95 /* Send goodbye message to every peer in the order generated by iterator it */
96 for (cur = (char**)xbt_dynar_iterator_next(bc->it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(bc->it)) {
97 /* Send message to current peer */
99 task = task_message_end_data_new(me, current_host);
100 //MSG_task_set_category(task, current_host);
101 MSG_task_send(task, current_host);
107 broadcaster_t broadcaster_init(xbt_dynar_t host_list, unsigned int piece_count)
110 broadcaster_t bc = xbt_new(s_broadcaster_t, 1);
112 bc->piece_count = piece_count;
113 bc->current_piece = 0;
114 bc->host_list = host_list;
115 bc->it = xbt_dynar_iterator_new(bc->host_list, forward_indices_list);
116 bc->max_pending_sends = MAX_PENDING_SENDS;
117 bc->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
119 status = broadcaster_build_chain(bc);
120 xbt_assert(status == MSG_OK, "Chain initialization failed");
125 static void broadcaster_destroy(broadcaster_t bc)
127 /* Destroy iterator and hostlist */
128 xbt_dynar_iterator_delete(bc->it);
129 xbt_dynar_free(&bc->pending_sends);
130 xbt_dynar_free(&bc->host_list);
133 /** Emitter function */
134 int broadcaster(int argc, char *argv[])
136 broadcaster_t bc = NULL;
137 xbt_dynar_t host_list = NULL;
139 unsigned int piece_count = PIECE_COUNT;
141 XBT_DEBUG("broadcaster");
143 /* Add every mailbox given by the hostcount in argv[1] to a dynamic array */
144 host_list = build_hostlist_from_hostcount(atoi(argv[1]));
146 /* argv[2] is the number of pieces */
148 piece_count = atoi(argv[2]);
149 XBT_DEBUG("piece_count set to %d", piece_count);
151 XBT_DEBUG("No piece_count specified, defaulting to %d", piece_count);
153 bc = broadcaster_init(host_list, piece_count);
155 /* TODO: Error checking */
156 status = broadcaster_send_file(bc);
157 status = broadcaster_finish(bc);
159 broadcaster_destroy(bc);