1 /* Copyright (c) 2007-2010, 2012-2015, 2017. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "simgrid/s4u.hpp"
10 #define PIECE_SIZE 65536
11 #define MESSAGE_BUILD_CHAIN_SIZE 40
12 #define MESSAGE_SEND_DATA_HEADER_SIZE 1
14 XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_chainsend, "Messages specific for chainsend");
18 simgrid::s4u::MailboxPtr prev_ = nullptr;
19 simgrid::s4u::MailboxPtr next_ = nullptr;
20 unsigned int num_pieces = 0;
21 explicit ChainMessage(simgrid::s4u::MailboxPtr prev, simgrid::s4u::MailboxPtr next, const unsigned int num_pieces)
22 : prev_(prev), next_(next), num_pieces(num_pieces)
25 ~ChainMessage() = default;
30 FilePiece() = default;
31 ~FilePiece() = default;
36 simgrid::s4u::MailboxPtr prev = nullptr;
37 simgrid::s4u::MailboxPtr next = nullptr;
38 simgrid::s4u::MailboxPtr me = nullptr;
39 std::vector<simgrid::s4u::CommPtr> pending_recvs;
40 std::vector<simgrid::s4u::CommPtr> pending_sends;
42 unsigned long long received_bytes = 0;
43 unsigned int received_pieces = 0;
44 unsigned int total_pieces = 0;
46 Peer() { me = simgrid::s4u::Mailbox::byName(simgrid::s4u::Host::current()->getCname()); }
51 ChainMessage* msg = static_cast<ChainMessage*>(me->get());
54 total_pieces = msg->num_pieces;
55 XBT_DEBUG("Peer %s got a 'BUILD_CHAIN' message (prev: %s / next: %s)", me->getCname(),
56 prev ? prev->getCname() : nullptr, next ? next->getCname() : nullptr);
66 simgrid::s4u::CommPtr comm = me->get_async(&received);
67 pending_recvs.push_back(comm);
69 int idx = simgrid::s4u::Comm::wait_any(&pending_recvs);
71 comm = pending_recvs.at(idx);
72 XBT_DEBUG("Peer %s got a 'SEND_DATA' message", me->getCname());
73 pending_recvs.erase(pending_recvs.begin() + idx);
74 if (next != nullptr) {
75 XBT_DEBUG("Sending (asynchronously) from %s to %s", me->getCname(), next->getCname());
76 simgrid::s4u::CommPtr send = next->put_async(received, MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
77 pending_sends.push_back(send);
79 delete static_cast<FilePiece*>(received);
82 received_bytes += PIECE_SIZE;
83 XBT_DEBUG("%u pieces received, %llu bytes received", received_pieces, received_bytes);
84 if (received_pieces >= total_pieces) {
94 simgrid::s4u::MailboxPtr first = nullptr;
95 std::vector<simgrid::s4u::MailboxPtr> mailboxes;
96 unsigned int piece_count;
100 auto cur = mailboxes.begin();
101 simgrid::s4u::MailboxPtr prev = nullptr;
102 simgrid::s4u::MailboxPtr last = nullptr;
104 /* Build the chain if there's at least one peer */
105 if (cur != mailboxes.end()) {
106 /* init: prev=NULL, host=current cur, next=next cur */
107 simgrid::s4u::MailboxPtr next = *cur;
110 /* This iterator iterates one step ahead: cur is current iterated element, but is actually next in the chain */
112 /* following steps: prev=last, host=next, next=cur */
115 simgrid::s4u::MailboxPtr current_mailbox = next;
116 if (cur != mailboxes.end())
121 XBT_DEBUG("Building chain--broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"",
122 simgrid::s4u::Host::current()->getCname(), current_mailbox->getCname(),
123 prev ? prev->getCname() : nullptr, next ? next->getCname() : nullptr);
125 /* Send message to current peer */
126 current_mailbox->put(new ChainMessage(prev, next, piece_count), MESSAGE_BUILD_CHAIN_SIZE);
128 last = current_mailbox;
129 } while (cur != mailboxes.end());
135 std::vector<simgrid::s4u::CommPtr> pending_sends;
136 for (unsigned int current_piece = 0; current_piece < piece_count; current_piece++) {
137 XBT_DEBUG("Sending (send) piece %u from %s into mailbox %s", current_piece,
138 simgrid::s4u::Host::current()->getCname(), first->getCname());
139 simgrid::s4u::CommPtr comm = first->put_async(new FilePiece(), MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
140 pending_sends.push_back(comm);
142 simgrid::s4u::Comm::wait_all(&pending_sends);
145 Broadcaster(int hostcount, unsigned int piece_count) : piece_count(piece_count)
147 for (int i = 1; i <= hostcount; i++) {
148 std::string name = std::string("node-") + std::to_string(i) + ".acme.org";
149 XBT_DEBUG("%s", name.c_str());
150 mailboxes.push_back(simgrid::s4u::Mailbox::byName(name));
154 ~Broadcaster() = default;
161 Peer* p = new Peer();
163 double start_time = simgrid::s4u::Engine::getClock();
167 simgrid::s4u::Comm::wait_all(&p->pending_sends);
168 double end_time = simgrid::s4u::Engine::getClock();
170 XBT_INFO("### %f %llu bytes (Avg %f MB/s); copy finished (simulated).", end_time - start_time, p->received_bytes,
171 p->received_bytes / 1024.0 / 1024.0 / (end_time - start_time));
176 static void broadcaster(int hostcount, unsigned int piece_count)
178 XBT_DEBUG("broadcaster");
180 Broadcaster* bc = new Broadcaster(hostcount, piece_count);
187 int main(int argc, char* argv[])
189 simgrid::s4u::Engine e(&argc, argv);
191 e.loadPlatform(argv[1]);
193 simgrid::s4u::Actor::createActor("broadcaster", simgrid::s4u::Host::by_name("node-0.acme.org"), broadcaster, 8, 256);
195 simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-1.acme.org"), peer);
196 simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-2.acme.org"), peer);
197 simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-3.acme.org"), peer);
198 simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-4.acme.org"), peer);
199 simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-5.acme.org"), peer);
200 simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-6.acme.org"), peer);
201 simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-7.acme.org"), peer);
202 simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-8.acme.org"), peer);
205 XBT_INFO("Total simulation time: %e", simgrid::s4u::Engine::getClock());