Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
BMF: remove warning since the solver can be used by any model
[simgrid.git] / src / s4u / s4u_Comm.cpp
index cff5e62..eec97a7 100644 (file)
@@ -3,18 +3,18 @@
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
-//#include "src/msg/msg_private.hpp"
-//#include "xbt/log.h"
-
+#include <cmath>
 #include <simgrid/Exception.hpp>
 #include <simgrid/comm.h>
 #include <simgrid/s4u/Comm.hpp>
 #include <simgrid/s4u/Engine.hpp>
 #include <simgrid/s4u/Mailbox.hpp>
 
+#include "mc/mc.h"
 #include "src/kernel/activity/CommImpl.hpp"
 #include "src/kernel/actor/ActorImpl.hpp"
 #include "src/kernel/actor/SimcallObserver.hpp"
+#include "src/mc/mc_replay.hpp"
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_comm, s4u_activity, "S4U asynchronous communications");
 
@@ -24,7 +24,7 @@ xbt::signal<void(Comm const&)> Comm::on_send;
 xbt::signal<void(Comm const&)> Comm::on_recv;
 xbt::signal<void(Comm const&)> Comm::on_completion;
 
-CommPtr Comm::set_copy_data_callback(void (*callback)(kernel::activity::CommImpl*, void*, size_t))
+CommPtr Comm::set_copy_data_callback(const std::function<void(kernel::activity::CommImpl*, void*, size_t)>& callback)
 {
   copy_data_function_ = callback;
   return this;
@@ -34,8 +34,8 @@ void Comm::copy_buffer_callback(kernel::activity::CommImpl* comm, void* buff, si
 {
   XBT_DEBUG("Copy the data over");
   memcpy(comm->dst_buff_, buff, buff_size);
-  if (comm->detached()) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the
-                          // original buffer available to the application ASAP
+  if (comm->is_detached()) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the
+                             // original buffer available to the application ASAP
     xbt_free(buff);
     comm->src_buff_ = nullptr;
   }
@@ -60,6 +60,91 @@ Comm::~Comm()
   }
 }
 
