#include <xbt.h>
#include <xbt/replay.h>
+#define KEY_SIZE (sizeof(int) * 2 + 1)
+
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
int communicator_size = 0;
static int active_processes = 0;
-xbt_dynar_t *reqq = NULL;
+xbt_dict_t reqq = NULL;
MPI_Datatype MPI_DEFAULT_TYPE;
MPI_Datatype MPI_CURRENT_TYPE;
}
}
+
+static xbt_dynar_t get_reqq_self(){
+ char * key;
+
+ int size = asprintf(&key, "%d", smpi_process_index());
+ if(size==-1)
+ xbt_die("could not allocate memory for asprintf");
+ xbt_dynar_t dynar_mpi_request = (xbt_dynar_t) xbt_dict_get(reqq, key);
+ free(key);
+
+ return dynar_mpi_request;
+}
+
+static void set_reqq_self(xbt_dynar_t mpi_request){
+ char * key;
+
+ int size = asprintf(&key, "%d", smpi_process_index());
+ if(size==-1)
+ xbt_die("could not allocate memory for asprintf");
+ xbt_dict_set(reqq, key, mpi_request, free);
+ free(key);
+}
+
+
//allocate a single buffer for all sends, growing it if needed
void* smpi_get_tmp_sendbuffer(int size){
if (!smpi_process_get_replaying())
static void action_init(const char *const *action)
{
- int i;
XBT_DEBUG("Initialize the counters");
CHECK_ACTION_PARAMS(action, 0, 1);
if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype
active_processes = smpi_process_count();
if (!reqq) {
+ reqq = xbt_dict_new();
+ }
+
+ set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
+
+ /*
reqq=xbt_new0(xbt_dynar_t,active_processes);
for(i=0;i<active_processes;i++){
reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
}
}
+ */
}
static void action_finalize(const char *const *action)
TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
request->send = 1;
- xbt_dynar_push(reqq[smpi_process_index()],&request);
+ xbt_dynar_push(get_reqq_self(),&request);
log_timed_action (action, clock);
}
TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
request->recv = 1;
- xbt_dynar_push(reqq[smpi_process_index()],&request);
+ xbt_dynar_push(get_reqq_self(),&request);
log_timed_action (action, clock);
}
MPI_Status status;
int flag = TRUE;
- request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
+ request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
//if request is null here, this may mean that a previous test has succeeded
//Different times in traced application and replayed version may lead to this
//In this case, ignore the extra calls.
/* push back request in dynar to be caught by a subsequent wait. if the test
* did succeed, the request is now NULL.
*/
- xbt_dynar_push_as(reqq[smpi_process_index()],MPI_Request, request);
+ xbt_dynar_push_as(get_reqq_self(),MPI_Request, request);
TRACE_smpi_testing_out(rank);
}
MPI_Request request;
MPI_Status status;
- xbt_assert(xbt_dynar_length(reqq[smpi_process_index()]),
+ xbt_assert(xbt_dynar_length(get_reqq_self()),
"action wait not preceded by any irecv or isend: %s",
xbt_str_join_array(action," "));
- request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
+ request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
if (!request){
/* Assuming that the trace is well formed, this mean the comm might have
int count_requests=0;
unsigned int i=0;
- count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
+ count_requests=xbt_dynar_length(get_reqq_self());
if (count_requests>0) {
MPI_Request requests[count_requests];
/* The reqq is an array of dynars. Its index corresponds to the rank.
Thus each rank saves its own requests to the array request. */
- xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
+ xbt_dynar_foreach(get_reqq_self(),i,requests[i]);
//save information from requests
xbt_dynar_free(&dsts);
xbt_dynar_free(&recvs);
- int freedrank=smpi_process_index();
- xbt_dynar_free_container(&(reqq[freedrank]));
- reqq[freedrank]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
+ //TODO xbt_dynar_free_container(get_reqq_self());
+ set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
}
log_timed_action (action, clock);
}
xbt_free(recvdisps);
}
-void smpi_replay_init(int *argc, char***argv){
+void smpi_replay_run(int *argc, char***argv){
+ /* First initializes everything */
smpi_process_init(argc, argv);
smpi_process_mark_as_initialized();
smpi_process_set_replaying(1);
TRACE_smpi_computing_init(rank);
instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
extra->type = TRACING_INIT;
- TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
- TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
+ char *operation =bprintf("%s_init",__FUNCTION__);
+ TRACE_smpi_collective_in(rank, -1, operation, extra);
+ TRACE_smpi_collective_out(rank, -1, operation);
+ free(operation);
- if (!action_funs){
- _xbt_replay_action_init();
+ if (!_xbt_replay_action_init()) {
xbt_replay_action_register("init", action_init);
xbt_replay_action_register("finalize", action_finalize);
xbt_replay_action_register("comm_size", action_comm_size);
XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
smpi_execute_flops(value);
} else {
- //UGLY done to force context switch to be sure that all MSG_process begin initialization
- XBT_VERB("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
+ //UGLY: force a context switch to be sure that all MSG_processes begin initialization
+ XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
smpi_execute_flops(0.0);
}
+ /* Actually run the replay */
xbt_replay_action_runner(*argc, *argv);
-}
-int smpi_replay_finalize(){
+ /* and now, finalize everything */
double sim_time= 1.;
/* One active process will stop. Decrease the counter*/
XBT_DEBUG("There are %lu elements in reqq[*]",
- xbt_dynar_length(reqq[smpi_process_index()]));
- if (!xbt_dynar_is_empty(reqq[smpi_process_index()])){
- int count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
+ xbt_dynar_length(get_reqq_self()));
+ if (!xbt_dynar_is_empty(get_reqq_self())){
+ int count_requests=xbt_dynar_length(get_reqq_self());
MPI_Request requests[count_requests];
MPI_Status status[count_requests];
unsigned int i;
- xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
+ xbt_dynar_foreach(get_reqq_self(),i,requests[i]);
smpi_mpi_waitall(count_requests, requests, status);
active_processes--;
} else {
}
- xbt_dynar_free_container(&(reqq[smpi_process_index()]));
+ //TODO xbt_dynar_free_container(get_reqq_self()));
if(!active_processes){
XBT_INFO("Simulation time %f", sim_time);
_xbt_replay_action_exit();
xbt_free(sendbuffer);
xbt_free(recvbuffer);
- xbt_free(reqq);
+ //xbt_free(reqq);
+ xbt_dict_free(&reqq); //not need, data have been freed ???
reqq = NULL;
}
-
- int rank = smpi_process_index();
- instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
- extra->type = TRACING_FINALIZE;
- TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
+ instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
+ extra_fin->type = TRACING_FINALIZE;
+ operation =bprintf("%s_finalize",__FUNCTION__);
+ TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
smpi_process_finalize();
- TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
+ TRACE_smpi_collective_out(rank, -1, operation);
TRACE_smpi_finalize(smpi_process_index());
smpi_process_destroy();
- return MPI_SUCCESS;
+ free(operation);
}