Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Start to modernize the remaining old simcalls related to comms
authorSUTER Frederic <frederic.suter@cc.in2p3.fr>
Tue, 1 Feb 2022 10:32:05 +0000 (11:32 +0100)
committerSUTER Frederic <frederic.suter@cc.in2p3.fr>
Tue, 1 Feb 2022 10:32:05 +0000 (11:32 +0100)
+ refactor wait, test, waitany, and testany at the activity level
+ use observers for all the simcalls related to these operations
+ add 2 new examples that allows to test or wait for the completion of
any kind of activities stored in a single vector
+ simplify another example thanks to that
+ implement the to_string and dot_label functions in the observers
instead of in mc::Api

This (big) commit breaks the MC has the dependency check still mixes
old and modern simcalls and is thus borken. This is a work in progress
for the greater good, and towards SG4.

35 files changed:
MANIFEST.in
examples/c/actor-exiting/actor-exiting.tesh
examples/cpp/CMakeLists.txt
examples/cpp/activity-testany/s4u-activity-testany.cpp [new file with mode: 0644]
examples/cpp/activity-testany/s4u-activity-testany.tesh [new file with mode: 0644]
examples/cpp/activity-waitany/s4u-activity-waitany.cpp [new file with mode: 0644]
examples/cpp/activity-waitany/s4u-activity-waitany.tesh [new file with mode: 0644]
examples/cpp/actor-exiting/s4u-actor-exiting.tesh
examples/cpp/io-dependent/s4u-io-dependent.cpp
examples/cpp/io-dependent/s4u-io-dependent.tesh
examples/cpp/mc-bugged1/s4u-mc-bugged1.tesh
include/simgrid/s4u/Activity.hpp
include/simgrid/s4u/Comm.hpp
src/kernel/activity/ActivityImpl.cpp
src/kernel/activity/ActivityImpl.hpp
src/kernel/activity/CommImpl.cpp
src/kernel/activity/CommImpl.hpp
src/kernel/activity/ExecImpl.cpp
src/kernel/activity/ExecImpl.hpp
src/kernel/activity/IoImpl.cpp
src/kernel/activity/IoImpl.hpp
src/kernel/actor/ActorImpl.cpp
src/kernel/actor/SimcallObserver.cpp
src/kernel/actor/SimcallObserver.hpp
src/mc/api.cpp
src/mc/api.hpp
src/mc/mc_base.cpp
src/mc/remote/AppSide.cpp
src/msg/msg_comm.cpp
src/msg/msg_task.cpp
src/s4u/s4u_Activity.cpp
src/s4u/s4u_Comm.cpp
src/s4u/s4u_Exec.cpp
src/s4u/s4u_Io.cpp
src/smpi/mpi/smpi_request.cpp

index 15d6b34..9ea1765 100644 (file)
@@ -124,6 +124,10 @@ include examples/c/plugin-host-load/plugin-host-load.c
 include examples/c/plugin-host-load/plugin-host-load.tesh
 include examples/c/synchro-semaphore/synchro-semaphore.c
 include examples/c/synchro-semaphore/synchro-semaphore.tesh
+include examples/cpp/activity-testany/s4u-activity-testany.cpp
+include examples/cpp/activity-testany/s4u-activity-testany.tesh
+include examples/cpp/activity-waitany/s4u-activity-waitany.cpp
+include examples/cpp/activity-waitany/s4u-activity-waitany.tesh
 include examples/cpp/actor-create/s4u-actor-create.cpp
 include examples/cpp/actor-create/s4u-actor-create.tesh
 include examples/cpp/actor-create/s4u-actor-create_d.xml
index 0c05238..9ca7685 100644 (file)
@@ -8,6 +8,6 @@ $ ${bindir:=.}/c-actor-exiting ${platfdir}/small_platform.xml "--log=root.fmt:[%
 > [  3.000000] (maestro@) Oops! Deadlock or code not perfectly clean.
 > [  3.000000] (maestro@) 1 actors are still running, waiting for something.
 > [  3.000000] (maestro@) Legend of the following listing: "Actor <pid> (<name>@<host>): <status>"
-> [  3.000000] (maestro@) Actor 3 (C@Ginette): waiting for communication activity 0xdeadbeef () in state WAITING to finish
+> [  3.000000] (maestro@) Actor 3 (C@Ginette): waiting for synchronization activity 0xdeadbeef () in state WAITING to finish
 > [  3.000000] (C@Ginette) I was killed!
 > [  3.000000] (C@Ginette) The backtrace would be displayed here if --log=no_loc would not have been passed
index 8cf74e5..cd6f65e 100644 (file)
@@ -95,7 +95,8 @@ endif()
 
 # Deal with each example
 
