Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[s4u] Allocate Mailbox on the heap and return MailboxPtr
authorGabriel Corona <gabriel.corona@loria.fr>
Mon, 4 Jul 2016 12:37:51 +0000 (14:37 +0200)
committerGabriel Corona <gabriel.corona@loria.fr>
Wed, 6 Jul 2016 11:47:51 +0000 (13:47 +0200)
include/simgrid/msg.h
include/simgrid/s4u/mailbox.hpp
include/simgrid/simix.h
include/xbt/synchro_core.h
src/msg/msg_gos.cpp
src/s4u/s4u_engine.cpp
src/s4u/s4u_mailbox.cpp
src/simix/libsmx.cpp
src/simix/smx_network.cpp
src/simix/smx_network_private.h

index 681572d..b3dce5f 100644 (file)
@@ -26,7 +26,7 @@ SG_BEGIN_DECL()
  * #MSG_task_send and friends) hide this object behind a string
  * alias. That mean that you don't provide the mailbox on which you
  * want to send your task, but only the name of this mailbox. */
-typedef struct s_smx_mailbox *msg_mailbox_t;
+typedef smx_mailbox_t msg_mailbox_t;
 
 /* ******************************** Environment ************************************ */
 typedef simgrid_As *msg_as_t;
index 1d30515..4238ecf 100644 (file)
@@ -8,7 +8,7 @@
 
 #include <string>
 
-#include <boost/unordered_map.hpp>
+#include <boost/intrusive_ptr.hpp>
 
 #include <xbt/base.h>
 
@@ -27,20 +27,29 @@ namespace s4u {
  */
 XBT_PUBLIC_CLASS Mailbox {
   friend Comm;
+  friend simgrid::s4u::Engine;
+  friend simgrid::simix::Mailbox;
+
+  smx_mailbox_t pimpl_;
+
+  Mailbox(smx_mailbox_t mbox): pimpl_(mbox) {}
 
-private:
-  Mailbox(const char*name, smx_mailbox_t inferior);
-public:
-  ~Mailbox();
-  
 protected:
   smx_mailbox_t getInferior() { return pimpl_; }
 
 public:
+
+  // We don't have to manage the lifetime of mailboxes:
+  friend void intrusive_ptr_add_ref(Mailbox*) {}
+  friend void intrusive_ptr_release(Mailbox*) {}
+  using Ptr = boost::intrusive_ptr<Mailbox>;
+
   /** Get the name of that mailbox */
   const char *getName();
+
   /** Retrieve the mailbox associated to the given string */
-  static Mailbox *byName(const char *name);
+  static Ptr byName(const char *name);
+
   /** Returns whether the mailbox contains queued communications */
   bool empty();
 
@@ -50,15 +59,13 @@ public:
    * This models the real behavior of TCP and MPI communications, amongst other.
    */
   void setReceiver(Actor* process);
+
   /** Return the process declared as permanent receiver, or nullptr if none **/
   Actor& receiver();
-
-private:
-  std::string name_;
-  smx_mailbox_t pimpl_;
-  static boost::unordered_map<std::string, Mailbox *> *mailboxes;
-  friend s4u::Engine;
 };
+
+using MailboxPtr = Mailbox::Ptr;
+
 }} // namespace simgrid::s4u
 
 XBT_PUBLIC(sg_mbox_t) sg_mbox_by_name(const char*name);
index 53e7b0b..d9b6007 100644 (file)
@@ -32,22 +32,21 @@ namespace simix {
   class Context;
   class ContextFactory;
   class Mutex;
+  class Mailbox;
 }
 }
 
 typedef simgrid::simix::Context *smx_context_t;
 typedef simgrid::simix::Process *smx_process_t;
-
-/**
- * \ingroup simix_synchro_management
- */
 typedef simgrid::simix::Mutex   *smx_mutex_t;
+typedef simgrid::simix::Mailbox *smx_mailbox_t;
 
 #else
 
 typedef struct s_smx_context *smx_context_t;
 typedef struct s_smx_process *smx_process_t;
 typedef struct s_smx_mutex   *smx_mutex_t;
+typedef struct s_smx_mailbox *smx_mailbox_t;
 
 #endif
 
@@ -110,10 +109,6 @@ typedef enum {
 /** @} */
 
 /******************************* Networking ***********************************/
-/**
- * \ingroup simix_mbox_management
- */
-typedef struct s_smx_mailbox *smx_mailbox_t;
 
 /* Process creation/destruction callbacks */
 typedef void (*void_pfn_smxprocess_t) (smx_process_t);
index b859f17..c3378ad 100644 (file)
@@ -29,7 +29,6 @@ SG_BEGIN_DECL()
  *  @{
  */
 
-
 /** @brief Thread mutex data type (opaque object)
  *  @hideinitializer
  */
index d3b5016..9e0378e 100644 (file)
@@ -839,7 +839,8 @@ msg_error_t MSG_task_send_with_timeout_bounded(msg_task_t task, const char *alia
 int MSG_task_listen(const char *alias)
 {
   smx_mailbox_t mbox = MSG_mailbox_get_by_alias(alias);
-  return !MSG_mailbox_is_empty(mbox) || (mbox->permanent_receiver && !mbox->done_comm_queue->empty());
+  return !MSG_mailbox_is_empty(mbox) ||
+    (mbox->permanent_receiver && !mbox->done_comm_queue.empty());
 }
 
 /** \ingroup msg_task_usage
index 6bf7a98..3df3b92 100644 (file)
@@ -39,7 +39,6 @@ Engine *Engine::instance() {
 void Engine::shutdown() {
   delete s4u::Engine::instance_;
   s4u::Engine::instance_ = nullptr;
-  delete s4u::Mailbox::mailboxes;
   delete s4u::Storage::storages_;
 }
 
index c8c6149..d7f4869 100644 (file)
@@ -17,29 +17,21 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_channel,s4u,"S4U Communication Mailboxes");
 namespace simgrid {
 namespace s4u {
 
-boost::unordered_map <std::string, s4u::Mailbox *> *s4u::Mailbox::mailboxes = new boost::unordered_map<std::string, s4u::Mailbox*> ();
-
-Mailbox::Mailbox(const char*name, smx_mailbox_t inferior) {
-  pimpl_ = inferior;
-  name_ = name;
-  mailboxes->insert({name, this});
-}
 const char *Mailbox::getName() {
-  return name_.c_str();
+  return pimpl_->name;
 }
-Mailbox *Mailbox::byName(const char*name) {
-  s4u::Mailbox *res;
-  try {
-    res = mailboxes->at(name);
-  } catch (std::out_of_range& e) {
-    // FIXME: there is a potential race condition here where two actors run Mailbox::byName on a non-existent mailbox
-    // during the same scheduling round. Both will be interrupted in the simcall creating the underlying simix mbox.
-    // Only one simix object will be created, but two S4U objects will be created.
-    // Only one S4U object will be stored in the hashmap and used, and the other one will be leaked.
-    new Mailbox(name,simcall_mbox_create(name));
-    res = mailboxes->at(name); // Use the stored one, even if it's not the one I created myself.
-  }
-  return res;
+
+MailboxPtr Mailbox::byName(const char*name) {
+  // FIXME: there is a race condition here where two actors run Mailbox::byName
+  // on a non-existent mailbox during the same scheduling round. Both will be
+  // interrupted in the simcall creating the underlying simix mbox.
+  // Only one simix object will be created, but two S4U objects will be created.
+  // Only one S4U object will be stored in the hashmap and used, and the other
+  // one will be leaked.
+  smx_mailbox_t mbox = simcall_mbox_get_by_name(name);
+  if (mbox == nullptr)
+    mbox = simcall_mbox_create(name);
+  return MailboxPtr(&mbox->mbox_, true);
 }
 
 bool Mailbox::empty() {
@@ -61,7 +53,7 @@ Actor& Mailbox::receiver() {
 /*------- C functions -------*/
 
 sg_mbox_t sg_mbox_by_name(const char*name){
-  return simgrid::s4u::Mailbox::byName(name);
+  return simgrid::s4u::Mailbox::byName(name).get();
 }
 int sg_mbox_is_empty(sg_mbox_t mbox) {
   return mbox->empty();
index 253116d..8f41f62 100644 (file)
@@ -642,8 +642,7 @@ smx_mailbox_t simcall_mbox_get_by_name(const char *name)
  */
 smx_synchro_t simcall_mbox_front(smx_mailbox_t mbox)
 {
-
-  return mbox->comm_queue->empty()? nullptr:mbox->comm_queue->front();
+  return mbox->comm_queue.empty() ? nullptr : mbox->comm_queue.front();
 }
 
 void simcall_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process)
index e6ce384..db3b4fd 100644 (file)
@@ -45,14 +45,8 @@ smx_mailbox_t SIMIX_mbox_create(const char *name)
   xbt_assert(name, "Mailboxes must have a name");
   /* two processes may have pushed the same mbox_create simcall at the same time */
   smx_mailbox_t mbox = (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
-
   if (!mbox) {
-    mbox = new s_smx_mailbox_t();
-    mbox->name = xbt_strdup(name);
-    mbox->comm_queue = new std::deque<smx_synchro_t>();
-    mbox->done_comm_queue = nullptr; // Allocated on need only
-    mbox->permanent_receiver=nullptr;
-
+    mbox = new simgrid::simix::Mailbox(name);
     XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
     xbt_dict_set(mailboxes, mbox->name, mbox, nullptr);
   }
@@ -63,9 +57,6 @@ void SIMIX_mbox_free(void *data)
 {
   XBT_DEBUG("mbox free %p", data);
   smx_mailbox_t mbox = (smx_mailbox_t) data;
-  xbt_free(mbox->name);
-  delete mbox->comm_queue;
-  delete mbox->done_comm_queue;
   delete mbox;
 }
 
@@ -81,9 +72,7 @@ smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
  */
 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process)
 {
-  mbox->permanent_receiver=process;
-  if (mbox->done_comm_queue == nullptr)
-    mbox->done_comm_queue = new std::deque<smx_synchro_t>();
+  mbox->permanent_receiver = process;
 }
 
 /**
@@ -94,8 +83,7 @@ void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process)
 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t synchro)
 {
   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
-
-  mbox->comm_queue->push_back(comm);
+  mbox->comm_queue.push_back(comm);
   comm->mbox = mbox;
 }
 
@@ -109,9 +97,9 @@ void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_synchro_t synchro)
   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
 
   comm->mbox = nullptr;
-  for (auto it = mbox->comm_queue->begin(); it != mbox->comm_queue->end(); it++)
+  for (auto it = mbox->comm_queue.begin(); it != mbox->comm_queue.end(); it++)
     if (*it == comm) {
-      mbox->comm_queue->erase(it);
+      mbox->comm_queueerase(it);
       return;
     }
   xbt_die("Cannot remove this comm that is not part of the mailbox");
@@ -190,7 +178,7 @@ XBT_PRIVATE smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_
    *
    * If it is not found then push our communication into the rendez-vous point */
   smx_synchro_t other_synchro =
-      _find_matching_comm(mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro, /*remove_matching*/true);
+      _find_matching_comm(&mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro, /*remove_matching*/true);
   simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
 
 
@@ -203,7 +191,7 @@ XBT_PRIVATE smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_
       other_synchro->state = SIMIX_READY;
       other_comm->dst_proc=mbox->permanent_receiver.get();
       other_comm->ref();
-      mbox->done_comm_queue->push_back(other_synchro);
+      mbox->done_comm_queue.push_back(other_synchro);
       other_comm->mbox=mbox;
       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_comm));
 
@@ -274,16 +262,16 @@ smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void
     void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
     void *data, double rate)
 {
-  XBT_DEBUG("recv from %p %p", mbox, mbox->comm_queue);
+  XBT_DEBUG("recv from %p %p", mbox, &mbox->comm_queue);
   simgrid::simix::Comm* this_synchro = new simgrid::simix::Comm(SIMIX_COMM_RECEIVE);
 
   smx_synchro_t other_synchro;
   //communication already done, get it inside the fifo of completed comms
-  if (mbox->permanent_receiver && ! mbox->done_comm_queue->empty()) {
+  if (mbox->permanent_receiver != nullptr && ! mbox->done_comm_queue.empty()) {
 
     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
     //find a match in the already received fifo
-    other_synchro = _find_matching_comm(mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
+    other_synchro = _find_matching_comm(&mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
     //if not found, assume the receiver came first, register it to the mailbox in the classical way
     if (!other_synchro)  {
       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
@@ -308,10 +296,10 @@ smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void
      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
      *
      * If it is not found then push our communication into the rendez-vous point */
-    other_synchro = _find_matching_comm(mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
+    other_synchro = _find_matching_comm(&mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
 
     if (!other_synchro) {
-      XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue->size());
+      XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue.size());
       other_synchro = this_synchro;
       SIMIX_mbox_push(mbox, this_synchro);
     } else {
@@ -356,7 +344,7 @@ smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t m
 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t mbox, int type, int src,
                               int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
 {
-  XBT_DEBUG("iprobe from %p %p", mbox, mbox->comm_queue);
+  XBT_DEBUG("iprobe from %p %p", mbox, &mbox->comm_queue);
   simgrid::simix::Comm* this_comm;
   int smx_type;
   if(type == 1){
@@ -367,14 +355,15 @@ smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t mbox, int
     smx_type = SIMIX_COMM_SEND;
   } 
   smx_synchro_t other_synchro=nullptr;
-  if(mbox->permanent_receiver && ! mbox->done_comm_queue->empty()){
+  if (mbox->permanent_receiver != nullptr && !mbox->done_comm_queue.empty()) {
     XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
-    other_synchro =
-        _find_matching_comm(mbox->done_comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
+    other_synchro = _find_matching_comm(&mbox->done_comm_queue,
+      (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
   }
   if (!other_synchro){
     XBT_DEBUG("check if we have more luck in the normal mailbox");
-    other_synchro = _find_matching_comm(mbox->comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
+    other_synchro = _find_matching_comm(&mbox->comm_queue,
+      (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
   }
 
   if(other_synchro)
index 420e31e..ca85abf 100644 (file)
@@ -8,19 +8,39 @@
 #define _SIMIX_NETWORK_PRIVATE_H
 
 #include <deque>
-#include <xbt/base.h>
+#include <string>
+
 #include <boost/intrusive_ptr.hpp>
 
+#include <xbt/base.h>
+
+#include <simgrid/s4u/mailbox.hpp>
+
 #include "simgrid/simix.h"
 #include "popping_private.h"
+#include "src/simix/smx_process_private.h"
+
+namespace simgrid {
+namespace simix {
 
 /** @brief Rendez-vous point datatype */
-typedef struct s_smx_mailbox {
-  char *name = nullptr;
-  std::deque<smx_synchro_t> *comm_queue = nullptr;
+
+class Mailbox {
+public:
+  Mailbox(const char* name) : mbox_(this), name(xbt_strdup(name)) {}
+  ~Mailbox() {
+    xbt_free(name);
+  }
+
+  simgrid::s4u::Mailbox mbox_;
+  char* name;
+  std::deque<smx_synchro_t> comm_queue;
   boost::intrusive_ptr<simgrid::simix::Process> permanent_receiver; //process which the mailbox is attached to
-  std::deque<smx_synchro_t> *done_comm_queue = nullptr;//messages already received in the permanent receive mode
-} s_smx_mailbox_t;
+  std::deque<smx_synchro_t> done_comm_queue;//messages already received in the permanent receive mode
+};
+
+}
+}
 
 XBT_PRIVATE void SIMIX_mailbox_exit(void);