Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Introduce a Mailbox::get_async() with no payload parameter
authorMartin Quinson <martin.quinson@ens-rennes.fr>
Mon, 24 Jul 2023 17:33:41 +0000 (19:33 +0200)
committerMartin Quinson <martin.quinson@ens-rennes.fr>
Mon, 24 Jul 2023 17:33:58 +0000 (19:33 +0200)
You can use the new Comm::get_payload() once the communication is over
to retrieve the payload.

17 files changed:
ChangeLog
examples/python/activityset-testany/activityset-testany.py
examples/python/activityset-waitall/activityset-waitall.py
examples/python/activityset-waitallfor/activityset-waitallfor.py
examples/python/activityset-waitany/activityset-waitany.py
examples/python/comm-testany/comm-testany.py
examples/python/comm-waitallfor/comm-waitallfor.py
examples/python/network-nonlinear/network-nonlinear.py
examples/python/platform-comm-serialize/platform-comm-serialize.py
include/simgrid/s4u/Comm.hpp
include/simgrid/s4u/Mailbox.hpp
src/bindings/python/simgrid_python.cpp
src/kernel/activity/CommImpl.cpp
src/kernel/activity/CommImpl.hpp
src/s4u/s4u_Comm.cpp
src/s4u/s4u_Mailbox.cpp
teshsuite/s4u/CMakeLists.txt

index fd83eff..158d75b 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -5,6 +5,8 @@ S4U:
  - New function NetZone::add_route(host1, host2, links) when you don't need gateways
    Also add a variant with s4u::Link, when you don't want to specify the directions
    on symmetric routes.
+ - Introduce a Mailbox::get_async() with no payload parameter. You can use the new 
+   Comm::get_payload() once the communication is over to retrieve the payload.
 
 SMPI:
  - New SMPI_app_instance_join(): wait for the completion of a started MPI instance
@@ -12,6 +14,7 @@ SMPI:
 
 Python:
  - Make the host_load plugin available from Python. See examples/python/plugin-host-load
+ - Mailbox::get_async() does not return a pair anymore. Use comm.get_payload() instead.
 
 ----------------------------------------------------------------------------
 
index 34f1e9a..6a8901f 100644 (file)
@@ -16,7 +16,7 @@ def bob():
 
   this_actor.info("Create my asynchronous activities")
   exec = this_actor.exec_async(5e9)
-  comm, payload = mbox.get_async()
+  comm = mbox.get_async()
   io   = disk.read_async(300000000)
 
   pending_activities = ActivitySet([exec, comm])
index 36b70b8..4e4d1ee 100644 (file)
@@ -16,7 +16,7 @@ def bob():
 
   this_actor.info("Create my asynchronous activities")
   exec = this_actor.exec_async(5e9)
-  comm, payload = mbox.get_async()
+  comm = mbox.get_async()
   io   = disk.read_async(300000000)
 
   pending_activities = ActivitySet([exec, comm])
index 4f28809..44b3c6f 100644 (file)
@@ -16,7 +16,7 @@ def bob():
 
   this_actor.info("Create my asynchronous activities")
   exec = this_actor.exec_async(5e9)
-  comm, payload = mbox.get_async()
+  comm = mbox.get_async()
   io   = disk.read_async(300000000)
 
   pending_activities = ActivitySet([exec, comm])
index 4f16d7d..88ac531 100644 (file)
@@ -16,7 +16,7 @@ def bob():
 
   this_actor.info("Create my asynchronous activities")
   exec = this_actor.exec_async(5e9)
-  comm, payload = mbox.get_async()
+  comm = mbox.get_async()
   io   = disk.read_async(300000000)
 
   pending_activities = ActivitySet([exec, comm])
index 52220cf..84469b0 100644 (file)
@@ -24,9 +24,9 @@ def create_parser() -> ArgumentParser:
 def rank0():
     rank0_mailbox: Mailbox = Mailbox.by_name("rank0")
     this_actor.info("Post my asynchronous receives")
-    comm1, a1 = rank0_mailbox.get_async()
-    comm2, a2 = rank0_mailbox.get_async()
-    comm3, a3 = rank0_mailbox.get_async()
+    comm1 = rank0_mailbox.get_async()
+    comm2 = rank0_mailbox.get_async()
+    comm3 = rank0_mailbox.get_async()
     pending_comms: List[Comm] = [comm1, comm2, comm3]
 
     this_actor.info("Send some data to rank-1")
