Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'v3_9_x' of scm.gforge.inria.fr:/gitroot/simgrid/simgrid into v3_9_x
[simgrid.git] / examples / msg / chainsend / broadcaster.c
1 #include "broadcaster.h"
2
3 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_broadcaster,
4                              "Messages specific for the broadcaster");
5
6 xbt_dynar_t build_hostlist_from_hostcount(int hostcount)
7 {
8   xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL);
9   char *hostname = NULL;
10   int i = 1;
11   
12   for (; i < hostcount+1; i++) {
13     hostname = xbt_new(char, HOSTNAME_LENGTH);
14     snprintf(hostname, HOSTNAME_LENGTH, "host%d", i);
15     XBT_DEBUG("%s", hostname);
16     xbt_dynar_push(host_list, &hostname);
17   }
18   return host_list;
19 }
20
21 int broadcaster_build_chain(broadcaster_t bc)
22 {
23   msg_task_t task = NULL;
24   char **cur = (char**)xbt_dynar_iterator_next(bc->it);
25   const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
26   const char *current_host = NULL;
27   const char *prev = NULL;
28   const char *next = NULL;
29   const char *last = NULL;
30
31   /* Build the chain if there's at least one peer */
32   if (cur != NULL) {
33     /* init: prev=NULL, host=current cur, next=next cur */
34     next = *cur;
35     bc->first = next;
36
37     /* This iterator iterates one step ahead: cur is current iterated element, 
38        but it's actually the next one in the chain */
39     do {
40       /* following steps: prev=last, host=next, next=cur */
41       cur = (char**)xbt_dynar_iterator_next(bc->it);
42       prev = last;
43       current_host = next;
44       if (cur != NULL)
45         next = *cur;
46       else
47         next = NULL;
48       XBT_DEBUG("Building chain -- broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next);
49     
50       /* Send message to current peer */
51       task = task_message_chain_new(me, current_host, prev, next);
52       //MSG_task_set_category(task, current_host);
53       MSG_task_send(task, current_host);
54
55       last = current_host;
56     } while (cur != NULL);
57   }
58
59   return MSG_OK;
60 }
61
62 int broadcaster_send_file(broadcaster_t bc)
63 {
64   const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
65   //msg_comm_t comm = NULL;
66   msg_task_t task = NULL;
67
68   bc->current_piece = 0;
69
70   while (bc->current_piece < bc->piece_count) {
71     task = task_message_data_new(me, bc->first, NULL, PIECE_SIZE);
72     XBT_DEBUG("Sending (send) piece %d from %s into mailbox %s", bc->current_piece, me, bc->first);
73     MSG_task_send(task, bc->first);
74     bc->current_piece++;
75   }
76
77   return MSG_OK;
78 }
79
80 int broadcaster_finish(broadcaster_t bc)
81 {
82   msg_task_t task = NULL;
83   const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
84   const char *current_host = NULL;
85   char **cur = NULL;
86
87   xbt_dynar_iterator_seek(bc->it, 0);
88
89   /* Send goodbye message to every peer in the order generated by iterator it */
90   for (cur = (char**)xbt_dynar_iterator_next(bc->it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(bc->it)) {
91     /* Send message to current peer */
92     current_host = *cur;
93     task = task_message_end_data_new(me, current_host);
94     //MSG_task_set_category(task, current_host);
95     MSG_task_send(task, current_host);
96   }
97
98   return MSG_OK;
99 }
100
101 broadcaster_t broadcaster_init(xbt_dynar_t host_list, unsigned int piece_count)
102 {
103   int status;
104   broadcaster_t bc = xbt_new(s_broadcaster_t, 1);
105
106   bc->piece_count = piece_count;
107   bc->current_piece = 0;
108   bc->host_list = host_list;
109   bc->it = xbt_dynar_iterator_new(bc->host_list, forward_indices_list);
110   bc->max_pending_sends = MAX_PENDING_SENDS;
111   bc->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
112
113   status = broadcaster_build_chain(bc);
114   xbt_assert(status == MSG_OK, "Chain initialization failed");
115
116   return bc;
117 }
118
119 static void broadcaster_destroy(broadcaster_t bc)
120 {
121   /* Destroy iterator and hostlist */
122   xbt_dynar_iterator_delete(bc->it);
123   xbt_dynar_free(&bc->pending_sends);
124   xbt_dynar_free(&bc->host_list);
125 }
126
127 /** Emitter function  */
128 int broadcaster(int argc, char *argv[])
129 {
130   broadcaster_t bc = NULL;
131   xbt_dynar_t host_list = NULL;
132   int status;
133   unsigned int piece_count = PIECE_COUNT;
134
135   XBT_DEBUG("broadcaster");
136
137   /* Add every mailbox given by the hostcount in argv[1] to a dynamic array */
138   host_list = build_hostlist_from_hostcount(atoi(argv[1]));
139
140   /* argv[2] is the number of pieces */
141   if (argc > 2) {
142     piece_count = atoi(argv[2]);
143     XBT_DEBUG("piece_count set to %d", piece_count);
144   } else {
145     XBT_DEBUG("No piece_count specified, defaulting to %d", piece_count);
146   }
147   bc = broadcaster_init(host_list, piece_count);
148
149   /* TODO: Error checking */
150   status = broadcaster_send_file(bc);
151   status = broadcaster_finish(bc);
152
153   broadcaster_destroy(bc);
154
155   return status;
156 }