Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
replace another old simcall in MSG
[simgrid.git] / src / msg / msg_gos.cpp
index 8f3b098..6f04930 100644 (file)
@@ -6,6 +6,7 @@
 #include "simgrid/Exception.hpp"
 #include <cmath>
 
+#include "simgrid/s4u/Comm.hpp"
 #include "simgrid/s4u/Mailbox.hpp"
 #include "src/instr/instr_private.hpp"
 #include "src/kernel/activity/ExecImpl.hpp"
@@ -238,8 +239,6 @@ msg_error_t MSG_task_receive_ext_bounded(msg_task_t * task, const char *alias, d
   if (host)
     THROW_UNIMPLEMENTED;
 
-  TRACE_msg_task_get_start();
-
   /* Sanity check */
   xbt_assert(task, "Null pointer for the task storage");
 
@@ -248,8 +247,9 @@ msg_error_t MSG_task_receive_ext_bounded(msg_task_t * task, const char *alias, d
 
   /* Try to receive it by calling SIMIX network layer */
   try {
-    simcall_comm_recv(MSG_process_self()->get_impl(), mailbox->get_impl(), task, nullptr, nullptr, nullptr, nullptr,
-                      timeout, rate);
+    void* payload;
+    mailbox->get_init()->set_dst_data(&payload, sizeof(msg_task_t*))->set_rate(rate)->wait_for(timeout);
+    *task = static_cast<msg_task_t>(payload);
     XBT_DEBUG("Got task %s from %s", (*task)->name, mailbox->get_cname());
     (*task)->simdata->setNotUsed();
   } catch (simgrid::HostFailureException& e) {
@@ -480,12 +480,12 @@ int MSG_comm_testany(xbt_dynar_t comms)
   int finished_index = -1;
 
   /* Create the equivalent array with SIMIX objects: */
-  std::vector<simgrid::kernel::activity::ActivityImplPtr> s_comms;
+  std::vector<simgrid::kernel::activity::CommImpl*> s_comms;
   s_comms.reserve(xbt_dynar_length(comms));
   msg_comm_t comm;
   unsigned int cursor;
   xbt_dynar_foreach(comms, cursor, comm) {
-    s_comms.push_back(comm->s_comm);
+    s_comms.push_back(static_cast<simgrid::kernel::activity::CommImpl*>(comm->s_comm.get()));
   }
 
   msg_error_t status = MSG_OK;
@@ -575,20 +575,18 @@ int MSG_comm_waitany(xbt_dynar_t comms)
 {
   int finished_index = -1;
 
-  /* create the equivalent dynar with SIMIX objects */
-  xbt_dynar_t s_comms = xbt_dynar_new(sizeof(smx_activity_t), [](void*ptr){
-    intrusive_ptr_release(*(simgrid::kernel::activity::ActivityImpl**)ptr);
-  });
+  /* Create the equivalent array with SIMIX objects: */
+  std::vector<simgrid::kernel::activity::CommImpl*> s_comms;
+  s_comms.reserve(xbt_dynar_length(comms));
   msg_comm_t comm;
   unsigned int cursor;
   xbt_dynar_foreach(comms, cursor, comm) {
-    intrusive_ptr_add_ref(comm->s_comm.get());
-    xbt_dynar_push_as(s_comms, simgrid::kernel::activity::ActivityImpl*, comm->s_comm.get());
+    s_comms.push_back(static_cast<simgrid::kernel::activity::CommImpl*>(comm->s_comm.get()));
   }
 
   msg_error_t status = MSG_OK;
   try {
-    finished_index = simcall_comm_waitany(s_comms, -1);
+    finished_index = simcall_comm_waitany(s_comms.data(), s_comms.size(), -1);
   } catch (simgrid::TimeoutError& e) {
     finished_index = e.value;
     status         = MSG_TIMEOUT;
@@ -603,7 +601,6 @@ int MSG_comm_waitany(xbt_dynar_t comms)
   }
 
   xbt_assert(finished_index != -1, "WaitAny returned -1");
-  xbt_dynar_free(&s_comms);
 
   comm = xbt_dynar_get_as(comms, finished_index, msg_comm_t);
   /* the communication is finished */
@@ -642,15 +639,12 @@ msg_task_t MSG_comm_get_task(msg_comm_t comm)
 
 /**
  * @brief This function is called by SIMIX in kernel mode to copy the data of a comm.
- * @param synchro the comm
+ * @param comm the comm
  * @param buff the data copied
  * @param buff_size size of the buffer
  */
-void MSG_comm_copy_data_from_SIMIX(smx_activity_t synchro, void* buff, size_t buff_size)
+void MSG_comm_copy_data_from_SIMIX(simgrid::kernel::activity::CommImpl* comm, void* buff, size_t buff_size)
 {
-  simgrid::kernel::activity::CommImplPtr comm =
-      boost::static_pointer_cast<simgrid::kernel::activity::CommImpl>(synchro);
-
   SIMIX_comm_copy_pointer_callback(comm, buff, buff_size);
 
   // notify the user callback if any
@@ -736,13 +730,12 @@ msg_error_t MSG_task_send_with_timeout(msg_task_t task, const char *alias, doubl
 
   /* Try to send it by calling SIMIX network layer */
   try {
-    smx_activity_t comm = nullptr; /* MC needs the comm to be set to nullptr during the simix call  */
-    comm = simcall_comm_isend(SIMIX_process_self(), mailbox->get_impl(), t_simdata->bytes_amount, t_simdata->rate, task,
-                              sizeof(void*), nullptr, nullptr, nullptr, nullptr, 0);
+    simgrid::s4u::CommPtr comm = mailbox->put_init(task, t_simdata->bytes_amount)->set_rate(t_simdata->rate);
+    t_simdata->comm            = boost::static_pointer_cast<simgrid::kernel::activity::CommImpl>(comm->get_impl());
+    comm->start();
     if (TRACE_is_enabled() && task->category != nullptr)
-      simgrid::simix::simcall([comm, task] { comm->set_category(task->category); });
-    t_simdata->comm = boost::static_pointer_cast<simgrid::kernel::activity::CommImpl>(comm);
-    simcall_comm_wait(comm, timeout);
+      simgrid::simix::simcall([comm, task] { comm->get_impl()->set_category(task->category); });
+    comm->wait_for(timeout);
   } catch (simgrid::TimeoutError& e) {
     ret = MSG_TIMEOUT;
   } catch (simgrid::CancelException& e) {
@@ -757,7 +750,6 @@ msg_error_t MSG_task_send_with_timeout(msg_task_t task, const char *alias, doubl
     t_simdata->setNotUsed();
   }
 
-  TRACE_msg_task_put_end();
   return ret;
 }
 
@@ -795,14 +787,9 @@ msg_error_t MSG_task_send_with_timeout_bounded(msg_task_t task, const char *alia
  */
 int MSG_task_listen_from(const char *alias)
 {
-  simgrid::s4u::MailboxPtr mbox = simgrid::s4u::Mailbox::by_name(alias);
-  simgrid::kernel::activity::CommImplPtr comm =
-      boost::static_pointer_cast<simgrid::kernel::activity::CommImpl>(mbox->front());
-
-  if (not comm)
-    return -1;
+  simgrid::kernel::activity::CommImplPtr comm = simgrid::s4u::Mailbox::by_name(alias)->front();
 
-  return MSG_process_get_PID(static_cast<msg_task_t>(comm->src_buff_)->simdata->sender);
+  return comm ? MSG_process_get_PID(static_cast<msg_task_t>(comm->src_buff_)->simdata->sender) : -1;
 }
 
 /**