-foreach (example actor-create actor-daemon actor-exiting actor-join actor-kill
+foreach (example activity-testany activity-waitany
+                 actor-create actor-daemon actor-exiting actor-join actor-kill
                  actor-lifetime actor-migrate actor-suspend actor-yield actor-stacksize
                  app-bittorrent app-chainsend app-token-ring
                  comm-pingpong comm-ready comm-serialize comm-suspend comm-testany comm-wait comm-waitany comm-waitall comm-waituntil
diff --git a/examples/cpp/activity-testany/s4u-activity-testany.cpp b/examples/cpp/activity-testany/s4u-activity-testany.cpp
new file mode 100644 (file)
index 0000000..6dc8769
--- /dev/null
@@ -0,0 +1,72 @@
+/* Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/s4u.hpp"
+#include <cstdlib>
+#include <iostream>
+#include <string>
+namespace sg4 = simgrid::s4u;
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_testany, "Messages specific for this s4u example");
+
+static void bob()
+{
+  sg4::Mailbox* mbox    = sg4::Mailbox::by_name(std::string("mbox"));
+  const sg4::Disk* disk = sg4::Host::current()->get_disks().front();
+  std::string* payload;
+
+  XBT_INFO("Create my asynchronous activities");
+  auto exec = sg4::this_actor::exec_async(5e9);
+  auto comm = mbox->get_async(&payload);
+  auto io   = disk->read_async(3e8);
+
+  std::vector<sg4::ActivityPtr> pending_activities = {boost::dynamic_pointer_cast<sg4::Activity>(exec),
+                                                      boost::dynamic_pointer_cast<sg4::Activity>(comm),
+                                                      boost::dynamic_pointer_cast<sg4::Activity>(io)};
+
+  XBT_INFO("Sleep_for a while");
+  sg4::this_actor::sleep_for(1);
+
+  XBT_INFO("Test for completed activities");
+  while (not pending_activities.empty()) {
+    ssize_t changed_pos = sg4::Activity::test_any(pending_activities);
+    if (changed_pos != -1) {
+      auto* completed_one = pending_activities[changed_pos].get();
+      if (dynamic_cast<sg4::Comm*>(completed_one))
+        XBT_INFO("Completed a Comm");
+      if (dynamic_cast<sg4::Exec*>(completed_one))
+        XBT_INFO("Completed an Exec");
+      if (dynamic_cast<sg4::Io*>(completed_one))
+        XBT_INFO("Completed an I/O");
+      pending_activities.erase(pending_activities.begin() + changed_pos);
+    } else { // nothing matches, wait for a little bit
+      XBT_INFO("Nothing matches, test again in 0.5s");
+      sg4::this_actor::sleep_for(.5);
+    }
+  }
+  XBT_INFO("Last activity is complete");
+  delete payload;
+}
+
+static void alice()
+{
+  auto* payload = new std::string("Message");
+  XBT_INFO("Send '%s'", payload->c_str());
+  sg4::Mailbox::by_name(std::string("mbox"))->put(payload, 6e8);
+}
+
+int main(int argc, char* argv[])
+{
+  sg4::Engine e(&argc, argv);
+
+  e.load_platform(argv[1]);
+
+  sg4::Actor::create("bob", e.host_by_name("bob"), bob);
+  sg4::Actor::create("alice", e.host_by_name("alice"), alice);
+
+  e.run();
+
+  return 0;
+}
diff --git a/examples/cpp/activity-testany/s4u-activity-testany.tesh b/examples/cpp/activity-testany/s4u-activity-testany.tesh
new file mode 100644 (file)
index 0000000..ab4d820
--- /dev/null
@@ -0,0 +1,20 @@
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/s4u-activity-testany ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%4.2r]%e[%5a]%e%m%n"
+> [0.00] [alice] Send 'Message'
+> [0.00] [  bob] Create my asynchronous activities
+> [0.00] [  bob] Sleep_for a while
+> [1.00] [  bob] Test for completed activities
+> [1.00] [  bob] Nothing matches, test again in 0.5s
+> [1.50] [  bob] Nothing matches, test again in 0.5s
+> [2.00] [  bob] Nothing matches, test again in 0.5s
+> [2.50] [  bob] Nothing matches, test again in 0.5s
+> [3.00] [  bob] Completed an I/O
+> [3.00] [  bob] Nothing matches, test again in 0.5s
+> [3.50] [  bob] Nothing matches, test again in 0.5s
+> [4.00] [  bob] Nothing matches, test again in 0.5s
+> [4.50] [  bob] Nothing matches, test again in 0.5s
+> [5.00] [  bob] Completed an Exec
+> [5.00] [  bob] Nothing matches, test again in 0.5s
+> [5.50] [  bob] Completed a Comm
+> [5.50] [  bob] Last activity is complete
diff --git a/examples/cpp/activity-waitany/s4u-activity-waitany.cpp b/examples/cpp/activity-waitany/s4u-activity-waitany.cpp
new file mode 100644 (file)
index 0000000..aa13b41
--- /dev/null
@@ -0,0 +1,66 @@
+/* Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/s4u.hpp"
+#include <cstdlib>
+#include <iostream>
+#include <string>
+namespace sg4 = simgrid::s4u;
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_waittany, "Messages specific for this s4u example");
+
+static void bob()
+{
+  sg4::Mailbox* mbox    = sg4::Mailbox::by_name(std::string("mbox"));
+  const sg4::Disk* disk = sg4::Host::current()->get_disks().front();
+  std::string* payload;
+
+  XBT_INFO("Create my asynchronous activities");
+  auto exec = sg4::this_actor::exec_async(5e9);
+  auto comm = mbox->get_async(&payload);
+  auto io   = disk->read_async(3e8);
+
+  std::vector<sg4::ActivityPtr> pending_activities = {boost::dynamic_pointer_cast<sg4::Activity>(exec),
+                                                      boost::dynamic_pointer_cast<sg4::Activity>(comm),
+                                                      boost::dynamic_pointer_cast<sg4::Activity>(io)};
+
+  XBT_INFO("Wait for asynchrounous activities to complete");
+  while (not pending_activities.empty()) {
+    ssize_t changed_pos = sg4::Activity::wait_any(pending_activities);
+    if (changed_pos != -1) {
+      auto* completed_one = pending_activities[changed_pos].get();
+      if (dynamic_cast<sg4::Comm*>(completed_one))
+        XBT_INFO("Completed a Comm");
+      if (dynamic_cast<sg4::Exec*>(completed_one))
+        XBT_INFO("Completed an Exec");
+      if (dynamic_cast<sg4::Io*>(completed_one))
+        XBT_INFO("Completed an I/O");
+      pending_activities.erase(pending_activities.begin() + changed_pos);
+    }
+  }
+  XBT_INFO("Last activity is complete");
+  delete payload;
+}
+
+static void alice()
+{
+  auto* payload = new std::string("Message");
+  XBT_INFO("Send '%s'", payload->c_str());
+  sg4::Mailbox::by_name(std::string("mbox"))->put(payload, 6e8);
+}
+
+int main(int argc, char* argv[])
+{
+  sg4::Engine e(&argc, argv);
+
+  e.load_platform(argv[1]);
+
+  sg4::Actor::create("bob", e.host_by_name("bob"), bob);
+  sg4::Actor::create("alice", e.host_by_name("alice"), alice);
+
+  e.run();
+
+  return 0;
+}
diff --git a/examples/cpp/activity-waitany/s4u-activity-waitany.tesh b/examples/cpp/activity-waitany/s4u-activity-waitany.tesh
new file mode 100644 (file)
index 0000000..7b15aa6
--- /dev/null
@@ -0,0 +1,10 @@
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/s4u-activity-waitany ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n"
+> [0.000000] [alice] Send 'Message'
+> [0.000000] [  bob] Create my asynchronous activities
+> [0.000000] [  bob] Wait for asynchrounous activities to complete
+> [3.000000] [  bob] Completed an I/O
+> [5.000000] [  bob] Completed an Exec
+> [5.197828] [  bob] Completed a Comm
+> [5.197828] [  bob] Last activity is complete
index c46a118..c5a6742 100644 (file)
@@ -12,7 +12,7 @@ $ ${bindir:=.}/s4u-actor-exiting ${platfdir}/small_platform.xml "--log=root.fmt:
 > [  3.000000] (maestro@) Oops! Deadlock or code not perfectly clean.
 > [  3.000000] (maestro@) 1 actors are still running, waiting for something.
 > [  3.000000] (maestro@) Legend of the following listing: "Actor <pid> (<name>@<host>): <status>"
-> [  3.000000] (maestro@) Actor 3 (C@Ginette): waiting for communication activity 0xdeadbeef () in state WAITING to finish
+> [  3.000000] (maestro@) Actor 3 (C@Ginette): waiting for synchronization activity 0xdeadbeef () in state WAITING to finish
 > [  3.000000] (C@Ginette) I was killed!
 > [  3.000000] (C@Ginette) The backtrace would be displayed here if --log=no_loc would not have been passed
 > [  3.000000] (maestro@) Actor C terminates now
index 486c517..3f83813 100644 (file)
@@ -11,16 +11,18 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_test, "Messages specific for this s4u example")
 
 static void test()
 {
-  std::vector<simgrid::s4u::IoPtr> pending_ios;
+  std::vector<simgrid::s4u::ActivityPtr> pending_activities;
 
   simgrid::s4u::ExecPtr bob_compute = simgrid::s4u::this_actor::exec_init(1e9);
+  pending_activities.push_back(boost::dynamic_pointer_cast<simgrid::s4u::Activity>(bob_compute));
   simgrid::s4u::IoPtr bob_write =
       simgrid::s4u::Host::current()->get_disks().front()->io_init(4000000, simgrid::s4u::Io::OpType::WRITE);
-  pending_ios.push_back(bob_write);
+  pending_activities.push_back(boost::dynamic_pointer_cast<simgrid::s4u::Activity>(bob_write));
   simgrid::s4u::IoPtr carl_read =
       simgrid::s4u::Host::by_name("carl")->get_disks().front()->io_init(4000000, simgrid::s4u::Io::OpType::READ);
-  pending_ios.push_back(carl_read);
+  pending_activities.push_back(boost::dynamic_pointer_cast<simgrid::s4u::Activity>(carl_read));
   simgrid::s4u::ExecPtr carl_compute = simgrid::s4u::Host::by_name("carl")->exec_init(1e9);
+  pending_activities.push_back(boost::dynamic_pointer_cast<simgrid::s4u::Activity>(carl_compute));
 
   // Name the activities (for logging purposes only)
   bob_compute->set_name("bob compute");
@@ -43,13 +45,11 @@ static void test()
   carl_compute->vetoable_start();
 
   // wait for the completion of all activities
-  bob_compute->wait();
-  while (not pending_ios.empty()) {
-    ssize_t changed_pos = simgrid::s4u::Io::wait_any(pending_ios);
-    XBT_INFO("Io '%s' is complete", pending_ios[changed_pos]->get_cname());
-    pending_ios.erase(pending_ios.begin() + changed_pos);
+  while (not pending_activities.empty()) {
+    ssize_t changed_pos = simgrid::s4u::Activity::wait_any(pending_activities);
+    XBT_INFO("Activity '%s' is complete", pending_activities[changed_pos]->get_cname());
+    pending_activities.erase(pending_activities.begin() + changed_pos);
   }
-  carl_compute->wait();
 }
 
 int main(int argc, char* argv[])
index c9b9ecc..b44775f 100644 (file)
@@ -4,10 +4,12 @@
 $ ${bindir:=.}/s4u-io-dependent ${platfdir}/hosts_with_disks.xml --log=s4u_activity.t:verbose "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
 > [  1.000000] (1:bob@bob) 'bob write' is assigned to a resource and all dependencies are solved. Let's start
 > [  1.000000] (1:bob@bob) Remove a dependency from 'bob compute' on 'bob write'
+> [  1.000000] (1:bob@bob) Activity 'bob compute' is complete
 > [  1.100000] (1:bob@bob) 'carl read' is assigned to a resource and all dependencies are solved. Let's start
-> [  1.100000] (1:bob@bob) Io 'bob write' is complete
+> [  1.100000] (1:bob@bob) Activity 'bob write' is complete
 > [  1.100000] (1:bob@bob) Remove a dependency from 'bob write' on 'carl read'
-> [  1.140000] (1:bob@bob) Io 'carl read' is complete
+> [  1.140000] (1:bob@bob) Activity 'carl read' is complete
 > [  1.140000] (1:bob@bob) 'carl compute' is assigned to a resource and all dependencies are solved. Let's start
 > [  1.140000] (1:bob@bob) Remove a dependency from 'carl read' on 'carl compute'
+> [  2.140000] (1:bob@bob) Activity 'carl compute' is complete
 > [  2.140000] (0:maestro@) Simulation time 2.14
index 5e96cde..4983919 100644 (file)
@@ -23,14 +23,14 @@ $ ${bindir:=.}/../../../bin/simgrid-mc ${bindir:=.}/s4u-mc-bugged1 ${platfdir:=.
 > [  0.000000] (0:maestro@) Counter-example execution trace:
 > [  0.000000] (0:maestro@)   [(1)HostA (server)] iRecv(dst=(1)HostA (server), buff=(verbose only), size=(verbose only))
 > [  0.000000] (0:maestro@)   [(2)HostB (client)] iSend(src=(2)HostB (client), buff=(verbose only), size=(verbose only))
-> [  0.000000] (0:maestro@)   [(1)HostA (server)] Wait(comm=(verbose only) [(2)HostB (client)-> (1)HostA (server)])
+> [  0.000000] (0:maestro@)   [(1)HostA (server)] Wait(comm=(verbose only) [(2)HostB (client) -> (1)HostA (server)])
 > [  0.000000] (0:maestro@)   [(1)HostA (server)] iRecv(dst=(1)HostA (server), buff=(verbose only), size=(verbose only))
-> [  0.000000] (0:maestro@)   [(2)HostB (client)] Wait(comm=(verbose only) [(2)HostB (client)-> (1)HostA (server)])
+> [  0.000000] (0:maestro@)   [(2)HostB (client)] Wait(comm=(verbose only) [(2)HostB (client) -> (1)HostA (server)])
 > [  0.000000] (0:maestro@)   [(4)HostD (client)] iSend(src=(4)HostD (client), buff=(verbose only), size=(verbose only))
-> [  0.000000] (0:maestro@)   [(1)HostA (server)] Wait(comm=(verbose only) [(4)HostD (client)-> (1)HostA (server)])
+> [  0.000000] (0:maestro@)   [(1)HostA (server)] Wait(comm=(verbose only) [(4)HostD (client) -> (1)HostA (server)])
 > [  0.000000] (0:maestro@)   [(1)HostA (server)] iRecv(dst=(1)HostA (server), buff=(verbose only), size=(verbose only))
 > [  0.000000] (0:maestro@)   [(3)HostC (client)] iSend(src=(3)HostC (client), buff=(verbose only), size=(verbose only))
-> [  0.000000] (0:maestro@)   [(1)HostA (server)] Wait(comm=(verbose only) [(3)HostC (client)-> (1)HostA (server)])
+> [  0.000000] (0:maestro@)   [(1)HostA (server)] Wait(comm=(verbose only) [(3)HostC (client) -> (1)HostA (server)])
 > [  0.000000] (0:maestro@) Path = 1;2;1;1;2;4;1;1;3;1
 > [  0.000000] (0:maestro@) Expanded states = 22
 > [  0.000000] (0:maestro@) Visited states = 56
index df49b9e..efba3e2 100644 (file)
@@ -141,6 +141,11 @@ public:
    */
   virtual Activity* start() = 0;
   /** Blocks the current actor until the activity is terminated */
+  /** Tests whether the given activity is terminated yet. */
+  virtual bool test();
+  /*! take a vector s4u::ActivityPtr and return the rank of the first finished one (or -1 if none is done). */
+  static ssize_t test_any(const std::vector<ActivityPtr>& activities);
+
   Activity* wait() { return wait_for(-1.0); }
   /** Blocks the current actor until the activity is terminated, or until the timeout is elapsed\n
    *  Raises: timeout exception.*/
@@ -148,6 +153,11 @@ public:
   /** Blocks the current actor until the activity is terminated, or until the time limit is reached\n
    * Raises: timeout exception. */
   void wait_until(double time_limit);
+  /*! take a vector of s4u::ActivityPtr and return when one of them is finished.
+   * The return value is the rank of the first finished ActivityPtr. */
+  static ssize_t wait_any(const std::vector<ActivityPtr>& activities) { return wait_any_for(activities, -1); }
+  /*! Same as wait_any, but with a timeout. If the timeout occurs, parameter last is returned.*/
+  static ssize_t wait_any_for(const std::vector<ActivityPtr>& activities, double timeout);
 
   /** Cancel that activity */
   Activity* cancel();
@@ -156,8 +166,6 @@ public:
   /** Return a string representation of the activity's state (one of INITED, STARTING, STARTED, CANCELED, FINISHED) */
   const char* get_state_str() const;
   void set_state(Activity::State state) { state_ = state; }
-  /** Tests whether the given activity is terminated yet. */
-  virtual bool test();
 
   /** Blocks the progression of this activity until it gets resumed */
   virtual Activity* suspend();
index 5c746e1..5f581e7 100644 (file)
@@ -102,7 +102,6 @@ public:
 
   Comm* start() override;
   Comm* wait_for(double timeout) override;
-  bool test() override;
 
   /** Start the comm, and ignore its result. It can be completely forgotten after that. */
   Comm* detach();
index ea59bd5..3840cc8 100644 (file)
@@ -4,11 +4,14 @@
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
 #include <simgrid/modelchecker.h>
+#include <simgrid/s4u/Engine.hpp>
 
 #include "src/kernel/activity/ActivityImpl.hpp"
+#include "src/kernel/activity/CommImpl.hpp"
 #include "src/kernel/activity/SynchroRaw.hpp"
 #include "src/kernel/actor/ActorImpl.hpp"
 #include "src/kernel/actor/SimcallObserver.hpp"
+#include "src/kernel/resource/CpuImpl.hpp"
 #include "src/mc/mc_replay.hpp"
 
 #include <boost/range/algorithm.hpp>
@@ -58,18 +61,52 @@ const char* ActivityImpl::get_state_str() const
   return to_c_str(state_);
 }
 
-bool ActivityImpl::test()
+bool ActivityImpl::test(actor::ActorImpl* issuer)
 {
+  // Associate this simcall to the synchro
+  auto* observer = dynamic_cast<kernel::actor::ActivityTestSimcall*>(issuer->simcall_.observer_);
+  if (observer)
+    register_simcall(&issuer->simcall_);
+
   if (state_ != State::WAITING && state_ != State::RUNNING) {
     finish();
+    issuer->exception_ = nullptr; // Do not propagate exception in that case
     return true;
   }
+
+  if (observer) {
+    observer->set_result(false);
+    issuer->waiting_synchro_ = nullptr;
+    unregister_simcall(&issuer->simcall_);
+    issuer->simcall_answer();
+  }
   return false;
 }
 
+ssize_t ActivityImpl::test_any(actor::ActorImpl* issuer, const std::vector<ActivityImpl*>& activities)
+{
+  if (MC_is_active() || MC_record_replay_is_active()) {
+    int idx = issuer->simcall_.mc_value_;
+    xbt_assert(idx == -1 || activities[idx]->test(issuer));
+    return idx;
+  }
+
+  for (std::size_t i = 0; i < activities.size(); ++i) {
+    if (activities[i]->test(issuer)) {
+      auto* observer = dynamic_cast<kernel::actor::ActivityTestanySimcall*>(issuer->simcall_.observer_);
+      xbt_assert(observer != nullptr);
+      observer->set_result(i);
+      issuer->simcall_answer();
+      return i;
+    }
+  }
+  issuer->simcall_answer();
+  return -1;
+}
+
 void ActivityImpl::wait_for(actor::ActorImpl* issuer, double timeout)
 {
-  XBT_DEBUG("Wait for execution of synchro %p, state %s", this, to_c_str(state_));
+  XBT_DEBUG("Wait for execution of synchro %p, state %s", this, get_state_str());
   xbt_assert(std::isfinite(timeout), "timeout is not finite!");
 
   /* Associate this simcall to the synchro */
@@ -78,14 +115,25 @@ void ActivityImpl::wait_for(actor::ActorImpl* issuer, double timeout)
   xbt_assert(not MC_is_active() && not MC_record_replay_is_active(), "MC is currently not supported here.");
 
   /* If the synchro is already finished then perform the error handling */
-  if (state_ != State::RUNNING) {
+  if (state_ != State::WAITING && state_ != State::RUNNING) {
     finish();
   } else {
+    auto* comm = dynamic_cast<CommImpl*>(this);
+    if (comm != nullptr) {
+      resource::Action* sleep = issuer->get_host()->get_cpu()->sleep(timeout);
+      sleep->set_activity(comm);
+
+      if (issuer == comm->src_actor_)
+        comm->src_timeout_ = sleep;
+      else
+        comm->dst_timeout_ = sleep;
+    }
     /* we need a sleep action (even when the timeout is infinite) to be notified of host failures */
     RawImplPtr synchro(new RawImpl([this, issuer]() {
       this->unregister_simcall(&issuer->simcall_);
       issuer->waiting_synchro_ = nullptr;
-      auto* observer = dynamic_cast<kernel::actor::ActivityWaitSimcall*>(issuer->simcall_.observer_);
+      issuer->exception_       = nullptr;
+      auto* observer           = dynamic_cast<kernel::actor::ActivityWaitSimcall*>(issuer->simcall_.observer_);
       xbt_assert(observer != nullptr);
       observer->set_result(true);
     }));
@@ -94,6 +142,33 @@ void ActivityImpl::wait_for(actor::ActorImpl* issuer, double timeout)
   }
 }
 
+void ActivityImpl::wait_any_for(actor::ActorImpl* issuer, const std::vector<ActivityImpl*>& activities, double timeout)
+{
+  XBT_DEBUG("Wait for execution of any synchro");
+  if (timeout < 0.0) {
+    issuer->simcall_.timeout_cb_ = nullptr;
+  } else {
+    issuer->simcall_.timeout_cb_ = timer::Timer::set(s4u::Engine::get_clock() + timeout, [issuer, &activities]() {
+      issuer->simcall_.timeout_cb_ = nullptr;
+      for (auto* act : activities)
+        act->unregister_simcall(&issuer->simcall_);
+      // default result (-1) is set in actor::ActivityWaitanySimcall
+      issuer->simcall_answer();
+    });
+  }
+
+  for (auto* act : activities) {
+    /* associate this simcall to the the synchro */
+    act->simcalls_.push_back(&issuer->simcall_);
+    /* see if the synchro is already finished */
+    if (act->get_state() != State::WAITING && act->get_state() != State::RUNNING) {
+      act->finish();
+      break;
+    }
+  }
+  XBT_DEBUG("Exit from ActivityImlp::wait_any_for");
+}
+
 void ActivityImpl::suspend()
 {
   if (surf_action_ == nullptr)
@@ -120,6 +195,30 @@ void ActivityImpl::cancel()
   state_ = State::CANCELED;
 }
 
+void ActivityImpl::handle_activity_waitany(smx_simcall_t simcall)
+{
+  /* If a waitany simcall is waiting for this synchro to finish, then remove it from the other synchros in the waitany
+   * list. Afterwards, get the position of the actual synchro in the waitany list and return it as the result of the
+   * simcall */
+  if (auto* observer = dynamic_cast<actor::ActivityWaitanySimcall*>(simcall->observer_)) {
+    if (simcall->timeout_cb_) {
+      simcall->timeout_cb_->remove();
+      simcall->timeout_cb_ = nullptr;
+    }
+
+    auto activities = observer->get_activities();
+    for (auto* act : activities)
+      act->unregister_simcall(simcall);
+
+    if (not MC_is_active() && not MC_record_replay_is_active()) {
+      auto element   = std::find(activities.begin(), activities.end(), this);
+      int rank       = element != activities.end() ? static_cast<int>(std::distance(activities.begin(), element)) : -1;
+      auto* observer = dynamic_cast<kernel::actor::ActivityWaitanySimcall*>(simcall->observer_);
+      observer->set_result(rank);
+    }
+  }
+}
+
 // boost::intrusive_ptr<Activity> support:
 void intrusive_ptr_add_ref(ActivityImpl* activity)
 {
index 33b1101..7acbe2b 100644 (file)
@@ -65,8 +65,11 @@ public:
   void set_finish_time(double finish_time) { finish_time_ = finish_time; }
   double get_finish_time() const { return finish_time_; }
 
-  virtual bool test();
+  virtual bool test(actor::ActorImpl* issuer);
+  static ssize_t test_any(actor::ActorImpl* issuer, const std::vector<ActivityImpl*>& activities);
+
   virtual void wait_for(actor::ActorImpl* issuer, double timeout);
+  static void wait_any_for(actor::ActorImpl* issuer, const std::vector<ActivityImpl*>& activities, double timeout);
   virtual ActivityImpl& set_timeout(double) { THROW_UNIMPLEMENTED; }
 
   virtual void suspend();
@@ -81,6 +84,7 @@ public:
 
   void register_simcall(smx_simcall_t simcall);
   void unregister_simcall(smx_simcall_t simcall);
+  void handle_activity_waitany(smx_simcall_t simcall);
   void clean_action();
   virtual double get_remaining() const;
   // Support for the boost::intrusive_ptr<ActivityImpl> datatype
index 39fe618..44a6e80 100644 (file)
@@ -185,15 +185,15 @@ void simcall_HANDLER_comm_wait(smx_simcall_t simcall, simgrid::kernel::activity:
   comm->wait_for(simcall->issuer_, timeout);
 }
 
-bool simcall_HANDLER_comm_test(smx_simcall_t, simgrid::kernel::activity::CommImpl* comm)
+bool simcall_HANDLER_comm_test(smx_simcall_t simcall, simgrid::kernel::activity::CommImpl* comm)
 {
-  return comm->test();
+  return comm->test(simcall->issuer_);
 }
 
 ssize_t simcall_HANDLER_comm_testany(smx_simcall_t simcall, simgrid::kernel::activity::CommImpl* comms[], size_t count)
 {
-  std::vector<simgrid::kernel::activity::CommImpl*> comms_vec(comms, comms + count);
-  return simgrid::kernel::activity::CommImpl::test_any(simcall->issuer_, comms_vec);
+  std::vector<simgrid::kernel::activity::ActivityImpl*> comms_vec(comms, comms + count);
+  return simgrid::kernel::activity::ActivityImpl::test_any(simcall->issuer_, comms_vec);
 }
 
 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, simgrid::kernel::activity::CommImpl* comms[], size_t count,
@@ -387,11 +387,11 @@ void CommImpl::copy_data()
   copied_ = true;
 }
 
-bool CommImpl::test()
+bool CommImpl::test(actor::ActorImpl* issuer)
 {
   if ((MC_is_active() || MC_record_replay_is_active()) && src_actor_ && dst_actor_)
     set_state(State::DONE);
-  return ActivityImpl::test();
+  return ActivityImpl::test(issuer);
 }
 
 void CommImpl::wait_for(actor::ActorImpl* issuer, double timeout)
@@ -400,7 +400,6 @@ void CommImpl::wait_for(actor::ActorImpl* issuer, double timeout)
 
   /* Associate this simcall to the wait synchro */
   register_simcall(&issuer->simcall_);
-
   if (MC_is_active() || MC_record_replay_is_active()) {
     int idx = issuer->simcall_.mc_value_;
     if (idx == 0) {
@@ -415,35 +414,7 @@ void CommImpl::wait_for(actor::ActorImpl* issuer, double timeout)
     finish();
     return;
   }
-
-  /* If the synchro has already finish perform the error handling, */
-  /* otherwise set up a waiting timeout on the right side          */
-  if (get_state() != State::WAITING && get_state() != State::RUNNING) {
-    finish();
-  } else { /* we need a sleep action (even when there is no timeout) to be notified of host failures */
-    resource::Action* sleep = issuer->get_host()->get_cpu()->sleep(timeout);
-    sleep->set_activity(this);
-
-    if (issuer == src_actor_)
-      src_timeout_ = sleep;
-    else
-      dst_timeout_ = sleep;
-  }
-}
-
-ssize_t CommImpl::test_any(const actor::ActorImpl* issuer, const std::vector<CommImpl*>& comms)
-{
-  if (MC_is_active() || MC_record_replay_is_active()) {
-    int idx = issuer->simcall_.mc_value_;
-    xbt_assert(idx == -1 || comms[idx]->test());
-    return idx;
-  }
-
-  for (std::size_t i = 0; i < comms.size(); ++i) {
-    if (comms[i]->test())
-      return i;
-  }
-  return -1;
+  ActivityImpl::wait_for(issuer, timeout);
 }
 
 void CommImpl::wait_any_for(actor::ActorImpl* issuer, const std::vector<CommImpl*>& comms, double timeout)
@@ -458,31 +429,8 @@ void CommImpl::wait_any_for(actor::ActorImpl* issuer, const std::vector<CommImpl
     comm->finish();
     return;
   }
-
-  if (timeout < 0.0) {
-    issuer->simcall_.timeout_cb_ = nullptr;
-  } else {
-    issuer->simcall_.timeout_cb_ = timer::Timer::set(s4u::Engine::get_clock() + timeout, [issuer, comms]() {
-      // FIXME: Vector `comms' is copied here. Use a reference once its lifetime is extended (i.e. when the simcall is
-      // modernized).
-      issuer->simcall_.timeout_cb_ = nullptr;
-      for (auto* comm : comms)
-        comm->unregister_simcall(&issuer->simcall_);
-      simcall_comm_waitany__set__result(&issuer->simcall_, -1);
-      issuer->simcall_answer();
-    });
-  }
-
-  for (auto* comm : comms) {
-    /* associate this simcall to the the synchro */
-    comm->simcalls_.push_back(&issuer->simcall_);
-
-    /* see if the synchro is already finished */
-    if (comm->get_state() != State::WAITING && comm->get_state() != State::RUNNING) {
-      comm->finish();
-      break;
-    }
-  }
+  std::vector<ActivityImpl*> activities(comms.begin(), comms.end());
+  ActivityImpl::wait_any_for(issuer, activities, timeout);
 }
 
 void CommImpl::suspend()
@@ -643,21 +591,8 @@ void CommImpl::finish()
 
     if (simcall->call_ == simix::Simcall::NONE) // FIXME: maybe a better way to handle this case
       continue;                                 // if actor handling comm is killed
-    if (simcall->call_ == simix::Simcall::COMM_WAITANY) {
-      CommImpl** comms = simcall_comm_waitany__get__comms(simcall);
-      size_t count     = simcall_comm_waitany__get__count(simcall);
-      for (size_t i = 0; i < count; i++)
-        comms[i]->unregister_simcall(simcall);
-      if (simcall->timeout_cb_) {
-        simcall->timeout_cb_->remove();
-        simcall->timeout_cb_ = nullptr;
-      }
-      if (not MC_is_active() && not MC_record_replay_is_active()) {
-        auto element = std::find(comms, comms + count, this);
-        ssize_t rank = (element != comms + count) ? element - comms : -1;
-        simcall_comm_waitany__set__result(simcall, rank);
-      }
-    }
+
+    handle_activity_waitany(simcall);
 
     /* Check out for errors */
 
@@ -667,29 +602,6 @@ void CommImpl::finish()
       set_exception(simcall->issuer_);
       simcall->issuer_->simcall_answer();
     }
