#include "smpi_process.hpp"
#include "smpi_request.hpp"
#include "xbt/replay.hpp"
+#include <simgrid/smpi/replay.hpp>
#include <boost/algorithm/string/join.hpp>
#include <memory>
#include <numeric>
#include <unordered_map>
-#include <sstream>
#include <vector>
#include <tuple>
typedef std::tuple</*sender*/ int, /* reciever */ int, /* tag */int> req_key_t;
typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
-static MPI_Datatype MPI_DEFAULT_TYPE;
-
-#define CHECK_ACTION_PARAMS(action, mandatory, optional) \
- { \
- if (action.size() < static_cast<unsigned long>(mandatory + 2)) { \
- std::stringstream ss; \
- for (const auto& elem : action) { \
- ss << elem << " "; \
- } \
- THROWF(arg_error, 0, "%s replay failed.\n" \
- "%zu items were given on the line. First two should be process_id and action. " \
- "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
- "The full line that was given is:\n %s\n" \
- "Please contact the Simgrid team if support is needed", \
- __func__, action.size(), static_cast<unsigned long>(mandatory), static_cast<unsigned long>(optional), \
- ss.str().c_str()); \
- } \
- }
static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
{
namespace smpi {
namespace replay {
+MPI_Datatype MPI_DEFAULT_TYPE;
class RequestStorage {
private:
}
};
-/**
- * Base class for all parsers.
- */
-class ActionArgParser {
-public:
- virtual ~ActionArgParser() = default;
- virtual void parse(simgrid::xbt::ReplayAction& action, std::string name) { CHECK_ACTION_PARAMS(action, 0, 0) }
-};
-
-class WaitTestParser : public ActionArgParser {
-public:
- int src;
- int dst;
- int tag;
-
- void parse(simgrid::xbt::ReplayAction& action, std::string name) override
+ void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
{
CHECK_ACTION_PARAMS(action, 3, 0)
src = std::stoi(action[2]);
dst = std::stoi(action[3]);
tag = std::stoi(action[4]);
}
-};
-class SendRecvParser : public ActionArgParser {
-public:
- /* communication partner; if we send, this is the receiver and vice versa */
- int partner;
- double size;
- int tag;
- MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
-
- void parse(simgrid::xbt::ReplayAction& action, std::string name) override
+ void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
{
CHECK_ACTION_PARAMS(action, 3, 1)
partner = std::stoi(action[2]);
if (action.size() > 5)
datatype1 = simgrid::smpi::Datatype::decode(action[5]);
}
-};
-class ComputeParser : public ActionArgParser {
-public:
- /* communication partner; if we send, this is the receiver and vice versa */
- double flops;
- void parse(simgrid::xbt::ReplayAction& action, std::string name) override
+ void ComputeParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
{
CHECK_ACTION_PARAMS(action, 1, 0)
flops = parse_double(action[2]);
}
-};
-class CollCommParser : public ActionArgParser {
-public:
- double size;
- double comm_size;
- double comp_size;
- int send_size;
- int recv_size;
- int root = 0;
- MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
- MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
-};
-
-class BcastArgParser : public CollCommParser {
-public:
- void parse(simgrid::xbt::ReplayAction& action, std::string name) override
+ void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
{
CHECK_ACTION_PARAMS(action, 1, 2)
size = parse_double(action[2]);
if (action.size() > 4)
datatype1 = simgrid::smpi::Datatype::decode(action[4]);
}
-};
-class ReduceArgParser : public CollCommParser {
-public:
- void parse(simgrid::xbt::ReplayAction& action, std::string name) override
+ void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
{
CHECK_ACTION_PARAMS(action, 2, 2)
comm_size = parse_double(action[2]);
if (action.size() > 5)
datatype1 = simgrid::smpi::Datatype::decode(action[5]);
}
-};
-class AllReduceArgParser : public CollCommParser {
-public:
- void parse(simgrid::xbt::ReplayAction& action, std::string name) override
+ void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
{
CHECK_ACTION_PARAMS(action, 2, 1)
comm_size = parse_double(action[2]);
if (action.size() > 4)
datatype1 = simgrid::smpi::Datatype::decode(action[4]);
}
-};
-class AllToAllArgParser : public CollCommParser {
-public:
- void parse(simgrid::xbt::ReplayAction& action, std::string name) override
+ void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
{
CHECK_ACTION_PARAMS(action, 2, 1)
comm_size = MPI_COMM_WORLD->size();
if (action.size() > 5)
datatype2 = simgrid::smpi::Datatype::decode(action[5]);
}
-};
-class GatherArgParser : public CollCommParser {
-public:
- void parse(simgrid::xbt::ReplayAction& action, std::string name) override
+ void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
{
/* The structure of the gather action for the rank 0 (total 4 processes) is the following:
0 gather 68 68 0 0 0
datatype2 = simgrid::smpi::Datatype::decode(action[5]);
}
}
-};
-class GatherVArgParser : public CollCommParser {
-public:
- int recv_size_sum;
- std::shared_ptr<std::vector<int>> recvcounts;
- std::vector<int> disps;
- void parse(simgrid::xbt::ReplayAction& action, std::string name) override
+ void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
{
/* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
0 gather 68 68 10 10 10 0 0 0
}
recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
}
-};
-class ScatterArgParser : public CollCommParser {
-public:
- void parse(simgrid::xbt::ReplayAction& action, std::string name) override
+ void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
{
/* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
0 gather 68 68 0 0 0
if (action.size() > 6)
datatype2 = simgrid::smpi::Datatype::decode(action[6]);
}
-};
-class ScatterVArgParser : public CollCommParser {
-public:
- int recv_size_sum;
- int send_size_sum;
- std::shared_ptr<std::vector<int>> sendcounts;
- std::vector<int> disps;
- void parse(simgrid::xbt::ReplayAction& action, std::string name) override
+ void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
{
/* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
0 gather 68 10 10 10 68 0 0 0
send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
}
-};
-class ReduceScatterArgParser : public CollCommParser {
-public:
- int recv_size_sum;
- std::shared_ptr<std::vector<int>> recvcounts;
- std::vector<int> disps;
- void parse(simgrid::xbt::ReplayAction& action, std::string name) override
+ void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
{
/* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
0 reduceScatter 275427 275427 275427 204020 11346849 0
}
recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
}
-};
-class AllToAllVArgParser : public CollCommParser {
-public:
- int recv_size_sum;
- int send_size_sum;
- std::shared_ptr<std::vector<int>> recvcounts;
- std::shared_ptr<std::vector<int>> sendcounts;
- std::vector<int> senddisps;
- std::vector<int> recvdisps;
- int send_buf_size;
- int recv_buf_size;
- void parse(simgrid::xbt::ReplayAction& action, std::string name) override
+ void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
{
/* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
0 allToAllV 100 1 7 10 12 100 1 70 10 5
send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
}
-};
-
-/**
- * Base class for all ReplayActions.
- * Note that this class actually implements the behavior of each action
- * while the parsing of the replay arguments is done in the @ActionArgParser class.
- * In other words: The logic goes here, the setup is done by the ActionArgParser.
- */
-template <class T> class ReplayAction {
-protected:
- const std::string name;
- const int my_proc_id;
- T args;
-
-public:
- explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::this_actor::get_pid()) {}
- virtual ~ReplayAction() = default;
- virtual void execute(simgrid::xbt::ReplayAction& action)
- {
- // Needs to be re-initialized for every action, hence here
- double start_time = smpi_process()->simulated_elapsed();
- args.parse(action, name);
- kernel(action);
- if (name != "Init")
- log_timed_action(action, start_time);
- }
-
- virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
-
- void* send_buffer(int size)
- {
- return smpi_get_tmp_sendbuffer(size);
- }
-
- void* recv_buffer(int size)
- {
- return smpi_get_tmp_recvbuffer(size);
- }
-};
+template<class T>
+void ReplayAction<T>::execute(simgrid::xbt::ReplayAction& action)
+{
+ // Needs to be re-initialized for every action, hence here
+ double start_time = smpi_process()->simulated_elapsed();
+ args.parse(action, name);
+ kernel(action);
+ if (name != "Init")
+ log_timed_action(action, start_time);
+}
class WaitAction : public ReplayAction<WaitTestParser> {
private: