Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
enable wait_any and wait_any_for for asynchronous executions
authorFrederic Suter <frederic.suter@cc.in2p3.fr>
Fri, 17 May 2019 07:16:21 +0000 (09:16 +0200)
committerFrederic Suter <frederic.suter@cc.in2p3.fr>
Fri, 17 May 2019 07:16:21 +0000 (09:16 +0200)
(No MC or failure support yet)

13 files changed:
examples/s4u/CMakeLists.txt
examples/s4u/exec-waitany/s4u-exec-waitany.cpp [new file with mode: 0644]
examples/s4u/exec-waitany/s4u-exec-waitany.tesh [new file with mode: 0644]
include/simgrid/s4u/Exec.hpp
include/simgrid/simix.h
src/kernel/activity/ExecImpl.cpp
src/s4u/s4u_Exec.cpp
src/simix/libsmx.cpp
src/simix/popping_accessors.hpp
src/simix/popping_bodies.cpp
src/simix/popping_enum.h
src/simix/popping_generated.cpp
src/simix/simcalls.in

index 18b19d1..e387d8c 100644 (file)
@@ -9,7 +9,7 @@ foreach (example actor-create actor-daemon actor-exiting actor-join actor-kill
                  cloud-capping cloud-migration cloud-simple
                  energy-exec energy-boot energy-link energy-vm
                  engine-filtering
-                 exec-async exec-basic exec-dvfs exec-ptask exec-remote
+                 exec-async exec-basic exec-dvfs exec-ptask exec-remote exec-waitany
                  io-async io-file-system io-file-remote io-storage-raw
                  platform-failures platform-profile platform-properties
                  plugin-hostload
diff --git a/examples/s4u/exec-waitany/s4u-exec-waitany.cpp b/examples/s4u/exec-waitany/s4u-exec-waitany.cpp
new file mode 100644 (file)
index 0000000..6597dc6
--- /dev/null
@@ -0,0 +1,62 @@
+/* Copyright (c) 2019. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/s4u.hpp"
+#include <cstdlib>
+#include <iostream>
+#include <string>
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_exec_waitany, "Messages specific for this s4u example");
+
+static void worker(bool with_timeout)
+{
+  /* Vector in which we store all pending executions*/
+  std::vector<simgrid::s4u::ExecPtr> pending_executions;
+
+  for (int i = 0; i < 3; i++) {
+    std::string name = std::string("Exec-") + std::to_string(i);
+    double amount    = (6 * (i % 2) + i + 1) * simgrid::s4u::this_actor::get_host()->get_speed();
+
+    simgrid::s4u::ExecPtr exec = simgrid::s4u::this_actor::exec_init(amount)->set_name(name);
+    pending_executions.push_back(exec);
+    exec->start();
+
+    XBT_INFO("Activity %s has started for %.0f seconds", name.c_str(),
+             amount / simgrid::s4u::this_actor::get_host()->get_speed());
+  }
+
+  /* Now that executions were initiated, wait for their completion, in order of termination.
+   *
+   * This loop waits for first terminating execution with wait_any() and remove it with erase(), until all execs are
+   * terminated.
+   */
+  while (not pending_executions.empty()) {
+    int pos;
+    if (with_timeout)
+      pos = simgrid::s4u::Exec::wait_any_for(&pending_executions, 4);
+    else
+      pos = simgrid::s4u::Exec::wait_any(&pending_executions);
+
+    if (pos < 0) {
+      XBT_INFO("Do not wait any longer for an activity");
+      pending_executions.clear();
+    } else {
+      XBT_INFO("Activity '%s' (at position %d) is complete", pending_executions[pos]->get_cname(), pos);
+      pending_executions.erase(pending_executions.begin() + pos);
+    }
+    XBT_INFO("%lu activities remain pending", pending_executions.size());
+  }
+}
+
+int main(int argc, char* argv[])
+{
+  simgrid::s4u::Engine e(&argc, argv);
+  e.load_platform(argv[1]);
+  simgrid::s4u::Actor::create("worker", simgrid::s4u::Host::by_name("Tremblay"), worker, false);
+  simgrid::s4u::Actor::create("worker_timeout", simgrid::s4u::Host::by_name("Tremblay"), worker, true);
+  e.run();
+
+  return 0;
+}
diff --git a/examples/s4u/exec-waitany/s4u-exec-waitany.tesh b/examples/s4u/exec-waitany/s4u-exec-waitany.tesh
new file mode 100644 (file)
index 0000000..072cab5
--- /dev/null
@@ -0,0 +1,22 @@
+#!/usr/bin/env tesh
+
+! output sort 19
+$ ${bindir:=.}/s4u-exec-waitany ${platfdir}/multicore_machine.xml "--log=root.fmt:[%10.6r]%e[%14P]%e%m%n"
+> [  0.000000] [        worker] Activity Exec-0 has started for 1 seconds
+> [  0.000000] [worker_timeout] Activity Exec-0 has started for 1 seconds
+> [  0.000000] [        worker] Activity Exec-1 has started for 8 seconds
+> [  0.000000] [worker_timeout] Activity Exec-1 has started for 8 seconds
+> [  0.000000] [        worker] Activity Exec-2 has started for 3 seconds
+> [  0.000000] [worker_timeout] Activity Exec-2 has started for 3 seconds
+> [  1.000000] [worker_timeout] Activity 'Exec-0' (at position 0) is complete
+> [  1.000000] [worker_timeout] 2 activities remain pending
+> [  1.000000] [        worker] Activity 'Exec-0' (at position 0) is complete
+> [  1.000000] [        worker] 2 activities remain pending
+> [  3.000000] [worker_timeout] Activity 'Exec-2' (at position 1) is complete
+> [  3.000000] [worker_timeout] 1 activities remain pending
+> [  3.000000] [        worker] Activity 'Exec-2' (at position 1) is complete
+> [  3.000000] [        worker] 1 activities remain pending
+> [  7.000000] [worker_timeout] Do not wait any longer for an activity
+> [  7.000000] [worker_timeout] 0 activities remain pending
+> [  8.000000] [        worker] Activity 'Exec-1' (at position 0) is complete
+> [  8.000000] [        worker] 0 activities remain pending
index ecfa6ce..d1c62ef 100644 (file)
@@ -52,6 +52,12 @@ public:
 
   Exec* wait() override;
   Exec* wait_for(double timeout) override;