+void Comm::send(kernel::actor::ActorImpl* sender, const Mailbox* mbox, double task_size, double rate, void* src_buff,
+                size_t src_buff_size,
+                const std::function<bool(void*, void*, simgrid::kernel::activity::CommImpl*)>& match_fun,
+                const std::function<void(simgrid::kernel::activity::CommImpl*, void*, size_t)>& copy_data_fun,
+                void* data, double timeout)
+{
+  /* checking for infinite values */
+  xbt_assert(std::isfinite(task_size), "task_size is not finite!");
+  xbt_assert(std::isfinite(rate), "rate is not finite!");
+  xbt_assert(std::isfinite(timeout), "timeout is not finite!");
+
+  xbt_assert(mbox, "No rendez-vous point defined for send");
+
+  if (MC_is_active() || MC_record_replay_is_active()) {
+    /* the model-checker wants two separate simcalls, and wants comm to be nullptr during the simcall */
+    simgrid::kernel::activity::ActivityImplPtr comm = nullptr;
+
+    simgrid::kernel::actor::CommIsendSimcall send_observer{
+        sender,  mbox->get_impl(), task_size, rate, static_cast<unsigned char*>(src_buff), src_buff_size, match_fun,
+        nullptr, copy_data_fun,    data,      false};
+    comm = simgrid::kernel::actor::simcall_answered(
+        [&send_observer] { return simgrid::kernel::activity::CommImpl::isend(&send_observer); }, &send_observer);
+
+    simgrid::kernel::actor::ActivityWaitSimcall wait_observer{sender, comm.get(), timeout};
+    if (simgrid::kernel::actor::simcall_blocking(
+            [&wait_observer] {
+              wait_observer.get_activity()->wait_for(wait_observer.get_issuer(), wait_observer.get_timeout());
+            },
+            &wait_observer)) {
+      throw simgrid::TimeoutException(XBT_THROW_POINT, "Timeouted");
+    }
+    comm = nullptr;
+  } else {
+    simgrid::kernel::actor::CommIsendSimcall observer(sender, mbox->get_impl(), task_size, rate,
+                                                      static_cast<unsigned char*>(src_buff), src_buff_size, match_fun,
+                                                      nullptr, copy_data_fun, data, false);
+    simgrid::kernel::actor::simcall_blocking([&observer, timeout] {
+      simgrid::kernel::activity::ActivityImplPtr comm = simgrid::kernel::activity::CommImpl::isend(&observer);
+      comm->wait_for(observer.get_issuer(), timeout);
+    });
+  }
+}
+
+void Comm::recv(kernel::actor::ActorImpl* receiver, const Mailbox* mbox, void* dst_buff, size_t* dst_buff_size,
+                const std::function<bool(void*, void*, simgrid::kernel::activity::CommImpl*)>& match_fun,
+                const std::function<void(simgrid::kernel::activity::CommImpl*, void*, size_t)>& copy_data_fun,
+                void* data, double timeout, double rate)
+{
+  xbt_assert(std::isfinite(timeout), "timeout is not finite!");
+  xbt_assert(mbox, "No rendez-vous point defined for recv");
+
+  if (MC_is_active() || MC_record_replay_is_active()) {
+    /* the model-checker wants two separate simcalls, and wants comm to be nullptr during the simcall */
+    simgrid::kernel::activity::ActivityImplPtr comm = nullptr;
+
+    simgrid::kernel::actor::CommIrecvSimcall observer{receiver,
+                                                      mbox->get_impl(),
+                                                      static_cast<unsigned char*>(dst_buff),
+                                                      dst_buff_size,
+                                                      match_fun,
+                                                      copy_data_fun,
+                                                      data,
+                                                      rate};
+    comm = simgrid::kernel::actor::simcall_answered(
+        [&observer] { return simgrid::kernel::activity::CommImpl::irecv(&observer); }, &observer);
+
+    simgrid::kernel::actor::ActivityWaitSimcall wait_observer{receiver, comm.get(), timeout};
+    if (simgrid::kernel::actor::simcall_blocking(
+            [&wait_observer] {
+              wait_observer.get_activity()->wait_for(wait_observer.get_issuer(), wait_observer.get_timeout());
+            },
+            &wait_observer)) {
+      throw simgrid::TimeoutException(XBT_THROW_POINT, "Timeouted");
+    }
+    comm = nullptr;
+  } else {
+    simgrid::kernel::actor::CommIrecvSimcall observer(receiver, mbox->get_impl(), static_cast<unsigned char*>(dst_buff),
+                                                      dst_buff_size, match_fun, copy_data_fun, data, rate);
+    simgrid::kernel::actor::simcall_blocking([&observer, timeout] {
+      simgrid::kernel::activity::ActivityImplPtr comm = simgrid::kernel::activity::CommImpl::irecv(&observer);
+      comm->wait_for(observer.get_issuer(), timeout);
+    });
+  }
+}
+
 CommPtr Comm::sendto_init()
 {
   CommPtr res(new Comm());
@@ -306,13 +391,13 @@ Comm* Comm::wait_for(double timeout)
         return vetoable_start()->wait_for(timeout); // In the case of host2host comm, do it in two simcalls
       } else if (src_buff_ != nullptr) {
         on_send(*this);
-        simcall_comm_send(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
-                          copy_data_function_, get_data<void>(), timeout);
+        send(sender_, mailbox_, remains_, rate_, src_buff_, src_buff_size_, match_fun_, copy_data_function_,
+             get_data<void>(), timeout);
 
       } else { // Receiver
         on_recv(*this);
-        simcall_comm_recv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_, copy_data_function_,
-                          get_data<void>(), timeout, rate_);
+        recv(receiver_, mailbox_, dst_buff_, &dst_buff_size_, match_fun_, copy_data_function_, get_data<void>(),
+             timeout, rate_);
       }
       break;
     case State::STARTED: