Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
MailboxPtr looks like a smart pointer, but it's not. Kill it.
[simgrid.git] / examples / s4u / app-chainsend / s4u-app-chainsend.cpp
1 /* Copyright (c) 2007-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "simgrid/s4u.hpp"
7 #include <vector>
8
9 constexpr unsigned PIECE_SIZE                    = 65536;
10 constexpr unsigned MESSAGE_BUILD_CHAIN_SIZE      = 40;
11 constexpr unsigned MESSAGE_SEND_DATA_HEADER_SIZE = 1;
12
13 XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_chainsend, "Messages specific for chainsend");
14
15 class ChainMessage {
16 public:
17   simgrid::s4u::Mailbox* prev_   = nullptr;
18   simgrid::s4u::Mailbox* next_   = nullptr;
19   unsigned int num_pieces        = 0;
20   explicit ChainMessage(simgrid::s4u::Mailbox* prev, simgrid::s4u::Mailbox* next, const unsigned int num_pieces)
21       : prev_(prev), next_(next), num_pieces(num_pieces)
22   {
23   }
24   ~ChainMessage() = default;
25 };
26
27 class FilePiece {
28 public:
29   FilePiece()  = default;
30   ~FilePiece() = default;
31 };
32
33 class Peer {
34 public:
35   simgrid::s4u::Mailbox* prev = nullptr;
36   simgrid::s4u::Mailbox* next = nullptr;
37   simgrid::s4u::Mailbox* me   = nullptr;
38   std::vector<simgrid::s4u::CommPtr> pending_recvs;
39   std::vector<simgrid::s4u::CommPtr> pending_sends;
40
41   unsigned long long received_bytes = 0;
42   unsigned int received_pieces      = 0;
43   unsigned int total_pieces         = 0;
44
45   Peer() { me = simgrid::s4u::Mailbox::by_name(simgrid::s4u::Host::current()->get_cname()); }
46   ~Peer()     = default;
47
48   void joinChain()
49   {
50     ChainMessage* msg = static_cast<ChainMessage*>(me->get());
51     prev              = msg->prev_;
52     next              = msg->next_;
53     total_pieces      = msg->num_pieces;
54     XBT_DEBUG("Peer %s got a 'BUILD_CHAIN' message (prev: %s / next: %s)", me->get_cname(),
55               prev ? prev->get_cname() : nullptr, next ? next->get_cname() : nullptr);
56     delete msg;
57   }
58
59   void forwardFile()
60   {
61     void* received;
62     bool done = false;
63
64     while (not done) {
65       simgrid::s4u::CommPtr comm = me->get_async(&received);
66       pending_recvs.push_back(comm);
67
68       int idx = simgrid::s4u::Comm::wait_any(&pending_recvs);
69       if (idx != -1) {
70         comm = pending_recvs.at(idx);
71         XBT_DEBUG("Peer %s got a 'SEND_DATA' message", me->get_cname());
72         pending_recvs.erase(pending_recvs.begin() + idx);
73         if (next != nullptr) {
74           XBT_DEBUG("Sending (asynchronously) from %s to %s", me->get_cname(), next->get_cname());
75           simgrid::s4u::CommPtr send = next->put_async(received, MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
76           pending_sends.push_back(send);
77         } else
78           delete static_cast<FilePiece*>(received);
79
80         received_pieces++;
81         received_bytes += PIECE_SIZE;
82         XBT_DEBUG("%u pieces received, %llu bytes received", received_pieces, received_bytes);
83         if (received_pieces >= total_pieces) {
84           done = true;
85         }
86       }
87     }
88   }
89 };
90
91 class Broadcaster {
92 public:
93   simgrid::s4u::Mailbox* first = nullptr;
94   std::vector<simgrid::s4u::Mailbox*> mailboxes;
95   unsigned int piece_count;
96
97   void buildChain()
98   {
99     auto cur                      = mailboxes.begin();
100     simgrid::s4u::Mailbox* prev   = nullptr;
101     simgrid::s4u::Mailbox* last   = nullptr;
102
103     /* Build the chain if there's at least one peer */
104     if (cur != mailboxes.end()) {
105       /* init: prev=NULL, host=current cur, next=next cur */
106       simgrid::s4u::Mailbox* next   = *cur;
107       first                         = next;
108
109       /* This iterator iterates one step ahead: cur is current iterated element, but is actually next in the chain */
110       do {
111         /* following steps: prev=last, host=next, next=cur */
112         ++cur;
113         prev                                     = last;
114         simgrid::s4u::Mailbox* current_mailbox   = next;
115         if (cur != mailboxes.end())
116           next = *cur;
117         else
118           next = nullptr;
119
120         XBT_DEBUG("Building chain--broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"",
121                   simgrid::s4u::Host::current()->get_cname(), current_mailbox->get_cname(),
122                   prev ? prev->get_cname() : nullptr, next ? next->get_cname() : nullptr);
123
124         /* Send message to current peer */
125         current_mailbox->put(new ChainMessage(prev, next, piece_count), MESSAGE_BUILD_CHAIN_SIZE);
126
127         last = current_mailbox;
128       } while (cur != mailboxes.end());
129     }
130   }
131
132   void sendFile()
133   {
134     std::vector<simgrid::s4u::CommPtr> pending_sends;
135     for (unsigned int current_piece = 0; current_piece < piece_count; current_piece++) {
136       XBT_DEBUG("Sending (send) piece %u from %s into mailbox %s", current_piece,
137                 simgrid::s4u::Host::current()->get_cname(), first->get_cname());
138       simgrid::s4u::CommPtr comm = first->put_async(new FilePiece(), MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
139       pending_sends.push_back(comm);
140     }
141     simgrid::s4u::Comm::wait_all(&pending_sends);
142   }
143
144   Broadcaster(int hostcount, unsigned int piece_count) : piece_count(piece_count)
145   {
146     for (int i = 1; i <= hostcount; i++) {
147       std::string name = std::string("node-") + std::to_string(i) + ".simgrid.org";
148       XBT_DEBUG("%s", name.c_str());
149       mailboxes.push_back(simgrid::s4u::Mailbox::by_name(name));
150     }
151   }
152
153   ~Broadcaster() = default;
154 };
155
156 static void peer()
157 {
158   XBT_DEBUG("peer");
159
160   Peer* p = new Peer();
161
162   double start_time = simgrid::s4u::Engine::get_clock();
163   p->joinChain();
164   p->forwardFile();
165
166   simgrid::s4u::Comm::wait_all(&p->pending_sends);
167   double end_time = simgrid::s4u::Engine::get_clock();
168
169   XBT_INFO("### %f %llu bytes (Avg %f MB/s); copy finished (simulated).", end_time - start_time, p->received_bytes,
170            p->received_bytes / 1024.0 / 1024.0 / (end_time - start_time));
171
172   delete p;
173 }
174
175 static void broadcaster(int hostcount, unsigned int piece_count)
176 {
177   XBT_DEBUG("broadcaster");
178
179   Broadcaster* bc = new Broadcaster(hostcount, piece_count);
180   bc->buildChain();
181   bc->sendFile();
182
183   delete bc;
184 }
185
186 int main(int argc, char* argv[])
187 {
188   simgrid::s4u::Engine e(&argc, argv);
189
190   e.load_platform(argv[1]);
191
192   simgrid::s4u::Actor::create("broadcaster", simgrid::s4u::Host::by_name("node-0.simgrid.org"), broadcaster, 8, 256);
193
194   simgrid::s4u::Actor::create("peer", simgrid::s4u::Host::by_name("node-1.simgrid.org"), peer);
195   simgrid::s4u::Actor::create("peer", simgrid::s4u::Host::by_name("node-2.simgrid.org"), peer);
196   simgrid::s4u::Actor::create("peer", simgrid::s4u::Host::by_name("node-3.simgrid.org"), peer);
197   simgrid::s4u::Actor::create("peer", simgrid::s4u::Host::by_name("node-4.simgrid.org"), peer);
198   simgrid::s4u::Actor::create("peer", simgrid::s4u::Host::by_name("node-5.simgrid.org"), peer);
199   simgrid::s4u::Actor::create("peer", simgrid::s4u::Host::by_name("node-6.simgrid.org"), peer);
200   simgrid::s4u::Actor::create("peer", simgrid::s4u::Host::by_name("node-7.simgrid.org"), peer);
201   simgrid::s4u::Actor::create("peer", simgrid::s4u::Host::by_name("node-8.simgrid.org"), peer);
202
203   e.run();
204   XBT_INFO("Total simulation time: %e", simgrid::s4u::Engine::get_clock());
205
206   return 0;
207 }