+  /*! take a vector of s4u::ExecPtr and return when one of them is finished.
+   * The return value is the rank of the first finished ExecPtr. */
+  static int wait_any(std::vector<ExecPtr>* execs) { return wait_any_for(execs, -1); }
+  /*! Same as wait_any, but with a timeout. If the timeout occurs, parameter last is returned.*/
+  static int wait_any_for(std::vector<ExecPtr>* execs, double timeout);
+
   bool test() override;
 
   ExecPtr set_bound(double bound);
@@ -60,6 +66,8 @@ public:
   ExecPtr set_tracing_category(const std::string& category);
   ExecPtr set_timeout(double timeout);
   Exec* cancel() override;
+  std::string get_name() const { return name_; }
+  const char* get_cname() const { return name_.c_str(); }
 
   XBT_ATTRIB_DEPRECATED_v324("Please use Exec::wait_for()") void wait(double t) override { wait_for(t); }
 };
index 99de58b..32de0a0 100644 (file)
@@ -180,6 +180,8 @@ XBT_ATTRIB_DEPRECATED_v325("Please use CommImpl::finish()") XBT_PUBLIC void SIMI
 /******************************* Host simcalls ********************************/
 #ifdef __cplusplus
 XBT_PUBLIC e_smx_state_t simcall_execution_wait(const smx_activity_t& execution);
+XBT_PUBLIC unsigned int simcall_execution_waitany_for(simgrid::kernel::activity::ExecImpl* execs[], size_t count,
+                                                      double timeout);
 XBT_PUBLIC bool simcall_execution_test(const smx_activity_t& execution);
 #endif
 
index 8e1c3e8..e7d8a2c 100644 (file)
@@ -13,6 +13,8 @@
 
 #include "simgrid/s4u/Host.hpp"
 
+#include <boost/range/algorithm.hpp>
+
 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simix_process);
 
 void simcall_HANDLER_execution_wait(smx_simcall_t simcall, simgrid::kernel::activity::ExecImpl* synchro)
@@ -46,6 +48,41 @@ void simcall_HANDLER_execution_test(smx_simcall_t simcall, simgrid::kernel::acti
   simcall_execution_test__set__result(simcall, res);
 }
 
