Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
47525dd27f2f48e88f3f40c2f1a41f81c5d033ca
[simgrid.git] / examples / cpp / app-chainsend / s4u-app-chainsend.cpp
1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "simgrid/s4u.hpp"
7 #include <vector>
8
9 constexpr unsigned PIECE_SIZE                    = 65536;
10 constexpr unsigned MESSAGE_BUILD_CHAIN_SIZE      = 40;
11 constexpr unsigned MESSAGE_SEND_DATA_HEADER_SIZE = 1;
12
13 XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_chainsend, "Messages specific for chainsend");
14
15 class ChainMessage {
16 public:
17   simgrid::s4u::Mailbox* prev_   = nullptr;
18   simgrid::s4u::Mailbox* next_   = nullptr;
19   unsigned int num_pieces        = 0;
20   explicit ChainMessage(simgrid::s4u::Mailbox* prev, simgrid::s4u::Mailbox* next, const unsigned int num_pieces)
21       : prev_(prev), next_(next), num_pieces(num_pieces)
22   {
23   }
24 };
25
26 class FilePiece {
27 public:
28   FilePiece()  = default;
29 };
30
31 class Peer {
32 public:
33   simgrid::s4u::Mailbox* prev = nullptr;
34   simgrid::s4u::Mailbox* next = nullptr;
35   simgrid::s4u::Mailbox* me   = nullptr;
36   std::vector<simgrid::s4u::CommPtr> pending_recvs;
37   std::vector<simgrid::s4u::CommPtr> pending_sends;
38
39   unsigned long long received_bytes = 0;
40   unsigned int received_pieces      = 0;
41   unsigned int total_pieces         = 0;
42
43   Peer() { me = simgrid::s4u::Mailbox::by_name(simgrid::s4u::Host::current()->get_cname()); }
44
45   void joinChain()
46   {
47     auto msg     = me->get_unique<ChainMessage>();
48     prev         = msg->prev_;
49     next         = msg->next_;
50     total_pieces = msg->num_pieces;
51     XBT_DEBUG("Peer %s got a 'BUILD_CHAIN' message (prev: %s / next: %s)", me->get_cname(),
52               prev ? prev->get_cname() : nullptr, next ? next->get_cname() : nullptr);
53   }
54
55   void forwardFile()
56   {
57     FilePiece* received;
58     bool done = false;
59
60     while (not done) {
61       simgrid::s4u::CommPtr comm = me->get_async<FilePiece>(&received);
62       pending_recvs.push_back(comm);
63
64       ssize_t idx = simgrid::s4u::Comm::wait_any(pending_recvs);
65       if (idx != -1) {
66         comm = pending_recvs.at(idx);
67         XBT_DEBUG("Peer %s got a 'SEND_DATA' message", me->get_cname());
68         pending_recvs.erase(pending_recvs.begin() + idx);
69         if (next != nullptr) {
70           XBT_DEBUG("Sending (asynchronously) from %s to %s", me->get_cname(), next->get_cname());
71           simgrid::s4u::CommPtr send = next->put_async(received, MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
72           pending_sends.push_back(send);
73         } else
74           delete received;
75
76         received_pieces++;
77         received_bytes += PIECE_SIZE;
78         XBT_DEBUG("%u pieces received, %llu bytes received", received_pieces, received_bytes);
79         if (received_pieces >= total_pieces) {
80           done = true;
81         }
82       }
83     }
84   }
85 };
86
87 class Broadcaster {
88 public:
89   simgrid::s4u::Mailbox* first = nullptr;
90   std::vector<simgrid::s4u::Mailbox*> mailboxes;
91   unsigned int piece_count;
92
93   void buildChain()
94   {
95     /* Build the chain if there's at least one peer */
96     if (not mailboxes.empty())
97       first = mailboxes.front();
98
99     for (unsigned i = 0; i < mailboxes.size(); i++) {
100       simgrid::s4u::Mailbox* prev = i > 0 ? mailboxes[i - 1] : nullptr;
101       simgrid::s4u::Mailbox* next = i < mailboxes.size() - 1 ? mailboxes[i + 1] : nullptr;
102       XBT_DEBUG("Building chain--broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"",
103                 simgrid::s4u::Host::current()->get_cname(), mailboxes[i]->get_cname(),
104                 prev ? prev->get_cname() : nullptr, next ? next->get_cname() : nullptr);
105       /* Send message to current peer */
106       mailboxes[i]->put(new ChainMessage(prev, next, piece_count), MESSAGE_BUILD_CHAIN_SIZE);
107     }
108   }
109
110   void sendFile()
111   {
112     std::vector<simgrid::s4u::CommPtr> pending_sends;
113     for (unsigned int current_piece = 0; current_piece < piece_count; current_piece++) {
114       XBT_DEBUG("Sending (send) piece %u from %s into mailbox %s", current_piece,
115                 simgrid::s4u::Host::current()->get_cname(), first->get_cname());
116       simgrid::s4u::CommPtr comm = first->put_async(new FilePiece(), MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
117       pending_sends.push_back(comm);
118     }
119     simgrid::s4u::Comm::wait_all(pending_sends);
120   }
121
122   Broadcaster(int hostcount, unsigned int piece_count) : piece_count(piece_count)
123   {
124     for (int i = 1; i <= hostcount; i++) {
125       std::string name = std::string("node-") + std::to_string(i) + ".simgrid.org";
126       XBT_DEBUG("%s", name.c_str());
127       mailboxes.push_back(simgrid::s4u::Mailbox::by_name(name));
128     }
129   }
130 };
131
132 static void peer()
133 {
134   XBT_DEBUG("peer");
135
136   Peer p;
137
138   double start_time = simgrid::s4u::Engine::get_clock();
139   p.joinChain();
140   p.forwardFile();
141
142   simgrid::s4u::Comm::wait_all(p.pending_sends);
143   double end_time = simgrid::s4u::Engine::get_clock();
144
145   XBT_INFO("### %f %llu bytes (Avg %f MB/s); copy finished (simulated).", end_time - start_time, p.received_bytes,
146            p.received_bytes / 1024.0 / 1024.0 / (end_time - start_time));
147 }
148
149 static void broadcaster(int hostcount, unsigned int piece_count)
150 {
151   XBT_DEBUG("broadcaster");
152
153   Broadcaster bc(hostcount, piece_count);
154   bc.buildChain();
155   bc.sendFile();
156 }
157
158 int main(int argc, char* argv[])
159 {
160   simgrid::s4u::Engine e(&argc, argv);
161
162   e.load_platform(argv[1]);
163
164   simgrid::s4u::Actor::create("broadcaster", e.host_by_name("node-0.simgrid.org"), broadcaster, 8, 256);
165
166   simgrid::s4u::Actor::create("peer", e.host_by_name("node-1.simgrid.org"), peer);
167   simgrid::s4u::Actor::create("peer", e.host_by_name("node-2.simgrid.org"), peer);
168   simgrid::s4u::Actor::create("peer", e.host_by_name("node-3.simgrid.org"), peer);
169   simgrid::s4u::Actor::create("peer", e.host_by_name("node-4.simgrid.org"), peer);
170   simgrid::s4u::Actor::create("peer", e.host_by_name("node-5.simgrid.org"), peer);
171   simgrid::s4u::Actor::create("peer", e.host_by_name("node-6.simgrid.org"), peer);
172   simgrid::s4u::Actor::create("peer", e.host_by_name("node-7.simgrid.org"), peer);
173   simgrid::s4u::Actor::create("peer", e.host_by_name("node-8.simgrid.org"), peer);
174
175   e.run();
176   XBT_INFO("Total simulation time: %e", simgrid::s4u::Engine::get_clock());
177
178   return 0;
179 }