From: Frederic Suter Date: Thu, 16 Mar 2017 17:26:28 +0000 (+0100) Subject: Merge branch 'master' of git+ssh://scm.gforge.inria.fr//gitroot/simgrid/simgrid X-Git-Tag: v3_15~85 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/f021475d9021fca0a6a72dd499aa1dc5dd1d2448?hp=fa54dd19890f64268c0654470eea2f3e04d6a7f7 Merge branch 'master' of git+ssh://scm.gforge.inria.fr//gitroot/simgrid/simgrid --- diff --git a/examples/s4u/actions-comm/s4u_actions-comm.cpp b/examples/s4u/actions-comm/s4u_actions-comm.cpp index e6699558a2..d62c029e11 100644 --- a/examples/s4u/actions-comm/s4u_actions-comm.cpp +++ b/examples/s4u/actions-comm/s4u_actions-comm.cpp @@ -38,7 +38,7 @@ public: argc = 2; argv[1] = &args.at(1)[0]; } - xbt_replay_action_runner(argc, argv); + simgrid::xbt::replay_runner(argc, argv); } void operator()() @@ -86,7 +86,7 @@ int main(int argc, char *argv[]) { simgrid::s4u::Engine* e = new simgrid::s4u::Engine(&argc, argv); /* Explicit initialization of the action module is required now*/ - _xbt_replay_action_init(); + simgrid::xbt::replay_init(); xbt_assert(argc > 2, "Usage: %s platform_file deployment_file [action_files]\n" "\t# if all actions are in the same file\n" @@ -96,7 +96,7 @@ int main(int argc, char *argv[]) argv[0], argv[0], argv[0]); e->loadPlatform(argv[1]); - e->registerDefault(&xbt_replay_action_runner); + e->registerDefault(&simgrid::xbt::replay_runner); e->registerFunction("p0"); e->registerFunction("p1"); e->loadDeployment(argv[2]); @@ -120,7 +120,7 @@ int main(int argc, char *argv[]) XBT_INFO("Simulation time %g", e->getClock()); - _xbt_replay_action_exit(); /* Explicit finalization of the action module */ + simgrid::xbt::replay_exit(); /* Explicit finalization of the action module */ return 0; } diff --git a/include/xbt/replay.hpp b/include/xbt/replay.hpp index 7b55760f71..a317f34c92 100644 --- a/include/xbt/replay.hpp +++ b/include/xbt/replay.hpp @@ -12,12 +12,20 @@ #include "xbt/dict.h" #ifdef __cplusplus #include +#include +#include namespace simgrid { namespace xbt { /* To split the file if a unique one is given (specific variable for the other case live in runner()) */ +typedef std::vector ReplayAction; +static std::unordered_map*> action_queues; + XBT_PUBLIC_DATA(std::ifstream*) action_fs; XBT_PUBLIC(bool) replay_is_active(); +XBT_PUBLIC(void) replay_init(); +XBT_PUBLIC(void) replay_exit(); +XBT_PUBLIC(int) replay_runner(int argc, char* argv[]); } } #endif @@ -25,14 +33,7 @@ XBT_PUBLIC(bool) replay_is_active(); SG_BEGIN_DECL() typedef void (*action_fun)(const char* const* args); - -XBT_PUBLIC_DATA(xbt_dict_t) xbt_action_queues; - XBT_PUBLIC(void) xbt_replay_action_register(const char* action_name, action_fun function); -XBT_PUBLIC(int) xbt_replay_action_runner(int argc, char* argv[]); - -XBT_PUBLIC(void) _xbt_replay_action_init(); -XBT_PUBLIC(void) _xbt_replay_action_exit(); SG_END_DECL() diff --git a/src/msg/msg_actions.cpp b/src/msg/msg_actions.cpp index 6edbf1b5ea..916aa87422 100644 --- a/src/msg/msg_actions.cpp +++ b/src/msg/msg_actions.cpp @@ -14,13 +14,13 @@ SG_BEGIN_DECL() void MSG_action_init() { - _xbt_replay_action_init(); - MSG_function_register_default(xbt_replay_action_runner); + simgrid::xbt::replay_init(); + MSG_function_register_default(simgrid::xbt::replay_runner); } void MSG_action_exit() { - _xbt_replay_action_exit(); + simgrid::xbt::replay_exit(); } /** \ingroup msg_trace_driven @@ -31,22 +31,18 @@ void MSG_action_exit() */ msg_error_t MSG_action_trace_run(char *path) { - msg_error_t res; - char *name; - xbt_dynar_t todo; - xbt_dict_cursor_t cursor; - if (path) { simgrid::xbt::action_fs = new std::ifstream(path, std::ifstream::in); } - res = MSG_main(); - if (!xbt_dict_is_empty(xbt_action_queues)) { + msg_error_t res = MSG_main(); + + if (!simgrid::xbt::action_queues.empty()) { XBT_WARN("Not all actions got consumed. If the simulation ended successfully (without deadlock)," " you may want to add new processes to your deployment file."); - xbt_dict_foreach(xbt_action_queues, cursor, name, todo) { - XBT_WARN("Still %lu actions for %s", xbt_dynar_length(todo), name); + for (auto actions_of : simgrid::xbt::action_queues) { + XBT_WARN("Still %zu actions for %s", actions_of.second->size(), actions_of.first.c_str()); } } @@ -55,9 +51,6 @@ msg_error_t MSG_action_trace_run(char *path) simgrid::xbt::action_fs = nullptr; } - xbt_dict_free(&xbt_action_queues); - xbt_action_queues = xbt_dict_new_homogeneous(nullptr); - return res; } diff --git a/src/smpi/smpi_replay.cpp b/src/smpi/smpi_replay.cpp index 3292e7c398..031d0d8824 100644 --- a/src/smpi/smpi_replay.cpp +++ b/src/smpi/smpi_replay.cpp @@ -909,7 +909,7 @@ void smpi_replay_run(int *argc, char***argv){ TRACE_smpi_collective_in(rank, -1, operation, extra); TRACE_smpi_collective_out(rank, -1, operation); xbt_free(operation); - _xbt_replay_action_init(); + simgrid::xbt::replay_init(); xbt_replay_action_register("init", action_init); xbt_replay_action_register("finalize", action_finalize); xbt_replay_action_register("comm_size", action_comm_size); @@ -950,7 +950,7 @@ void smpi_replay_run(int *argc, char***argv){ } /* Actually run the replay */ - xbt_replay_action_runner(*argc, *argv); + simgrid::xbt::replay_runner(*argc, *argv); /* and now, finalize everything */ /* One active process will stop. Decrease the counter*/ @@ -973,7 +973,7 @@ void smpi_replay_run(int *argc, char***argv){ if(active_processes==0){ /* Last process alive speaking: end the simulated timer */ XBT_INFO("Simulation time %f", smpi_process_simulated_elapsed()); - _xbt_replay_action_exit(); + simgrid::xbt::replay_exit(); xbt_free(sendbuffer); xbt_free(recvbuffer); } diff --git a/src/xbt/xbt_replay.cpp b/src/xbt/xbt_replay.cpp index 69b45d5cde..40b5cee45a 100644 --- a/src/xbt/xbt_replay.cpp +++ b/src/xbt/xbt_replay.cpp @@ -14,22 +14,23 @@ #include #include #include -#include -#include -#include #include XBT_LOG_NEW_DEFAULT_SUBCATEGORY(replay,xbt,"Replay trace reader"); +bool is_replay_active = false; + namespace simgrid { namespace xbt { -bool is_replay_active = false; -typedef std::vector ReplayAction; +std::ifstream* action_fs = nullptr; +std::unordered_map action_funs; -bool replay_is_active() +static void read_and_trim_line(std::ifstream* fs, std::string* line) { - return is_replay_active; + std::getline(*fs, *line); + boost::trim(*line); + XBT_DEBUG("got from trace: %s", line->c_str()); } class ReplayReader { @@ -55,9 +56,7 @@ public: bool ReplayReader::get(ReplayAction* action) { - std::getline(*fs, line); - boost::trim(line); - XBT_DEBUG("got from trace: %s", line.c_str()); + read_and_trim_line(fs, &line); linenum++; if (line.length() > 0 && line.find("#") == std::string::npos) { @@ -70,146 +69,41 @@ bool ReplayReader::get(ReplayAction* action) return this->get(action); } } -std::ifstream* action_fs = nullptr; -} -} - -std::unordered_map xbt_action_funs; -xbt_dict_t xbt_action_queues = nullptr; - -bool is_replay_active = false; - -static simgrid::xbt::ReplayAction* action_get_action(char* name); - -/** - * \ingroup XBT_replay - * \brief Registers a function to handle a kind of action - * - * Registers a function to handle a kind of action - * This table is then used by \ref xbt_replay_action_runner - * - * The argument of the function is the line describing the action, fields separated by spaces. - * - * \param action_name the reference name of the action. - * \param function prototype given by the type: void...(const char** action) - */ -void xbt_replay_action_register(const char *action_name, action_fun function) -{ - if (!is_replay_active) // If the user registers a function before the start - _xbt_replay_action_init(); - xbt_action_funs.insert({std::string(action_name), function}); -} - -void _xbt_replay_action_init() +void replay_init() { if (!is_replay_active) { - xbt_action_queues = xbt_dict_new_homogeneous(nullptr); is_replay_active = true; } } -void _xbt_replay_action_exit() +void replay_exit() { - xbt_dict_free(&xbt_action_queues); - xbt_action_queues = nullptr; } -/** - * \ingroup XBT_replay - * \brief function used internally to actually run the replay - - * \param argc argc . - * \param argv argv - */ -int xbt_replay_action_runner(int argc, char *argv[]) +bool replay_is_active() { - if (simgrid::xbt::action_fs) { // A unique trace file - while (true) { - simgrid::xbt::ReplayAction* evt = action_get_action(argv[0]); - if (evt == nullptr) - break; - - char** args = new char*[evt->size() + 1]; - int i = 0; - for (auto arg : *evt) { - args[i] = xbt_strdup(arg.c_str()); - i++; - } - args[i] = nullptr; - action_fun function = xbt_action_funs.at(evt->at(1)); - - try { - function(args); - } - catch(xbt_ex& e) { - xbt_die("Replay error :\n %s", e.what()); - } - for (unsigned int j = 0; j < evt->size(); j++) - xbt_free(args[j]); - delete[] args; - delete evt; - } - } else { // Should have got my trace file in argument - simgrid::xbt::ReplayAction* evt = new simgrid::xbt::ReplayAction(); - xbt_assert(argc >= 2, - "No '%s' agent function provided, no simulation-wide trace file provided, " - "and no process-wide trace file provided in deployment file. Aborting.", argv[0] - ); - simgrid::xbt::ReplayReader* reader = new simgrid::xbt::ReplayReader(argv[1]); - while (reader->get(evt)) { - if (evt->at(0).compare(argv[0]) == 0) { - char** args = new char*[evt->size() + 1]; - int i = 0; - for (auto arg : *evt) { - args[i] = xbt_strdup(arg.c_str()); - i++; - } - args[i] = nullptr; - action_fun function = xbt_action_funs.at(evt->at(1)); - try { - function(args); - } catch(xbt_ex& e) { - for (unsigned int j = 0; j < evt->size(); j++) - xbt_free(args[j]); - delete[] args; - evt->clear(); - xbt_die("Replay error on line %d of file %s :\n %s", reader->linenum, reader->filename_, e.what()); - } - for (unsigned int j = 0; j < evt->size(); j++) - xbt_free(args[j]); - delete[] args; - } else { - XBT_WARN("%s:%d: Ignore trace element not for me", reader->filename_, reader->linenum); - } - evt->clear(); - } - delete evt; - delete reader; - } - return 0; + return is_replay_active; } -static simgrid::xbt::ReplayAction* action_get_action(char* name) +static ReplayAction* get_action(char* name) { - simgrid::xbt::ReplayAction* action; + ReplayAction* action; - std::queue* myqueue = - (std::queue*)xbt_dict_get_or_null(xbt_action_queues, name); - if (myqueue == nullptr || myqueue->empty()) { // nothing stored for me. Read the file further - if (simgrid::xbt::action_fs == nullptr) { // File closed now. There's nothing more to read. I'm out of here + std::queue* myqueue = nullptr; + if (action_queues.find(std::string(name)) != action_queues.end()) + myqueue = action_queues.at(std::string(name)); + if (myqueue == nullptr || myqueue->empty()) { // Nothing stored for me. Read the file further + if (action_fs == nullptr) { // File closed now. There's nothing more to read. I'm out of here goto todo_done; } // Read lines until I reach something for me (which breaks in loop body) or end of file reached - while (!simgrid::xbt::action_fs->eof()) { + while (!action_fs->eof()) { std::string action_line; - std::getline(*simgrid::xbt::action_fs, action_line); - // cleanup and split the string I just read - boost::trim(action_line); - XBT_DEBUG("got from trace: %s", action_line.c_str()); + read_and_trim_line(action_fs, &action_line); if (action_line.length() > 0 && action_line.find("#") == std::string::npos) { /* we cannot split in place here because we parse&store several lines for the colleagues... */ - action = new simgrid::xbt::ReplayAction(); + action = new ReplayAction(); boost::split(*action, action_line, boost::is_any_of(" \t"), boost::token_compress_on); // if it's for me, I'm done @@ -218,11 +112,12 @@ static simgrid::xbt::ReplayAction* action_get_action(char* name) return action; } else { // Else, I have to store it for the relevant colleague - std::queue* otherqueue = - (std::queue*)xbt_dict_get_or_null(xbt_action_queues, evtname.c_str()); + std::queue* otherqueue = nullptr; + if (action_queues.find(evtname) != action_queues.end()) + otherqueue = action_queues.at(evtname); if (otherqueue == nullptr) { // Damn. Create the queue of that guy - otherqueue = new std::queue(); - xbt_dict_set(xbt_action_queues, evtname.c_str(), otherqueue, nullptr); + otherqueue = new std::queue(); + action_queues.insert({evtname, otherqueue}); } otherqueue->push(action); } @@ -240,7 +135,90 @@ static simgrid::xbt::ReplayAction* action_get_action(char* name) todo_done: if (myqueue != nullptr) { delete myqueue; - xbt_dict_remove(xbt_action_queues, name); + action_queues.erase(std::string(name)); } return nullptr; } + +static void handle_action(ReplayAction* action) +{ + XBT_DEBUG("%s replays a %s action", action->at(0).c_str(), action->at(1).c_str()); + char** c_action = new char*[action->size() + 1]; + action_fun function = action_funs.at(action->at(1)); + int i = 0; + for (auto arg : *action) { + c_action[i] = xbt_strdup(arg.c_str()); + i++; + } + c_action[i] = nullptr; + try { + function(c_action); + } catch (xbt_ex& e) { + for (unsigned int j = 0; j < action->size(); j++) + xbt_free(c_action[j]); + delete[] c_action; + action->clear(); + xbt_die("Replay error:\n %s", e.what()); + } + for (unsigned int j = 0; j < action->size(); j++) + xbt_free(c_action[j]); + delete[] c_action; +} + +/** + * \ingroup XBT_replay + * \brief function used internally to actually run the replay + + * \param argc argc . + * \param argv argv + */ +int replay_runner(int argc, char* argv[]) +{ + if (simgrid::xbt::action_fs) { // A unique trace file + while (true) { + simgrid::xbt::ReplayAction* evt = simgrid::xbt::get_action(argv[0]); + if (evt == nullptr) + break; + simgrid::xbt::handle_action(evt); + delete evt; + } + } else { // Should have got my trace file in argument + simgrid::xbt::ReplayAction* evt = new simgrid::xbt::ReplayAction(); + xbt_assert(argc >= 2, "No '%s' agent function provided, no simulation-wide trace file provided, " + "and no process-wide trace file provided in deployment file. Aborting.", + argv[0]); + simgrid::xbt::ReplayReader* reader = new simgrid::xbt::ReplayReader(argv[1]); + while (reader->get(evt)) { + if (evt->at(0).compare(argv[0]) == 0) { + simgrid::xbt::handle_action(evt); + } else { + XBT_WARN("%s:%d: Ignore trace element not for me", reader->filename_, reader->linenum); + } + evt->clear(); + } + delete evt; + delete reader; + } + return 0; +} +} +} + +/** + * \ingroup XBT_replay + * \brief Registers a function to handle a kind of action + * + * Registers a function to handle a kind of action + * This table is then used by \ref xbt_replay_action_runner + * + * The argument of the function is the line describing the action, fields separated by spaces. + * + * \param action_name the reference name of the action. + * \param function prototype given by the type: void...(const char** action) + */ +void xbt_replay_action_register(const char* action_name, action_fun function) +{ + if (!is_replay_active) // If the user registers a function before the start + simgrid::xbt::replay_init(); + simgrid::xbt::action_funs.insert({std::string(action_name), function}); +}