1 /* Copyright (c) 2007-2020. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "simgrid/s4u.hpp"
9 constexpr unsigned PIECE_SIZE = 65536;
10 constexpr unsigned MESSAGE_BUILD_CHAIN_SIZE = 40;
11 constexpr unsigned MESSAGE_SEND_DATA_HEADER_SIZE = 1;
13 XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_chainsend, "Messages specific for chainsend");
17 simgrid::s4u::Mailbox* prev_ = nullptr;
18 simgrid::s4u::Mailbox* next_ = nullptr;
19 unsigned int num_pieces = 0;
20 explicit ChainMessage(simgrid::s4u::Mailbox* prev, simgrid::s4u::Mailbox* next, const unsigned int num_pieces)
21 : prev_(prev), next_(next), num_pieces(num_pieces)
24 ~ChainMessage() = default;
29 FilePiece() = default;
30 ~FilePiece() = default;
35 simgrid::s4u::Mailbox* prev = nullptr;
36 simgrid::s4u::Mailbox* next = nullptr;
37 simgrid::s4u::Mailbox* me = nullptr;
38 std::vector<simgrid::s4u::CommPtr> pending_recvs;
39 std::vector<simgrid::s4u::CommPtr> pending_sends;
41 unsigned long long received_bytes = 0;
42 unsigned int received_pieces = 0;
43 unsigned int total_pieces = 0;
45 Peer() { me = simgrid::s4u::Mailbox::by_name(simgrid::s4u::Host::current()->get_cname()); }
50 const ChainMessage* msg = static_cast<ChainMessage*>(me->get());
53 total_pieces = msg->num_pieces;
54 XBT_DEBUG("Peer %s got a 'BUILD_CHAIN' message (prev: %s / next: %s)", me->get_cname(),
55 prev ? prev->get_cname() : nullptr, next ? next->get_cname() : nullptr);
65 simgrid::s4u::CommPtr comm = me->get_async(&received);
66 pending_recvs.push_back(comm);
68 int idx = simgrid::s4u::Comm::wait_any(&pending_recvs);
70 comm = pending_recvs.at(idx);
71 XBT_DEBUG("Peer %s got a 'SEND_DATA' message", me->get_cname());
72 pending_recvs.erase(pending_recvs.begin() + idx);
73 if (next != nullptr) {
74 XBT_DEBUG("Sending (asynchronously) from %s to %s", me->get_cname(), next->get_cname());
75 simgrid::s4u::CommPtr send = next->put_async(received, MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
76 pending_sends.push_back(send);
78 delete static_cast<FilePiece*>(received);
81 received_bytes += PIECE_SIZE;
82 XBT_DEBUG("%u pieces received, %llu bytes received", received_pieces, received_bytes);
83 if (received_pieces >= total_pieces) {
93 simgrid::s4u::Mailbox* first = nullptr;
94 std::vector<simgrid::s4u::Mailbox*> mailboxes;
95 unsigned int piece_count;
99 /* Build the chain if there's at least one peer */
100 if (not mailboxes.empty())
101 first = mailboxes.front();
103 for (unsigned i = 0; i < mailboxes.size(); i++) {
104 simgrid::s4u::Mailbox* prev = i > 0 ? mailboxes[i - 1] : nullptr;
105 simgrid::s4u::Mailbox* next = i < mailboxes.size() - 1 ? mailboxes[i + 1] : nullptr;
106 XBT_DEBUG("Building chain--broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"",
107 simgrid::s4u::Host::current()->get_cname(), mailboxes[i]->get_cname(),
108 prev ? prev->get_cname() : nullptr, next ? next->get_cname() : nullptr);
109 /* Send message to current peer */
110 mailboxes[i]->put(new ChainMessage(prev, next, piece_count), MESSAGE_BUILD_CHAIN_SIZE);
116 std::vector<simgrid::s4u::CommPtr> pending_sends;
117 for (unsigned int current_piece = 0; current_piece < piece_count; current_piece++) {
118 XBT_DEBUG("Sending (send) piece %u from %s into mailbox %s", current_piece,
119 simgrid::s4u::Host::current()->get_cname(), first->get_cname());
120 simgrid::s4u::CommPtr comm = first->put_async(new FilePiece(), MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
121 pending_sends.push_back(comm);
123 simgrid::s4u::Comm::wait_all(&pending_sends);
126 Broadcaster(int hostcount, unsigned int piece_count) : piece_count(piece_count)
128 for (int i = 1; i <= hostcount; i++) {
129 std::string name = std::string("node-") + std::to_string(i) + ".simgrid.org";
130 XBT_DEBUG("%s", name.c_str());
131 mailboxes.push_back(simgrid::s4u::Mailbox::by_name(name));
135 ~Broadcaster() = default;
142 Peer* p = new Peer();
144 double start_time = simgrid::s4u::Engine::get_clock();
148 simgrid::s4u::Comm::wait_all(&p->pending_sends);
149 double end_time = simgrid::s4u::Engine::get_clock();
151 XBT_INFO("### %f %llu bytes (Avg %f MB/s); copy finished (simulated).", end_time - start_time, p->received_bytes,
152 p->received_bytes / 1024.0 / 1024.0 / (end_time - start_time));
157 static void broadcaster(int hostcount, unsigned int piece_count)
159 XBT_DEBUG("broadcaster");
161 Broadcaster* bc = new Broadcaster(hostcount, piece_count);
168 int main(int argc, char* argv[])
170 simgrid::s4u::Engine e(&argc, argv);
172 e.load_platform(argv[1]);
174 simgrid::s4u::Actor::create("broadcaster", simgrid::s4u::Host::by_name("node-0.simgrid.org"), broadcaster, 8, 256);
176 simgrid::s4u::Actor::create("peer", simgrid::s4u::Host::by_name("node-1.simgrid.org"), peer);
177 simgrid::s4u::Actor::create("peer", simgrid::s4u::Host::by_name("node-2.simgrid.org"), peer);
178 simgrid::s4u::Actor::create("peer", simgrid::s4u::Host::by_name("node-3.simgrid.org"), peer);
179 simgrid::s4u::Actor::create("peer", simgrid::s4u::Host::by_name("node-4.simgrid.org"), peer);
180 simgrid::s4u::Actor::create("peer", simgrid::s4u::Host::by_name("node-5.simgrid.org"), peer);
181 simgrid::s4u::Actor::create("peer", simgrid::s4u::Host::by_name("node-6.simgrid.org"), peer);
182 simgrid::s4u::Actor::create("peer", simgrid::s4u::Host::by_name("node-7.simgrid.org"), peer);
183 simgrid::s4u::Actor::create("peer", simgrid::s4u::Host::by_name("node-8.simgrid.org"), peer);
186 XBT_INFO("Total simulation time: %e", simgrid::s4u::Engine::get_clock());