-    /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
-    if (simcall->issuer_->exception_ &&
-        (simcall->call_ == simix::Simcall::COMM_WAITANY || simcall->call_ == simix::Simcall::COMM_TESTANY)) {
-      // First retrieve the rank of our failing synchro
-      CommImpl** comms;
-      size_t count;
-      if (simcall->call_ == simix::Simcall::COMM_WAITANY) {
-        comms = simcall_comm_waitany__get__comms(simcall);
-        count = simcall_comm_waitany__get__count(simcall);
-      } else {
-        /* simcall->call_ == simix::Simcall::COMM_TESTANY */
-        comms = simcall_comm_testany__get__comms(simcall);
-        count = simcall_comm_testany__get__count(simcall);
-      }
-      auto element = std::find(comms, comms + count, this);
-      ssize_t rank = (element != comms + count) ? element - comms : -1;
-      // In order to modify the exception we have to rethrow it:
-      try {
-        std::rethrow_exception(simcall->issuer_->exception_);
-      } catch (Exception& e) {
-        e.set_value(rank);
-      }
-    }
 
     simcall->issuer_->waiting_synchro_ = nullptr;
     simcall->issuer_->activities_.remove(this);
index 87a5091..883aaaa 100644 (file)
@@ -49,9 +49,8 @@ public:
   std::vector<s4u::Link*> get_traversed_links() const;
   void copy_data();
 
