From 12780ec312d53404edc201ccab2e2bb0286df719 Mon Sep 17 00:00:00 2001 From: Frederic Suter Date: Thu, 14 Dec 2017 17:27:07 +0100 Subject: [PATCH] simpler S4u version of chainsend --- examples/s4u/CMakeLists.txt | 4 +- .../s4u/app-chainsend/s4u-app-chainsend.cpp | 208 ++++++++++++++++++ .../s4u/app-chainsend/s4u-app-chainsend.tesh | 16 ++ 3 files changed, 226 insertions(+), 2 deletions(-) create mode 100644 examples/s4u/app-chainsend/s4u-app-chainsend.cpp create mode 100644 examples/s4u/app-chainsend/s4u-app-chainsend.tesh diff --git a/examples/s4u/CMakeLists.txt b/examples/s4u/CMakeLists.txt index 4e1b704a69..21024edc7a 100644 --- a/examples/s4u/CMakeLists.txt +++ b/examples/s4u/CMakeLists.txt @@ -1,5 +1,5 @@ foreach (example actor-create actor-daemon actor-join actor-kill actor-lifetime actor-migration actor-suspend actor-yield - app-masterworker app-pingpong app-token-ring + app-chainsend app-masterworker app-pingpong app-token-ring async-wait async-waitany async-waitall cloud-simple energy-link energy-pstate energy-ptask energy-vm @@ -68,7 +68,7 @@ set(txt_files ${txt_files} ${CMAKE_CURRENT_SOURCE_DIR}/replay-comm/s4u-re ${CMAKE_CURRENT_SOURCE_DIR}/README.doc PARENT_SCOPE) foreach(example actor-create actor-daemon actor-join actor-kill actor-lifetime actor-migration actor-suspend actor-yield - app-bittorrent app-masterworker app-pingpong app-token-ring + app-bittorrent app-chainsend app-masterworker app-pingpong app-token-ring async-wait async-waitall async-waitany cloud-simple dht-chord diff --git a/examples/s4u/app-chainsend/s4u-app-chainsend.cpp b/examples/s4u/app-chainsend/s4u-app-chainsend.cpp new file mode 100644 index 0000000000..1cf5920fc0 --- /dev/null +++ b/examples/s4u/app-chainsend/s4u-app-chainsend.cpp @@ -0,0 +1,208 @@ +/* Copyright (c) 2007-2010, 2012-2015, 2017. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#include "simgrid/s4u.hpp" +#include + +#define PIECE_SIZE 65536 +#define MESSAGE_BUILD_CHAIN_SIZE 40 +#define MESSAGE_SEND_DATA_HEADER_SIZE 1 + +XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_chainsend, "Messages specific for chainsend"); + +class ChainMessage { +public: + simgrid::s4u::MailboxPtr prev_ = nullptr; + simgrid::s4u::MailboxPtr next_ = nullptr; + unsigned int num_pieces = 0; + explicit ChainMessage(simgrid::s4u::MailboxPtr prev, simgrid::s4u::MailboxPtr next, const unsigned int num_pieces) + : prev_(prev), next_(next), num_pieces(num_pieces) + { + } + ~ChainMessage() = default; +}; + +class FilePiece { +public: + FilePiece() = default; + ~FilePiece() = default; +}; + +class Peer { +public: + simgrid::s4u::MailboxPtr prev = nullptr; + simgrid::s4u::MailboxPtr next = nullptr; + simgrid::s4u::MailboxPtr me = nullptr; + std::vector pending_recvs; + std::vector pending_sends; + + unsigned long long received_bytes = 0; + unsigned int received_pieces = 0; + unsigned int total_pieces = 0; + + Peer() { me = simgrid::s4u::Mailbox::byName(simgrid::s4u::Host::current()->getCname()); } + ~Peer() = default; + + void joinChain() + { + ChainMessage* msg = static_cast(me->get()); + prev = msg->prev_; + next = msg->next_; + total_pieces = msg->num_pieces; + XBT_DEBUG("Peer %s got a 'BUILD_CHAIN' message (prev: %s / next: %s)", me->getCname(), + prev ? prev->getCname() : nullptr, next ? next->getCname() : nullptr); + delete msg; + } + + void forwardFile() + { + void* received; + bool done = false; + + while (not done) { + simgrid::s4u::CommPtr comm = me->get_async(&received); + pending_recvs.push_back(comm); + + int idx = simgrid::s4u::Comm::wait_any(&pending_recvs); + if (idx != -1) { + comm = pending_recvs.at(idx); + XBT_DEBUG("Peer %s got a 'SEND_DATA' message", me->getCname()); + pending_recvs.erase(pending_recvs.begin() + idx); + if (next != nullptr) { + XBT_DEBUG("Sending (asynchronously) from %s to %s", me->getCname(), next->getCname()); + simgrid::s4u::CommPtr send = next->put_async(received, MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE); + pending_sends.push_back(send); + } else + delete static_cast(received); + + received_pieces++; + received_bytes += PIECE_SIZE; + XBT_DEBUG("%u pieces received, %llu bytes received", received_pieces, received_bytes); + if (received_pieces >= total_pieces) { + done = true; + } + } + } + } +}; + +class Broadcaster { +public: + simgrid::s4u::MailboxPtr first = nullptr; + std::vector mailboxes; + unsigned int piece_count; + + void buildChain() + { + auto cur = mailboxes.begin(); + simgrid::s4u::MailboxPtr prev = nullptr; + simgrid::s4u::MailboxPtr last = nullptr; + + /* Build the chain if there's at least one peer */ + if (cur != mailboxes.end()) { + /* init: prev=NULL, host=current cur, next=next cur */ + simgrid::s4u::MailboxPtr next = *cur; + first = next; + + /* This iterator iterates one step ahead: cur is current iterated element, but is actually next in the chain */ + do { + /* following steps: prev=last, host=next, next=cur */ + ++cur; + prev = last; + simgrid::s4u::MailboxPtr current_mailbox = next; + if (cur != mailboxes.end()) + next = *cur; + else + next = nullptr; + + XBT_DEBUG("Building chain--broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", + simgrid::s4u::Host::current()->getCname(), current_mailbox->getCname(), + prev ? prev->getCname() : nullptr, next ? next->getCname() : nullptr); + + /* Send message to current peer */ + current_mailbox->put(new ChainMessage(prev, next, piece_count), MESSAGE_BUILD_CHAIN_SIZE); + + last = current_mailbox; + } while (cur != mailboxes.end()); + } + } + + void sendFile() + { + std::vector pending_sends; + for (unsigned int current_piece = 0; current_piece < piece_count; current_piece++) { + XBT_DEBUG("Sending (send) piece %u from %s into mailbox %s", current_piece, + simgrid::s4u::Host::current()->getCname(), first->getCname()); + simgrid::s4u::CommPtr comm = first->put_async(new FilePiece(), MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE); + pending_sends.push_back(comm); + } + simgrid::s4u::Comm::wait_all(&pending_sends); + } + + Broadcaster(int hostcount, unsigned int piece_count) : piece_count(piece_count) + { + for (int i = 1; i <= hostcount; i++) { + std::string name = std::string("node-") + std::to_string(i) + ".acme.org"; + XBT_DEBUG("%s", name.c_str()); + mailboxes.push_back(simgrid::s4u::Mailbox::byName(name)); + } + } + + ~Broadcaster() = default; +}; + +static void peer() +{ + XBT_DEBUG("peer"); + + Peer* p = new Peer(); + + double start_time = simgrid::s4u::Engine::getClock(); + p->joinChain(); + p->forwardFile(); + + simgrid::s4u::Comm::wait_all(&p->pending_sends); + double end_time = simgrid::s4u::Engine::getClock(); + + XBT_INFO("### %f %llu bytes (Avg %f MB/s); copy finished (simulated).", end_time - start_time, p->received_bytes, + p->received_bytes / 1024.0 / 1024.0 / (end_time - start_time)); + + delete p; +} + +static void broadcaster(int hostcount, unsigned int piece_count) +{ + XBT_DEBUG("broadcaster"); + + Broadcaster* bc = new Broadcaster(hostcount, piece_count); + bc->buildChain(); + bc->sendFile(); + + delete bc; +} + +int main(int argc, char* argv[]) +{ + simgrid::s4u::Engine e(&argc, argv); + + e.loadPlatform(argv[1]); + + simgrid::s4u::Actor::createActor("broadcaster", simgrid::s4u::Host::by_name("node-0.acme.org"), broadcaster, 8, 256); + + simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-1.acme.org"), peer); + simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-2.acme.org"), peer); + simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-3.acme.org"), peer); + simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-4.acme.org"), peer); + simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-5.acme.org"), peer); + simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-6.acme.org"), peer); + simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-7.acme.org"), peer); + simgrid::s4u::Actor::createActor("peer", simgrid::s4u::Host::by_name("node-8.acme.org"), peer); + + e.run(); + XBT_INFO("Total simulation time: %e", simgrid::s4u::Engine::getClock()); + + return 0; +} diff --git a/examples/s4u/app-chainsend/s4u-app-chainsend.tesh b/examples/s4u/app-chainsend/s4u-app-chainsend.tesh new file mode 100644 index 0000000000..86fc41c8b4 --- /dev/null +++ b/examples/s4u/app-chainsend/s4u-app-chainsend.tesh @@ -0,0 +1,16 @@ +#! ./tesh + +p Testing the chainsend S4U implementation + +! timeout 60 +! output sort 19 +$ $SG_TEST_EXENV ${bindir:=.}/s4u-app-chainsend ${platfdir}/cluster.xml "--log=root.fmt:[%12.6r]%e(%i:%P@%h)%e%m%n" +> [ 2.214423] (2:peer@node-1.acme.org) ### 2.214423 16777216 bytes (Avg 7.225360 MB/s); copy finished (simulated). +> [ 2.222796] (3:peer@node-2.acme.org) ### 2.222796 16777216 bytes (Avg 7.198141 MB/s); copy finished (simulated). +> [ 2.231170] (4:peer@node-3.acme.org) ### 2.231170 16777216 bytes (Avg 7.171127 MB/s); copy finished (simulated). +> [ 2.239543] (5:peer@node-4.acme.org) ### 2.239543 16777216 bytes (Avg 7.144314 MB/s); copy finished (simulated). +> [ 2.247917] (6:peer@node-5.acme.org) ### 2.247917 16777216 bytes (Avg 7.117701 MB/s); copy finished (simulated). +> [ 2.256290] (7:peer@node-6.acme.org) ### 2.256290 16777216 bytes (Avg 7.091286 MB/s); copy finished (simulated). +> [ 2.264637] (0:maestro@) Total simulation time: 2.264637e+00 +> [ 2.264637] (8:peer@node-7.acme.org) ### 2.264637 16777216 bytes (Avg 7.065151 MB/s); copy finished (simulated). +> [ 2.264637] (9:peer@node-8.acme.org) ### 2.264637 16777216 bytes (Avg 7.065151 MB/s); copy finished (simulated). -- 2.20.1