+void simcall_HANDLER_execution_waitany_for(smx_simcall_t simcall, simgrid::kernel::activity::ExecImpl* execs[],
+                                           size_t count, double timeout)
+{
+  if (timeout < 0.0) {
+    simcall->timer = nullptr;
+  } else {
+    simcall->timer = simgrid::simix::Timer::set(SIMIX_get_clock() + timeout, [simcall]() {
+      simgrid::kernel::activity::ExecImpl** execs = simcall_execution_waitany_for__get__execs(simcall);
+      size_t count                                = simcall_execution_waitany_for__get__count(simcall);
+
+      for (size_t i = 0; i < count; i++) {
+        // Remove the first occurence of simcall:
+        auto* exec = execs[i];
+        auto j     = boost::range::find(exec->simcalls_, simcall);
+        if (j != exec->simcalls_.end())
+          exec->simcalls_.erase(j);
+      }
+      simcall_execution_waitany_for__set__result(simcall, -1);
+      SIMIX_simcall_answer(simcall);
+    });
+  }
+
+  for (size_t i = 0; i < count; i++) {
+    /* associate this simcall to the the synchro */
+    auto* exec = execs[i];
+    exec->simcalls_.push_back(simcall);
+
+    /* see if the synchro is already finished */
+    if (exec->state_ != SIMIX_WAITING && exec->state_ != SIMIX_RUNNING) {
+      exec->finish();
+      break;
+    }
+  }
+}
+
 namespace simgrid {
 namespace kernel {
 namespace activity {
@@ -180,6 +217,37 @@ void ExecImpl::finish()
   while (not simcalls_.empty()) {
     smx_simcall_t simcall = simcalls_.front();
     simcalls_.pop_front();
+
+    /* If a waitany simcall is waiting for this synchro to finish, then remove it from the other synchros in the waitany
+     * list. Afterwards, get the position of the actual synchro in the waitany list and return it as the result of the
+     * simcall */
+
+    if (simcall->call == SIMCALL_NONE) // FIXME: maybe a better way to handle this case
+      continue;                        // if process handling comm is killed
+    if (simcall->call == SIMCALL_EXECUTION_WAITANY_FOR) {
+      simgrid::kernel::activity::ExecImpl** execs = simcall_execution_waitany_for__get__execs(simcall);
+      size_t count                                = simcall_execution_waitany_for__get__count(simcall);
+
+      for (size_t i = 0; i < count; i++) {
+        // Remove the first occurence of simcall:
+        auto* exec = execs[i];
+        auto j     = boost::range::find(exec->simcalls_, simcall);
+        if (j != exec->simcalls_.end())
+          exec->simcalls_.erase(j);
+
+        if (simcall->timer) {
+          simcall->timer->remove();
+          simcall->timer = nullptr;
+        }
+      }
+
+      if (not MC_is_active() && not MC_record_replay_is_active()) {
+        ExecImpl** element = std::find(execs, execs + count, this);
+        int rank           = (element != execs + count) ? element - execs : -1;
+        simcall_execution_waitany_for__set__result(simcall, rank);
+      }
+    }
+
     switch (state_) {
 
       case SIMIX_DONE:
@@ -212,8 +280,6 @@ void ExecImpl::finish()
     }
 
     simcall->issuer->waiting_synchro = nullptr;
-    simcall_execution_wait__set__result(simcall, state_);
-
     /* Fail the process if the host is down */
     if (simcall->issuer->get_host()->is_on())
       SIMIX_simcall_answer(simcall);
index 918a705..feecbdd 100644 (file)
@@ -53,6 +53,14 @@ Exec* Exec::wait_for(double)
   THROW_UNIMPLEMENTED;
 }
 
+int Exec::wait_any_for(std::vector<ExecPtr>* execs, double timeout)
+{
+  std::unique_ptr<kernel::activity::ExecImpl* []> rexecs(new kernel::activity::ExecImpl*[execs->size()]);
+  std::transform(begin(*execs), end(*execs), rexecs.get(),
+                 [](const ExecPtr& exec) { return static_cast<kernel::activity::ExecImpl*>(exec->pimpl_.get()); });
+  return simcall_execution_waitany_for(rexecs.get(), execs->size(), timeout);
+}
+
 Exec* Exec::cancel()
 {
   simix::simcall([this] { boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->cancel(); });
index 278a3ec..9261394 100644 (file)
@@ -39,6 +39,11 @@ bool simcall_execution_test(const smx_activity_t& execution)
   return simcall_BODY_execution_test(static_cast<simgrid::kernel::activity::ExecImpl*>(execution.get()));
 }
 
+unsigned int simcall_execution_waitany_for(simgrid::kernel::activity::ExecImpl* execs[], size_t count, double timeout)
+{
+  return simcall_BODY_execution_waitany_for(execs, count, timeout);
+}
+
 void simcall_process_join(smx_actor_t process, double timeout)
 {
   simcall_BODY_process_join(process, timeout);
index 98a6521..ac60891 100644 (file)
@@ -115,6 +115,56 @@ static inline void simcall_execution_wait__set__result(smx_simcall_t simcall, in
   simgrid::simix::marshal<int>(simcall->result, result);
 }
 
+static inline simgrid::kernel::activity::ExecImpl** simcall_execution_waitany_for__get__execs(smx_simcall_t simcall)
+{
+  return simgrid::simix::unmarshal<simgrid::kernel::activity::ExecImpl**>(simcall->args[0]);
+}
+static inline simgrid::kernel::activity::ExecImpl** simcall_execution_waitany_for__getraw__execs(smx_simcall_t simcall)
+{
+  return simgrid::simix::unmarshal_raw<simgrid::kernel::activity::ExecImpl**>(simcall->args[0]);
+}
+static inline void simcall_execution_waitany_for__set__execs(smx_simcall_t simcall,
+                                                             simgrid::kernel::activity::ExecImpl** arg)
+{
+  simgrid::simix::marshal<simgrid::kernel::activity::ExecImpl**>(simcall->args[0], arg);
+}
+static inline size_t simcall_execution_waitany_for__get__count(smx_simcall_t simcall)
+{
+  return simgrid::simix::unmarshal<size_t>(simcall->args[1]);
+}
+static inline size_t simcall_execution_waitany_for__getraw__count(smx_simcall_t simcall)
+{
+  return simgrid::simix::unmarshal_raw<size_t>(simcall->args[1]);
+}
+static inline void simcall_execution_waitany_for__set__count(smx_simcall_t simcall, size_t arg)
+{
+  simgrid::simix::marshal<size_t>(simcall->args[1], arg);
+}
+static inline double simcall_execution_waitany_for__get__timeout(smx_simcall_t simcall)
+{
+  return simgrid::simix::unmarshal<double>(simcall->args[2]);
+}
+static inline double simcall_execution_waitany_for__getraw__timeout(smx_simcall_t simcall)
+{
+  return simgrid::simix::unmarshal_raw<double>(simcall->args[2]);
+}
+static inline void simcall_execution_waitany_for__set__timeout(smx_simcall_t simcall, double arg)
+{
+  simgrid::simix::marshal<double>(simcall->args[2], arg);
+}
+static inline int simcall_execution_waitany_for__get__result(smx_simcall_t simcall)
+{
+  return simgrid::simix::unmarshal<int>(simcall->result);
+}
+static inline int simcall_execution_waitany_for__getraw__result(smx_simcall_t simcall)
+{
+  return simgrid::simix::unmarshal_raw<int>(simcall->result);
+}
+static inline void simcall_execution_waitany_for__set__result(smx_simcall_t simcall, int result)
+{
+  simgrid::simix::marshal<int>(simcall->result, result);
+}
+
 static inline simgrid::kernel::activity::ExecImpl* simcall_execution_test__get__execution(smx_simcall_t simcall)
 {
   return simgrid::simix::unmarshal<simgrid::kernel::activity::ExecImpl*>(simcall->args[0]);
@@ -1029,6 +1079,9 @@ XBT_PRIVATE void simcall_HANDLER_process_suspend(smx_simcall_t simcall, smx_acto
 XBT_PRIVATE void simcall_HANDLER_process_join(smx_simcall_t simcall, smx_actor_t process, double timeout);
 XBT_PRIVATE void simcall_HANDLER_process_sleep(smx_simcall_t simcall, double duration);
 XBT_PRIVATE void simcall_HANDLER_execution_wait(smx_simcall_t simcall, simgrid::kernel::activity::ExecImpl* execution);
+XBT_PRIVATE void simcall_HANDLER_execution_waitany_for(smx_simcall_t simcall,
+                                                       simgrid::kernel::activity::ExecImpl** execs, size_t count,
+                                                       double timeout);
 XBT_PRIVATE void simcall_HANDLER_execution_test(smx_simcall_t simcall, simgrid::kernel::activity::ExecImpl* execution);
 XBT_PRIVATE void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_actor_t sender, smx_mailbox_t mbox,
                                            double task_size, double rate, unsigned char* src_buff, size_t src_buff_size,
@@ -1059,4 +1112,4 @@ XBT_PRIVATE void simcall_HANDLER_cond_wait_timeout(smx_simcall_t simcall, smx_co
 XBT_PRIVATE void simcall_HANDLER_sem_acquire(smx_simcall_t simcall, smx_sem_t sem);
 XBT_PRIVATE void simcall_HANDLER_sem_acquire_timeout(smx_simcall_t simcall, smx_sem_t sem, double timeout);
 XBT_PRIVATE void simcall_HANDLER_io_wait(smx_simcall_t simcall, simgrid::kernel::activity::IoImpl* io);
-XBT_PRIVATE int simcall_HANDLER_mc_random(smx_simcall_t simcall, int min, int max);
\ No newline at end of file
+XBT_PRIVATE int simcall_HANDLER_mc_random(smx_simcall_t simcall, int min, int max);
index 348a3e2..148b2e8 100644 (file)
@@ -67,6 +67,15 @@ inline static int simcall_BODY_execution_wait(simgrid::kernel::activity::ExecImp
   return simcall<int, simgrid::kernel::activity::ExecImpl*>(SIMCALL_EXECUTION_WAIT, execution);
 }
 
+inline static int simcall_BODY_execution_waitany_for(simgrid::kernel::activity::ExecImpl** execs, size_t count,
+                                                     double timeout)
+{
+  if (0) /* Go to that function to follow the code flow through the simcall barrier */
+    simcall_HANDLER_execution_waitany_for(&SIMIX_process_self()->simcall, execs, count, timeout);
+  return simcall<int, simgrid::kernel::activity::ExecImpl**, size_t, double>(SIMCALL_EXECUTION_WAITANY_FOR, execs,
+                                                                             count, timeout);
+}
+
 inline static bool simcall_BODY_execution_test(simgrid::kernel::activity::ExecImpl* execution)
 {
   if (0) /* Go to that function to follow the code flow through the simcall barrier */
index 692b303..45a78da 100644 (file)
@@ -23,6 +23,7 @@ typedef enum {
   SIMCALL_PROCESS_JOIN,
   SIMCALL_PROCESS_SLEEP,
   SIMCALL_EXECUTION_WAIT,
+  SIMCALL_EXECUTION_WAITANY_FOR,
   SIMCALL_EXECUTION_TEST,
   SIMCALL_COMM_SEND,
   SIMCALL_COMM_ISEND,
index 2d55ceb..4457433 100644 (file)
@@ -30,6 +30,7 @@ const char* simcall_names[] = {
     "SIMCALL_PROCESS_JOIN",
     "SIMCALL_PROCESS_SLEEP",
     "SIMCALL_EXECUTION_WAIT",
+    "SIMCALL_EXECUTION_WAITANY_FOR",
     "SIMCALL_EXECUTION_TEST",
     "SIMCALL_COMM_SEND",
     "SIMCALL_COMM_ISEND",
@@ -79,6 +80,12 @@ case SIMCALL_EXECUTION_WAIT:
   simcall_HANDLER_execution_wait(simcall, simgrid::simix::unmarshal<simgrid::kernel::activity::ExecImpl*>(simcall->args[0]));
   break;
 
+case SIMCALL_EXECUTION_WAITANY_FOR:
+  simcall_HANDLER_execution_waitany_for(
+      simcall, simgrid::simix::unmarshal<simgrid::kernel::activity::ExecImpl**>(simcall->args[0]),
+      simgrid::simix::unmarshal<size_t>(simcall->args[1]), simgrid::simix::unmarshal<double>(simcall->args[2]));
+  break;
+
 case SIMCALL_EXECUTION_TEST:
   simcall_HANDLER_execution_test(simcall, simgrid::simix::unmarshal<simgrid::kernel::activity::ExecImpl*>(simcall->args[0]));
   break;
index 10d0ee0..8dd096b 100644 (file)
@@ -40,6 +40,7 @@ int  process_join(smx_actor_t process, double timeout) [[block]];
 int  process_sleep(double duration) [[block]];
 
 int           execution_wait(simgrid::kernel::activity::ExecImpl* execution) [[block]];
+int           execution_waitany_for(simgrid::kernel::activity::ExecImpl** execs, size_t count, double timeout) [[block]];
 bool          execution_test(simgrid::kernel::activity::ExecImpl* execution) [[block]];
 
 void           comm_send(smx_actor_t sender, smx_mailbox_t mbox, double task_size, double rate, unsigned char* src_buff, size_t src_buff_size, simix_match_func_t match_fun, simix_copy_data_func_t copy_data_fun, void* data, double timeout) [[block]];