index 0f38795..d72490b 100644 (file)
@@ -18,7 +18,7 @@ from typing import List
 from uuid import uuid4
 import sys
 
-from simgrid import Actor, Comm, Engine, Host, Mailbox, PyGetAsync, this_actor
+from simgrid import Actor, Comm, Engine, Host, Mailbox, this_actor
 
 
 SIMULATED_JOB_SIZE_BYTES = 1024
@@ -78,12 +78,11 @@ def worker(worker_id: str):
 @dataclass
 class AsyncJobResult:
     job: Job
-    result_comm: Comm
-    async_data: PyGetAsync
+    comm: Comm
 
     @property
     def complete(self) -> bool:
-        return self.result_comm.test()
+        return self.comm.test()
 
     @property
     def status(self) -> str:
@@ -103,20 +102,19 @@ def client(client_id: str, jobs: List[float], wait_timeout: float):
             result_mailbox=result_mailbox
         ), SIMULATED_JOB_SIZE_BYTES)
         out_comm.detach()
-        result_comm, async_data = result_mailbox.get_async()
+        result_comm = result_mailbox.get_async()
         async_job_results.append(AsyncJobResult(
             job=job,
-            result_comm=result_comm,
-            async_data=async_data
+            comm=result_comm
         ))
     this_actor.info(f"awaiting results for all jobs (timeout={wait_timeout}s)")
-    completed_comms = Comm.wait_all_for([entry.result_comm for entry in async_job_results], wait_timeout)
+    completed_comms = Comm.wait_all_for([entry.comm for entry in async_job_results], wait_timeout)
     logger = this_actor.warning if completed_comms < len(async_job_results) else this_actor.info
     logger(f"received {completed_comms}/{len(async_job_results)} results")
     for result in async_job_results:
         this_actor.info(f"{result.job.job_id}"
                         f" status={result.status}"
-                        f" result_payload={result.async_data.get() if result.complete else ''}")
+                        f" result_payload={result.comm.get_payload() if result.complete else ''}")
 
 
 def main():
index 7bc1218..96b0981 100644 (file)
@@ -50,21 +50,18 @@ class Receiver:
     def __call__(self):
         mbox = Mailbox.by_name("receiver")
 
-        pending_msgs = []
         pending_comms = []
 
         this_actor.info("Wait for %d messages asynchronously" % self.msg_count)
         for _ in range(self.msg_count):
-            comm, data = mbox.get_async()
+            comm = mbox.get_async()
             pending_comms.append(comm)
-            pending_msgs.append(data)
 
         while pending_comms:
             index = Comm.wait_any(pending_comms)
-            msg = pending_msgs[index].get()
+            msg = pending_comms[index].get_payload()
             this_actor.info("I got '%s'." % msg)
             del pending_comms[index]
-            del pending_msgs[index]
 
 ####################################################################################################
 def link_nonlinear(link: Link, capacity: float, n: int) -> float:
index bbd871d..eaa880c 100644 (file)
@@ -6,7 +6,7 @@
 from typing import List, Tuple
 import sys
 
-from simgrid import Engine, Actor, Comm, Host, LinkInRoute, Mailbox, NetZone, this_actor, PyGetAsync
+from simgrid import Engine, Actor, Comm, Host, LinkInRoute, Mailbox, NetZone, this_actor
 
 
 RECEIVER_MAILBOX_NAME = "receiver"
@@ -44,14 +44,13 @@ class Receiver(object):
 
     def __call__(self):
         # List in which we store all incoming msgs
-        pending_comms: List[Tuple[Comm, PyGetAsync]] = []
+        pending_comms: List[Comm] = []
         this_actor.info(f"Wait for {self.messages_count} messages asynchronously")
         for _ in range(self.messages_count):
             pending_comms.append(self.mailbox.get_async())
         while pending_comms:
-            index = Comm.wait_any([comm for (comm, _) in pending_comms])
-            _, async_data = pending_comms[index]
-            this_actor.info(f"I got '{async_data.get()}'.")
+            index = Comm.wait_any(pending_comms)
+            this_actor.info(f"I got '{pending_comms[index].get_payload()}'.")
             pending_comms.pop(index)
 
 
index 3b1f417..c8a0156 100644 (file)
@@ -151,6 +151,9 @@ public:
   void* get_dst_data() const { return dst_buff_; }
   /** Retrieve the size of the received data. Not to be mixed with @ref Activity::get_remaining()  */
   size_t get_dst_data_size() const { return dst_buff_size_; }
