Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
dependencies support for Comm (needs testing)
authorFrederic Suter <frederic.suter@cc.in2p3.fr>
Thu, 30 Jan 2020 10:53:13 +0000 (11:53 +0100)
committerFrederic Suter <frederic.suter@cc.in2p3.fr>
Thu, 30 Jan 2020 11:03:27 +0000 (12:03 +0100)
examples/s4u/async-ready/s4u-async-ready.tesh
include/simgrid/s4u/Activity.hpp
src/s4u/s4u_Comm.cpp
src/s4u/s4u_Mailbox.cpp

index d5d2591..9b6d9a8 100644 (file)
@@ -86,6 +86,6 @@ $ ${bindir:=.}/s4u-async-ready ${platfdir}/small_platform_fatpipe.xml s4u-async-
 > [  0.110000] (3:peer@Perl) I got a 'Message 5 from peer 1'.
 > [  0.110000] (3:peer@Perl) I got a 'finalize'.
 > [  0.110000] (3:peer@Perl) I'm done, just waiting for my peers to receive the messages before exiting
-> [  0.110000] (3:peer@Perl) Goodbye now!
 > [  0.110000] (1:peer@Tremblay) Goodbye now!
 > [  0.110000] (2:peer@Ruby) Goodbye now!
+> [  0.110000] (3:peer@Perl) Goodbye now!
index 1c87d16..87fb4f6 100644 (file)
@@ -105,7 +105,6 @@ public:
   {
     state_ = State::STARTING;
     if (not has_dependencies()) {
-      state_ = State::STARTED;
       XBT_CDEBUG(s4u_activity, "All dependencies are solved, let's start '%s'", get_cname());
       start();
     }
index b9a8811..8ccb862 100644 (file)
@@ -36,7 +36,10 @@ int Comm::wait_any_for(const std::vector<CommPtr>* comms, double timeout)
   std::unique_ptr<kernel::activity::CommImpl* []> rcomms(new kernel::activity::CommImpl*[comms->size()]);
   std::transform(begin(*comms), end(*comms), rcomms.get(),
                  [](const CommPtr& comm) { return static_cast<kernel::activity::CommImpl*>(comm->pimpl_.get()); });
-  return simcall_comm_waitany(rcomms.get(), comms->size(), timeout);
+  int changed_pos = simcall_comm_waitany(rcomms.get(), comms->size(), timeout);
+  if (changed_pos != -1)
+    comms->at(changed_pos)->release_dependencies();
+  return changed_pos;
 }
 
 void Comm::wait_all(const std::vector<CommPtr>* comms)
@@ -82,6 +85,7 @@ CommPtr Comm::set_src_data(void* buff, size_t size)
   src_buff_size_ = size;
   return this;
 }
+
 CommPtr Comm::set_dst_data(void** buff)
 {
   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
@@ -116,8 +120,8 @@ CommPtr Comm::set_tracing_category(const std::string& category)
 
 Comm* Comm::start()
 {
-  xbt_assert(get_state() == State::INITED, "You cannot use %s() once your communication started (not implemented)",
-             __FUNCTION__);
+  xbt_assert(get_state() == State::INITED || get_state() == State::STARTING,
+             "You cannot use %s() once your communication started (not implemented)", __FUNCTION__);
 
   if (src_buff_ != nullptr) { // Sender side
     on_sender_start(*Actor::self());
@@ -154,7 +158,8 @@ Comm* Comm::wait_for(double timeout)
     case State::FINISHED:
       break;
 
-    case State::INITED: // It's not started yet. Do it in one simcall
+    case State::INITED:
+    case State::STARTING: // It's not started yet. Do it in one simcall
       if (src_buff_ != nullptr) {
         on_sender_start(*Actor::self());
         simcall_comm_send(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
@@ -166,12 +171,14 @@ Comm* Comm::wait_for(double timeout)
                           get_user_data(), timeout, rate_);
       }
       state_ = State::FINISHED;
+      this->release_dependencies();
       break;
 
     case State::STARTED:
       simcall_comm_wait(pimpl_, timeout);
       on_completion(*Actor::self());
       state_ = State::FINISHED;
+      this->release_dependencies();
       break;
 
     case State::CANCELED:
@@ -182,12 +189,16 @@ Comm* Comm::wait_for(double timeout)
   }
   return this;
 }
+
 int Comm::test_any(const std::vector<CommPtr>* comms)
 {
   std::unique_ptr<kernel::activity::CommImpl* []> rcomms(new kernel::activity::CommImpl*[comms->size()]);
   std::transform(begin(*comms), end(*comms), rcomms.get(),
                  [](const CommPtr& comm) { return static_cast<kernel::activity::CommImpl*>(comm->pimpl_.get()); });
-  return simcall_comm_testany(rcomms.get(), comms->size());
+  int changed_pos = simcall_comm_testany(rcomms.get(), comms->size());
+  if (changed_pos != -1)
+    comms->at(changed_pos)->release_dependencies();
+  return changed_pos;
 }
 
 Comm* Comm::detach()
@@ -196,7 +207,8 @@ Comm* Comm::detach()
              __FUNCTION__);
   xbt_assert(src_buff_ != nullptr && src_buff_size_ != 0, "You can only detach sends, not recvs");
   detached_ = true;
-  return start();
+  vetoable_start();
+  return this;
 }
 
 Comm* Comm::cancel()
@@ -216,11 +228,12 @@ bool Comm::test()
   if (state_ == State::FINISHED)
     return true;
 
-  if (state_ == State::INITED)
-    this->start();
+  if (state_ == State::INITED || state_ == State::STARTING)
+    this->vetoable_start();
 
   if (simcall_comm_test(pimpl_)) {
     state_ = State::FINISHED;
+    this->release_dependencies();
     return true;
   }
   return false;
index 3fd04ac..810fd7f 100644 (file)
@@ -94,7 +94,7 @@ s4u::CommPtr Mailbox::put_async(void* payload, uint64_t simulated_size_in_bytes)
   xbt_assert(payload != nullptr, "You cannot send nullptr");
 
   s4u::CommPtr res = put_init(payload, simulated_size_in_bytes);
-  res->start();
+  res->vetoable_start();
   return res;
 }
 void Mailbox::put(void* payload, uint64_t simulated_size_in_bytes)
@@ -104,6 +104,7 @@ void Mailbox::put(void* payload, uint64_t simulated_size_in_bytes)
   CommPtr c = put_init();
   c->set_remaining(simulated_size_in_bytes);
   c->set_src_data(payload);
+  c->vetoable_start();
   c->wait();
 }
 /** Blocking send with timeout */
@@ -114,7 +115,7 @@ void Mailbox::put(void* payload, uint64_t simulated_size_in_bytes, double timeou
   CommPtr c = put_init();
   c->set_remaining(simulated_size_in_bytes);
   c->set_src_data(payload);
-  // c->start() is optional.
+  c->vetoable_start();
   c->wait_for(timeout);
 }
 
@@ -129,7 +130,7 @@ s4u::CommPtr Mailbox::get_async(void** data)
 {
   s4u::CommPtr res = get_init();
   res->set_dst_data(data, sizeof(*data));
-  res->start();
+  res->vetoable_start();
   return res;
 }
 
@@ -138,6 +139,7 @@ void* Mailbox::get()
   void* res = nullptr;
   CommPtr c = get_init();
   c->set_dst_data(&res, sizeof(res));
+  c->vetoable_start();
   c->wait();
   return res;
 }
@@ -146,6 +148,7 @@ void* Mailbox::get(double timeout)
   void* res = nullptr;
   CommPtr c = get_init();
   c->set_dst_data(&res, sizeof(res));
+  c->vetoable_start();
   c->wait_for(timeout);
   return res;
 }