Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Update copyright headers.
[simgrid.git] / teshsuite / msg / app-chainsend / broadcaster.c
1 /* Copyright (c) 2012-2018. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "broadcaster.h"
7
8 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_broadcaster, "Messages specific for the broadcaster");
9
10 xbt_dynar_t build_hostlist_from_hostcount(int hostcount)
11 {
12   xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), xbt_free_ref);
13   for (int i = 1; i <= hostcount; i++) {
14     char* hostname = bprintf("host%d", i);
15     XBT_DEBUG("%s", hostname);
16     xbt_dynar_push(host_list, &hostname);
17   }
18   return host_list;
19 }
20
21 int broadcaster_build_chain(broadcaster_t bc)
22 {
23   msg_task_t task          = NULL;
24   char** cur               = (char**)xbt_dynar_iterator_next(bc->it);
25   const char* me           = MSG_host_get_name(MSG_host_self());
26   const char* current_host = NULL;
27   const char* prev         = NULL;
28   const char* next         = NULL;
29   const char* last         = NULL;
30
31   /* Build the chain if there's at least one peer */
32   if (cur != NULL) {
33     /* init: prev=NULL, host=current cur, next=next cur */
34     next      = *cur;
35     bc->first = next;
36
37     /* This iterator iterates one step ahead: cur is current iterated element, but is actually next in the chain */
38     do {
39       /* following steps: prev=last, host=next, next=cur */
40       cur          = (char**)xbt_dynar_iterator_next(bc->it);
41       prev         = last;
42       current_host = next;
43       if (cur != NULL)
44         next = *cur;
45       else
46         next = NULL;
47       XBT_DEBUG("Building chain--broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next);
48
49       /* Send message to current peer */
50       task = task_message_chain_new(prev, next, bc->piece_count);
51       MSG_task_send(task, current_host);
52
53       last = current_host;
54     } while (cur != NULL);
55   }
56
57   return MSG_OK;
58 }
59
60 int broadcaster_send_file(broadcaster_t bc)
61 {
62   const char* me  = MSG_host_get_name(MSG_host_self());
63   msg_task_t task = NULL;
64
65   bc->current_piece = 0;
66
67   while (bc->current_piece < bc->piece_count) {
68     task = task_message_data_new(NULL, PIECE_SIZE);
69     XBT_DEBUG("Sending (send) piece %d from %s into mailbox %s", bc->current_piece, me, bc->first);
70     MSG_task_send(task, bc->first);
71     bc->current_piece++;
72   }
73
74   return MSG_OK;
75 }
76
77 broadcaster_t broadcaster_init(xbt_dynar_t host_list, unsigned int piece_count)
78 {
79   int status;
80   broadcaster_t bc = xbt_new(s_broadcaster_t, 1);
81
82   bc->first             = NULL;
83   bc->piece_count       = piece_count;
84   bc->current_piece     = 0;
85   bc->host_list         = host_list;
86   bc->it                = xbt_dynar_iterator_new(bc->host_list, forward_indices_list);
87   bc->max_pending_sends = MAX_PENDING_SENDS;
88   bc->pending_sends     = xbt_dynar_new(sizeof(msg_comm_t), NULL);
89
90   status = broadcaster_build_chain(bc);
91   xbt_assert(status == MSG_OK, "Chain initialization failed");
92
93   return bc;
94 }
95
96 static void broadcaster_destroy(broadcaster_t bc)
97 {
98   /* Destroy iterator and hostlist */
99   xbt_dynar_iterator_delete(bc->it);
100   xbt_dynar_free(&bc->pending_sends);
101   xbt_dynar_free(&bc->host_list);
102   xbt_free(bc);
103 }
104
105 /** Emitter function  */
106 int broadcaster(int argc, char* argv[])
107 {
108   unsigned int piece_count = PIECE_COUNT;
109
110   XBT_DEBUG("broadcaster");
111
112   /* Add every mailbox given by the hostcount in argv[1] to a dynamic array */
113   xbt_dynar_t host_list = build_hostlist_from_hostcount(xbt_str_parse_int(argv[1], "Invalid number of peers: %s"));
114
115   /* argv[2] is the number of pieces */
116   if (argc > 2) {
117     piece_count = xbt_str_parse_int(argv[2], "Invalid number of pieces: %s");
118     XBT_DEBUG("piece_count set to %u", piece_count);
119   } else {
120     XBT_DEBUG("No piece_count specified, defaulting to %u", piece_count);
121   }
122   broadcaster_t bc = broadcaster_init(host_list, piece_count);
123
124   /* TODO: Error checking */
125   int status = broadcaster_send_file(bc);
126
127   broadcaster_destroy(bc);
128
129   return status;
130 }