-  bool test() override;
+  bool test(actor::ActorImpl* issuer) override;
   void wait_for(actor::ActorImpl* issuer, double timeout) override;
-  static ssize_t test_any(const actor::ActorImpl* issuer, const std::vector<CommImpl*>& comms);
   static void wait_any_for(actor::ActorImpl* issuer, const std::vector<CommImpl*>& comms, double timeout);
 
   CommImpl* start();
index 85ab1e9..2eaaa8f 100644 (file)
@@ -199,26 +199,7 @@ void ExecImpl::finish()
     if (simcall->call_ == simix::Simcall::NONE) // FIXME: maybe a better way to handle this case
       continue;                                 // if process handling comm is killed
 
-    /* If a waitany simcall is waiting for this synchro to finish, then remove it from the other synchros in the waitany
-     * list. Afterwards, get the position of the actual synchro in the waitany list and return it as the result of the
-     * simcall */
-    if (auto* observer = dynamic_cast<actor::ExecutionWaitanySimcall*>(simcall->observer_)) {
-      const auto& execs = observer->get_execs();
-
-      for (auto* exec : execs)
-        exec->unregister_simcall(simcall);
-
-      if (simcall->timeout_cb_) {
-        simcall->timeout_cb_->remove();
-        simcall->timeout_cb_ = nullptr;
-      }
-
-      if (not MC_is_active() && not MC_record_replay_is_active()) {
-        auto element = std::find(execs.begin(), execs.end(), this);
-        int rank     = element != execs.end() ? static_cast<int>(std::distance(execs.begin(), element)) : -1;
-        observer->set_result(rank);
-      }
-    }
+    handle_activity_waitany(simcall);
 
     set_exception(simcall->issuer_);
 
@@ -259,31 +240,6 @@ ActivityImpl* ExecImpl::migrate(s4u::Host* to)
   return this;
 }
 
-void ExecImpl::wait_any_for(actor::ActorImpl* issuer, const std::vector<ExecImpl*>& execs, double timeout)
-{
-  if (timeout < 0.0) {
-    issuer->simcall_.timeout_cb_ = nullptr;
-  } else {
-    issuer->simcall_.timeout_cb_ = timer::Timer::set(s4u::Engine::get_clock() + timeout, [issuer, &execs]() {
-      issuer->simcall_.timeout_cb_ = nullptr;
-      for (auto* exec : execs)
-        exec->unregister_simcall(&issuer->simcall_);
-      // default result (-1) is set in actor::ExecutionWaitanySimcall
-      issuer->simcall_answer();
-    });
-  }
-
-  for (auto* exec : execs) {
-    /* associate this simcall to the the synchro */
-    exec->simcalls_.push_back(&issuer->simcall_);
-    /* see if the synchro is already finished */
-    if (exec->get_state() != State::WAITING && exec->get_state() != State::RUNNING) {
-      exec->finish();
-      break;
-    }
-  }
-}
-
 /*************
  * Callbacks *
  *************/
index 0aefc02..cbe373f 100644 (file)
@@ -56,7 +56,6 @@ public:
   void finish() override;
 
   void reset();
-  static void wait_any_for(actor::ActorImpl* issuer, const std::vector<ExecImpl*>& execs, double timeout);
 
   static xbt::signal<void(ExecImpl const&, s4u::Host*)> on_migration;
 };
index 8d96916..61c160b 100644 (file)
@@ -147,24 +147,8 @@ void IoImpl::finish()
 
     if (simcall->call_ == simix::Simcall::NONE) // FIXME: maybe a better way to handle this case
       continue;                                 // if process handling comm is killed
-    if (auto* observer = dynamic_cast<kernel::actor::IoWaitanySimcall*>(simcall->observer_)) { // simcall is a wait_any?
-      const auto& ios = observer->get_ios();
-
-      for (auto* io : ios) {
-        io->unregister_simcall(simcall);
-
-        if (simcall->timeout_cb_) {
-          simcall->timeout_cb_->remove();
-          simcall->timeout_cb_ = nullptr;
-        }
-      }
-
-      if (not MC_is_active() && not MC_record_replay_is_active()) {
-        auto element = std::find(ios.begin(), ios.end(), this);
-        int rank     = element != ios.end() ? static_cast<int>(std::distance(ios.begin(), element)) : -1;
-        observer->set_result(rank);
-      }
-    }
+
+    handle_activity_waitany(simcall);
 
     set_exception(simcall->issuer_);
 
@@ -173,32 +157,6 @@ void IoImpl::finish()
   }
 }
 
-void IoImpl::wait_any_for(actor::ActorImpl* issuer, const std::vector<IoImpl*>& ios, double timeout)
-{
-  if (timeout < 0.0) {
-    issuer->simcall_.timeout_cb_ = nullptr;
-  } else {
-    issuer->simcall_.timeout_cb_ = timer::Timer::set(s4u::Engine::get_clock() + timeout, [issuer, &ios]() {
-      issuer->simcall_.timeout_cb_ = nullptr;
-      for (auto* io : ios)
-        io->unregister_simcall(&issuer->simcall_);
-      // default result (-1) is set in actor::IoWaitanySimcall
-      issuer->simcall_answer();
-    });
-  }
-
-  for (auto* io : ios) {
-    /* associate this simcall to the the synchro */
-    io->simcalls_.push_back(&issuer->simcall_);
-
-    /* see if the synchro is already finished */
-    if (io->get_state() != State::WAITING && io->get_state() != State::RUNNING) {
-      io->finish();
-      break;
-    }
-  }
-}
-
 } // namespace activity
 } // namespace kernel
 } // namespace simgrid
index bb0b666..1c9dbd5 100644 (file)
@@ -39,7 +39,6 @@ public:
   void post() override;
   void set_exception(actor::ActorImpl* issuer) override;
   void finish() override;
-  static void wait_any_for(actor::ActorImpl* issuer, const std::vector<IoImpl*>& ios, double timeout);
 };
 } // namespace activity
 } // namespace kernel
index c49720c..19ff81c 100644 (file)
@@ -279,7 +279,6 @@ void ActorImpl::yield()
 
   /* Go into sleep and return control to maestro */
   context_->suspend();
-
   /* Ok, maestro returned control to us */
   XBT_DEBUG("Control returned to me: '%s'", get_cname());
 
@@ -305,7 +304,6 @@ void ActorImpl::yield()
       e.rethrow_nested(XBT_THROW_POINT, boost::core::demangle(typeid(e).name()) + " raised in kernel mode.");
     }
   }
-
 #if HAVE_SMPI
   if (not finished_)
     smpi_switch_data_segment(get_iface());
index 8903daa..b15f77b 100644 (file)
@@ -5,8 +5,10 @@
 
 #include "src/kernel/actor/SimcallObserver.hpp"
 #include "simgrid/s4u/Host.hpp"
+#include "src/kernel/activity/CommImpl.hpp"
 #include "src/kernel/activity/MutexImpl.hpp"
 #include "src/kernel/actor/ActorImpl.hpp"
+#include "src/mc/mc_config.hpp"
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(mc_observer, mc, "Logging specific to MC simcall observation");
 
@@ -52,10 +54,10 @@ std::string SimcallObserver::to_string(int /*times_considered*/) const
                                      issuer_->get_cname());
 }
 
-std::string SimcallObserver::dot_label() const
+std::string SimcallObserver::dot_label(int /*times_considered*/) const
 {
   if (issuer_->get_host())
-    return xbt::string_printf("[(%ld)%s] ", issuer_->get_pid(), issuer_->get_cname());
+    return xbt::string_printf("[(%ld)%s] ", issuer_->get_pid(), issuer_->get_host()->get_cname());
   return xbt::string_printf("[(%ld)] ", issuer_->get_pid());
 }
 
@@ -64,9 +66,9 @@ std::string RandomSimcall::to_string(int times_considered) const
   return SimcallObserver::to_string(times_considered) + "MC_RANDOM(" + std::to_string(times_considered) + ")";
 }
 
-std::string RandomSimcall::dot_label() const
+std::string RandomSimcall::dot_label(int times_considered) const
 {
-  return SimcallObserver::dot_label() + "MC_RANDOM(" + std::to_string(next_value_) + ")";
+  return SimcallObserver::dot_label(times_considered) + "MC_RANDOM(" + std::to_string(next_value_) + ")";
 }
 
 void RandomSimcall::prepare(int times_considered)
@@ -85,9 +87,9 @@ std::string MutexUnlockSimcall::to_string(int times_considered) const
   return SimcallObserver::to_string(times_considered) + "Mutex UNLOCK";
 }
 
-std::string MutexUnlockSimcall::dot_label() const
+std::string MutexUnlockSimcall::dot_label(int times_considered) const
 {
-  return SimcallObserver::dot_label() + "Mutex UNLOCK";
+  return SimcallObserver::dot_label(times_considered) + "Mutex UNLOCK";
 }
 
 std::string MutexLockSimcall::to_string(int times_considered) const
@@ -100,9 +102,9 @@ std::string MutexLockSimcall::to_string(int times_considered) const
   return res;
 }
 
-std::string MutexLockSimcall::dot_label() const
+std::string MutexLockSimcall::dot_label(int times_considered) const
 {
-  return SimcallObserver::dot_label() + (blocking_ ? "Mutex LOCK" : "Mutex TRYLOCK");
+  return SimcallObserver::dot_label(times_considered) + (blocking_ ? "Mutex LOCK" : "Mutex TRYLOCK");
 }
 
 bool MutexLockSimcall::is_enabled() const
@@ -117,9 +119,9 @@ std::string ConditionWaitSimcall::to_string(int times_considered) const
   return res;
 }
 
-std::string ConditionWaitSimcall::dot_label() const
+std::string ConditionWaitSimcall::dot_label(int times_considered) const
 {
-  return SimcallObserver::dot_label() + "Condition WAIT";
+  return SimcallObserver::dot_label(times_considered) + "Condition WAIT";
 }
 
 bool ConditionWaitSimcall::is_enabled() const
@@ -139,9 +141,9 @@ std::string SemAcquireSimcall::to_string(int times_considered) const
   return res;
 }
 
-std::string SemAcquireSimcall::dot_label() const
+std::string SemAcquireSimcall::dot_label(int times_considered) const
 {
-  return SimcallObserver::dot_label() + "Sem ACQUIRE";
+  return SimcallObserver::dot_label(times_considered) + "Sem ACQUIRE";
 }
 
 bool SemAcquireSimcall::is_enabled() const
@@ -154,28 +156,173 @@ bool SemAcquireSimcall::is_enabled() const
   return true;
 }
 
-std::string ExecutionWaitanySimcall::to_string(int times_considered) const
+int ActivityTestanySimcall::get_max_consider() const
 {
-  std::string res = SimcallObserver::to_string(times_considered) + "Execution WAITANY";
-  res += "(" + (timeout_ == -1.0 ? "" : std::to_string(timeout_)) + ")";
+  // Only Comms are of interest to MC for now. When all types of activities can be consider, this function can simply
+  // return the size of activities_.
+  int count = 0;
+  for (const auto& act : activities_)
+    if (dynamic_cast<activity::CommImpl*>(act) != nullptr)
+      count++;
+  return count;
+}
+
+std::string ActivityTestanySimcall::to_string(int times_considered) const
+{
+  std::string res = SimcallObserver::to_string(times_considered);
+  if (times_considered == -1) {
+    res += "TestAny FALSE(-)";
+  } else {
+    res += "TestAny(" + xbt::string_printf("(%d of %zu)", times_considered + 1, activities_.size());
+  }
+
   return res;
 }
 
-std::string ExecutionWaitanySimcall::dot_label() const
+std::string ActivityTestanySimcall::dot_label(int times_considered) const
 {
-  return SimcallObserver::dot_label() + "Execution WAITANY";
+  std::string res = SimcallObserver::dot_label(times_considered) + "TestAny ";
+  if (times_considered == -1) {
+    res += "FALSE";
+  } else {
+    res += xbt::string_printf("TRUE [%d of %zu]", times_considered + 1, activities_.size());
+  }
+  return res;
 }
 
