Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
f2c6ae26d63795069e0d674273be606a6b5dff5b
[simgrid.git] / src / xbt / xbt_replay.cpp
1 /* Copyright (c) 2010, 2012-2015, 2017. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "src/internal_config.h"
8 #include "xbt/ex.hpp"
9 #include "xbt/log.h"
10 #include "xbt/replay.hpp"
11 #include "xbt/str.h"
12 #include "xbt/sysdep.h"
13
14 #include <boost/algorithm/string.hpp>
15 #include <ctype.h>
16 #include <errno.h>
17 #include <fstream>
18 #include <queue>
19 #include <unordered_map>
20 #include <wchar.h>
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(replay,xbt,"Replay trace reader");
23
24 xbt_dict_t xbt_action_queues = nullptr;
25 bool is_replay_active        = false;
26
27 namespace simgrid {
28 namespace xbt {
29
30 std::ifstream* action_fs = nullptr;
31 std::unordered_map<std::string, action_fun> action_funs;
32 typedef std::vector<std::string> ReplayAction;
33
34 static void read_and_trim_line(std::ifstream* fs, std::string* line)
35 {
36   std::getline(*fs, *line);
37   boost::trim(*line);
38   XBT_DEBUG("got from trace: %s", line->c_str());
39 }
40
41 class ReplayReader {
42   std::ifstream* fs;
43   std::string line;
44
45 public:
46   char* filename_;
47   int linenum = 0;
48
49   explicit ReplayReader(const char* filename)
50   {
51     filename_ = xbt_strdup(filename);
52     fs        = new std::ifstream(filename, std::ifstream::in);
53   }
54   ~ReplayReader()
55   {
56     free(filename_);
57     delete fs;
58   }
59   bool get(ReplayAction* action);
60 };
61
62 bool ReplayReader::get(ReplayAction* action)
63 {
64   read_and_trim_line(fs, &line);
65   linenum++;
66
67   if (line.length() > 0 && line.find("#") == std::string::npos) {
68     boost::split(*action, line, boost::is_any_of(" \t"), boost::token_compress_on);
69     return !fs->eof();
70   } else {
71     if (fs->eof())
72       return false;
73     else
74       return this->get(action);
75   }
76 }
77
78 void replay_init()
79 {
80   if (!is_replay_active) {
81     xbt_action_queues = xbt_dict_new_homogeneous(nullptr);
82     is_replay_active  = true;
83   }
84 }
85
86 void replay_exit()
87 {
88   xbt_dict_free(&xbt_action_queues);
89   xbt_action_queues = nullptr;
90 }
91
92 bool replay_is_active()
93 {
94   return is_replay_active;
95 }
96
97 static ReplayAction* get_action(char* name)
98 {
99   ReplayAction* action;
100
101   std::queue<ReplayAction*>* myqueue =
102       static_cast<std::queue<ReplayAction*>*>(xbt_dict_get_or_null(xbt_action_queues, name));
103   if (myqueue == nullptr || myqueue->empty()) { // Nothing stored for me. Read the file further
104     if (action_fs == nullptr) {                 // File closed now. There's nothing more to read. I'm out of here
105       goto todo_done;
106     }
107     // Read lines until I reach something for me (which breaks in loop body) or end of file reached
108     while (!action_fs->eof()) {
109       std::string action_line;
110       read_and_trim_line(action_fs, &action_line);
111       if (action_line.length() > 0 && action_line.find("#") == std::string::npos) {
112         /* we cannot split in place here because we parse&store several lines for the colleagues... */
113         action = new ReplayAction();
114         boost::split(*action, action_line, boost::is_any_of(" \t"), boost::token_compress_on);
115
116         // if it's for me, I'm done
117         std::string evtname = action->front();
118         if (evtname.compare(name) == 0) {
119           return action;
120         } else {
121           // Else, I have to store it for the relevant colleague
122           std::queue<ReplayAction*>* otherqueue =
123               static_cast<std::queue<ReplayAction*>*>(xbt_dict_get_or_null(xbt_action_queues, evtname.c_str()));
124           if (otherqueue == nullptr) { // Damn. Create the queue of that guy
125             otherqueue = new std::queue<ReplayAction*>();
126             xbt_dict_set(xbt_action_queues, evtname.c_str(), otherqueue, nullptr);
127           }
128           otherqueue->push(action);
129         }
130       }
131     }
132     // end of file reached while searching in vain for more work
133   } else {
134     // Get something from my queue and return it
135     action = myqueue->front();
136     myqueue->pop();
137     return action;
138   }
139
140 // All my actions in the file are done and either I or a colleague closed the file. Let's cleanup before leaving.
141 todo_done:
142   if (myqueue != nullptr) {
143     delete myqueue;
144     xbt_dict_remove(xbt_action_queues, name);
145   }
146   return nullptr;
147 }
148
149 static void handle_action(ReplayAction* action)
150 {
151   XBT_DEBUG("%s replays a %s action", action->at(0).c_str(), action->at(1).c_str());
152   char** c_action     = new char*[action->size() + 1];
153   action_fun function = action_funs.at(action->at(1));
154   int i               = 0;
155   for (auto arg : *action) {
156     c_action[i] = xbt_strdup(arg.c_str());
157     i++;
158   }
159   c_action[i] = nullptr;
160   try {
161     function(c_action);
162   } catch (xbt_ex& e) {
163     for (unsigned int j = 0; j < action->size(); j++)
164       xbt_free(c_action[j]);
165     delete[] c_action;
166     action->clear();
167     xbt_die("Replay error:\n %s", e.what());
168   }
169   for (unsigned int j = 0; j < action->size(); j++)
170     xbt_free(c_action[j]);
171   delete[] c_action;
172 }
173
174 /**
175  * \ingroup XBT_replay
176  * \brief function used internally to actually run the replay
177
178  * \param argc argc .
179  * \param argv argv
180  */
181 int replay_runner(int argc, char* argv[])
182 {
183   if (simgrid::xbt::action_fs) { // A unique trace file
184     while (true) {
185       simgrid::xbt::ReplayAction* evt = simgrid::xbt::get_action(argv[0]);
186       if (evt == nullptr)
187         break;
188       simgrid::xbt::handle_action(evt);
189       delete evt;
190     }
191   } else { // Should have got my trace file in argument
192     simgrid::xbt::ReplayAction* evt = new simgrid::xbt::ReplayAction();
193     xbt_assert(argc >= 2, "No '%s' agent function provided, no simulation-wide trace file provided, "
194                           "and no process-wide trace file provided in deployment file. Aborting.",
195                argv[0]);
196     simgrid::xbt::ReplayReader* reader = new simgrid::xbt::ReplayReader(argv[1]);
197     while (reader->get(evt)) {
198       if (evt->at(0).compare(argv[0]) == 0) {
199         simgrid::xbt::handle_action(evt);
200       } else {
201         XBT_WARN("%s:%d: Ignore trace element not for me", reader->filename_, reader->linenum);
202       }
203       evt->clear();
204     }
205     delete evt;
206     delete reader;
207   }
208   return 0;
209 }
210 }
211 }
212
213 /**
214  * \ingroup XBT_replay
215  * \brief Registers a function to handle a kind of action
216  *
217  * Registers a function to handle a kind of action
218  * This table is then used by \ref xbt_replay_action_runner
219  *
220  * The argument of the function is the line describing the action, fields separated by spaces.
221  *
222  * \param action_name the reference name of the action.
223  * \param function prototype given by the type: void...(const char** action)
224  */
225 void xbt_replay_action_register(const char* action_name, action_fun function)
226 {
227   if (!is_replay_active) // If the user registers a function before the start
228     simgrid::xbt::replay_init();
229   simgrid::xbt::action_funs.insert({std::string(action_name), function});
230 }