Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
1cf5920fc0baa5b3723b61e1c1809ee96f2c7763
[simgrid.git] / examples / s4u / app-chainsend / s4u-app-chainsend.cpp
1 /* Copyright (c) 2007-2010, 2012-2015, 2017. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "simgrid/s4u.hpp"
8 #include <vector>
9
10 #define PIECE_SIZE 65536
11 #define MESSAGE_BUILD_CHAIN_SIZE 40
12 #define MESSAGE_SEND_DATA_HEADER_SIZE 1
13
14 XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_chainsend, "Messages specific for chainsend");
15
16 class ChainMessage {
17 public:
18   simgrid::s4u::MailboxPtr prev_ = nullptr;
19   simgrid::s4u::MailboxPtr next_ = nullptr;
20   unsigned int num_pieces        = 0;
21   explicit ChainMessage(simgrid::s4u::MailboxPtr prev, simgrid::s4u::MailboxPtr next, const unsigned int num_pieces)
22       : prev_(prev), next_(next), num_pieces(num_pieces)
23   {
24   }
25   ~ChainMessage() = default;
26 };
27
28 class FilePiece {
29 public:
30   FilePiece()  = default;
31   ~FilePiece() = default;
32 };
33
34 class Peer {
35 public:
36   simgrid::s4u::MailboxPtr prev = nullptr;
37   simgrid::s4u::MailboxPtr next = nullptr;
38   simgrid::s4u::MailboxPtr me   = nullptr;
39   std::vector<simgrid::s4u::CommPtr> pending_recvs;
40   std::vector<simgrid::s4u::CommPtr> pending_sends;
41
42   unsigned long long received_bytes = 0;
43   unsigned int received_pieces      = 0;
44   unsigned int total_pieces         = 0;
45
46   Peer() { me = simgrid::s4u::Mailbox::byName(simgrid::s4u::Host::current()->getCname()); }
47   ~Peer()     = default;
48
49   void joinChain()
50   {
51     ChainMessage* msg = static_cast<ChainMessage*>(me->get());
52     prev              = msg->prev_;
53     next              = msg->next_;
54     total_pieces      = msg->num_pieces;
55     XBT_DEBUG("Peer %s got a 'BUILD_CHAIN' message (prev: %s / next: %s)", me->getCname(),
56               prev ? prev->getCname() : nullptr, next ? next->getCname() : nullptr);
57     delete msg;
58   }
59
60   void forwardFile()
61   {
62     void* received;
63     bool done = false;
64
65     while (not done) {
66       simgrid::s4u::CommPtr comm = me->get_async(&received);
67       pending_recvs.push_back(comm);
68
69       int idx = simgrid::s4u::Comm::wait_any(&pending_recvs);
70       if (idx != -1) {
71         comm = pending_recvs.at(idx);
72         XBT_DEBUG("Peer %s got a 'SEND_DATA' message", me->getCname());
73         pending_recvs.erase(pending_recvs.begin() + idx);
74         if (next != nullptr) {
75           XBT_DEBUG("Sending (asynchronously) from %s to %s", me->getCname(), next->getCname());
76           simgrid::s4u::CommPtr send = next->put_async(received, MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
77           pending_sends.push_back(send);
78         } else
79           delete static_cast<FilePiece*>(received);
80
81         received_pieces++;
82         received_bytes += PIECE_SIZE;
83         XBT_DEBUG("%u pieces received, %llu bytes received", received_pieces, received_bytes);
84         if (received_pieces >= total_pieces) {
85           done = true;
86         }
87       }
88     }
89   }
90 };
91
92 class Broadcaster {
93 public:
94   simgrid::s4u::MailboxPtr first = nullptr;
95   std::vector<simgrid::s4u::MailboxPtr> mailboxes;
96   unsigned int piece_count;
97
98   void buildChain()
99   {
100     auto cur                      = mailboxes.begin();
101     simgrid::s4u::MailboxPtr prev = nullptr;
102     simgrid::s4u::MailboxPtr last = nullptr;
103
104     /* Build the chain if there's at least one peer */
105     if (cur != mailboxes.end()) {
106       /* init: prev=NULL, host=current cur, next=next cur */
107       simgrid::s4u::MailboxPtr next = *cur;
108       first                         = next;
109
110       /* This iterator iterates one step ahead: cur is current iterated element, but is actually next in the chain */
111       do {
112         /* following steps: prev=last, host=next, next=cur */
113         ++cur;
114         prev                                     = last;
115         simgrid::s4u::MailboxPtr current_mailbox = next;
116         if (cur != mailboxes.end())
117           next = *cur;
118         else
119           next = nullptr;
120
121         XBT_DEBUG("Building chain--broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"",
122                   simgrid::s4u::Host::current()->getCname(), current_mailbox->getCname(),
123                   prev ? prev->getCname() : nullptr, next ? next->getCname() : nullptr);
124
125         /* Send message to current peer */
126         current_mailbox->put(new ChainMessage(prev, next, piece_count), MESSAGE_BUILD_CHAIN_SIZE);
127
128         last = current_mailbox;
129       } while (cur != mailboxes.end());
130     }
131   }
132
133   void sendFile()
134   {
135     std::vector<simgrid::s4u::CommPtr> pending_sends;
136     for (unsigned int current_piece = 0; current_piece < piece_count; current_piece++) {
137       XBT_DEBUG("Sending (send) piece %u from %s into mailbox %s", current_piece,
138                 simgrid::s4u::Host::current()->getCname(), first->getCname());
139       simgrid::s4u::CommPtr comm = first->put_async(new FilePiece(), MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
140       pending_sends.push_back(comm);
141     }
142     simgrid::s4u::Comm::wait_all(&pending_sends);
143   }
144
145   Broadcaster(int hostcount, unsigned int piece_count) : piece_count(piece_count)
146   {
147     for (int i = 1; i <= hostcount; i++) {
148       std::string name = std::string("node-") + std::to_string(i) + ".acme.org";
149       XBT_DEBUG("%s", name.c_str());
150       mailboxes.push_back(simgrid::s4u::Mailbox::byName(name));
151     }
152   }
153
154   ~Broadcaster() = default;
155 };
156
157 static void peer()
158 {
159   XBT_DEBUG("peer");
160
161   Peer* p = new Peer();
162
163   double start_time = simgrid::s4u::Engine::getClock();
164   p->joinChain();
165   p->forwardFile();
166
167   simgrid::s4u::Comm::wait_all(&p->pending_sends);
168   double end_time = simgrid::s4u::Engine::getClock();
169
170   XBT_INFO("### %f %llu bytes (Avg %f MB/s); copy finished (simulated).", end_time - start_time, p->received_bytes,
171            p->received_bytes / 1024.0 / 1024.0 / (end_time - start_time));
172
173   delete p;
174 }
175
176 static void broadcaster(int hostcount, unsigned int piece_count)
177 {
178   XBT_DEBUG("broadcaster");
179
180   Broadcaster* bc = new Broadcaster(hostcount, piece_count);
181   bc->buildChain();
182   bc->sendFile();
183
184   delete bc;
185 }
186
187 int main(int argc, char* argv[])
188 {
189   simgrid::s4u::Engine e(&argc, argv);
190
191   e.loadPlatform(argv[1]);
192
193   simgrid::s4u::Actor::createActor("broadcaster", simgrid::s4u::Host::by_name("node-0.acme.org"), broadcaster, 8, 256);
194
195   simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-1.acme.org"), peer);
196   simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-2.acme.org"), peer);
197   simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-3.acme.org"), peer);
198   simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-4.acme.org"), peer);
199   simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-5.acme.org"), peer);
200   simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-6.acme.org"), peer);
201   simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-7.acme.org"), peer);
202   simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-8.acme.org"), peer);
203
204   e.run();
205   XBT_INFO("Total simulation time: %e", simgrid::s4u::Engine::getClock());
206
207   return 0;
208 }