-std::string IoWaitanySimcall::to_string(int times_considered) const
+std::string ActivityTestSimcall::to_string(int times_considered) const
 {
-  std::string res = SimcallObserver::to_string(times_considered) + "I/O WAITANY";
-  res += "(" + (timeout_ == -1.0 ? "" : std::to_string(timeout_)) + ")";
+  std::string res = SimcallObserver::to_string(times_considered) + "Test ";
+  auto* comm      = dynamic_cast<activity::CommImpl*>(activity_);
+  if (comm) {
+    if (comm->src_actor_.get() == nullptr || comm->dst_actor_.get() == nullptr) {
+      res += "FALSE(comm=";
+      res += XBT_LOG_ISENABLED(mc_observer, xbt_log_priority_verbose) ? xbt::string_printf("%p)", comm)
+                                                                      : "(verbose only))";
+    } else {
+      res += "TRUE(comm=";
+
+      auto src = comm->src_actor_;
+      auto dst = comm->dst_actor_;
+      res +=
+          XBT_LOG_ISENABLED(mc_observer, xbt_log_priority_verbose) ? xbt::string_printf("%p", comm) : "(verbose only) ";
+      res += xbt::string_printf("[(%ld)%s (%s) ", src->get_pid(), src->get_host()->get_cname(), src->get_cname()) +
+             "-> " +
+             xbt::string_printf("(%ld)%s (%s)])", dst->get_pid(), dst->get_host()->get_cname(), dst->get_cname());
+    }
+  }
+  return res;
+}
+
+std::string ActivityTestSimcall::dot_label(int times_considered) const
+{
+  std::string res = SimcallObserver::dot_label(times_considered) + "Test ";
+  auto* comm      = dynamic_cast<activity::CommImpl*>(activity_);
+  if (comm && (comm->src_actor_.get() == nullptr || comm->dst_actor_.get() == nullptr)) {
+    res += "FALSE";
+  } else {
+    res += "TRUE";
+  }
+  return res;
+}
+
+std::string ActivityWaitSimcall::to_string(int times_considered) const
+{
+  std::string res = SimcallObserver::to_string(times_considered);
+  auto* comm      = dynamic_cast<activity::CommImpl*>(activity_);
+  if (comm == nullptr)
+    return res;
+  if (times_considered == -1) {
+    res += "WaitTimeout(comm=" + XBT_LOG_ISENABLED(mc_observer, xbt_log_priority_verbose)
+               ? xbt::string_printf("%p)", comm)
+               : "(verbose only))";
+  } else {
+    res += "Wait(comm=";
+
+    auto src = comm->src_actor_;
+    auto dst = comm->dst_actor_;
+    res +=
+        XBT_LOG_ISENABLED(mc_observer, xbt_log_priority_verbose) ? xbt::string_printf("%p", comm) : "(verbose only) ";
+    res += xbt::string_printf("[(%ld)%s (%s) ", src->get_pid(), src->get_host()->get_cname(), src->get_cname()) +
+           "-> " + xbt::string_printf("(%ld)%s (%s)])", dst->get_pid(), dst->get_host()->get_cname(), dst->get_cname());
+  }
   return res;
 }
 
-std::string IoWaitanySimcall::dot_label() const
+std::string ActivityWaitSimcall::dot_label(int times_considered) const
 {
-  return SimcallObserver::dot_label() + "I/O WAITANY";
+  std::string res = SimcallObserver::dot_label(times_considered);
+  res += (times_considered == -1) ? "WaitTimeout " : "Wait ";
+
+  auto* comm = dynamic_cast<activity::CommImpl*>(activity_);
+  if (comm) {
+    auto src = comm->src_actor_;
+    auto dst = comm->dst_actor_;
+    res += " [(" + std::to_string(src ? src->get_pid() : 0) + ")";
+    res += "->(" + std::to_string(dst ? dst->get_pid() : 0) + ")]";
+  }
+  return res;
+}
+
+bool ActivityWaitSimcall::is_enabled() const
+{
+  /* FIXME: check also that src and dst processes are not suspended */
+  const auto* comm = dynamic_cast<activity::CommImpl*>(activity_);
+  if (comm == nullptr)
+    return true;
+
+  if (comm->src_timeout_ || comm->dst_timeout_) {
+    /* If it has a timeout it will be always be enabled (regardless of who declared the timeout),
+     * because even if the communication is not ready, it can timeout and won't block. */
+    if (_sg_mc_timeout == 1)
+      return true;
+  }
+  /* On the other hand if it hasn't a timeout, check if the comm is ready.*/
+  else if (comm->detached() && comm->src_actor_ == nullptr && comm->get_state() == activity::State::READY)
+    return (comm->dst_actor_ != nullptr);
+  return (comm->src_actor_ && comm->dst_actor_);
+}
+
+std::string ActivityWaitanySimcall::dot_label(int times_considered) const
+{
+  return SimcallObserver::dot_label(times_considered) +
+         xbt::string_printf("WaitAny [%d of %zu]", times_considered + 1, activities_.size());
+}
+
+bool ActivityWaitanySimcall::is_enabled() const
+{
+  // FIXME: deal with other kind of activities (Exec and I/Os)
+  for (auto act : activities_) {
+    const auto* comm = dynamic_cast<activity::CommImpl*>(act);
+    if (comm != nullptr && comm->src_actor_ && comm->dst_actor_)
+      return true;
+  }
+  return false;
+}
+
+int ActivityWaitanySimcall::get_max_consider() const
+{
+  // Only Comms are of interest to MC for now. When all types of activities can be consider, this function can simply
+  // return the size of activities_.
+  int count = 0;
+  for (const auto& act : activities_)
+    if (dynamic_cast<activity::CommImpl*>(act) != nullptr)
+      count++;
+  return count;
+}
+
+std::string ActivityWaitanySimcall::to_string(int times_considered) const
+{
+  std::string res = SimcallObserver::to_string(times_considered) + "WaitAny(";
+  size_t count    = activities_.size();
+  if (count > 0) {
+    if (auto* comm = dynamic_cast<kernel::activity::CommImpl*>(activities_[times_considered]))
+      res += "comm=" + XBT_LOG_ISENABLED(mc_observer, xbt_log_priority_verbose)
+                 ? xbt::string_printf("%p", comm)
+                 : "(verbose only)" + xbt::string_printf("(%d of %zu))", times_considered + 1, count);
+  } else
+    res += "comm at idx " + std::to_string(times_considered) + ")";
+  return res;
 }
 
 } // namespace actor
index bdafcf4..d9a7b7f 100644 (file)
@@ -60,14 +60,14 @@ public:
    * Most simcalls are not visible from the MC because they don't have an observer at all. */
   virtual bool is_visible() const { return true; }
   virtual std::string to_string(int times_considered) const = 0;
-  virtual std::string dot_label() const                     = 0;
+  virtual std::string dot_label(int times_considered) const = 0;
 };
 
 template <class T> class ResultingSimcall : public SimcallObserver {
   T result_;
 
 public:
-  ResultingSimcall(smx_actor_t actor, T default_result) : SimcallObserver(actor), result_(default_result) {}
+  ResultingSimcall(ActorImpl* actor, T default_result) : SimcallObserver(actor), result_(default_result) {}
   void set_result(T res) { result_ = res; }
   T get_result() const { return result_; }
 };
@@ -78,7 +78,7 @@ class RandomSimcall : public SimcallObserver {
   int next_value_ = 0;
 
 public:
-  RandomSimcall(smx_actor_t actor, int min, int max) : SimcallObserver(actor), min_(min), max_(max)
+  RandomSimcall(ActorImpl* actor, int min, int max) : SimcallObserver(actor), min_(min), max_(max)
   {
     xbt_assert(min < max);
   }
@@ -91,7 +91,7 @@ public:
   int get_max_consider() const override;
   void prepare(int times_considered) override;
   std::string to_string(int times_considered) const override;
-  std::string dot_label() const override;
+  std::string dot_label(int times_considered) const override;
   int get_value() const { return next_value_; }
   bool depends(SimcallObserver* other) override;
 };
@@ -100,7 +100,7 @@ class MutexSimcall : public SimcallObserver {
   activity::MutexImpl* const mutex_;
 
 public:
-  MutexSimcall(smx_actor_t actor, activity::MutexImpl* mutex) : SimcallObserver(actor), mutex_(mutex) {}
+  MutexSimcall(ActorImpl* actor, activity::MutexImpl* mutex) : SimcallObserver(actor), mutex_(mutex) {}
   activity::MutexImpl* get_mutex() const { return mutex_; }
   bool depends(SimcallObserver* other) override;
 };
