Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of git+ssh://scm.gforge.inria.fr//gitroot/simgrid/simgrid
authorFrederic Suter <frederic.suter@cc.in2p3.fr>
Thu, 16 Mar 2017 17:26:28 +0000 (18:26 +0100)
committerFrederic Suter <frederic.suter@cc.in2p3.fr>
Thu, 16 Mar 2017 17:26:28 +0000 (18:26 +0100)
examples/s4u/actions-comm/s4u_actions-comm.cpp
include/xbt/replay.hpp
src/msg/msg_actions.cpp
src/smpi/smpi_replay.cpp
src/xbt/xbt_replay.cpp

index e669955..d62c029 100644 (file)
@@ -38,7 +38,7 @@ public:
       argc    = 2;
       argv[1] = &args.at(1)[0];
     }
-    xbt_replay_action_runner(argc, argv);
+    simgrid::xbt::replay_runner(argc, argv);
   }
 
   void operator()()
@@ -86,7 +86,7 @@ int main(int argc, char *argv[])
 {
   simgrid::s4u::Engine* e = new simgrid::s4u::Engine(&argc, argv);
   /* Explicit initialization of the action module is required now*/
-  _xbt_replay_action_init();
+  simgrid::xbt::replay_init();
 
   xbt_assert(argc > 2, "Usage: %s platform_file deployment_file [action_files]\n"
                        "\t# if all actions are in the same file\n"
@@ -96,7 +96,7 @@ int main(int argc, char *argv[])
              argv[0], argv[0], argv[0]);
 
   e->loadPlatform(argv[1]);
-  e->registerDefault(&xbt_replay_action_runner);
+  e->registerDefault(&simgrid::xbt::replay_runner);
   e->registerFunction<Replayer>("p0");
   e->registerFunction<Replayer>("p1");
   e->loadDeployment(argv[2]);
@@ -120,7 +120,7 @@ int main(int argc, char *argv[])
 
   XBT_INFO("Simulation time %g", e->getClock());
 
-  _xbt_replay_action_exit(); /* Explicit finalization of the action module */
+  simgrid::xbt::replay_exit(); /* Explicit finalization of the action module */
 
   return 0;
 }
index 7b55760..a317f34 100644 (file)
 #include "xbt/dict.h"
 #ifdef __cplusplus
 #include <fstream>
+#include <queue>
+#include <unordered_map>
 
 namespace simgrid {
 namespace xbt {
 /* To split the file if a unique one is given (specific variable for the other case live in runner()) */
+typedef std::vector<std::string> ReplayAction;
+static std::unordered_map<std::string, std::queue<ReplayAction*>*> action_queues;
+
 XBT_PUBLIC_DATA(std::ifstream*) action_fs;
 XBT_PUBLIC(bool) replay_is_active();
+XBT_PUBLIC(void) replay_init();
+XBT_PUBLIC(void) replay_exit();
+XBT_PUBLIC(int) replay_runner(int argc, char* argv[]);
 }
 }
 #endif
@@ -25,14 +33,7 @@ XBT_PUBLIC(bool) replay_is_active();
 SG_BEGIN_DECL()
 
 typedef void (*action_fun)(const char* const* args);
-
-XBT_PUBLIC_DATA(xbt_dict_t) xbt_action_queues;
-
 XBT_PUBLIC(void) xbt_replay_action_register(const char* action_name, action_fun function);
-XBT_PUBLIC(int) xbt_replay_action_runner(int argc, char* argv[]);
-
-XBT_PUBLIC(void) _xbt_replay_action_init();
-XBT_PUBLIC(void) _xbt_replay_action_exit();
 
 SG_END_DECL()
 
index 6edbf1b..916aa87 100644 (file)
@@ -14,13 +14,13 @@ SG_BEGIN_DECL()
 
 void MSG_action_init()
 {
-  _xbt_replay_action_init();
-  MSG_function_register_default(xbt_replay_action_runner);
+  simgrid::xbt::replay_init();
+  MSG_function_register_default(simgrid::xbt::replay_runner);
 }
 
 void MSG_action_exit()
 {
-  _xbt_replay_action_exit();
+  simgrid::xbt::replay_exit();
 }
 
 /** \ingroup msg_trace_driven
@@ -31,22 +31,18 @@ void MSG_action_exit()
  */
 msg_error_t MSG_action_trace_run(char *path)
 {
-  msg_error_t res;
-  char *name;
-  xbt_dynar_t todo;
-  xbt_dict_cursor_t cursor;
-
   if (path) {
     simgrid::xbt::action_fs = new std::ifstream(path, std::ifstream::in);
   }
-  res = MSG_main();
 
-  if (!xbt_dict_is_empty(xbt_action_queues)) {
+  msg_error_t res = MSG_main();
+
+  if (!simgrid::xbt::action_queues.empty()) {
     XBT_WARN("Not all actions got consumed. If the simulation ended successfully (without deadlock),"
              " you may want to add new processes to your deployment file.");
 
-    xbt_dict_foreach(xbt_action_queues, cursor, name, todo) {
-      XBT_WARN("Still %lu actions for %s", xbt_dynar_length(todo), name);
+    for (auto actions_of : simgrid::xbt::action_queues) {
+      XBT_WARN("Still %zu actions for %s", actions_of.second->size(), actions_of.first.c_str());
     }
   }
 
@@ -55,9 +51,6 @@ msg_error_t MSG_action_trace_run(char *path)
     simgrid::xbt::action_fs = nullptr;
   }
 
-  xbt_dict_free(&xbt_action_queues);
-  xbt_action_queues = xbt_dict_new_homogeneous(nullptr);
-
   return res;
 }
 
index 3292e7c..031d0d8 100644 (file)
@@ -909,7 +909,7 @@ void smpi_replay_run(int *argc, char***argv){
   TRACE_smpi_collective_in(rank, -1, operation, extra);
   TRACE_smpi_collective_out(rank, -1, operation);
   xbt_free(operation);
-  _xbt_replay_action_init();
+  simgrid::xbt::replay_init();
   xbt_replay_action_register("init",       action_init);
   xbt_replay_action_register("finalize",   action_finalize);
   xbt_replay_action_register("comm_size",  action_comm_size);
@@ -950,7 +950,7 @@ void smpi_replay_run(int *argc, char***argv){
   }
 
   /* Actually run the replay */
-  xbt_replay_action_runner(*argc, *argv);
+  simgrid::xbt::replay_runner(*argc, *argv);
 
   /* and now, finalize everything */
   /* One active process will stop. Decrease the counter*/
@@ -973,7 +973,7 @@ void smpi_replay_run(int *argc, char***argv){
   if(active_processes==0){
     /* Last process alive speaking: end the simulated timer */
     XBT_INFO("Simulation time %f", smpi_process_simulated_elapsed());
-    _xbt_replay_action_exit();
+    simgrid::xbt::replay_exit();
     xbt_free(sendbuffer);
     xbt_free(recvbuffer);
   }
index 69b45d5..40b5cee 100644 (file)
 #include <boost/algorithm/string.hpp>
 #include <ctype.h>
 #include <errno.h>
-#include <fstream>
-#include <queue>
-#include <unordered_map>
 #include <wchar.h>
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(replay,xbt,"Replay trace reader");
 
+bool is_replay_active        = false;
+
 namespace simgrid {
 namespace xbt {
 
-bool is_replay_active = false;
-typedef std::vector<std::string> ReplayAction;
+std::ifstream* action_fs = nullptr;
+std::unordered_map<std::string, action_fun> action_funs;
 
-bool replay_is_active()
+static void read_and_trim_line(std::ifstream* fs, std::string* line)
 {
-  return is_replay_active;
+  std::getline(*fs, *line);
+  boost::trim(*line);
+  XBT_DEBUG("got from trace: %s", line->c_str());
 }
 
 class ReplayReader {
@@ -55,9 +56,7 @@ public:
 
 bool ReplayReader::get(ReplayAction* action)
 {
-  std::getline(*fs, line);
-  boost::trim(line);
-  XBT_DEBUG("got from trace: %s", line.c_str());
+  read_and_trim_line(fs, &line);
   linenum++;
 
   if (line.length() > 0 && line.find("#") == std::string::npos) {
@@ -70,146 +69,41 @@ bool ReplayReader::get(ReplayAction* action)
       return this->get(action);
   }
 }
-std::ifstream* action_fs = nullptr;
-}
-}
-
-std::unordered_map<std::string, action_fun> xbt_action_funs;
 
-xbt_dict_t xbt_action_queues = nullptr;
-
-bool is_replay_active = false;
-
-static simgrid::xbt::ReplayAction* action_get_action(char* name);
-
-/**
- * \ingroup XBT_replay
- * \brief Registers a function to handle a kind of action
- *
- * Registers a function to handle a kind of action
- * This table is then used by \ref xbt_replay_action_runner
- *
- * The argument of the function is the line describing the action, fields separated by spaces.
- *
- * \param action_name the reference name of the action.
- * \param function prototype given by the type: void...(const char** action)
- */
-void xbt_replay_action_register(const char *action_name, action_fun function)
-{
-  if (!is_replay_active) // If the user registers a function before the start
-    _xbt_replay_action_init();
-  xbt_action_funs.insert({std::string(action_name), function});
-}
-
-void _xbt_replay_action_init()
+void replay_init()
 {
   if (!is_replay_active) {
-    xbt_action_queues = xbt_dict_new_homogeneous(nullptr);
     is_replay_active  = true;
   }
 }
 
-void _xbt_replay_action_exit()
+void replay_exit()
 {
-  xbt_dict_free(&xbt_action_queues);
-  xbt_action_queues = nullptr;
 }
 
-/**
- * \ingroup XBT_replay
- * \brief function used internally to actually run the replay
-
- * \param argc argc .
- * \param argv argv
- */
-int xbt_replay_action_runner(int argc, char *argv[])
+bool replay_is_active()
 {
-  if (simgrid::xbt::action_fs) { // A unique trace file
-    while (true) {
-      simgrid::xbt::ReplayAction* evt = action_get_action(argv[0]);
-      if (evt == nullptr)
-        break;
-
-      char** args = new char*[evt->size() + 1];
-      int i       = 0;
-      for (auto arg : *evt) {
-        args[i] = xbt_strdup(arg.c_str());
-        i++;
-      }
-      args[i]             = nullptr;
-      action_fun function = xbt_action_funs.at(evt->at(1));
-
-      try {
-        function(args);
-      }
-      catch(xbt_ex& e) {
-        xbt_die("Replay error :\n %s", e.what());
-      }
-      for (unsigned int j = 0; j < evt->size(); j++)
-        xbt_free(args[j]);
-      delete[] args;
-      delete evt;
-    }
-  } else {                      // Should have got my trace file in argument
-    simgrid::xbt::ReplayAction* evt = new simgrid::xbt::ReplayAction();
-    xbt_assert(argc >= 2,
-                "No '%s' agent function provided, no simulation-wide trace file provided, "
-                "and no process-wide trace file provided in deployment file. Aborting.", argv[0]
-        );
-    simgrid::xbt::ReplayReader* reader = new simgrid::xbt::ReplayReader(argv[1]);
-    while (reader->get(evt)) {
-      if (evt->at(0).compare(argv[0]) == 0) {
-        char** args = new char*[evt->size() + 1];
-        int i       = 0;
-        for (auto arg : *evt) {
-          args[i] = xbt_strdup(arg.c_str());
-          i++;
-        }
-        args[i]             = nullptr;
-        action_fun function = xbt_action_funs.at(evt->at(1));
-        try {
-          function(args);
-        } catch(xbt_ex& e) {
-          for (unsigned int j = 0; j < evt->size(); j++)
-            xbt_free(args[j]);
-          delete[] args;
-          evt->clear();
-          xbt_die("Replay error on line %d of file %s :\n %s", reader->linenum, reader->filename_, e.what());
-        }
-        for (unsigned int j = 0; j < evt->size(); j++)
-          xbt_free(args[j]);
-        delete[] args;
-      } else {
-        XBT_WARN("%s:%d: Ignore trace element not for me", reader->filename_, reader->linenum);
-      }
-      evt->clear();
-    }
-    delete evt;
-    delete reader;
-  }
-  return 0;
+  return is_replay_active;
 }
 
-static simgrid::xbt::ReplayAction* action_get_action(char* name)
+static ReplayAction* get_action(char* name)
 {
-  simgrid::xbt::ReplayAction* action;
+  ReplayAction* action;
 
-  std::queue<simgrid::xbt::ReplayAction*>* myqueue =
-      (std::queue<simgrid::xbt::ReplayAction*>*)xbt_dict_get_or_null(xbt_action_queues, name);
-  if (myqueue == nullptr || myqueue->empty()) { // nothing stored for me. Read the file further
-    if (simgrid::xbt::action_fs == nullptr) {   // File closed now. There's nothing more to read. I'm out of here
+  std::queue<ReplayAction*>* myqueue = nullptr;
+  if (action_queues.find(std::string(name)) != action_queues.end())
+    myqueue = action_queues.at(std::string(name));
+  if (myqueue == nullptr || myqueue->empty()) { // Nothing stored for me. Read the file further
+    if (action_fs == nullptr) {                 // File closed now. There's nothing more to read. I'm out of here
       goto todo_done;
     }
     // Read lines until I reach something for me (which breaks in loop body) or end of file reached
-    while (!simgrid::xbt::action_fs->eof()) {
+    while (!action_fs->eof()) {
       std::string action_line;
-      std::getline(*simgrid::xbt::action_fs, action_line);
-      // cleanup and split the string I just read
-      boost::trim(action_line);
-      XBT_DEBUG("got from trace: %s", action_line.c_str());
+      read_and_trim_line(action_fs, &action_line);
       if (action_line.length() > 0 && action_line.find("#") == std::string::npos) {
         /* we cannot split in place here because we parse&store several lines for the colleagues... */
-        action = new simgrid::xbt::ReplayAction();
+        action = new ReplayAction();
         boost::split(*action, action_line, boost::is_any_of(" \t"), boost::token_compress_on);
 
         // if it's for me, I'm done
@@ -218,11 +112,12 @@ static simgrid::xbt::ReplayAction* action_get_action(char* name)
           return action;
         } else {
           // Else, I have to store it for the relevant colleague
-          std::queue<simgrid::xbt::ReplayAction*>* otherqueue =
-              (std::queue<simgrid::xbt::ReplayAction*>*)xbt_dict_get_or_null(xbt_action_queues, evtname.c_str());
+          std::queue<ReplayAction*>* otherqueue = nullptr;
+          if (action_queues.find(evtname) != action_queues.end())
+            otherqueue = action_queues.at(evtname);
           if (otherqueue == nullptr) { // Damn. Create the queue of that guy
-            otherqueue = new std::queue<simgrid::xbt::ReplayAction*>();
-            xbt_dict_set(xbt_action_queues, evtname.c_str(), otherqueue, nullptr);
+            otherqueue = new std::queue<ReplayAction*>();
+            action_queues.insert({evtname, otherqueue});
           }
           otherqueue->push(action);
         }
@@ -240,7 +135,90 @@ static simgrid::xbt::ReplayAction* action_get_action(char* name)
 todo_done:
   if (myqueue != nullptr) {
     delete myqueue;
-    xbt_dict_remove(xbt_action_queues, name);
+    action_queues.erase(std::string(name));
   }
   return nullptr;
 }
+
+static void handle_action(ReplayAction* action)
+{
+  XBT_DEBUG("%s replays a %s action", action->at(0).c_str(), action->at(1).c_str());
+  char** c_action     = new char*[action->size() + 1];
+  action_fun function = action_funs.at(action->at(1));
+  int i               = 0;
+  for (auto arg : *action) {
+    c_action[i] = xbt_strdup(arg.c_str());
+    i++;
+  }
+  c_action[i] = nullptr;
+  try {
+    function(c_action);
+  } catch (xbt_ex& e) {
+    for (unsigned int j = 0; j < action->size(); j++)
+      xbt_free(c_action[j]);
+    delete[] c_action;
+    action->clear();
+    xbt_die("Replay error:\n %s", e.what());
+  }
+  for (unsigned int j = 0; j < action->size(); j++)
+    xbt_free(c_action[j]);
+  delete[] c_action;
+}
+
+/**
+ * \ingroup XBT_replay
+ * \brief function used internally to actually run the replay
+
+ * \param argc argc .
+ * \param argv argv
+ */
+int replay_runner(int argc, char* argv[])
+{
+  if (simgrid::xbt::action_fs) { // A unique trace file
+    while (true) {
+      simgrid::xbt::ReplayAction* evt = simgrid::xbt::get_action(argv[0]);
+      if (evt == nullptr)
+        break;
+      simgrid::xbt::handle_action(evt);
+      delete evt;
+    }
+  } else { // Should have got my trace file in argument
+    simgrid::xbt::ReplayAction* evt = new simgrid::xbt::ReplayAction();
+    xbt_assert(argc >= 2, "No '%s' agent function provided, no simulation-wide trace file provided, "
+                          "and no process-wide trace file provided in deployment file. Aborting.",
+               argv[0]);
+    simgrid::xbt::ReplayReader* reader = new simgrid::xbt::ReplayReader(argv[1]);
+    while (reader->get(evt)) {
+      if (evt->at(0).compare(argv[0]) == 0) {
+        simgrid::xbt::handle_action(evt);
+      } else {
+        XBT_WARN("%s:%d: Ignore trace element not for me", reader->filename_, reader->linenum);
+      }
+      evt->clear();
+    }
+    delete evt;
+    delete reader;
+  }
+  return 0;
+}
+}
+}
+
+/**
+ * \ingroup XBT_replay
+ * \brief Registers a function to handle a kind of action
+ *
+ * Registers a function to handle a kind of action
+ * This table is then used by \ref xbt_replay_action_runner
+ *
+ * The argument of the function is the line describing the action, fields separated by spaces.
+ *
+ * \param action_name the reference name of the action.
+ * \param function prototype given by the type: void...(const char** action)
+ */
+void xbt_replay_action_register(const char* action_name, action_fun function)
+{
+  if (!is_replay_active) // If the user registers a function before the start
+    simgrid::xbt::replay_init();
+  simgrid::xbt::action_funs.insert({std::string(action_name), function});
+}