Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
reorg and more functions in namespace
[simgrid.git] / src / xbt / xbt_replay.cpp
1 /* Copyright (c) 2010, 2012-2015, 2017. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "src/internal_config.h"
8 #include "xbt/ex.hpp"
9 #include "xbt/log.h"
10 #include "xbt/replay.hpp"
11 #include "xbt/str.h"
12 #include "xbt/sysdep.h"
13
14 #include <boost/algorithm/string.hpp>
15 #include <ctype.h>
16 #include <errno.h>
17 #include <fstream>
18 #include <queue>
19 #include <unordered_map>
20 #include <wchar.h>
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(replay,xbt,"Replay trace reader");
23
24 xbt_dict_t xbt_action_queues = nullptr;
25 bool is_replay_active        = false;
26
27 namespace simgrid {
28 namespace xbt {
29
30 std::ifstream* action_fs = nullptr;
31 std::unordered_map<std::string, action_fun> action_funs;
32 typedef std::vector<std::string> ReplayAction;
33
34 static void read_and_trim_line(std::ifstream* fs, std::string* line)
35 {
36   std::getline(*fs, *line);
37   boost::trim(*line);
38   XBT_DEBUG("got from trace: %s", line->c_str());
39 }
40
41 class ReplayReader {
42   std::ifstream* fs;
43   std::string line;
44
45 public:
46   char* filename_;
47   int linenum = 0;
48
49   explicit ReplayReader(const char* filename)
50   {
51     filename_ = xbt_strdup(filename);
52     fs        = new std::ifstream(filename, std::ifstream::in);
53   }
54   ~ReplayReader()
55   {
56     free(filename_);
57     delete fs;
58   }
59   bool get(ReplayAction* action);
60 };
61
62 bool ReplayReader::get(ReplayAction* action)
63 {
64   read_and_trim_line(fs, &line);
65   linenum++;
66
67   if (line.length() > 0 && line.find("#") == std::string::npos) {
68     boost::split(*action, line, boost::is_any_of(" \t"), boost::token_compress_on);
69     return !fs->eof();
70   } else {
71     if (fs->eof())
72       return false;
73     else
74       return this->get(action);
75   }
76 }
77
78 void replay_init()
79 {
80   if (!is_replay_active) {
81     xbt_action_queues = xbt_dict_new_homogeneous(nullptr);
82     is_replay_active  = true;
83   }
84 }
85
86 void replay_exit()
87 {
88   xbt_dict_free(&xbt_action_queues);
89   xbt_action_queues = nullptr;
90 }
91
92 bool replay_is_active()
93 {
94   return is_replay_active;
95 }
96
97 static ReplayAction* get_action(char* name)
98 {
99   ReplayAction* action;
100
101   std::queue<ReplayAction*>* myqueue = (std::queue<ReplayAction*>*)xbt_dict_get_or_null(xbt_action_queues, name);
102   if (myqueue == nullptr || myqueue->empty()) { // Nothing stored for me. Read the file further
103     if (action_fs == nullptr) {                 // File closed now. There's nothing more to read. I'm out of here
104       goto todo_done;
105     }
106     // Read lines until I reach something for me (which breaks in loop body) or end of file reached
107     while (!action_fs->eof()) {
108       std::string action_line;
109       read_and_trim_line(action_fs, &action_line);
110       if (action_line.length() > 0 && action_line.find("#") == std::string::npos) {
111         /* we cannot split in place here because we parse&store several lines for the colleagues... */
112         action = new ReplayAction();
113         boost::split(*action, action_line, boost::is_any_of(" \t"), boost::token_compress_on);
114
115         // if it's for me, I'm done
116         std::string evtname = action->front();
117         if (evtname.compare(name) == 0) {
118           return action;
119         } else {
120           // Else, I have to store it for the relevant colleague
121           std::queue<ReplayAction*>* otherqueue =
122               (std::queue<ReplayAction*>*)xbt_dict_get_or_null(xbt_action_queues, evtname.c_str());
123           if (otherqueue == nullptr) { // Damn. Create the queue of that guy
124             otherqueue = new std::queue<ReplayAction*>();
125             xbt_dict_set(xbt_action_queues, evtname.c_str(), otherqueue, nullptr);
126           }
127           otherqueue->push(action);
128         }
129       }
130     }
131     // end of file reached while searching in vain for more work
132   } else {
133     // Get something from my queue and return it
134     action = myqueue->front();
135     myqueue->pop();
136     return action;
137   }
138
139 // All my actions in the file are done and either I or a colleague closed the file. Let's cleanup before leaving.
140 todo_done:
141   if (myqueue != nullptr) {
142     delete myqueue;
143     xbt_dict_remove(xbt_action_queues, name);
144   }
145   return nullptr;
146 }
147
148 static void handle_action(ReplayAction* action)
149 {
150   XBT_DEBUG("%s replays a %s action", action->at(0).c_str(), action->at(1).c_str());
151   char** c_action     = new char*[action->size() + 1];
152   action_fun function = action_funs.at(action->at(1));
153   int i               = 0;
154   for (auto arg : *action) {
155     c_action[i] = xbt_strdup(arg.c_str());
156     i++;
157   }
158   c_action[i] = nullptr;
159   try {
160     function(c_action);
161   } catch (xbt_ex& e) {
162     for (unsigned int j = 0; j < action->size(); j++)
163       xbt_free(c_action[j]);
164     delete[] c_action;
165     action->clear();
166     xbt_die("Replay error:\n %s", e.what());
167   }
168   for (unsigned int j = 0; j < action->size(); j++)
169     xbt_free(c_action[j]);
170   delete[] c_action;
171 }
172
173 /**
174  * \ingroup XBT_replay
175  * \brief function used internally to actually run the replay
176
177  * \param argc argc .
178  * \param argv argv
179  */
180 int replay_runner(int argc, char* argv[])
181 {
182   if (simgrid::xbt::action_fs) { // A unique trace file
183     while (true) {
184       simgrid::xbt::ReplayAction* evt = simgrid::xbt::get_action(argv[0]);
185       if (evt == nullptr)
186         break;
187       simgrid::xbt::handle_action(evt);
188       delete evt;
189     }
190   } else { // Should have got my trace file in argument
191     simgrid::xbt::ReplayAction* evt = new simgrid::xbt::ReplayAction();
192     xbt_assert(argc >= 2, "No '%s' agent function provided, no simulation-wide trace file provided, "
193                           "and no process-wide trace file provided in deployment file. Aborting.",
194                argv[0]);
195     simgrid::xbt::ReplayReader* reader = new simgrid::xbt::ReplayReader(argv[1]);
196     while (reader->get(evt)) {
197       if (evt->at(0).compare(argv[0]) == 0) {
198         simgrid::xbt::handle_action(evt);
199       } else {
200         XBT_WARN("%s:%d: Ignore trace element not for me", reader->filename_, reader->linenum);
201       }
202       evt->clear();
203     }
204     delete evt;
205     delete reader;
206   }
207   return 0;
208 }
209 }
210 }
211
212 /**
213  * \ingroup XBT_replay
214  * \brief Registers a function to handle a kind of action
215  *
216  * Registers a function to handle a kind of action
217  * This table is then used by \ref xbt_replay_action_runner
218  *
219  * The argument of the function is the line describing the action, fields separated by spaces.
220  *
221  * \param action_name the reference name of the action.
222  * \param function prototype given by the type: void...(const char** action)
223  */
224 void xbt_replay_action_register(const char* action_name, action_fun function)
225 {
226   if (!is_replay_active) // If the user registers a function before the start
227     simgrid::xbt::replay_init();
228   simgrid::xbt::action_funs.insert({std::string(action_name), function});
229 }