@@ -111,21 +111,21 @@ class MutexUnlockSimcall : public MutexSimcall {
 public:
   SimcallObserver* clone() override { return new MutexUnlockSimcall(get_issuer(), get_mutex()); }
   std::string to_string(int times_considered) const override;
-  std::string dot_label() const override;
+  std::string dot_label(int times_considered) const override;
 };
 
 class MutexLockSimcall : public MutexSimcall {
   const bool blocking_;
 
 public:
-  MutexLockSimcall(smx_actor_t actor, activity::MutexImpl* mutex, bool blocking = true)
+  MutexLockSimcall(ActorImpl* actor, activity::MutexImpl* mutex, bool blocking = true)
       : MutexSimcall(actor, mutex), blocking_(blocking)
   {
   }
   SimcallObserver* clone() override { return new MutexLockSimcall(get_issuer(), get_mutex(), blocking_); }
   bool is_enabled() const override;
   std::string to_string(int times_considered) const override;
-  std::string dot_label() const override;
+  std::string dot_label(int times_considered) const override;
 };
 
 class ConditionWaitSimcall : public ResultingSimcall<bool> {
@@ -134,7 +134,7 @@ class ConditionWaitSimcall : public ResultingSimcall<bool> {
   const double timeout_;
 
 public:
-  ConditionWaitSimcall(smx_actor_t actor, activity::ConditionVariableImpl* cond, activity::MutexImpl* mutex,
+  ConditionWaitSimcall(ActorImpl* actor, activity::ConditionVariableImpl* cond, activity::MutexImpl* mutex,
                        double timeout = -1.0)
       : ResultingSimcall(actor, false), cond_(cond), mutex_(mutex), timeout_(timeout)
   {
@@ -143,7 +143,7 @@ public:
   bool is_enabled() const override;
   bool is_visible() const override { return false; }
   std::string to_string(int times_considered) const override;
-  std::string dot_label() const override;
+  std::string dot_label(int times_considered) const override;
   activity::ConditionVariableImpl* get_cond() const { return cond_; }
   activity::MutexImpl* get_mutex() const { return mutex_; }
   double get_timeout() const { return timeout_; }
@@ -154,7 +154,7 @@ class SemAcquireSimcall : public ResultingSimcall<bool> {
   const double timeout_;
 
 public:
-  SemAcquireSimcall(smx_actor_t actor, activity::SemaphoreImpl* sem, double timeout = -1.0)
+  SemAcquireSimcall(ActorImpl* actor, activity::SemaphoreImpl* sem, double timeout = -1.0)
       : ResultingSimcall(actor, false), sem_(sem), timeout_(timeout)
   {
   }
@@ -162,61 +162,80 @@ public:
   bool is_enabled() const override;
   bool is_visible() const override { return false; }
   std::string to_string(int times_considered) const override;
-  std::string dot_label() const override;
+  std::string dot_label(int times_considered) const override;
   activity::SemaphoreImpl* get_sem() const { return sem_; }
   double get_timeout() const { return timeout_; }
 };
 
-class ActivityWaitSimcall : public ResultingSimcall<bool> {
+class ActivityTestSimcall : public ResultingSimcall<bool> {
   activity::ActivityImpl* const activity_;
-  const double timeout_;
 
 public:
-  ActivityWaitSimcall(smx_actor_t actor, activity::ActivityImpl* activity, double timeout)
-      : ResultingSimcall(actor, false), activity_(activity), timeout_(timeout)
+  ActivityTestSimcall(ActorImpl* actor, activity::ActivityImpl* activity)
+      : ResultingSimcall(actor, true), activity_(activity)
   {
   }
-  SimcallObserver* clone() override { return new ActivityWaitSimcall(get_issuer(), activity_, timeout_); }
-  bool is_visible() const override { return false; }
-  std::string to_string(int times_considered) const override { return SimcallObserver::to_string(times_considered); }
-  std::string dot_label() const override { return SimcallObserver::dot_label(); }
+  SimcallObserver* clone() override { return new ActivityTestSimcall(get_issuer(), activity_); }
+  bool is_visible() const override { return true; }
+  std::string to_string(int times_considered) const override;
+  std::string dot_label(int times_considered) const override;
   activity::ActivityImpl* get_activity() const { return activity_; }
-  double get_timeout() const { return timeout_; }
 };
 
-class ExecutionWaitanySimcall : public ResultingSimcall<ssize_t> {
-  const std::vector<activity::ExecImpl*>& execs_;
+class ActivityTestanySimcall : public ResultingSimcall<ssize_t> {
+  const std::vector<activity::ActivityImpl*>& activities_;
+
+public:
+  ActivityTestanySimcall(ActorImpl* actor, const std::vector<activity::ActivityImpl*>& activities)
+      : ResultingSimcall(actor, -1), activities_(activities)
+  {
+  }
+  SimcallObserver* clone() override { return new ActivityTestanySimcall(get_issuer(), activities_); }
+  bool is_visible() const override { return true; }
+  int get_max_consider() const override;
+  std::string to_string(int times_considered) const override;
+  std::string dot_label(int times_considered) const override;
+  const std::vector<activity::ActivityImpl*>& get_activities() const { return activities_; }
+};
+
+class ActivityWaitSimcall : public ResultingSimcall<bool> {
+  activity::ActivityImpl* activity_;
   const double timeout_;
 
 public:
-  ExecutionWaitanySimcall(smx_actor_t actor, const std::vector<activity::ExecImpl*>& execs, double timeout)
-      : ResultingSimcall(actor, -1), execs_(execs), timeout_(timeout)
+  ActivityWaitSimcall(ActorImpl* actor, activity::ActivityImpl* activity, double timeout)
+      : ResultingSimcall(actor, false), activity_(activity), timeout_(timeout)
   {
   }
-  SimcallObserver* clone() override { return new ExecutionWaitanySimcall(get_issuer(), execs_, timeout_); }
-  bool is_visible() const override { return false; }
+  SimcallObserver* clone() override { return new ActivityWaitSimcall(get_issuer(), activity_, timeout_); }
+  bool is_visible() const override { return true; }
+  bool is_enabled() const override;
   std::string to_string(int times_considered) const override;
-  std::string dot_label() const override;
-  const std::vector<activity::ExecImpl*>& get_execs() const { return execs_; }
+  std::string dot_label(int times_considered) const override;
+  activity::ActivityImpl* get_activity() const { return activity_; }
+  void set_activity(activity::ActivityImpl* activity) { activity_ = activity; }
   double get_timeout() const { return timeout_; }
 };
 
-class IoWaitanySimcall : public ResultingSimcall<ssize_t> {
-  const std::vector<activity::IoImpl*>& ios_;
+class ActivityWaitanySimcall : public ResultingSimcall<ssize_t> {
+  const std::vector<activity::ActivityImpl*>& activities_;
   const double timeout_;
 
 public:
-  IoWaitanySimcall(smx_actor_t actor, const std::vector<activity::IoImpl*>& ios, double timeout)
-      : ResultingSimcall(actor, -1), ios_(ios), timeout_(timeout)
+  ActivityWaitanySimcall(ActorImpl* actor, const std::vector<activity::ActivityImpl*>& activities, double timeout)
+      : ResultingSimcall(actor, -1), activities_(activities), timeout_(timeout)
   {
   }
-  SimcallObserver* clone() override { return new IoWaitanySimcall(get_issuer(), ios_, timeout_); }
-  bool is_visible() const override { return false; }
+  SimcallObserver* clone() override { return new ActivityWaitanySimcall(get_issuer(), activities_, timeout_); }
+  bool is_enabled() const override;
+  bool is_visible() const override { return true; }
+  int get_max_consider() const override;
   std::string to_string(int times_considered) const override;
-  std::string dot_label() const override;
-  const std::vector<activity::IoImpl*>& get_ios() const { return ios_; }
+  std::string dot_label(int times_considered) const override;
+  const std::vector<activity::ActivityImpl*>& get_activities() const { return activities_; }
   double get_timeout() const { return timeout_; }
 };
+
 } // namespace actor
 } // namespace kernel
 } // namespace simgrid
index 073e242..a02f0fc 100644 (file)
@@ -52,32 +52,30 @@ static std::string buff_size_to_string(size_t buff_size)
   return XBT_LOG_ISENABLED(Api, xbt_log_priority_verbose) ? std::to_string(buff_size) : "(verbose only)";
 }
 
-static void simcall_translate(smx_simcall_t req,
-                              simgrid::mc::Remote<simgrid::kernel::activity::CommImpl>& buffered_comm);
+static void simcall_translate(smx_simcall_t req, Remote<kernel::activity::CommImpl>& buffered_comm);
 
 static bool request_is_enabled_by_idx(const RemoteProcess& process, smx_simcall_t req, unsigned int idx)
 {
-  kernel::activity::CommImpl* remote_act = nullptr;
+  kernel::activity::ActivityImpl* remote_act = nullptr;
+  if (auto wait = dynamic_cast<kernel::actor::ActivityWaitSimcall*>(req->observer_))
+    /* FIXME: check also that src and dst processes are not suspended */
+    remote_act = wait->get_activity();
+  else if (auto waitany = dynamic_cast<kernel::actor::ActivityWaitanySimcall*>(req->observer_))
+    remote_act = waitany->get_activities().at(idx);
+  else if (auto testany = dynamic_cast<kernel::actor::ActivityTestanySimcall*>(req->observer_))
+    remote_act = testany->get_activities().at(idx);
+
   switch (req->call_) {
     case Simcall::COMM_WAIT:
-      /* FIXME: check also that src and dst processes are not suspended */
-      remote_act = simcall_comm_wait__getraw__comm(req);
-      break;
-
     case Simcall::COMM_WAITANY:
-      remote_act = process.read(remote(simcall_comm_waitany__get__comms(req) + idx));
-      break;
-
     case Simcall::COMM_TESTANY:
-      remote_act = process.read(remote(simcall_comm_testany__get__comms(req) + idx));
       break;
-
     default:
       return true;
   }
 
   Remote<kernel::activity::CommImpl> temp_comm;
-  process.read(temp_comm, remote(remote_act));
+  process.read(temp_comm, remote(static_cast<kernel::activity::CommImpl*>(remote_act)));
   const kernel::activity::CommImpl* comm = temp_comm.get_buffer();
   return comm->src_actor_.get() && comm->dst_actor_.get();
 }
@@ -227,12 +225,12 @@ static void simcall_translate(smx_simcall_t req,
   }
 }
 
-simgrid::kernel::activity::CommImpl* Api::get_comm_or_nullptr(smx_simcall_t const r) const
+kernel::activity::CommImpl* Api::get_comm_or_nullptr(smx_simcall_t const r) const
 {
-  if (r->call_ == Simcall::COMM_WAIT)
-    return simcall_comm_wait__get__comm(r);
-  if (r->call_ == Simcall::COMM_TEST)
-    return simcall_comm_test__get__comm(r);
+  if (auto wait = dynamic_cast<kernel::actor::ActivityWaitSimcall*>(r->observer_))
+    return static_cast<kernel::activity::CommImpl*>(wait->get_activity());
+  if (auto test = dynamic_cast<kernel::actor::ActivityTestSimcall*>(r->observer_))
+    return static_cast<kernel::activity::CommImpl*>(test->get_activity());
   return nullptr;
 }
 
@@ -749,73 +747,14 @@ std::string Api::request_to_string(smx_simcall_t req, int value) const
       break;
     }
 
-    case Simcall::COMM_WAIT: {
-      simgrid::kernel::activity::CommImpl* remote_act = simcall_comm_wait__get__comm(req);
-      if (value == -1) {
-        type = "WaitTimeout";
-        args = "comm=" + pointer_to_string(remote_act);
-      } else {
-        type = "Wait";
-
-        simgrid::mc::Remote<simgrid::kernel::activity::CommImpl> temp_activity;
-        const simgrid::kernel::activity::CommImpl* act;
-        mc_model_checker->get_remote_process().read(temp_activity, remote(remote_act));
-        act = temp_activity.get_buffer();
-
-        smx_actor_t src_proc =
-            mc_model_checker->get_remote_process().resolve_actor(simgrid::mc::remote(act->src_actor_.get()));
-        smx_actor_t dst_proc =
-            mc_model_checker->get_remote_process().resolve_actor(simgrid::mc::remote(act->dst_actor_.get()));
-        args = "comm=" + pointer_to_string(remote_act);
-        args += " [" + get_actor_string(src_proc) + "-> " + get_actor_string(dst_proc) + "]";
-      }
-      break;
-    }
-
-    case Simcall::COMM_TEST: {
-      simgrid::kernel::activity::CommImpl* remote_act = simcall_comm_test__get__comm(req);
-      simgrid::mc::Remote<simgrid::kernel::activity::CommImpl> temp_activity;
-      const simgrid::kernel::activity::CommImpl* act;
-      mc_model_checker->get_remote_process().read(temp_activity, remote(remote_act));
-      act = temp_activity.get_buffer();
-
-      if (act->src_actor_.get() == nullptr || act->dst_actor_.get() == nullptr) {
-        type = "Test FALSE";
-        args = "comm=" + pointer_to_string(remote_act);
-      } else {
-        type = "Test TRUE";
-
-        smx_actor_t src_proc =
-            mc_model_checker->get_remote_process().resolve_actor(simgrid::mc::remote(act->src_actor_.get()));
-        smx_actor_t dst_proc =
-            mc_model_checker->get_remote_process().resolve_actor(simgrid::mc::remote(act->dst_actor_.get()));
-        args = "comm=" + pointer_to_string(remote_act);
-        args += " [" + get_actor_string(src_proc) + " -> " + get_actor_string(dst_proc) + "]";
-      }
-      break;
-    }
-
-    case Simcall::COMM_WAITANY: {
-      type         = "WaitAny";
-      size_t count = simcall_comm_waitany__get__count(req);
-      if (count > 0) {
-        simgrid::kernel::activity::CommImpl* remote_sync;
-        remote_sync =
-            mc_model_checker->get_remote_process().read(remote(simcall_comm_waitany__get__comms(req) + value));
-        args = "comm=" + pointer_to_string(remote_sync) + xbt::string_printf("(%d of %zu)", value + 1, count);
-      } else
-        args = "comm at idx " + std::to_string(value);
-      break;
-    }
-
+    case Simcall::COMM_WAIT:
+      // See ActivityWaitSimcall::to_string(int times_considered)
+    case Simcall::COMM_TEST:
+      // See ActivityTestSimcall::to_string(int times_considered)
+    case Simcall::COMM_WAITANY:
+      // See ActivityWaitanySimcall::to_string(int times_considered)
     case Simcall::COMM_TESTANY:
-      if (value == -1) {
-        type = "TestAny FALSE";
-        args = "-";
-      } else {
-        type = "TestAny";
-        args = xbt::string_printf("(%d of %zu)", value + 1, simcall_comm_testany__get__count(req));
-      }
+      // See ActivityTestanySimcall::to_string(int times_considered)
       break;
 
     default:
@@ -847,51 +786,13 @@ std::string Api::request_get_dot_output(smx_simcall_t req, int value) const
         break;
 
       case Simcall::COMM_WAIT:
-        if (value == -1) {
-          label = "[" + get_actor_dot_label(issuer) + "] WaitTimeout";
-        } else {
-          kernel::activity::ActivityImpl* remote_act = simcall_comm_wait__get__comm(req);
-          Remote<kernel::activity::CommImpl> temp_comm;
-          mc_model_checker->get_remote_process().read(temp_comm,
-                                                      remote(static_cast<kernel::activity::CommImpl*>(remote_act)));
-          const kernel::activity::CommImpl* comm = temp_comm.get_buffer();
-
-          const kernel::actor::ActorImpl* src_proc =
-              mc_model_checker->get_remote_process().resolve_actor(mc::remote(comm->src_actor_.get()));
-          const kernel::actor::ActorImpl* dst_proc =
-              mc_model_checker->get_remote_process().resolve_actor(mc::remote(comm->dst_actor_.get()));
-          label = "[" + get_actor_dot_label(issuer) + "] Wait";
-          label += " [(" + std::to_string(src_proc ? src_proc->get_pid() : 0) + ")";
-          label += "->(" + std::to_string(dst_proc ? dst_proc->get_pid() : 0) + ")]";
-        }
-        break;
-
-      case Simcall::COMM_TEST: {
-        kernel::activity::ActivityImpl* remote_act = simcall_comm_test__get__comm(req);
-        Remote<simgrid::kernel::activity::CommImpl> temp_comm;
-        mc_model_checker->get_remote_process().read(temp_comm,
-                                                    remote(static_cast<kernel::activity::CommImpl*>(remote_act)));
-        const kernel::activity::CommImpl* comm = temp_comm.get_buffer();
-        if (comm->src_actor_.get() == nullptr || comm->dst_actor_.get() == nullptr) {
-          label = "[" + get_actor_dot_label(issuer) + "] Test FALSE";
-        } else {
-          label = "[" + get_actor_dot_label(issuer) + "] Test TRUE";
-        }
-        break;
-      }
-
+        // See ActivityWaitSimcall::dot_label(int times_considered)
+      case Simcall::COMM_TEST:
+        // See ActivityTestSimcall::dot_label(int times_considered)
       case Simcall::COMM_WAITANY:
-        label = "[" + get_actor_dot_label(issuer) + "] WaitAny";
-        label += xbt::string_printf(" [%d of %zu]", value + 1, simcall_comm_waitany__get__count(req));
-        break;
-
+        // See ActivityWaittanySimcall::dot_label(int times_considered)
       case Simcall::COMM_TESTANY:
-        if (value == -1) {
-          label = "[" + get_actor_dot_label(issuer) + "] TestAny FALSE";
-        } else {
-          label = "[" + get_actor_dot_label(issuer) + "] TestAny TRUE";
-          label += xbt::string_printf(" [%d of %zu]", value + 1, simcall_comm_testany__get__count(req));
-        }
+        // See ActivityTestanySimcall::dot_label(int times_considered)
         break;
 
       default:
index d68b0af..6989844 100644 (file)
@@ -55,10 +55,11 @@ private:
   simgrid::kernel::activity::CommImpl* get_comm_or_nullptr(smx_simcall_t const r) const;
   bool request_depend_asymmetric(smx_simcall_t r1, smx_simcall_t r2) const;
   simgrid::mc::ActorInformation* actor_info_cast(smx_actor_t actor) const;
+
+public:
   std::string get_actor_string(smx_actor_t actor) const;
   std::string get_actor_dot_label(smx_actor_t actor) const;
 
-public:
   // No copy:
   Api(Api const&) = delete;
   void operator=(Api const&) = delete;
index 4dd3168..84c8591 100644 (file)
@@ -74,7 +74,7 @@ void execute_actors()
  *  - if the wait will succeed immediately (if both peer of the comm are there already or if the mutex is available)
  *  - if a timeout is provided, because we can fire the timeout if the transition is not ready without blocking in this
  * transition for ever.
- *
+ * This is controlled in the is_enabled() method of the corresponding observers.
  */
 // Called from both MCer and MCed:
 bool actor_is_enabled(smx_actor_t actor)
@@ -94,43 +94,11 @@ bool actor_is_enabled(smx_actor_t actor)
   if (req->observer_ != nullptr)
     return req->observer_->is_enabled();
 
-  using simix::Simcall;
-  switch (req->call_) {
-    case Simcall::NONE:
-      return false;
-
-    case Simcall::COMM_WAIT: {
-      /* FIXME: check also that src and dst processes are not suspended */
-      const kernel::activity::CommImpl* act = simcall_comm_wait__getraw__comm(req);
-
-      if (act->src_timeout_ || act->dst_timeout_) {
-        /* If it has a timeout it will be always be enabled (regardless of who declared the timeout),
-         * because even if the communication is not ready, it can timeout and won't block. */
-        if (_sg_mc_timeout == 1)
-          return true;
-      }
-      /* On the other hand if it hasn't a timeout, check if the comm is ready.*/
-      else if (act->detached() && act->src_actor_ == nullptr &&
-               act->get_state() == simgrid::kernel::activity::State::READY)
-        return (act->dst_actor_ != nullptr);
-      return (act->src_actor_ && act->dst_actor_);
-    }
-
-    case Simcall::COMM_WAITANY: {
-      simgrid::kernel::activity::CommImpl** comms = simcall_comm_waitany__get__comms(req);
-      size_t count                                = simcall_comm_waitany__get__count(req);
-      for (unsigned int index = 0; index < count; ++index) {
-        auto const* comm = comms[index];
-        if (comm->src_actor_ && comm->dst_actor_)
-          return true;
-      }
-      return false;
-    }
-
-    default:
-      /* The rest of the requests are always enabled */
-      return true;
-  }
+  if (req->call_ == simix::Simcall::NONE)
+    return false;
+  else
+    /* The rest of the requests are always enabled */
+    return true;
 }
 
 /* This is the list of requests that are visible from the checker algorithm.
@@ -145,8 +113,7 @@ bool request_is_visible(const s_smx_simcall* req)
     return req->observer_->is_visible();
 
   using simix::Simcall;
-  return req->call_ == Simcall::COMM_ISEND || req->call_ == Simcall::COMM_IRECV || req->call_ == Simcall::COMM_WAIT ||
-         req->call_ == Simcall::COMM_WAITANY || req->call_ == Simcall::COMM_TEST || req->call_ == Simcall::COMM_TESTANY;
+  return req->call_ == Simcall::COMM_ISEND || req->call_ == Simcall::COMM_IRECV;
 }
 
 }
index 6ca3ec8..2841ecb 100644 (file)
@@ -175,7 +175,7 @@ void AppSide::handle_messages() const
         const kernel::actor::ActorImpl* actor = kernel::actor::ActorImpl::by_pid(msg_simcall->aid);
         xbt_assert(actor != nullptr, "Invalid pid %ld", msg_simcall->aid);
         xbt_assert(actor->simcall_.observer_, "The transition of %s has no observer", actor->get_cname());
-        std::string value = actor->simcall_.observer_->dot_label();
+        std::string value = actor->simcall_.observer_->dot_label(msg_simcall->time_considered);
 
         // Send result:
         s_mc_message_simcall_to_string_answer_t answer{MessageType::SIMCALL_TO_STRING_ANSWER, {0}};
index 5f8cdbc..318d93c 100644 (file)
@@ -87,17 +87,16 @@ int MSG_comm_testany(const_xbt_dynar_t comms)
   ssize_t finished_index = -1;
 
   /* Create the equivalent array with SIMIX objects: */
