Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'coverity_scan' of github.com:mquinson/simgrid
[simgrid.git] / examples / msg / chainsend / broadcaster.c
1 /* Copyright (c) 2012-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "broadcaster.h"
8
9 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_broadcaster,
10                              "Messages specific for the broadcaster");
11
12 xbt_dynar_t build_hostlist_from_hostcount(int hostcount)
13 {
14   xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), xbt_free_ref);
15   int i;
16   
17   for (i = 1; i <= hostcount; i++) {
18     char *hostname = bprintf("host%d", i);
19     XBT_DEBUG("%s", hostname);
20     xbt_dynar_push(host_list, &hostname);
21   }
22   return host_list;
23 }
24
25 int broadcaster_build_chain(broadcaster_t bc)
26 {
27   msg_task_t task = NULL;
28   char **cur = (char**)xbt_dynar_iterator_next(bc->it);
29   const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
30   const char *current_host = NULL;
31   const char *prev = NULL;
32   const char *next = NULL;
33   const char *last = NULL;
34
35   /* Build the chain if there's at least one peer */
36   if (cur != NULL) {
37     /* init: prev=NULL, host=current cur, next=next cur */
38     next = *cur;
39     bc->first = next;
40
41     /* This iterator iterates one step ahead: cur is current iterated element, 
42        but it's actually the next one in the chain */
43     do {
44       /* following steps: prev=last, host=next, next=cur */
45       cur = (char**)xbt_dynar_iterator_next(bc->it);
46       prev = last;
47       current_host = next;
48       if (cur != NULL)
49         next = *cur;
50       else
51         next = NULL;
52       XBT_DEBUG("Building chain -- broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next);
53     
54       /* Send message to current peer */
55       task = task_message_chain_new(prev, next, bc->piece_count);
56       MSG_task_send(task, current_host);
57
58       last = current_host;
59     } while (cur != NULL);
60   }
61
62   return MSG_OK;
63 }
64
65 int broadcaster_send_file(broadcaster_t bc)
66 {
67   const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
68   //msg_comm_t comm = NULL;
69   msg_task_t task = NULL;
70
71   bc->current_piece = 0;
72
73   while (bc->current_piece < bc->piece_count) {
74     task = task_message_data_new(NULL, PIECE_SIZE);
75     XBT_DEBUG("Sending (send) piece %d from %s into mailbox %s", bc->current_piece, me, bc->first);
76     MSG_task_send(task, bc->first);
77     bc->current_piece++;
78   }
79
80   return MSG_OK;
81 }
82
83 broadcaster_t broadcaster_init(xbt_dynar_t host_list, unsigned int piece_count)
84 {
85   int status;
86   broadcaster_t bc = xbt_new(s_broadcaster_t, 1);
87
88   bc->piece_count = piece_count;
89   bc->current_piece = 0;
90   bc->host_list = host_list;
91   bc->it = xbt_dynar_iterator_new(bc->host_list, forward_indices_list);
92   bc->max_pending_sends = MAX_PENDING_SENDS;
93   bc->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
94
95   status = broadcaster_build_chain(bc);
96   xbt_assert(status == MSG_OK, "Chain initialization failed");
97
98   return bc;
99 }
100
101 static void broadcaster_destroy(broadcaster_t bc)
102 {
103   /* Destroy iterator and hostlist */
104   xbt_dynar_iterator_delete(bc->it);
105   xbt_dynar_free(&bc->pending_sends);
106   xbt_dynar_free(&bc->host_list); /* FIXME: host names are not free'd */
107   xbt_free(bc);
108 }
109
110 /** Emitter function  */
111 int broadcaster(int argc, char *argv[])
112 {
113   broadcaster_t bc = NULL;
114   xbt_dynar_t host_list = NULL;
115   int status;
116   unsigned int piece_count = PIECE_COUNT;
117
118   XBT_DEBUG("broadcaster");
119
120   /* Add every mailbox given by the hostcount in argv[1] to a dynamic array */
121   host_list = build_hostlist_from_hostcount(xbt_str_parse_int(argv[1], "Invalid number of peers: %s"));
122
123   /* argv[2] is the number of pieces */
124   if (argc > 2) {
125     piece_count = xbt_str_parse_int(argv[2], "Invalid number of pieces: %s");
126     XBT_DEBUG("piece_count set to %d", piece_count);
127   } else {
128     XBT_DEBUG("No piece_count specified, defaulting to %d", piece_count);
129   }
130   bc = broadcaster_init(host_list, piece_count);
131
132   /* TODO: Error checking */
133   status = broadcaster_send_file(bc);
134
135   broadcaster_destroy(bc);
136
137   return status;
138 }