Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Remove explicit conversion to std::string when it's not required.
[simgrid.git] / examples / cpp / app-chainsend / s4u-app-chainsend.cpp
1 /* Copyright (c) 2007-2022. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "simgrid/s4u.hpp"
7 #include <vector>
8
9 constexpr unsigned PIECE_SIZE                    = 65536;
10 constexpr unsigned MESSAGE_BUILD_CHAIN_SIZE      = 40;
11 constexpr unsigned MESSAGE_SEND_DATA_HEADER_SIZE = 1;
12
13 XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_chainsend, "Messages specific for chainsend");
14 namespace sg4 = simgrid::s4u;
15
16 class ChainMessage {
17 public:
18   sg4::Mailbox* prev_            = nullptr;
19   sg4::Mailbox* next_            = nullptr;
20   unsigned int num_pieces        = 0;
21   explicit ChainMessage(sg4::Mailbox* prev, sg4::Mailbox* next, const unsigned int num_pieces)
22       : prev_(prev), next_(next), num_pieces(num_pieces)
23   {
24   }
25 };
26
27 class FilePiece {
28 public:
29   FilePiece()  = default;
30 };
31
32 class Peer {
33 public:
34   sg4::Mailbox* prev = nullptr;
35   sg4::Mailbox* next = nullptr;
36   sg4::Mailbox* me   = nullptr;
37   std::vector<sg4::CommPtr> pending_recvs;
38   std::vector<sg4::CommPtr> pending_sends;
39
40   unsigned long long received_bytes = 0;
41   unsigned int received_pieces      = 0;
42   unsigned int total_pieces         = 0;
43
44   Peer() { me = sg4::Mailbox::by_name(sg4::Host::current()->get_cname()); }
45
46   void joinChain()
47   {
48     auto msg     = me->get_unique<ChainMessage>();
49     prev         = msg->prev_;
50     next         = msg->next_;
51     total_pieces = msg->num_pieces;
52     XBT_DEBUG("Peer %s got a 'BUILD_CHAIN' message (prev: %s / next: %s)", me->get_cname(),
53               prev ? prev->get_cname() : nullptr, next ? next->get_cname() : nullptr);
54   }
55
56   void forwardFile()
57   {
58     FilePiece* received;
59     bool done = false;
60
61     while (not done) {
62       sg4::CommPtr comm = me->get_async<FilePiece>(&received);
63       pending_recvs.push_back(comm);
64
65       ssize_t idx = sg4::Comm::wait_any(pending_recvs);
66       if (idx != -1) {
67         comm = pending_recvs.at(idx);
68         XBT_DEBUG("Peer %s got a 'SEND_DATA' message", me->get_cname());
69         pending_recvs.erase(pending_recvs.begin() + idx);
70         if (next != nullptr) {
71           XBT_DEBUG("Sending (asynchronously) from %s to %s", me->get_cname(), next->get_cname());
72           sg4::CommPtr send = next->put_async(received, MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
73           pending_sends.push_back(send);
74         } else
75           delete received;
76
77         received_pieces++;
78         received_bytes += PIECE_SIZE;
79         XBT_DEBUG("%u pieces received, %llu bytes received", received_pieces, received_bytes);
80         if (received_pieces >= total_pieces) {
81           done = true;
82         }
83       }
84     }
85   }
86 };
87
88 class Broadcaster {
89 public:
90   sg4::Mailbox* first = nullptr;
91   std::vector<sg4::Mailbox*> mailboxes;
92   unsigned int piece_count;
93
94   void buildChain()
95   {
96     /* Build the chain if there's at least one peer */
97     if (not mailboxes.empty())
98       first = mailboxes.front();
99
100     for (unsigned i = 0; i < mailboxes.size(); i++) {
101       sg4::Mailbox* prev = i > 0 ? mailboxes[i - 1] : nullptr;
102       sg4::Mailbox* next = i < mailboxes.size() - 1 ? mailboxes[i + 1] : nullptr;
103       XBT_DEBUG("Building chain--broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"",
104                 sg4::Host::current()->get_cname(), mailboxes[i]->get_cname(), prev ? prev->get_cname() : nullptr,
105                 next ? next->get_cname() : nullptr);
106       /* Send message to current peer */
107       mailboxes[i]->put(new ChainMessage(prev, next, piece_count), MESSAGE_BUILD_CHAIN_SIZE);
108     }
109   }
110
111   void sendFile()
112   {
113     std::vector<sg4::CommPtr> pending_sends;
114     for (unsigned int current_piece = 0; current_piece < piece_count; current_piece++) {
115       XBT_DEBUG("Sending (send) piece %u from %s into mailbox %s", current_piece, sg4::Host::current()->get_cname(),
116                 first->get_cname());
117       sg4::CommPtr comm = first->put_async(new FilePiece(), MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
118       pending_sends.push_back(comm);
119     }
120     sg4::Comm::wait_all(pending_sends);
121   }
122
123   Broadcaster(int hostcount, unsigned int piece_count) : piece_count(piece_count)
124   {
125     for (int i = 1; i <= hostcount; i++) {
126       std::string name = "node-" + std::to_string(i) + ".simgrid.org";
127       XBT_DEBUG("%s", name.c_str());
128       mailboxes.push_back(sg4::Mailbox::by_name(name));
129     }
130   }
131 };
132
133 static void peer()
134 {
135   XBT_DEBUG("peer");
136
137   Peer p;
138
139   double start_time = sg4::Engine::get_clock();
140   p.joinChain();
141   p.forwardFile();
142
143   sg4::Comm::wait_all(p.pending_sends);
144   double end_time = sg4::Engine::get_clock();
145
146   XBT_INFO("### %f %llu bytes (Avg %f MB/s); copy finished (simulated).", end_time - start_time, p.received_bytes,
147            p.received_bytes / 1024.0 / 1024.0 / (end_time - start_time));
148 }
149
150 static void broadcaster(int hostcount, unsigned int piece_count)
151 {
152   XBT_DEBUG("broadcaster");
153
154   Broadcaster bc(hostcount, piece_count);
155   bc.buildChain();
156   bc.sendFile();
157 }
158
159 int main(int argc, char* argv[])
160 {
161   sg4::Engine e(&argc, argv);
162
163   e.load_platform(argv[1]);
164
165   sg4::Actor::create("broadcaster", e.host_by_name("node-0.simgrid.org"), broadcaster, 8, 256);
166
167   sg4::Actor::create("peer", e.host_by_name("node-1.simgrid.org"), peer);
168   sg4::Actor::create("peer", e.host_by_name("node-2.simgrid.org"), peer);
169   sg4::Actor::create("peer", e.host_by_name("node-3.simgrid.org"), peer);
170   sg4::Actor::create("peer", e.host_by_name("node-4.simgrid.org"), peer);
171   sg4::Actor::create("peer", e.host_by_name("node-5.simgrid.org"), peer);
172   sg4::Actor::create("peer", e.host_by_name("node-6.simgrid.org"), peer);
173   sg4::Actor::create("peer", e.host_by_name("node-7.simgrid.org"), peer);
174   sg4::Actor::create("peer", e.host_by_name("node-8.simgrid.org"), peer);
175
176   e.run();
177   XBT_INFO("Total simulation time: %e", sg4::Engine::get_clock());
178
179   return 0;
180 }