Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add new entry in Release_Notes.
[simgrid.git] / examples / c / app-chainsend / broadcaster.c
1 /* Copyright (c) 2012-2023. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "chainsend.h"
7
8 XBT_LOG_NEW_DEFAULT_CATEGORY(broadcaster, "Messages specific for the broadcaster");
9
10 static chain_message_t chain_message_new(sg_mailbox_t prev, sg_mailbox_t next, const unsigned int num_pieces)
11 {
12   chain_message_t msg = xbt_malloc(sizeof(s_chain_message_t));
13   msg->prev_          = prev;
14   msg->next_          = next;
15   msg->num_pieces     = num_pieces;
16
17   return msg;
18 }
19
20 static void broadcaster_build_chain(broadcaster_t bc)
21 {
22   /* Build the chain if there's at least one peer */
23   if (bc->host_count > 0)
24     bc->first = bc->mailboxes[0];
25
26   for (unsigned i = 0; i < bc->host_count; i++) {
27     sg_mailbox_t prev = i > 0 ? bc->mailboxes[i - 1] : NULL;
28     sg_mailbox_t next = i < bc->host_count - 1 ? bc->mailboxes[i + 1] : NULL;
29     XBT_DEBUG("Building chain--broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", sg_host_self_get_name(),
30               sg_mailbox_get_name(bc->mailboxes[i]), prev ? sg_mailbox_get_name(prev) : NULL,
31               next ? sg_mailbox_get_name(next) : NULL);
32     /* Send message to current peer */
33     sg_mailbox_put(bc->mailboxes[i], chain_message_new(prev, next, bc->piece_count), MESSAGE_BUILD_CHAIN_SIZE);
34   }
35 }
36
37 static void broadcaster_send_file(const_broadcaster_t bc)
38 {
39   for (unsigned int current_piece = 0; current_piece < bc->piece_count; current_piece++) {
40     XBT_DEBUG("Sending (send) piece %u from %s into mailbox %s", current_piece, sg_host_self_get_name(),
41               sg_mailbox_get_name(bc->first));
42     char* file_piece = bprintf("piece-%u", current_piece);
43     sg_comm_t comm   = sg_mailbox_put_async(bc->first, file_piece, MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
44     sg_activity_set_push(bc->pending_sends, (sg_activity_t)comm);
45   }
46   sg_activity_set_wait_all(bc->pending_sends);
47 }
48
49 static broadcaster_t broadcaster_init(sg_mailbox_t* mailboxes, unsigned int host_count, unsigned int piece_count)
50 {
51   broadcaster_t bc = xbt_malloc(sizeof(s_broadcaster_t));
52
53   bc->first         = NULL;
54   bc->host_count    = host_count;
55   bc->piece_count   = piece_count;
56   bc->mailboxes     = mailboxes;
57   bc->pending_sends = sg_activity_set_init();
58
59   broadcaster_build_chain(bc);
60
61   return bc;
62 }
63
64 static void broadcaster_destroy(broadcaster_t bc)
65 {
66   sg_activity_set_delete(bc->pending_sends);
67   xbt_free(bc->mailboxes);
68   xbt_free(bc);
69 }
70
71 /** Emitter function  */
72 void broadcaster(int argc, char* argv[])
73 {
74   XBT_DEBUG("broadcaster");
75   xbt_assert(argc > 2);
76   unsigned int host_count = (unsigned int)xbt_str_parse_int(argv[1], "Invalid number of peers");
77
78   sg_mailbox_t* mailboxes = xbt_malloc(sizeof(sg_mailbox_t) * host_count);
79
80   for (unsigned int i = 1; i <= host_count; i++) {
81     char* name = bprintf("node-%u.simgrid.org", i);
82     XBT_DEBUG("%s", name);
83     mailboxes[i - 1] = sg_mailbox_by_name(name);
84     free(name);
85   }
86
87   unsigned int piece_count = (unsigned int)xbt_str_parse_int(argv[2], "Invalid number of pieces");
88
89   broadcaster_t bc = broadcaster_init(mailboxes, host_count, piece_count);
90
91   broadcaster_send_file(bc);
92
93   broadcaster_destroy(bc);
94 }