-  std::vector<simgrid::kernel::activity::CommImpl*> s_comms;
+  std::vector<simgrid::s4u::CommPtr> s_comms;
   s_comms.reserve(xbt_dynar_length(comms));
   msg_comm_t comm;
   unsigned int cursor;
-  xbt_dynar_foreach (comms, cursor, comm) {
-    s_comms.push_back(static_cast<simgrid::kernel::activity::CommImpl*>(comm->s_comm->get_impl()));
-  }
+  xbt_dynar_foreach (comms, cursor, comm)
+    s_comms.push_back(comm->s_comm);
 
   msg_error_t status = MSG_OK;
   try {
-    finished_index = simcall_comm_testany(s_comms.data(), s_comms.size());
+    finished_index = simgrid::s4u::Comm::test_any(s_comms);
   } catch (const simgrid::TimeoutException& e) {
     finished_index = e.get_value();
     status         = MSG_TIMEOUT;
@@ -164,17 +163,17 @@ int MSG_comm_waitany(const_xbt_dynar_t comms)
   ssize_t finished_index = -1;
 
   /* Create the equivalent array with SIMIX objects: */
-  std::vector<simgrid::kernel::activity::CommImpl*> s_comms;
+  std::vector<simgrid::s4u::CommPtr> s_comms;
   s_comms.reserve(xbt_dynar_length(comms));
   msg_comm_t comm;
   unsigned int cursor;
   xbt_dynar_foreach (comms, cursor, comm) {
-    s_comms.push_back(static_cast<simgrid::kernel::activity::CommImpl*>(comm->s_comm->get_impl()));
+    s_comms.push_back(comm->s_comm);
   }
 
   msg_error_t status = MSG_OK;
   try {
-    finished_index = simcall_comm_waitany(s_comms.data(), s_comms.size(), -1);
+    finished_index = simgrid::s4u::Comm::wait_any_for(s_comms, -1);
   } catch (const simgrid::TimeoutException& e) {
     finished_index = e.get_value();
     status         = MSG_TIMEOUT;
index d17bed8..fc934d2 100644 (file)
@@ -737,7 +737,7 @@ void MSG_task_set_bytes_amount(msg_task_t task, double data_size)
  */
 double MSG_task_get_remaining_communication(const_msg_task_t task)
 {
-  XBT_DEBUG("calling simcall_communication_get_remains(%p)", task->comm.get());
+  XBT_DEBUG("calling s4u::Comm::get_remaining (%p)", task->comm.get());
   return task->comm->get_remaining();
 }
 
index 8db0c38..eee17e8 100644 (file)
@@ -5,6 +5,7 @@
 
 #include <simgrid/Exception.hpp>
 #include <simgrid/s4u/Activity.hpp>
+#include <simgrid/s4u/Comm.hpp>
 #include <simgrid/s4u/Engine.hpp>
 #include <simgrid/s4u/Exec.hpp>
 #include <simgrid/s4u/Io.hpp>
@@ -52,6 +53,8 @@ Activity* Activity::wait_for(double timeout)
     vetoable_start();
 
   if (state_ == State::FAILED) {
+    if (dynamic_cast<Comm*>(this))
+      throw NetworkFailureException(XBT_THROW_POINT, "Cannot wait for a failed comm");
     if (dynamic_cast<Exec*>(this))
       throw HostFailureException(XBT_THROW_POINT, "Cannot wait for a failed exec");
     if (dynamic_cast<Io*>(this))
@@ -78,14 +81,51 @@ bool Activity::test()
   if (state_ == State::INITED || state_ == State::STARTING)
     this->vetoable_start();
 
-  if (kernel::actor::simcall([this] { return this->get_impl()->test(); })) {
+  kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
+  kernel::actor::ActivityTestSimcall observer{issuer, pimpl_.get()};
+  if (kernel::actor::simcall_blocking([&observer] { observer.get_activity()->test(observer.get_issuer()); },
+                                      &observer)) {
     complete(State::FINISHED);
     return true;
   }
-
   return false;
 }
 
+ssize_t Activity::test_any(const std::vector<ActivityPtr>& activities)
+{
+  std::vector<kernel::activity::ActivityImpl*> ractivities(activities.size());
+  std::transform(begin(activities), end(activities), begin(ractivities),
+                 [](const ActivityPtr& act) { return act->pimpl_.get(); });
+
+  kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
+  kernel::actor::ActivityTestanySimcall observer{issuer, ractivities};
+  ssize_t changed_pos = kernel::actor::simcall_blocking(
+      [&observer] { kernel::activity::ActivityImpl::test_any(observer.get_issuer(), observer.get_activities()); },
+      &observer);
+  if (changed_pos != -1)
+    activities.at(changed_pos)->complete(State::FINISHED);
+  return changed_pos;
+}
+
+ssize_t Activity::wait_any_for(const std::vector<ActivityPtr>& activities, double timeout)
+{
+  std::vector<kernel::activity::ActivityImpl*> ractivities(activities.size());
+  std::transform(begin(activities), end(activities), begin(ractivities),
+                 [](const ActivityPtr& activity) { return activity->pimpl_.get(); });
+
+  kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
+  kernel::actor::ActivityWaitanySimcall observer{issuer, ractivities, timeout};
+  ssize_t changed_pos = kernel::actor::simcall_blocking(
+      [&observer] {
+        kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(),
+                                                     observer.get_timeout());
+      },
+      &observer);
+  if (changed_pos != -1)
+    activities.at(changed_pos)->complete(State::FINISHED);
+  return changed_pos;
+}
+
 Activity* Activity::cancel()
 {
   kernel::actor::simcall([this] {
index 81f3df1..1c90aad 100644 (file)
@@ -14,6 +14,7 @@
 
 #include "src/kernel/activity/CommImpl.hpp"
 #include "src/kernel/actor/ActorImpl.hpp"
+#include "src/kernel/actor/SimcallObserver.hpp"
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_comm, s4u_activity, "S4U asynchronous communications");
 
@@ -38,12 +39,12 @@ Comm::~Comm()
 
 ssize_t Comm::wait_any_for(const std::vector<CommPtr>& comms, double timeout)
 {
-  std::vector<kernel::activity::CommImpl*> rcomms(comms.size());
-  std::transform(begin(comms), end(comms), begin(rcomms),
-                 [](const CommPtr& comm) { return static_cast<kernel::activity::CommImpl*>(comm->pimpl_.get()); });
+  std::vector<ActivityPtr> activities;
+  for (const auto& comm : comms)
+    activities.push_back(boost::dynamic_pointer_cast<Activity>(comm));
   ssize_t changed_pos;
   try {
-    changed_pos = simcall_comm_waitany(rcomms.data(), rcomms.size(), timeout);
+    changed_pos = Activity::wait_any_for(activities, timeout);
   } catch (const NetworkFailureException& e) {
     for (auto c : comms) {
       if (c->pimpl_->get_state() == kernel::activity::State::FAILED) {
@@ -52,8 +53,6 @@ ssize_t Comm::wait_any_for(const std::vector<CommPtr>& comms, double timeout)
     }
     e.rethrow_nested(XBT_THROW_POINT, boost::core::demangle(typeid(e).name()) + " raised in kernel mode.");
   }
-  if (changed_pos != -1)
-    comms.at(changed_pos)->complete(State::FINISHED);
   return changed_pos;
 }
 
@@ -227,7 +226,6 @@ Comm* Comm::start()
     on_recv(*this);
     pimpl_ = simcall_comm_irecv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_,
                                 copy_data_function_, get_data<void>(), rate_);
-
   } else {
     xbt_die("Cannot start a communication before specifying whether we are the sender or the receiver");
   }
@@ -252,12 +250,13 @@ Comm* Comm::start()
  *                Negative values denote infinite wait times. 0 as a timeout returns immediately. */
 Comm* Comm::wait_for(double timeout)
 {
+  XBT_DEBUG("Calling Comm::wait_for with state %s", get_state_str());
+  kernel::actor::ActorImpl* issuer = nullptr;
   switch (state_) {
     case State::FINISHED:
       break;
     case State::FAILED:
       throw NetworkFailureException(XBT_THROW_POINT, "Cannot wait for a failed communication");
-
     case State::INITED:
     case State::STARTING: // It's not started yet. Do it in one simcall if it's a regular communication
       if (from_ != nullptr || to_ != nullptr) {
@@ -273,11 +272,17 @@ Comm* Comm::wait_for(double timeout)
                           get_data<void>(), timeout, rate_);
       }
       break;
-
     case State::STARTED:
       try {
-        simcall_comm_wait(get_impl(), timeout);
+        issuer = kernel::actor::ActorImpl::self();
+        kernel::actor::ActivityWaitSimcall observer{issuer, pimpl_.get(), timeout};
+        if (kernel::actor::simcall_blocking(
+                [&observer] { observer.get_activity()->wait_for(observer.get_issuer(), observer.get_timeout()); },
+                &observer)) {
+          throw TimeoutException(XBT_THROW_POINT, "Timeouted");
+        }
       } catch (const NetworkFailureException& e) {
+        issuer->simcall_.observer_ = nullptr; // Comm failed on network failure, reset the observer to nullptr
         complete(State::FAILED);
         e.rethrow_nested(XBT_THROW_POINT, boost::core::demangle(typeid(e).name()) + " raised in kernel mode.");
       }
@@ -295,13 +300,10 @@ Comm* Comm::wait_for(double timeout)
 
 ssize_t Comm::test_any(const std::vector<CommPtr>& comms)
 {
-  std::vector<kernel::activity::CommImpl*> rcomms(comms.size());
-  std::transform(begin(comms), end(comms), begin(rcomms),
-                 [](const CommPtr& comm) { return static_cast<kernel::activity::CommImpl*>(comm->pimpl_.get()); });
-  ssize_t changed_pos = simcall_comm_testany(rcomms.data(), rcomms.size());
-  if (changed_pos != -1)
-    comms.at(changed_pos)->complete(State::FINISHED);
-  return changed_pos;
+  std::vector<ActivityPtr> activities;
+  for (const auto& comm : comms)
+    activities.push_back(boost::dynamic_pointer_cast<Activity>(comm));
+  return Activity::test_any(activities);
 }
 
 Comm* Comm::detach()
@@ -314,24 +316,6 @@ Comm* Comm::detach()
   return this;
 }
 
-bool Comm::test() // TODO: merge with Activity::test, once modernized
-{
-  xbt_assert(state_ == State::INITED || state_ == State::STARTED || state_ == State::STARTING ||
-             state_ == State::CANCELED || state_ == State::FINISHED);
-
-  if (state_ == State::CANCELED || state_ == State::FINISHED)
-    return true;
-
-  if (state_ == State::INITED || state_ == State::STARTING)
-    this->vetoable_start();
-
-  if (simcall_comm_test(get_impl())) {
-    complete(State::FINISHED);
-    return true;
-  }
-  return false;
-}
-
 Mailbox* Comm::get_mailbox() const
 {
   return mailbox_;
index 633284d..14f8e74 100644 (file)
@@ -61,20 +61,10 @@ Exec* Exec::start()
 
 ssize_t Exec::wait_any_for(const std::vector<ExecPtr>& execs, double timeout)
 {
-  std::vector<kernel::activity::ExecImpl*> rexecs(execs.size());
-  std::transform(begin(execs), end(execs), begin(rexecs),
-                 [](const ExecPtr& exec) { return static_cast<kernel::activity::ExecImpl*>(exec->pimpl_.get()); });
-
-  kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
-  kernel::actor::ExecutionWaitanySimcall observer{issuer, rexecs, timeout};
-  ssize_t changed_pos = kernel::actor::simcall_blocking(
-      [&observer] {
-        kernel::activity::ExecImpl::wait_any_for(observer.get_issuer(), observer.get_execs(), observer.get_timeout());
-      },
-      &observer);
-  if (changed_pos != -1)
-    execs.at(changed_pos)->complete(State::FINISHED);
-  return changed_pos;
+  std::vector<ActivityPtr> activities;
+  for (const auto& exec : execs)
+    activities.push_back(boost::dynamic_pointer_cast<Activity>(exec));
+  return Activity::wait_any_for(activities, timeout);
 }
 
 /** @brief change the execution bound
index bf092d9..77856e7 100644 (file)
@@ -41,20 +41,10 @@ Io* Io::start()
 
 ssize_t Io::wait_any_for(const std::vector<IoPtr>& ios, double timeout)
 {
-  std::vector<kernel::activity::IoImpl*> rios(ios.size());
-  std::transform(begin(ios), end(ios), begin(rios),
-                 [](const IoPtr& io) { return static_cast<kernel::activity::IoImpl*>(io->pimpl_.get()); });
-
-  kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
-  kernel::actor::IoWaitanySimcall observer{issuer, rios, timeout};
-  ssize_t changed_pos = kernel::actor::simcall_blocking(
-      [&observer] {
-        kernel::activity::IoImpl::wait_any_for(observer.get_issuer(), observer.get_ios(), observer.get_timeout());
-      },
-      &observer);
-  if (changed_pos != -1)
-    ios.at(changed_pos)->complete(State::FINISHED);
-  return changed_pos;
+  std::vector<ActivityPtr> activities;
+  for (const auto& io : ios)
+    activities.push_back(boost::dynamic_pointer_cast<Activity>(io));
+  return Activity::wait_any_for(activities, timeout);
 }
 
 IoPtr Io::set_disk(const_sg_disk_t disk)
index 07b98cb..ffe10b5 100644 (file)
@@ -8,14 +8,16 @@
 #include "mc/mc.h"
 #include "private.hpp"
 #include "simgrid/Exception.hpp"
+#include "simgrid/s4u/ConditionVariable.hpp"
 #include "simgrid/s4u/Exec.hpp"
 #include "simgrid/s4u/Mutex.hpp"
-#include "simgrid/s4u/ConditionVariable.hpp"
 #include "smpi_comm.hpp"
 #include "smpi_datatype.hpp"
 #include "smpi_host.hpp"
 #include "smpi_op.hpp"
 #include "src/kernel/activity/CommImpl.hpp"
+#include "src/kernel/actor/ActorImpl.hpp"
+#include "src/kernel/actor/SimcallObserver.hpp"
 #include "src/mc/mc_replay.hpp"
 #include "src/smpi/include/smpi_actor.hpp"
 
@@ -659,7 +661,10 @@ int Request::test(MPI_Request * request, MPI_Status * status, int* flag) {
   if (((*request)->flags_ & (MPI_REQ_PREPARED | MPI_REQ_FINISHED)) == 0) {
     if ((*request)->action_ != nullptr && ((*request)->flags_ & MPI_REQ_CANCELLED) == 0){
       try{
-        *flag = simcall_comm_test((*request)->action_.get());
+        kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
+        kernel::actor::ActivityTestSimcall observer{issuer, (*request)->action_.get()};
+        *flag = kernel::actor::simcall_blocking([&observer] { observer.get_activity()->test(observer.get_issuer()); },
+                                                &observer);
       } catch (const Exception&) {
         *flag = 0;
         return ret;
@@ -725,7 +730,7 @@ int Request::testsome(int incount, MPI_Request requests[], int *count, int *indi
 
 int Request::testany(int count, MPI_Request requests[], int *index, int* flag, MPI_Status * status)
 {
-  std::vector<simgrid::kernel::activity::CommImpl*> comms;
+  std::vector<simgrid::kernel::activity::ActivityImpl*> comms;
   comms.reserve(count);
 
   *flag = 0;
@@ -735,7 +740,7 @@ int Request::testany(int count, MPI_Request requests[], int *index, int* flag, M
   std::vector<int> map; /** Maps all matching comms back to their location in requests **/
   for (int i = 0; i < count; i++) {
     if ((requests[i] != MPI_REQUEST_NULL) && requests[i]->action_ && not(requests[i]->flags_ & MPI_REQ_PREPARED)) {
-      comms.push_back(static_cast<simgrid::kernel::activity::CommImpl*>(requests[i]->action_.get()));
+      comms.push_back(requests[i]->action_.get());
       map.push_back(i);
     }
   }
@@ -746,7 +751,11 @@ int Request::testany(int count, MPI_Request requests[], int *index, int* flag, M
       simgrid::s4u::this_actor::sleep_for(nsleeps * smpi_test_sleep);
     ssize_t i;
     try{
-      i = simcall_comm_testany(comms.data(), comms.size()); // The i-th element in comms matches!
+      kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
+      kernel::actor::ActivityTestanySimcall observer{issuer, comms};
+      i = kernel::actor::simcall_blocking(
+          [&observer] { kernel::activity::ActivityImpl::test_any(observer.get_issuer(), observer.get_activities()); },
+          &observer);
     } catch (const Exception&) {
       XBT_DEBUG("Exception in testany");
       return 0;
@@ -1056,7 +1065,11 @@ int Request::wait(MPI_Request * request, MPI_Status * status)
   if ((*request)->action_ != nullptr){
       try{
         // this is not a detached send
-        simcall_comm_wait((*request)->action_.get(), -1.0);
+        kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
+        kernel::actor::ActivityWaitSimcall observer{issuer, (*request)->action_.get(), -1};
+        kernel::actor::simcall_blocking(
+            [&observer] { observer.get_activity()->wait_for(observer.get_issuer(), observer.get_timeout()); },
+            &observer);
       } catch (const CancelException&) {
         XBT_VERB("Request cancelled");
       }
@@ -1097,7 +1110,7 @@ int Request::waitany(int count, MPI_Request requests[], MPI_Status * status)
 
   if(count > 0) {
     // Wait for a request to complete
-    std::vector<simgrid::kernel::activity::CommImpl*> comms;
+    std::vector<simgrid::kernel::activity::ActivityImpl*> comms;
     std::vector<int> map;
     XBT_DEBUG("Wait for one of %d", count);
     for(int i = 0; i < count; i++) {
@@ -1105,7 +1118,7 @@ int Request::waitany(int count, MPI_Request requests[], MPI_Status * status)
           not(requests[i]->flags_ & MPI_REQ_FINISHED)) {
         if (requests[i]->action_ != nullptr) {
           XBT_DEBUG("Waiting any %p ", requests[i]);
-          comms.push_back(static_cast<simgrid::kernel::activity::CommImpl*>(requests[i]->action_.get()));
+          comms.push_back(requests[i]->action_.get());
           map.push_back(i);
         } else {
           // This is a finished detached request, let's return this one
@@ -1124,7 +1137,14 @@ int Request::waitany(int count, MPI_Request requests[], MPI_Status * status)
       XBT_DEBUG("Enter waitany for %zu comms", comms.size());
       ssize_t i;
       try{
-        i = simcall_comm_waitany(comms.data(), comms.size(), -1);
+        kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
+        kernel::actor::ActivityWaitanySimcall observer{issuer, comms, -1};
+        i = kernel::actor::simcall_blocking(
+            [&observer] {
+              kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(),
+                                                           observer.get_timeout());
+            },
+            &observer);
       } catch (const CancelException&) {
         XBT_INFO("request cancelled");
         i = -1;