From 55eb4a6777966d160968eb4bfeac1ffa9028ec24 Mon Sep 17 00:00:00 2001 From: Augustin Degomme Date: Thu, 2 Oct 2014 14:18:08 +0200 Subject: [PATCH] Change way replay is handled, to allow cohabitation between replay and "classic" SMPI --- src/smpi/private.h | 5 +++++ src/smpi/smpi_base.c | 12 +++++++----- src/smpi/smpi_comm.c | 23 +++++++++-------------- src/smpi/smpi_deployment.c | 3 ++- src/smpi/smpi_global.c | 22 ++++++++++++++++++++++ src/smpi/smpi_mpi_dt.c | 7 +++++-- src/smpi/smpi_replay.c | 7 ++++--- 7 files changed, 54 insertions(+), 25 deletions(-) diff --git a/src/smpi/private.h b/src/smpi/private.h index b2d4ff20ee..b02770769c 100644 --- a/src/smpi/private.h +++ b/src/smpi/private.h @@ -167,6 +167,8 @@ void smpi_process_simulated_start(void); double smpi_process_simulated_elapsed(void); void smpi_process_set_sampling(int s); int smpi_process_get_sampling(void); +void smpi_process_set_replaying(int s); +int smpi_process_get_replaying(void); void smpi_deployment_register_process(const char* instance_id, int rank, int index, MPI_Comm**, xbt_bar_t*); void smpi_deployment_cleanup_instances(void); @@ -174,6 +176,9 @@ void smpi_deployment_cleanup_instances(void); void smpi_comm_copy_buffer_callback(smx_action_t comm, void *buff, size_t buff_size); +void smpi_comm_null_copy_buffer_callback(smx_action_t comm, + void *buff, size_t buff_size); + void print_request(const char *message, MPI_Request request); int smpi_enabled(void); diff --git a/src/smpi/smpi_base.c b/src/smpi/smpi_base.c index 940cbe2bd3..d24ae3aa3f 100644 --- a/src/smpi/smpi_base.c +++ b/src/smpi/smpi_base.c @@ -388,7 +388,8 @@ void smpi_mpi_start(MPI_Request request) smpi_comm_use(request->comm); request->action = simcall_comm_irecv(mailbox, request->buf, &request->real_size, &match_recv, - &smpi_comm_copy_buffer_callback, + !smpi_process_get_replaying()? &smpi_comm_copy_buffer_callback + : &smpi_comm_null_copy_buffer_callback, request, -1.0); XBT_DEBUG("recv simcall posted"); @@ -451,7 +452,7 @@ void smpi_mpi_start(MPI_Request request) request->refcount++; if(request->old_type->has_subtype == 0){ oldbuf = request->buf; - if (!_xbt_replay_is_active() && oldbuf && request->size!=0){ + if (!smpi_process_get_replaying() && oldbuf && request->size!=0){ if((smpi_privatize_global_variables) && ((char*)request->buf >= start_data_exe) && ((char*)request->buf < start_data_exe + size_data_exe )){ @@ -474,7 +475,8 @@ void smpi_mpi_start(MPI_Request request) buf, request->real_size, &match_send, &xbt_free_f, // how to free the userdata if a detached send fails - &smpi_comm_copy_buffer_callback, + !smpi_process_get_replaying()? &smpi_comm_copy_buffer_callback + : &smpi_comm_null_copy_buffer_callback, request, // detach if msg size < eager/rdv switch limit request->detached); @@ -690,7 +692,7 @@ static void finish_wait(MPI_Request * request, MPI_Status * status) MPI_Datatype datatype = req->old_type; if((req->flags & ACCUMULATE) || (datatype->has_subtype == 1)){ - if (!_xbt_replay_is_active()){ + if (!smpi_process_get_replaying()){ if( smpi_privatize_global_variables && ((char*)req->old_buf >= start_data_exe) && ((char*)req->old_buf < start_data_exe + size_data_exe ) @@ -1403,7 +1405,7 @@ void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count, if(src != root) { // FIXME: possibly overkill we we have contiguous/noncontiguous data // mapping... - if (!_xbt_replay_is_active()) + if (!smpi_process_get_replaying()) tmpbufs[index] = xbt_malloc(count * dataext); else tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext); diff --git a/src/smpi/smpi_comm.c b/src/smpi/smpi_comm.c index c6da369a53..cfc23cd615 100644 --- a/src/smpi/smpi_comm.c +++ b/src/smpi/smpi_comm.c @@ -10,10 +10,8 @@ #include "smpi_mpi_dt_private.h" #include "limits.h" #include "simix/smx_private.h" -#include "xbt/replay.h" #include "colls/colls.h" -extern int is_replay_active ; XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_comm, smpi, "Logging specific to SMPI (comm)"); @@ -134,8 +132,6 @@ void smpi_comm_set_leaders_comm(MPI_Comm comm, MPI_Comm leaders){ } void smpi_comm_set_intra_comm(MPI_Comm comm, MPI_Comm leaders){ - if (comm == MPI_COMM_UNINITIALIZED) - comm = smpi_process_comm_world(); comm->intra_comm=leaders; } @@ -158,9 +154,8 @@ MPI_Comm smpi_comm_get_leaders_comm(MPI_Comm comm){ } MPI_Comm smpi_comm_get_intra_comm(MPI_Comm comm){ - if (comm == MPI_COMM_UNINITIALIZED) - comm = smpi_process_comm_world(); - if(comm==MPI_COMM_WORLD) return smpi_process_get_comm_intra(); + if (comm == MPI_COMM_UNINITIALIZED || comm==MPI_COMM_WORLD) + return smpi_process_get_comm_intra(); else return comm->intra_comm; } @@ -303,9 +298,9 @@ void smpi_comm_init_smp(MPI_Comm comm){ // say to SimGrid that we are not in replay for a while, because we need // the buffers to be copied for the following calls int replaying = 0; //cache data to set it back again after - if(_xbt_replay_is_active()){ - replaying = 1; - is_replay_active = 0 ; + if(smpi_process_get_replaying()){ + replaying=1; + smpi_process_set_replaying(0); } if(smpi_privatize_global_variables){ //we need to switch here, as the called function may silently touch global variables @@ -398,7 +393,7 @@ void smpi_comm_init_smp(MPI_Comm comm){ MPI_Comm leader_comm = MPI_COMM_NULL; - if(comm!=MPI_COMM_WORLD){ + if(MPI_COMM_WORLD!=MPI_COMM_UNINITIALIZED && comm!=MPI_COMM_WORLD){ //create leader_communicator for (i=0; i< leader_group_size;i++) smpi_group_set_mapping(leaders_group, leader_list[i], i); @@ -423,7 +418,7 @@ void smpi_comm_init_smp(MPI_Comm comm){ // Are the nodes uniform ? = same number of process/node int my_local_size=smpi_comm_size(comm_intra); if(smpi_comm_rank(comm_intra)==0) { - int* non_uniform_map = xbt_malloc(sizeof(int)*leader_group_size); + int* non_uniform_map = xbt_malloc0(sizeof(int)*leader_group_size); smpi_coll_tuned_allgather_mpich(&my_local_size, 1, MPI_INT, non_uniform_map, 1, MPI_INT, leader_comm); for(i=0; i < leader_group_size; i++) { @@ -460,7 +455,7 @@ void smpi_comm_init_smp(MPI_Comm comm){ smpi_mpi_allreduce(&is_blocked, &(global_blocked), 1, MPI_INT, MPI_LAND, comm); - if(comm==MPI_COMM_WORLD){ + if(MPI_COMM_WORLD==SMPI_UNINITIALIZED || comm==MPI_COMM_WORLD){ if(smpi_comm_rank(comm)==0){ comm->is_blocked=global_blocked; } @@ -470,6 +465,6 @@ void smpi_comm_init_smp(MPI_Comm comm){ xbt_free(leader_list); if(replaying==1) - is_replay_active = 1; + smpi_process_set_replaying(1); } diff --git a/src/smpi/smpi_deployment.c b/src/smpi/smpi_deployment.c index 33177ac333..30df611365 100644 --- a/src/smpi/smpi_deployment.c +++ b/src/smpi/smpi_deployment.c @@ -88,7 +88,8 @@ void smpi_deployment_cleanup_instances(){ s_smpi_mpi_instance_t* instance = NULL; char *name = NULL; xbt_dict_foreach(smpi_instances, cursor, name, instance) { - while (smpi_group_unuse(smpi_comm_group(instance->comm_world)) > 0); + if(instance->comm_world!=MPI_COMM_NULL) + while (smpi_group_unuse(smpi_comm_group(instance->comm_world)) > 0); xbt_free(instance->comm_world); xbt_barrier_destroy(instance->finalization_barrier); } diff --git a/src/smpi/smpi_global.c b/src/smpi/smpi_global.c index 03d06e1c30..01f56e9ae0 100644 --- a/src/smpi/smpi_global.c +++ b/src/smpi/smpi_global.c @@ -35,6 +35,7 @@ typedef struct s_smpi_process_data { char state; int sampling; /* inside an SMPI_SAMPLE_ block? */ char* instance_id; + int replaying; /* is the process replaying a trace */ xbt_bar_t finalization_barrier; } s_smpi_process_data_t; @@ -91,6 +92,7 @@ void smpi_process_init(int *argc, char ***argv) if(temp_bar != NULL) data->finalization_barrier = temp_bar; data->index = index; data->instance_id = instance_id; + data->replaying = 0; xbt_free(simcall_process_get_data(proc)); simcall_process_set_data(proc, data); if (*argc > 3) { @@ -173,6 +175,19 @@ void smpi_process_mark_as_initialized(void) process_data[index_to_process_data[index]]->state = SMPI_INITIALIZED; } +void smpi_process_set_replaying(int value){ + int index = smpi_process_index(); + if ((index != MPI_UNDEFINED) && (process_data[index_to_process_data[index]]->state != SMPI_FINALIZED)) + process_data[index_to_process_data[index]]->replaying = value; +} + +int smpi_process_get_replaying(){ + int index = smpi_process_index(); + if (index != MPI_UNDEFINED) + return process_data[index_to_process_data[index]]->replaying; + else return _xbt_replay_is_active(); +} + int smpi_global_size(void) { @@ -358,6 +373,13 @@ void smpi_comm_copy_buffer_callback(smx_action_t comm, } + +void smpi_comm_null_copy_buffer_callback(smx_action_t comm, + void *buff, size_t buff_size) +{ + return; +} + static void smpi_check_options(){ //check correctness of MPI parameters diff --git a/src/smpi/smpi_mpi_dt.c b/src/smpi/smpi_mpi_dt.c index 29843af1a1..e943324a7c 100644 --- a/src/smpi/smpi_mpi_dt.c +++ b/src/smpi/smpi_mpi_dt.c @@ -223,7 +223,7 @@ int smpi_datatype_copy(void *sendbuf, int sendcount, MPI_Datatype sendtype, count = sendcount < recvcount ? sendcount : recvcount; if(sendtype->has_subtype == 0 && recvtype->has_subtype == 0) { - if(!_xbt_replay_is_active()) memcpy(recvbuf, sendbuf, count); + if(!smpi_process_get_replaying()) memcpy(recvbuf, sendbuf, count); } else if (sendtype->has_subtype == 0) { @@ -1615,11 +1615,14 @@ void smpi_op_destroy(MPI_Op op) void smpi_op_apply(MPI_Op op, void *invec, void *inoutvec, int *len, MPI_Datatype * datatype) { + if(op==MPI_OP_NULL) + return; + if(smpi_privatize_global_variables){ //we need to switch here, as the called function may silently touch global variables XBT_DEBUG("Applying operation, switch to the right data frame "); smpi_switch_data_segment(smpi_process_index()); } - if(!_xbt_replay_is_active()) + if(!smpi_process_get_replaying()) op->func(invec, inoutvec, len, datatype); } diff --git a/src/smpi/smpi_replay.c b/src/smpi/smpi_replay.c index 9b5ff76cc8..261aec6413 100644 --- a/src/smpi/smpi_replay.c +++ b/src/smpi/smpi_replay.c @@ -33,7 +33,7 @@ static void log_timed_action (const char *const *action, double clock){ //allocate a single buffer for all sends, growing it if needed void* smpi_get_tmp_sendbuffer(int size){ - if (!_xbt_replay_is_active()) + if (!smpi_process_get_replaying()) return xbt_malloc(size); if (sendbuffer_size