Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Remove unused fields.
[simgrid.git] / examples / msg / chainsend / broadcaster.c
1 #include "broadcaster.h"
2
3 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_broadcaster,
4                              "Messages specific for the broadcaster");
5
6 xbt_dynar_t build_hostlist_from_hostcount(int hostcount)
7 {
8   xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), xbt_free_ref);
9   int i;
10   
11   for (i = 1; i <= hostcount; i++) {
12     char *hostname = bprintf("host%d", i);
13     XBT_DEBUG("%s", hostname);
14     xbt_dynar_push(host_list, &hostname);
15   }
16   return host_list;
17 }
18
19 int broadcaster_build_chain(broadcaster_t bc)
20 {
21   msg_task_t task = NULL;
22   char **cur = (char**)xbt_dynar_iterator_next(bc->it);
23   const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
24   const char *current_host = NULL;
25   const char *prev = NULL;
26   const char *next = NULL;
27   const char *last = NULL;
28
29   /* Build the chain if there's at least one peer */
30   if (cur != NULL) {
31     /* init: prev=NULL, host=current cur, next=next cur */
32     next = *cur;
33     bc->first = next;
34
35     /* This iterator iterates one step ahead: cur is current iterated element, 
36        but it's actually the next one in the chain */
37     do {
38       /* following steps: prev=last, host=next, next=cur */
39       cur = (char**)xbt_dynar_iterator_next(bc->it);
40       prev = last;
41       current_host = next;
42       if (cur != NULL)
43         next = *cur;
44       else
45         next = NULL;
46       XBT_DEBUG("Building chain -- broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next);
47     
48       /* Send message to current peer */
49       task = task_message_chain_new(prev, next, bc->piece_count);
50       MSG_task_send(task, current_host);
51
52       last = current_host;
53     } while (cur != NULL);
54   }
55
56   return MSG_OK;
57 }
58
59 int broadcaster_send_file(broadcaster_t bc)
60 {
61   const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
62   //msg_comm_t comm = NULL;
63   msg_task_t task = NULL;
64
65   bc->current_piece = 0;
66
67   while (bc->current_piece < bc->piece_count) {
68     task = task_message_data_new(NULL, PIECE_SIZE);
69     XBT_DEBUG("Sending (send) piece %d from %s into mailbox %s", bc->current_piece, me, bc->first);
70     MSG_task_send(task, bc->first);
71     bc->current_piece++;
72   }
73
74   return MSG_OK;
75 }
76
77 broadcaster_t broadcaster_init(xbt_dynar_t host_list, unsigned int piece_count)
78 {
79   int status;
80   broadcaster_t bc = xbt_new(s_broadcaster_t, 1);
81
82   bc->piece_count = piece_count;
83   bc->current_piece = 0;
84   bc->host_list = host_list;
85   bc->it = xbt_dynar_iterator_new(bc->host_list, forward_indices_list);
86   bc->max_pending_sends = MAX_PENDING_SENDS;
87   bc->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
88
89   status = broadcaster_build_chain(bc);
90   xbt_assert(status == MSG_OK, "Chain initialization failed");
91
92   return bc;
93 }
94
95 static void broadcaster_destroy(broadcaster_t bc)
96 {
97   /* Destroy iterator and hostlist */
98   xbt_dynar_iterator_delete(bc->it);
99   xbt_dynar_free(&bc->pending_sends);
100   xbt_dynar_free(&bc->host_list); /* FIXME: host names are not free'd */
101   xbt_free(bc);
102 }
103
104 /** Emitter function  */
105 int broadcaster(int argc, char *argv[])
106 {
107   broadcaster_t bc = NULL;
108   xbt_dynar_t host_list = NULL;
109   int status;
110   unsigned int piece_count = PIECE_COUNT;
111
112   XBT_DEBUG("broadcaster");
113
114   /* Add every mailbox given by the hostcount in argv[1] to a dynamic array */
115   host_list = build_hostlist_from_hostcount(atoi(argv[1]));
116
117   /* argv[2] is the number of pieces */
118   if (argc > 2) {
119     piece_count = atoi(argv[2]);
120     XBT_DEBUG("piece_count set to %d", piece_count);
121   } else {
122     XBT_DEBUG("No piece_count specified, defaulting to %d", piece_count);
123   }
124   bc = broadcaster_init(host_list, piece_count);
125
126   /* TODO: Error checking */
127   status = broadcaster_send_file(bc);
128
129   broadcaster_destroy(bc);
130
131   return status;
132 }