- New function NetZone::add_route(host1, host2, links) when you don't need gateways
Also add a variant with s4u::Link, when you don't want to specify the directions
on symmetric routes.
+ - Introduce a Mailbox::get_async() with no payload parameter. You can use the new
+ Comm::get_payload() once the communication is over to retrieve the payload.
SMPI:
- New SMPI_app_instance_join(): wait for the completion of a started MPI instance
Python:
- Make the host_load plugin available from Python. See examples/python/plugin-host-load
+ - Mailbox::get_async() does not return a pair anymore. Use comm.get_payload() instead.
----------------------------------------------------------------------------
this_actor.info("Create my asynchronous activities")
exec = this_actor.exec_async(5e9)
- comm, payload = mbox.get_async()
+ comm = mbox.get_async()
io = disk.read_async(300000000)
pending_activities = ActivitySet([exec, comm])
this_actor.info("Create my asynchronous activities")
exec = this_actor.exec_async(5e9)
- comm, payload = mbox.get_async()
+ comm = mbox.get_async()
io = disk.read_async(300000000)
pending_activities = ActivitySet([exec, comm])
this_actor.info("Create my asynchronous activities")
exec = this_actor.exec_async(5e9)
- comm, payload = mbox.get_async()
+ comm = mbox.get_async()
io = disk.read_async(300000000)
pending_activities = ActivitySet([exec, comm])
this_actor.info("Create my asynchronous activities")
exec = this_actor.exec_async(5e9)
- comm, payload = mbox.get_async()
+ comm = mbox.get_async()
io = disk.read_async(300000000)
pending_activities = ActivitySet([exec, comm])
def rank0():
rank0_mailbox: Mailbox = Mailbox.by_name("rank0")
this_actor.info("Post my asynchronous receives")
- comm1, a1 = rank0_mailbox.get_async()
- comm2, a2 = rank0_mailbox.get_async()
- comm3, a3 = rank0_mailbox.get_async()
+ comm1 = rank0_mailbox.get_async()
+ comm2 = rank0_mailbox.get_async()
+ comm3 = rank0_mailbox.get_async()
pending_comms: List[Comm] = [comm1, comm2, comm3]
this_actor.info("Send some data to rank-1")
from uuid import uuid4
import sys
-from simgrid import Actor, Comm, Engine, Host, Mailbox, PyGetAsync, this_actor
+from simgrid import Actor, Comm, Engine, Host, Mailbox, this_actor
SIMULATED_JOB_SIZE_BYTES = 1024
@dataclass
class AsyncJobResult:
job: Job
- result_comm: Comm
- async_data: PyGetAsync
+ comm: Comm
@property
def complete(self) -> bool:
- return self.result_comm.test()
+ return self.comm.test()
@property
def status(self) -> str:
result_mailbox=result_mailbox
), SIMULATED_JOB_SIZE_BYTES)
out_comm.detach()
- result_comm, async_data = result_mailbox.get_async()
+ result_comm = result_mailbox.get_async()
async_job_results.append(AsyncJobResult(
job=job,
- result_comm=result_comm,
- async_data=async_data
+ comm=result_comm
))
this_actor.info(f"awaiting results for all jobs (timeout={wait_timeout}s)")
- completed_comms = Comm.wait_all_for([entry.result_comm for entry in async_job_results], wait_timeout)
+ completed_comms = Comm.wait_all_for([entry.comm for entry in async_job_results], wait_timeout)
logger = this_actor.warning if completed_comms < len(async_job_results) else this_actor.info
logger(f"received {completed_comms}/{len(async_job_results)} results")
for result in async_job_results:
this_actor.info(f"{result.job.job_id}"
f" status={result.status}"
- f" result_payload={result.async_data.get() if result.complete else ''}")
+ f" result_payload={result.comm.get_payload() if result.complete else ''}")
def main():
def __call__(self):
mbox = Mailbox.by_name("receiver")
- pending_msgs = []
pending_comms = []
this_actor.info("Wait for %d messages asynchronously" % self.msg_count)
for _ in range(self.msg_count):
- comm, data = mbox.get_async()
+ comm = mbox.get_async()
pending_comms.append(comm)
- pending_msgs.append(data)
while pending_comms:
index = Comm.wait_any(pending_comms)
- msg = pending_msgs[index].get()
+ msg = pending_comms[index].get_payload()
this_actor.info("I got '%s'." % msg)
del pending_comms[index]
- del pending_msgs[index]
####################################################################################################
def link_nonlinear(link: Link, capacity: float, n: int) -> float:
from typing import List, Tuple
import sys
-from simgrid import Engine, Actor, Comm, Host, LinkInRoute, Mailbox, NetZone, this_actor, PyGetAsync
+from simgrid import Engine, Actor, Comm, Host, LinkInRoute, Mailbox, NetZone, this_actor
RECEIVER_MAILBOX_NAME = "receiver"
def __call__(self):
# List in which we store all incoming msgs
- pending_comms: List[Tuple[Comm, PyGetAsync]] = []
+ pending_comms: List[Comm] = []
this_actor.info(f"Wait for {self.messages_count} messages asynchronously")
for _ in range(self.messages_count):
pending_comms.append(self.mailbox.get_async())
while pending_comms:
- index = Comm.wait_any([comm for (comm, _) in pending_comms])
- _, async_data = pending_comms[index]
- this_actor.info(f"I got '{async_data.get()}'.")
+ index = Comm.wait_any(pending_comms)
+ this_actor.info(f"I got '{pending_comms[index].get_payload()}'.")
pending_comms.pop(index)
void* get_dst_data() const { return dst_buff_; }
/** Retrieve the size of the received data. Not to be mixed with @ref Activity::get_remaining() */
size_t get_dst_data_size() const { return dst_buff_size_; }
+ /** Retrieve the payload associated to the communication. You can only do that once the comm is (gracefully)
+ * terminated, and it is only setup by the default copy_data callback (not the SMPI one) */
+ void* get_payload() const;
/* Common functions */
CommPtr get_init();
/** Creates and start an async data reception to that mailbox */
template <typename T> CommPtr get_async(T** data);
+ /** Creates and start an async data reception to that mailbox. Since the data location is not provided, you'll have to
+ * use Comm::get_payload once the comm terminates */
+ CommPtr get_async();
/** Blocking data reception */
template <typename T> T* get();
sg_version_get(&major, &minor, &patch);
return simgrid::xbt::string_printf("%i.%i.%i", major, minor, patch);
}
-
-/** @brief Wrap for mailbox::get_async */
-class PyGetAsync {
- std::unique_ptr<PyObject*> data = std::make_unique<PyObject*>();
-
-public:
- PyObject** get() const { return data.get(); }
-};
-
} // namespace
PYBIND11_DECLARE_HOLDER_TYPE(T, boost::intrusive_ptr<T>)
"get", [](Mailbox* self) { return py::reinterpret_steal<py::object>(self->get<PyObject>()); },
py::call_guard<py::gil_scoped_release>(), "Blocking data reception")
.def(
- "get_async",
- [](Mailbox* self) -> std::tuple<CommPtr, PyGetAsync> {
- PyGetAsync wrap;
- auto comm = self->get_async(wrap.get());
- return std::make_tuple(std::move(comm), std::move(wrap));
- },
+ "get_async", [](Mailbox* self) -> CommPtr { return self->get_async(); },
py::call_guard<py::gil_scoped_release>(),
"Non-blocking data reception. Use data.get() to get the python object after the communication has finished")
.def("set_receiver", &Mailbox::set_receiver, py::call_guard<py::gil_scoped_release>(),
"Sets the actor as permanent receiver");
- /* Class PyGetAsync */
- py::class_<PyGetAsync>(m, "PyGetAsync", "Wrapper for async get communications")
- .def(py::init<>())
- .def(
- "get", [](const PyGetAsync* self) { return py::reinterpret_steal<py::object>(*(self->get())); },
- "Get python object after async communication in receiver side");
-
/* class Activity */
- py::class_<Activity, ActivityPtr>(m, "Activityy", "Activity. See the C++ documentation for details.");
+ py::class_<Activity, ActivityPtr>(m, "Activity", "Activity. See the C++ documentation for details.");
/* Class Comm */
py::class_<Comm, CommPtr, Activity>(m, "Comm", "Communication. See the C++ documentation for details.")
"Block until the completion of that communication, or raises TimeoutException after the specified timeout.")
.def("wait_until", &Comm::wait_until, py::call_guard<py::gil_scoped_release>(), py::arg("time_limit"),
"Block until the completion of that communication, or raises TimeoutException after the specified time.")
+ .def(
+ "get_payload",
+ [](const Comm* self) { return py::reinterpret_steal<py::object>((PyObject*)self->get_payload()); },
+ py::call_guard<py::gil_scoped_release>(),
+ "Retrieve the message's payload of a get_async. You cannot call this until after the comm termination.")
.def("detach", py::overload_cast<>(&Comm::detach), py::return_value_policy::reference_internal,
py::call_guard<py::gil_scoped_release>(),
"Start the comm, and ignore its result. It can be completely forgotten after that.")
std::function<void(CommImpl*, void*, size_t)> CommImpl::copy_data_callback_ = [](kernel::activity::CommImpl* comm,
void* buff, size_t buff_size) {
xbt_assert((buff_size == sizeof(void*)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
- *(void**)(comm->dst_buff_) = buff;
+ if (comm->dst_buff_ != nullptr) // get_async provided a buffer
+ *(void**)(comm->dst_buff_) = buff;
+ comm->payload_ = buff; // Setup what will be retrieved by s4u::Comm::get_payload()
};
void CommImpl::set_copy_data_callback(const std::function<void(CommImpl*, void*, size_t)>& callback)
{
size_t buff_size = src_buff_size_;
/* If there is no data to copy then return */
- if (not src_buff_ || not dst_buff_ || copied_)
+ if (not src_buff_ || not dst_buff_size_ || copied_)
return;
XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)", this,
unsigned char* dst_buff_ = nullptr;
size_t src_buff_size_ = 0;
size_t* dst_buff_size_ = nullptr;
+ void* payload_ = nullptr; // If dst_buff_ is NULL, the default copy callback puts the data here
void* src_data_ = nullptr; /* User data associated to the communication */
void* dst_data_ = nullptr;
#include <cmath>
#include <simgrid/Exception.hpp>
#include <simgrid/comm.h>
+#include <simgrid/s4u/ActivitySet.hpp>
#include <simgrid/s4u/Comm.hpp>
#include <simgrid/s4u/Engine.hpp>
#include <simgrid/s4u/Mailbox.hpp>
return this;
}
+void* Comm::get_payload() const
+{
+ xbt_assert(get_state() == State::FINISHED,
+ "You can only retrieve the payload of a communication that gracefully terminated, but its state is %s.",
+ get_state_str());
+ return static_cast<kernel::activity::CommImpl*>(pimpl_.get())->payload_;
+}
+
Actor* Comm::get_sender() const
{
kernel::actor::ActorImplPtr sender = nullptr;
{
xbt_assert(get_state() == State::INITED || get_state() == State::STARTING,
"You cannot use %s() once your communication started (not implemented)", __FUNCTION__);
+
+ auto myself = kernel::actor::ActorImpl::self();
+
if (get_source() != nullptr || get_destination() != nullptr) {
xbt_assert(is_assigned(), "When either from_ or to_ is specified, both must be.");
xbt_assert(src_buff_ == nullptr && dst_buff_ == nullptr,
});
fire_on_start();
fire_on_this_start();
- } else if (src_buff_ != nullptr) { // Sender side
+ } else if (myself == sender_) {
on_send(*this);
on_this_send(*this);
kernel::actor::CommIsendSimcall observer{sender_,
"Isend"};
pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::isend(&observer); },
&observer);
- } else if (dst_buff_ != nullptr) { // Receiver side
+ } else if (myself == receiver_) {
xbt_assert(not detached_, "Receive cannot be detached");
on_recv(*this);
on_this_recv(*this);
return res;
}
+CommPtr Mailbox::get_async()
+{
+ CommPtr res = get_init()->set_dst_data(nullptr, sizeof(void*));
+ res->start();
+ return res;
+}
+
kernel::activity::ActivityImplPtr
Mailbox::iprobe(int type, const std::function<bool(void*, void*, kernel::activity::CommImpl*)>& match_fun, void* data)
{
foreach(x actor actor-autorestart actor-suspend
activity-lifecycle
- comm-get-sender comm-pt2pt comm-fault-scenarios wait-all-for wait-any-for
+ comm-get-sender comm-pt2pt comm-fault-scenarios wait-all-for
cloud-interrupt-migration cloud-two-execs
monkey-masterworkers monkey-semaphore
concurrent_rw