+  /** Retrieve the payload associated to the communication. You can only do that once the comm is (gracefully)
+   * terminated, and it is only setup by the default copy_data callback (not the SMPI one) */
+  void* get_payload() const;
 
   /* Common functions */
 
index b5723df..b8eddff 100644 (file)
@@ -118,6 +118,9 @@ public:
   CommPtr get_init();
   /** Creates and start an async data reception to that mailbox */
   template <typename T> CommPtr get_async(T** data);
+  /** Creates and start an async data reception to that mailbox. Since the data location is not provided, you'll have to
+   * use Comm::get_payload once the comm terminates */
+  CommPtr get_async();
 
   /** Blocking data reception */
   template <typename T> T* get();
index a8e9ce7..04f0a16 100644 (file)
@@ -76,15 +76,6 @@ std::string get_simgrid_version()
   sg_version_get(&major, &minor, &patch);
   return simgrid::xbt::string_printf("%i.%i.%i", major, minor, patch);
 }
-
-/** @brief Wrap for mailbox::get_async */
-class PyGetAsync {
-  std::unique_ptr<PyObject*> data = std::make_unique<PyObject*>();
-
-public:
-  PyObject** get() const { return data.get(); }
-};
-
 } // namespace
 
 PYBIND11_DECLARE_HOLDER_TYPE(T, boost::intrusive_ptr<T>)
@@ -634,26 +625,14 @@ PYBIND11_MODULE(simgrid, m)
           "get", [](Mailbox* self) { return py::reinterpret_steal<py::object>(self->get<PyObject>()); },
           py::call_guard<py::gil_scoped_release>(), "Blocking data reception")
       .def(
-          "get_async",
-          [](Mailbox* self) -> std::tuple<CommPtr, PyGetAsync> {
-            PyGetAsync wrap;
-            auto comm = self->get_async(wrap.get());
-            return std::make_tuple(std::move(comm), std::move(wrap));
-          },
+          "get_async", [](Mailbox* self) -> CommPtr { return self->get_async(); },
           py::call_guard<py::gil_scoped_release>(),
           "Non-blocking data reception. Use data.get() to get the python object after the communication has finished")
       .def("set_receiver", &Mailbox::set_receiver, py::call_guard<py::gil_scoped_release>(),
            "Sets the actor as permanent receiver");
 
-  /* Class PyGetAsync */
-  py::class_<PyGetAsync>(m, "PyGetAsync", "Wrapper for async get communications")
-      .def(py::init<>())
-      .def(
-          "get", [](const PyGetAsync* self) { return py::reinterpret_steal<py::object>(*(self->get())); },
-          "Get python object after async communication in receiver side");
-
   /* class Activity */
-  py::class_<Activity, ActivityPtr>(m, "Activityy", "Activity. See the C++ documentation for details.");
+  py::class_<Activity, ActivityPtr>(m, "Activity", "Activity. See the C++ documentation for details.");
 
   /* Class Comm */
   py::class_<Comm, CommPtr, Activity>(m, "Comm", "Communication. See the C++ documentation for details.")
@@ -693,6 +672,11 @@ PYBIND11_MODULE(simgrid, m)
            "Block until the completion of that communication, or raises TimeoutException after the specified timeout.")
       .def("wait_until", &Comm::wait_until, py::call_guard<py::gil_scoped_release>(), py::arg("time_limit"),
            "Block until the completion of that communication, or raises TimeoutException after the specified time.")
+      .def(
+          "get_payload",
+          [](const Comm* self) { return py::reinterpret_steal<py::object>((PyObject*)self->get_payload()); },
+          py::call_guard<py::gil_scoped_release>(),
+          "Retrieve the message's payload of a get_async. You cannot call this until after the comm termination.")
       .def("detach", py::overload_cast<>(&Comm::detach), py::return_value_policy::reference_internal,
            py::call_guard<py::gil_scoped_release>(),
            "Start the comm, and ignore its result. It can be completely forgotten after that.")
