Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
simpler S4u version of chainsend
authorFrederic Suter <frederic.suter@cc.in2p3.fr>
Thu, 14 Dec 2017 16:27:07 +0000 (17:27 +0100)
committerFrederic Suter <frederic.suter@cc.in2p3.fr>
Thu, 14 Dec 2017 16:27:07 +0000 (17:27 +0100)
examples/s4u/CMakeLists.txt
examples/s4u/app-chainsend/s4u-app-chainsend.cpp [new file with mode: 0644]
examples/s4u/app-chainsend/s4u-app-chainsend.tesh [new file with mode: 0644]

index 4e1b704..21024ed 100644 (file)
@@ -1,5 +1,5 @@
 foreach (example actor-create actor-daemon actor-join actor-kill actor-lifetime actor-migration actor-suspend actor-yield
 foreach (example actor-create actor-daemon actor-join actor-kill actor-lifetime actor-migration actor-suspend actor-yield
-                 app-masterworker app-pingpong app-token-ring
+                 app-chainsend app-masterworker app-pingpong app-token-ring
                  async-wait async-waitany async-waitall
                  cloud-simple
                  energy-link energy-pstate energy-ptask energy-vm
                  async-wait async-waitany async-waitall
                  cloud-simple
                  energy-link energy-pstate energy-ptask energy-vm
@@ -68,7 +68,7 @@ set(txt_files     ${txt_files}    ${CMAKE_CURRENT_SOURCE_DIR}/replay-comm/s4u-re
                                   ${CMAKE_CURRENT_SOURCE_DIR}/README.doc                                   PARENT_SCOPE)
 
 foreach(example actor-create actor-daemon actor-join actor-kill actor-lifetime actor-migration actor-suspend actor-yield
                                   ${CMAKE_CURRENT_SOURCE_DIR}/README.doc                                   PARENT_SCOPE)
 
 foreach(example actor-create actor-daemon actor-join actor-kill actor-lifetime actor-migration actor-suspend actor-yield
-                app-bittorrent app-masterworker app-pingpong app-token-ring 
+                app-bittorrent app-chainsend app-masterworker app-pingpong app-token-ring 
                 async-wait async-waitall async-waitany
                 cloud-simple
                 dht-chord 
                 async-wait async-waitall async-waitany
                 cloud-simple
                 dht-chord 
diff --git a/examples/s4u/app-chainsend/s4u-app-chainsend.cpp b/examples/s4u/app-chainsend/s4u-app-chainsend.cpp
new file mode 100644 (file)
index 0000000..1cf5920
--- /dev/null
@@ -0,0 +1,208 @@
+/* Copyright (c) 2007-2010, 2012-2015, 2017. The SimGrid Team.
+ * All rights reserved.                                                     */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/s4u.hpp"
+#include <vector>
+
+#define PIECE_SIZE 65536
+#define MESSAGE_BUILD_CHAIN_SIZE 40
+#define MESSAGE_SEND_DATA_HEADER_SIZE 1
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_chainsend, "Messages specific for chainsend");
+
+class ChainMessage {
+public:
+  simgrid::s4u::MailboxPtr prev_ = nullptr;
+  simgrid::s4u::MailboxPtr next_ = nullptr;
+  unsigned int num_pieces        = 0;
+  explicit ChainMessage(simgrid::s4u::MailboxPtr prev, simgrid::s4u::MailboxPtr next, const unsigned int num_pieces)
+      : prev_(prev), next_(next), num_pieces(num_pieces)
+  {
+  }
+  ~ChainMessage() = default;
+};
+
+class FilePiece {
+public:
+  FilePiece()  = default;
+  ~FilePiece() = default;
+};
+
+class Peer {
+public:
+  simgrid::s4u::MailboxPtr prev = nullptr;
+  simgrid::s4u::MailboxPtr next = nullptr;
+  simgrid::s4u::MailboxPtr me   = nullptr;
+  std::vector<simgrid::s4u::CommPtr> pending_recvs;
+  std::vector<simgrid::s4u::CommPtr> pending_sends;
+
+  unsigned long long received_bytes = 0;
+  unsigned int received_pieces      = 0;
+  unsigned int total_pieces         = 0;
+
+  Peer() { me = simgrid::s4u::Mailbox::byName(simgrid::s4u::Host::current()->getCname()); }
+  ~Peer()     = default;
+
+  void joinChain()
+  {
+    ChainMessage* msg = static_cast<ChainMessage*>(me->get());
+    prev              = msg->prev_;
+    next              = msg->next_;
+    total_pieces      = msg->num_pieces;
+    XBT_DEBUG("Peer %s got a 'BUILD_CHAIN' message (prev: %s / next: %s)", me->getCname(),
+              prev ? prev->getCname() : nullptr, next ? next->getCname() : nullptr);
+    delete msg;
+  }
+
+  void forwardFile()
+  {
+    void* received;
+    bool done = false;
+
+    while (not done) {
+      simgrid::s4u::CommPtr comm = me->get_async(&received);
+      pending_recvs.push_back(comm);
+
+      int idx = simgrid::s4u::Comm::wait_any(&pending_recvs);
+      if (idx != -1) {
+        comm = pending_recvs.at(idx);
+        XBT_DEBUG("Peer %s got a 'SEND_DATA' message", me->getCname());
+        pending_recvs.erase(pending_recvs.begin() + idx);
+        if (next != nullptr) {
+          XBT_DEBUG("Sending (asynchronously) from %s to %s", me->getCname(), next->getCname());
+          simgrid::s4u::CommPtr send = next->put_async(received, MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
+          pending_sends.push_back(send);
+        } else
+          delete static_cast<FilePiece*>(received);
+
+        received_pieces++;
+        received_bytes += PIECE_SIZE;
+        XBT_DEBUG("%u pieces received, %llu bytes received", received_pieces, received_bytes);
+        if (received_pieces >= total_pieces) {
+          done = true;
+        }
+      }
+    }
+  }
+};
+
+class Broadcaster {
+public:
+  simgrid::s4u::MailboxPtr first = nullptr;
+  std::vector<simgrid::s4u::MailboxPtr> mailboxes;
+  unsigned int piece_count;
+
+  void buildChain()
+  {
+    auto cur                      = mailboxes.begin();
+    simgrid::s4u::MailboxPtr prev = nullptr;
+    simgrid::s4u::MailboxPtr last = nullptr;
+
+    /* Build the chain if there's at least one peer */
+    if (cur != mailboxes.end()) {
+      /* init: prev=NULL, host=current cur, next=next cur */
+      simgrid::s4u::MailboxPtr next = *cur;
+      first                         = next;
+
+      /* This iterator iterates one step ahead: cur is current iterated element, but is actually next in the chain */
+      do {
+        /* following steps: prev=last, host=next, next=cur */
+        ++cur;
+        prev                                     = last;
+        simgrid::s4u::MailboxPtr current_mailbox = next;
+        if (cur != mailboxes.end())
+          next = *cur;
+        else
+          next = nullptr;
+
+        XBT_DEBUG("Building chain--broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"",
+                  simgrid::s4u::Host::current()->getCname(), current_mailbox->getCname(),
+                  prev ? prev->getCname() : nullptr, next ? next->getCname() : nullptr);
+
+        /* Send message to current peer */
+        current_mailbox->put(new ChainMessage(prev, next, piece_count), MESSAGE_BUILD_CHAIN_SIZE);
+
+        last = current_mailbox;
+      } while (cur != mailboxes.end());
+    }
+  }
+
+  void sendFile()
+  {
+    std::vector<simgrid::s4u::CommPtr> pending_sends;
+    for (unsigned int current_piece = 0; current_piece < piece_count; current_piece++) {
+      XBT_DEBUG("Sending (send) piece %u from %s into mailbox %s", current_piece,
+                simgrid::s4u::Host::current()->getCname(), first->getCname());
+      simgrid::s4u::CommPtr comm = first->put_async(new FilePiece(), MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
+      pending_sends.push_back(comm);
+    }
+    simgrid::s4u::Comm::wait_all(&pending_sends);
+  }
+
+  Broadcaster(int hostcount, unsigned int piece_count) : piece_count(piece_count)
+  {
+    for (int i = 1; i <= hostcount; i++) {
+      std::string name = std::string("node-") + std::to_string(i) + ".acme.org";
+      XBT_DEBUG("%s", name.c_str());
+      mailboxes.push_back(simgrid::s4u::Mailbox::byName(name));
+    }
+  }
+
+  ~Broadcaster() = default;
+};
+
+static void peer()
+{
+  XBT_DEBUG("peer");
+
+  Peer* p = new Peer();
+
+  double start_time = simgrid::s4u::Engine::getClock();
+  p->joinChain();
+  p->forwardFile();
+
+  simgrid::s4u::Comm::wait_all(&p->pending_sends);
+  double end_time = simgrid::s4u::Engine::getClock();
+
+  XBT_INFO("### %f %llu bytes (Avg %f MB/s); copy finished (simulated).", end_time - start_time, p->received_bytes,
+           p->received_bytes / 1024.0 / 1024.0 / (end_time - start_time));
+
+  delete p;
+}
+
+static void broadcaster(int hostcount, unsigned int piece_count)
+{
+  XBT_DEBUG("broadcaster");
+
+  Broadcaster* bc = new Broadcaster(hostcount, piece_count);
+  bc->buildChain();
+  bc->sendFile();
+
+  delete bc;
+}
+
+int main(int argc, char* argv[])
+{
+  simgrid::s4u::Engine e(&argc, argv);
+
+  e.loadPlatform(argv[1]);
+
+  simgrid::s4u::Actor::createActor("broadcaster", simgrid::s4u::Host::by_name("node-0.acme.org"), broadcaster, 8, 256);
+
+  simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-1.acme.org"), peer);
+  simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-2.acme.org"), peer);
+  simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-3.acme.org"), peer);
+  simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-4.acme.org"), peer);
+  simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-5.acme.org"), peer);
+  simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-6.acme.org"), peer);
+  simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-7.acme.org"), peer);
+  simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-8.acme.org"), peer);
+
+  e.run();
+  XBT_INFO("Total simulation time: %e", simgrid::s4u::Engine::getClock());
+
+  return 0;
+}
diff --git a/examples/s4u/app-chainsend/s4u-app-chainsend.tesh b/examples/s4u/app-chainsend/s4u-app-chainsend.tesh
new file mode 100644 (file)
index 0000000..86fc41c
--- /dev/null
@@ -0,0 +1,16 @@
+#! ./tesh
+
+p Testing the chainsend S4U implementation
+
+! timeout 60
+! output sort 19
+$ $SG_TEST_EXENV ${bindir:=.}/s4u-app-chainsend ${platfdir}/cluster.xml "--log=root.fmt:[%12.6r]%e(%i:%P@%h)%e%m%n"
+> [    2.214423] (2:peer@node-1.acme.org) ### 2.214423 16777216 bytes (Avg 7.225360 MB/s); copy finished (simulated).
+> [    2.222796] (3:peer@node-2.acme.org) ### 2.222796 16777216 bytes (Avg 7.198141 MB/s); copy finished (simulated).
+> [    2.231170] (4:peer@node-3.acme.org) ### 2.231170 16777216 bytes (Avg 7.171127 MB/s); copy finished (simulated).
+> [    2.239543] (5:peer@node-4.acme.org) ### 2.239543 16777216 bytes (Avg 7.144314 MB/s); copy finished (simulated).
+> [    2.247917] (6:peer@node-5.acme.org) ### 2.247917 16777216 bytes (Avg 7.117701 MB/s); copy finished (simulated).
+> [    2.256290] (7:peer@node-6.acme.org) ### 2.256290 16777216 bytes (Avg 7.091286 MB/s); copy finished (simulated).
+> [    2.264637] (0:maestro@) Total simulation time: 2.264637e+00
+> [    2.264637] (8:peer@node-7.acme.org) ### 2.264637 16777216 bytes (Avg 7.065151 MB/s); copy finished (simulated).
+> [    2.264637] (9:peer@node-8.acme.org) ### 2.264637 16777216 bytes (Avg 7.065151 MB/s); copy finished (simulated).