/************************** Comunication simcalls *****************************/
/***** Rendez-vous points *****/
-XBT_PUBLIC(smx_mailbox_t) simcall_mbox_create(const char *name);
XBT_PUBLIC(void) simcall_mbox_set_receiver(smx_mailbox_t mbox , smx_actor_t process);
/***** Communication simcalls *****/
}
if(mbox)
- SIMIX_mbox_remove(mbox, this);
-
+ mbox->remove(this);
}
void simgrid::kernel::activity::Comm::suspend()
{
/* if the synchro is a waiting state means that it is still in a mbox */
/* so remove from it and delete it */
if (state == SIMIX_WAITING) {
- SIMIX_mbox_remove(mbox, this);
+ mbox->remove(this);
state = SIMIX_CANCELED;
}
else if (!MC_is_active() /* when running the MC there are no surf actions */
MailboxPtr Mailbox::byName(const char*name)
{
- // FIXME: there is a race condition here where two actors run Mailbox::byName
- // on a non-existent mailbox during the same scheduling round. Both will be
- // interrupted in the simcall creating the underlying simix mbox.
- // Only one simix object will be created, but two S4U objects will be created.
- // Only one S4U object will be stored in the hashmap and used, and the other
- // one will be leaked.
- smx_mailbox_t mbox = SIMIX_mbox_get_by_name(name);
- if (mbox == nullptr)
- mbox = simcall_mbox_create(name);
+ simix::MailboxImpl* mbox = simix::MailboxImpl::byNameOrNull(name);
+ if (mbox == nullptr) {
+ mbox = simix::kernelImmediate([name] {
+ return simix::MailboxImpl::byNameOrCreate(name);
+ });
+ }
return MailboxPtr(&mbox->piface_, true);
}
{
xbt_dict_free(&mailboxes);
}
-
-/******************************************************************************/
-/* Rendez-Vous Points */
-/******************************************************************************/
-
-smx_mailbox_t SIMIX_mbox_create(const char* name)
-{
- xbt_assert(name, "Mailboxes must have a name");
- /* two processes may have pushed the same mbox_create simcall at the same time */
- smx_mailbox_t mbox = static_cast<smx_mailbox_t>(xbt_dict_get_or_null(mailboxes, name));
- if (!mbox) {
- mbox = new simgrid::simix::MailboxImpl(name);
- XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
- xbt_dict_set(mailboxes, mbox->name_, mbox, nullptr);
- }
- return mbox;
-}
-
void SIMIX_mbox_free(void* data)
{
XBT_DEBUG("mbox free %p", data);
delete mbox;
}
-smx_mailbox_t SIMIX_mbox_get_by_name(const char* name)
-{
- return static_cast<smx_mailbox_t>(xbt_dict_get_or_null(mailboxes, name));
-}
+/******************************************************************************/
+/* Rendez-Vous Points */
+/******************************************************************************/
/**
* \brief set the receiver of the rendez vous point to allow eager sends
mbox->permanent_receiver = process;
}
-/**
- * \brief Pushes a communication synchro into a rendez-vous point
- * \param mbox The mailbox
- * \param synchro The communication synchro
+namespace simgrid {
+namespace simix {
+/** @brief Returns the mailbox of that name, or nullptr */
+MailboxImpl* MailboxImpl::byNameOrNull(const char* name)
+{
+ return static_cast<smx_mailbox_t>(xbt_dict_get_or_null(mailboxes, name));
+}
+/** @brief Returns the mailbox of that name, newly created on need */
+MailboxImpl* MailboxImpl::byNameOrCreate(const char* name)
+{
+ xbt_assert(name, "Mailboxes must have a name");
+ /* two processes may have pushed the same mbox_create simcall at the same time */
+ smx_mailbox_t mbox = static_cast<smx_mailbox_t>(xbt_dict_get_or_null(mailboxes, name));
+ if (!mbox) {
+ mbox = new simgrid::simix::MailboxImpl(name);
+ XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
+ xbt_dict_set(mailboxes, mbox->name_, mbox, nullptr);
+ }
+ return mbox;
+}
+/** @brief Pushes a communication activity into a mailbox
+ * @param activity What to add
*/
-void SIMIX_mbox_push(smx_mailbox_t mbox, smx_activity_t synchro)
+void MailboxImpl::push(smx_activity_t synchro)
{
simgrid::kernel::activity::Comm* comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
- mbox->comm_queue.push_back(comm);
- comm->mbox = mbox;
+ this->comm_queue.push_back(comm);
+ comm->mbox = this;
}
-/**
- * \brief Removes a communication synchro from a rendez-vous point
- * \param mbox The rendez-vous point
- * \param synchro The communication synchro
+/** @brief Removes a communication activity from a mailbox
+ * @param activity What to remove
*/
-void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_activity_t synchro)
+void MailboxImpl::remove(smx_activity_t activity)
{
- simgrid::kernel::activity::Comm* comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
+ simgrid::kernel::activity::Comm* comm = static_cast<simgrid::kernel::activity::Comm*>(activity);
comm->mbox = nullptr;
- for (auto it = mbox->comm_queue.begin(); it != mbox->comm_queue.end(); it++)
+ for (auto it = this->comm_queue.begin(); it != this->comm_queue.end(); it++)
if (*it == comm) {
- mbox->comm_queue.erase(it);
+ this->comm_queue.erase(it);
return;
}
- xbt_die("Cannot remove the comm %p that is not part of the mailbox %s", comm, mbox->name_);
+ xbt_die("Cannot remove the comm %p that is not part of the mailbox %s", comm, this->name_);
+}
+}
}
/** @brief Rendez-vous point datatype */
class MailboxImpl {
-public:
explicit MailboxImpl(const char* name)
: piface_(this), name_(xbt_strdup(name)), comm_queue(MAX_MAILBOX_SIZE), done_comm_queue(MAX_MAILBOX_SIZE)
{
}
+
+public:
~MailboxImpl() { xbt_free(name_); }
+ static MailboxImpl* byNameOrNull(const char* name);
+ static MailboxImpl* byNameOrCreate(const char* name);
+ void push(smx_activity_t synchro);
+ void remove(smx_activity_t activity);
simgrid::s4u::Mailbox piface_; // Our interface
char* name_;
- boost::circular_buffer_space_optimized<smx_activity_t> comm_queue;
+
boost::intrusive_ptr<simgrid::simix::ActorImpl> permanent_receiver; //process which the mailbox is attached to
+ boost::circular_buffer_space_optimized<smx_activity_t> comm_queue;
boost::circular_buffer_space_optimized<smx_activity_t> done_comm_queue;//messages already received in the permanent receive mode
};
}
XBT_PRIVATE void SIMIX_mailbox_exit();
-XBT_PRIVATE smx_mailbox_t SIMIX_mbox_create(const char* name);
-XBT_PRIVATE smx_mailbox_t SIMIX_mbox_get_by_name(const char* name);
-XBT_PRIVATE void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_activity_t comm);
-XBT_PRIVATE void SIMIX_mbox_push(smx_mailbox_t mbox, smx_activity_t synchro);
-
XBT_PRIVATE void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_actor_t proc);
#endif /* SIMIX_MAILBOXIMPL_H */
return (e_smx_state_t) simcall_BODY_process_sleep(duration);
}
-/**
- * \ingroup simix_mbox_management
- * \brief Creates a new rendez-vous point
- * \param name The name of the rendez-vous point
- * \return The created rendez-vous point
- */
-smx_mailbox_t simcall_mbox_create(const char *name)
-{
- return simcall_BODY_mbox_create(name);
-}
-
void simcall_mbox_set_receiver(smx_mailbox_t mbox, smx_actor_t process)
{
simcall_BODY_mbox_set_receiver(mbox, process);
simgrid::simix::marshal<smx_actor_t>(simcall->result, result);
}
-static inline const char* simcall_mbox_create__get__name(smx_simcall_t simcall) {
- return simgrid::simix::unmarshal<const char*>(simcall->args[0]);
-}
-static inline void simcall_mbox_create__set__name(smx_simcall_t simcall, const char* arg) {
- simgrid::simix::marshal<const char*>(simcall->args[0], arg);
-}
-static inline smx_mailbox_t simcall_mbox_create__get__result(smx_simcall_t simcall){
- return simgrid::simix::unmarshal<smx_mailbox_t>(simcall->result);
-}
-static inline void simcall_mbox_create__set__result(smx_simcall_t simcall, smx_mailbox_t result){
- simgrid::simix::marshal<smx_mailbox_t>(simcall->result, result);
-}
-
static inline smx_mailbox_t simcall_mbox_set_receiver__get__mbox(smx_simcall_t simcall) {
return simgrid::simix::unmarshal<smx_mailbox_t>(simcall->args[0]);
}
return simcall<smx_actor_t, smx_actor_t>(SIMCALL_PROCESS_RESTART, process);
}
-inline static smx_mailbox_t simcall_BODY_mbox_create(const char* name) {
- /* Go to that function to follow the code flow through the simcall barrier */
- if (0) SIMIX_mbox_create(name);
- return simcall<smx_mailbox_t, const char*>(SIMCALL_MBOX_CREATE, name);
- }
-
inline static void simcall_BODY_mbox_set_receiver(smx_mailbox_t mbox, smx_actor_t receiver) {
/* Go to that function to follow the code flow through the simcall barrier */
if (0) SIMIX_mbox_set_receiver(mbox, receiver);
SIMCALL_PROCESS_ON_EXIT,
SIMCALL_PROCESS_AUTO_RESTART_SET,
SIMCALL_PROCESS_RESTART,
- SIMCALL_MBOX_CREATE,
SIMCALL_MBOX_SET_RECEIVER,
SIMCALL_COMM_IPROBE,
SIMCALL_COMM_SEND,
"SIMCALL_PROCESS_ON_EXIT",
"SIMCALL_PROCESS_AUTO_RESTART_SET",
"SIMCALL_PROCESS_RESTART",
- "SIMCALL_MBOX_CREATE",
"SIMCALL_MBOX_SET_RECEIVER",
"SIMCALL_COMM_IPROBE",
"SIMCALL_COMM_SEND",
SIMIX_simcall_answer(simcall);
break;
-case SIMCALL_MBOX_CREATE:
- simgrid::simix::marshal<smx_mailbox_t>(simcall->result, SIMIX_mbox_create(simgrid::simix::unmarshal<const char*>(simcall->args[0])));
- SIMIX_simcall_answer(simcall);
- break;
-
case SIMCALL_MBOX_SET_RECEIVER:
SIMIX_mbox_set_receiver(simgrid::simix::unmarshal<smx_mailbox_t>(simcall->args[0]), simgrid::simix::unmarshal<smx_actor_t>(simcall->args[1]));
SIMIX_simcall_answer(simcall);
void process_auto_restart_set(smx_actor_t process, int auto_restart) [[nohandler]];
smx_actor_t process_restart(smx_actor_t process);
-smx_mailbox_t mbox_create(const char* name) [[nohandler]];
void mbox_set_receiver(smx_mailbox_t mbox, smx_actor_t receiver) [[nohandler]];
smx_activity_t comm_iprobe(smx_mailbox_t mbox, int type, int src, int tag, simix_match_func_t match_fun, void* data);
XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_comm));
}else{
- SIMIX_mbox_push(mbox, this_synchro);
+ mbox->push(this_synchro);
}
} else {
XBT_DEBUG("Receive already pushed");
if (!other_synchro) {
XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
other_synchro = this_synchro;
- SIMIX_mbox_push(mbox, this_synchro);
+ mbox->push(this_synchro);
} else {
simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
if (!other_synchro) {
XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue.size());
other_synchro = this_synchro;
- SIMIX_mbox_push(mbox, this_synchro);
+ mbox->push(this_synchro);
} else {
this_synchro->unref();
simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
/* If the synchro is still in a rendez-vous point then remove from it */
if (comm->mbox)
- SIMIX_mbox_remove(comm->mbox, synchro);
+ comm->mbox->remove(synchro);
XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
+#include "mc/mc.h"
#include "private.h"
#include "private.hpp"
+#include "simgrid/s4u/Mailbox.hpp"
+#include "simgrid/sg_config.h"
#include "smpi_mpi_dt_private.h"
-#include "mc/mc.h"
+#include "src/kernel/activity/SynchroComm.hpp"
#include "src/mc/mc_record.h"
-#include "xbt/replay.h"
-#include "surf/surf.h"
-#include "src/simix/smx_private.h"
-#include "simgrid/sg_config.h"
#include "src/mc/mc_replay.h"
#include "src/msg/msg_private.h"
-#include "src/kernel/activity/SynchroComm.hpp"
+#include "src/simix/smx_private.h"
+#include "surf/surf.h"
+#include "xbt/replay.h"
#include <float.h> /* DBL_MAX */
#include <fstream>
double simulated;
int *argc;
char ***argv;
- smx_mailbox_t mailbox;
- smx_mailbox_t mailbox_small;
+ simgrid::s4u::MailboxPtr mailbox;
+ simgrid::s4u::MailboxPtr mailbox_small;
xbt_mutex_t mailboxes_mutex;
xbt_os_timer_t timer;
MPI_Comm comm_self;
data->argc = argc;
data->argv = argv;
// set the process attached to the mailbox
- simcall_mbox_set_receiver(data->mailbox_small, proc);
+ simcall_mbox_set_receiver(data->mailbox_small->getImpl(), proc);
XBT_DEBUG("<%d> New process in the game: %p", index, proc);
}
xbt_assert(smpi_process_data(),
smx_mailbox_t smpi_process_mailbox()
{
smpi_process_data_t data = smpi_process_data();
- return data->mailbox;
+ return data->mailbox->getImpl();
}
smx_mailbox_t smpi_process_mailbox_small()
{
smpi_process_data_t data = smpi_process_data();
- return data->mailbox_small;
+ return data->mailbox_small->getImpl();
}
xbt_mutex_t smpi_process_mailboxes_mutex()
smx_mailbox_t smpi_process_remote_mailbox(int index)
{
smpi_process_data_t data = smpi_process_remote_data(index);
- return data->mailbox;
+ return data->mailbox->getImpl();
}
smx_mailbox_t smpi_process_remote_mailbox_small(int index)
{
smpi_process_data_t data = smpi_process_remote_data(index);
- return data->mailbox_small;
+ return data->mailbox_small->getImpl();
}
xbt_mutex_t smpi_process_remote_mailboxes_mutex(int index)
process_data[i] = new s_smpi_process_data_t;
process_data[i]->argc = nullptr;
process_data[i]->argv = nullptr;
- process_data[i]->mailbox = simcall_mbox_create(get_mailbox_name(name, i));
- process_data[i]->mailbox_small = simcall_mbox_create(get_mailbox_name_small(name, i));
+ process_data[i]->mailbox = simgrid::s4u::Mailbox::byName(get_mailbox_name(name, i));
+ process_data[i]->mailbox_small = simgrid::s4u::Mailbox::byName(get_mailbox_name_small(name, i));
process_data[i]->mailboxes_mutex = xbt_mutex_init();
process_data[i]->timer = xbt_os_timer_new();
if (MC_is_active())