index e5c1ddf..25a7661 100644 (file)
@@ -37,7 +37,9 @@ CommImpl::CommImpl()
 std::function<void(CommImpl*, void*, size_t)> CommImpl::copy_data_callback_ = [](kernel::activity::CommImpl* comm,
                                                                                  void* buff, size_t buff_size) {
   xbt_assert((buff_size == sizeof(void*)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
-  *(void**)(comm->dst_buff_) = buff;
+  if (comm->dst_buff_ != nullptr) // get_async provided a buffer
+    *(void**)(comm->dst_buff_) = buff;
+  comm->payload_ = buff; // Setup what will be retrieved by s4u::Comm::get_payload()
 };
 
 void CommImpl::set_copy_data_callback(const std::function<void(CommImpl*, void*, size_t)>& callback)
@@ -192,7 +194,7 @@ void CommImpl::copy_data()
 {
   size_t buff_size = src_buff_size_;
   /* If there is no data to copy then return */
-  if (not src_buff_ || not dst_buff_ || copied_)
+  if (not src_buff_ || not dst_buff_size_ || copied_)
     return;
 
   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)", this,
index 4fcf67c..f3d9839 100644 (file)
@@ -98,6 +98,7 @@ expectations of the other side, too. See  */
   unsigned char* dst_buff_ = nullptr;
   size_t src_buff_size_    = 0;
   size_t* dst_buff_size_   = nullptr;
+  void* payload_           = nullptr; // If dst_buff_ is NULL, the default copy callback puts the data here
 
   void* src_data_ = nullptr; /* User data associated to the communication */
   void* dst_data_ = nullptr;
index 4b7030e..404321a 100644 (file)
@@ -6,6 +6,7 @@
 #include <cmath>
 #include <simgrid/Exception.hpp>
 #include <simgrid/comm.h>
+#include <simgrid/s4u/ActivitySet.hpp>
 #include <simgrid/s4u/Comm.hpp>
 #include <simgrid/s4u/Engine.hpp>
 #include <simgrid/s4u/Mailbox.hpp>
@@ -283,6 +284,14 @@ CommPtr Comm::set_payload_size(uint64_t bytes)
   return this;
 }
 
+void* Comm::get_payload() const
+{
+  xbt_assert(get_state() == State::FINISHED,
+             "You can only retrieve the payload of a communication that gracefully terminated, but its state is %s.",
+             get_state_str());
+  return static_cast<kernel::activity::CommImpl*>(pimpl_.get())->payload_;
+}
+
 Actor* Comm::get_sender() const
 {
   kernel::actor::ActorImplPtr sender = nullptr;
@@ -309,6 +318,9 @@ Comm* Comm::do_start()
 {
   xbt_assert(get_state() == State::INITED || get_state() == State::STARTING,
              "You cannot use %s() once your communication started (not implemented)", __FUNCTION__);
+
+  auto myself = kernel::actor::ActorImpl::self();
+
   if (get_source() != nullptr || get_destination() != nullptr) {
     xbt_assert(is_assigned(), "When either from_ or to_ is specified, both must be.");
     xbt_assert(src_buff_ == nullptr && dst_buff_ == nullptr,
@@ -320,7 +332,7 @@ Comm* Comm::do_start()
     });
     fire_on_start();
     fire_on_this_start();
-  } else if (src_buff_ != nullptr) { // Sender side
+  } else if (myself == sender_) {
     on_send(*this);
     on_this_send(*this);
     kernel::actor::CommIsendSimcall observer{sender_,
@@ -337,7 +349,7 @@ Comm* Comm::do_start()
                                              "Isend"};
     pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::isend(&observer); },
                                              &observer);
-  } else if (dst_buff_ != nullptr) { // Receiver side
+  } else if (myself == receiver_) {
     xbt_assert(not detached_, "Receive cannot be detached");
     on_recv(*this);
     on_this_recv(*this);
index 6ce4a03..76613fb 100644 (file)
@@ -127,6 +127,13 @@ CommPtr Mailbox::get_init()
   return res;
 }
 
+CommPtr Mailbox::get_async()
+{
+  CommPtr res = get_init()->set_dst_data(nullptr, sizeof(void*));
+  res->start();
+  return res;
+}
+
 kernel::activity::ActivityImplPtr
 Mailbox::iprobe(int type, const std::function<bool(void*, void*, kernel::activity::CommImpl*)>& match_fun, void* data)
 {
index 56d9e61..0ea50b2 100644 (file)
@@ -6,7 +6,7 @@ endforeach()
 
 foreach(x actor actor-autorestart actor-suspend
         activity-lifecycle
-        comm-get-sender comm-pt2pt comm-fault-scenarios wait-all-for wait-any-for
+        comm-get-sender comm-pt2pt comm-fault-scenarios wait-all-for
         cloud-interrupt-migration cloud-two-execs
        monkey-masterworkers monkey-semaphore
         concurrent_rw