Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Change way replay is handled, to allow cohabitation between replay and "classic"...
authorAugustin Degomme <augustin.degomme@imag.fr>
Thu, 2 Oct 2014 12:18:08 +0000 (14:18 +0200)
committerAugustin Degomme <augustin.degomme@imag.fr>
Thu, 2 Oct 2014 13:09:18 +0000 (15:09 +0200)
src/smpi/private.h
src/smpi/smpi_base.c
src/smpi/smpi_comm.c
src/smpi/smpi_deployment.c
src/smpi/smpi_global.c
src/smpi/smpi_mpi_dt.c
src/smpi/smpi_replay.c

index b2d4ff2..b027707 100644 (file)
@@ -167,6 +167,8 @@ void smpi_process_simulated_start(void);
 double smpi_process_simulated_elapsed(void);
 void smpi_process_set_sampling(int s);
 int smpi_process_get_sampling(void);
 double smpi_process_simulated_elapsed(void);
 void smpi_process_set_sampling(int s);
 int smpi_process_get_sampling(void);
+void smpi_process_set_replaying(int s);
+int smpi_process_get_replaying(void);
 
 void smpi_deployment_register_process(const char* instance_id, int rank, int index, MPI_Comm**, xbt_bar_t*);
 void smpi_deployment_cleanup_instances(void);
 
 void smpi_deployment_register_process(const char* instance_id, int rank, int index, MPI_Comm**, xbt_bar_t*);
 void smpi_deployment_cleanup_instances(void);
@@ -174,6 +176,9 @@ void smpi_deployment_cleanup_instances(void);
 void smpi_comm_copy_buffer_callback(smx_action_t comm,
                                            void *buff, size_t buff_size);
 
 void smpi_comm_copy_buffer_callback(smx_action_t comm,
                                            void *buff, size_t buff_size);
 
+void smpi_comm_null_copy_buffer_callback(smx_action_t comm,
+                                           void *buff, size_t buff_size);
+
 void print_request(const char *message, MPI_Request request);
 
 int smpi_enabled(void);
 void print_request(const char *message, MPI_Request request);
 
 int smpi_enabled(void);
index 940cbe2..d24ae3a 100644 (file)
@@ -388,7 +388,8 @@ void smpi_mpi_start(MPI_Request request)
     smpi_comm_use(request->comm);
     request->action = simcall_comm_irecv(mailbox, request->buf,
                                          &request->real_size, &match_recv,
     smpi_comm_use(request->comm);
     request->action = simcall_comm_irecv(mailbox, request->buf,
                                          &request->real_size, &match_recv,
-                                         &smpi_comm_copy_buffer_callback,
+                                         !smpi_process_get_replaying()? &smpi_comm_copy_buffer_callback
+                                         : &smpi_comm_null_copy_buffer_callback,
                                          request, -1.0);
         XBT_DEBUG("recv simcall posted");
 
                                          request, -1.0);
         XBT_DEBUG("recv simcall posted");
 
@@ -451,7 +452,7 @@ void smpi_mpi_start(MPI_Request request)
       request->refcount++;
       if(request->old_type->has_subtype == 0){
         oldbuf = request->buf;
       request->refcount++;
       if(request->old_type->has_subtype == 0){
         oldbuf = request->buf;
-        if (!_xbt_replay_is_active() && oldbuf && request->size!=0){
+        if (!smpi_process_get_replaying() && oldbuf && request->size!=0){
           if((smpi_privatize_global_variables)
              && ((char*)request->buf >= start_data_exe)
              && ((char*)request->buf < start_data_exe + size_data_exe )){
           if((smpi_privatize_global_variables)
              && ((char*)request->buf >= start_data_exe)
              && ((char*)request->buf < start_data_exe + size_data_exe )){
@@ -474,7 +475,8 @@ void smpi_mpi_start(MPI_Request request)
                          buf, request->real_size,
                          &match_send,
                          &xbt_free_f, // how to free the userdata if a detached send fails
                          buf, request->real_size,
                          &match_send,
                          &xbt_free_f, // how to free the userdata if a detached send fails
-                         &smpi_comm_copy_buffer_callback,
+                         !smpi_process_get_replaying()? &smpi_comm_copy_buffer_callback
+                         : &smpi_comm_null_copy_buffer_callback,
                          request,
                          // detach if msg size < eager/rdv switch limit
                          request->detached);
                          request,
                          // detach if msg size < eager/rdv switch limit
                          request->detached);
@@ -690,7 +692,7 @@ static void finish_wait(MPI_Request * request, MPI_Status * status)
     MPI_Datatype datatype = req->old_type;
 
     if((req->flags & ACCUMULATE) || (datatype->has_subtype == 1)){
     MPI_Datatype datatype = req->old_type;
 
     if((req->flags & ACCUMULATE) || (datatype->has_subtype == 1)){
-      if (!_xbt_replay_is_active()){
+      if (!smpi_process_get_replaying()){
         if( smpi_privatize_global_variables
             && ((char*)req->old_buf >= start_data_exe)
             && ((char*)req->old_buf < start_data_exe + size_data_exe )
         if( smpi_privatize_global_variables
             && ((char*)req->old_buf >= start_data_exe)
             && ((char*)req->old_buf < start_data_exe + size_data_exe )
@@ -1403,7 +1405,7 @@ void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
       if(src != root) {
         // FIXME: possibly overkill we we have contiguous/noncontiguous data
         //  mapping...
       if(src != root) {
         // FIXME: possibly overkill we we have contiguous/noncontiguous data
         //  mapping...
-           if (!_xbt_replay_is_active())
+           if (!smpi_process_get_replaying())
           tmpbufs[index] = xbt_malloc(count * dataext);
            else
              tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
           tmpbufs[index] = xbt_malloc(count * dataext);
            else
              tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
index c6da369..cfc23cd 100644 (file)
 #include "smpi_mpi_dt_private.h"
 #include "limits.h"
 #include "simix/smx_private.h"
 #include "smpi_mpi_dt_private.h"
 #include "limits.h"
 #include "simix/smx_private.h"
-#include "xbt/replay.h"
 #include "colls/colls.h"
 
 #include "colls/colls.h"
 
-extern int is_replay_active ;
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_comm, smpi,
                                 "Logging specific to SMPI (comm)");
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_comm, smpi,
                                 "Logging specific to SMPI (comm)");
@@ -134,8 +132,6 @@ void smpi_comm_set_leaders_comm(MPI_Comm comm, MPI_Comm leaders){
 }
 
 void smpi_comm_set_intra_comm(MPI_Comm comm, MPI_Comm leaders){
 }
 
 void smpi_comm_set_intra_comm(MPI_Comm comm, MPI_Comm leaders){
-  if (comm == MPI_COMM_UNINITIALIZED)
-    comm = smpi_process_comm_world();
   comm->intra_comm=leaders;
 }
 
   comm->intra_comm=leaders;
 }
 
@@ -158,9 +154,8 @@ MPI_Comm smpi_comm_get_leaders_comm(MPI_Comm comm){
 }
 
 MPI_Comm smpi_comm_get_intra_comm(MPI_Comm comm){
 }
 
 MPI_Comm smpi_comm_get_intra_comm(MPI_Comm comm){
-  if (comm == MPI_COMM_UNINITIALIZED)
-    comm = smpi_process_comm_world();
-  if(comm==MPI_COMM_WORLD) return smpi_process_get_comm_intra();
+  if (comm == MPI_COMM_UNINITIALIZED || comm==MPI_COMM_WORLD) 
+    return smpi_process_get_comm_intra();
   else return comm->intra_comm;
 }
 
   else return comm->intra_comm;
 }
 
@@ -303,9 +298,9 @@ void smpi_comm_init_smp(MPI_Comm comm){
   // say to SimGrid that we are not in replay for a while, because we need 
   // the buffers to be copied for the following calls
   int replaying = 0; //cache data to set it back again after
   // say to SimGrid that we are not in replay for a while, because we need 
   // the buffers to be copied for the following calls
   int replaying = 0; //cache data to set it back again after
-  if(_xbt_replay_is_active()){
-    replaying = 1;
-    is_replay_active = 0 ;
+  if(smpi_process_get_replaying()){
+   replaying=1;
+   smpi_process_set_replaying(0);
   }
 
   if(smpi_privatize_global_variables){ //we need to switch here, as the called function may silently touch global variables
   }
 
   if(smpi_privatize_global_variables){ //we need to switch here, as the called function may silently touch global variables
@@ -398,7 +393,7 @@ void smpi_comm_init_smp(MPI_Comm comm){
 
 
   MPI_Comm leader_comm = MPI_COMM_NULL;
 
 
   MPI_Comm leader_comm = MPI_COMM_NULL;
-  if(comm!=MPI_COMM_WORLD){
+  if(MPI_COMM_WORLD!=MPI_COMM_UNINITIALIZED && comm!=MPI_COMM_WORLD){
     //create leader_communicator
     for (i=0; i< leader_group_size;i++)
       smpi_group_set_mapping(leaders_group, leader_list[i], i);
     //create leader_communicator
     for (i=0; i< leader_group_size;i++)
       smpi_group_set_mapping(leaders_group, leader_list[i], i);
@@ -423,7 +418,7 @@ void smpi_comm_init_smp(MPI_Comm comm){
   // Are the nodes uniform ? = same number of process/node
   int my_local_size=smpi_comm_size(comm_intra);
   if(smpi_comm_rank(comm_intra)==0) {
   // Are the nodes uniform ? = same number of process/node
   int my_local_size=smpi_comm_size(comm_intra);
   if(smpi_comm_rank(comm_intra)==0) {
-    int* non_uniform_map = xbt_malloc(sizeof(int)*leader_group_size);
+    int* non_uniform_map = xbt_malloc0(sizeof(int)*leader_group_size);
     smpi_coll_tuned_allgather_mpich(&my_local_size, 1, MPI_INT,
         non_uniform_map, 1, MPI_INT, leader_comm);
     for(i=0; i < leader_group_size; i++) {
     smpi_coll_tuned_allgather_mpich(&my_local_size, 1, MPI_INT,
         non_uniform_map, 1, MPI_INT, leader_comm);
     for(i=0; i < leader_group_size; i++) {
@@ -460,7 +455,7 @@ void smpi_comm_init_smp(MPI_Comm comm){
   smpi_mpi_allreduce(&is_blocked, &(global_blocked), 1,
             MPI_INT, MPI_LAND, comm);
 
   smpi_mpi_allreduce(&is_blocked, &(global_blocked), 1,
             MPI_INT, MPI_LAND, comm);
 
-  if(comm==MPI_COMM_WORLD){
+  if(MPI_COMM_WORLD==SMPI_UNINITIALIZED || comm==MPI_COMM_WORLD){
     if(smpi_comm_rank(comm)==0){
         comm->is_blocked=global_blocked;
     }
     if(smpi_comm_rank(comm)==0){
         comm->is_blocked=global_blocked;
     }
@@ -470,6 +465,6 @@ void smpi_comm_init_smp(MPI_Comm comm){
   xbt_free(leader_list);
   
   if(replaying==1)
   xbt_free(leader_list);
   
   if(replaying==1)
-    is_replay_active = 1
+    smpi_process_set_replaying(1)
 }
 
 }
 
index 33177ac..30df611 100644 (file)
@@ -88,7 +88,8 @@ void smpi_deployment_cleanup_instances(){
   s_smpi_mpi_instance_t* instance = NULL;
   char *name = NULL;
   xbt_dict_foreach(smpi_instances, cursor, name, instance) {
   s_smpi_mpi_instance_t* instance = NULL;
   char *name = NULL;
   xbt_dict_foreach(smpi_instances, cursor, name, instance) {
-    while (smpi_group_unuse(smpi_comm_group(instance->comm_world)) > 0);
+    if(instance->comm_world!=MPI_COMM_NULL)
+      while (smpi_group_unuse(smpi_comm_group(instance->comm_world)) > 0);
     xbt_free(instance->comm_world);
     xbt_barrier_destroy(instance->finalization_barrier);
   }
     xbt_free(instance->comm_world);
     xbt_barrier_destroy(instance->finalization_barrier);
   }
index 03d06e1..01f56e9 100644 (file)
@@ -35,6 +35,7 @@ typedef struct s_smpi_process_data {
   char state;
   int sampling;                 /* inside an SMPI_SAMPLE_ block? */
   char* instance_id;
   char state;
   int sampling;                 /* inside an SMPI_SAMPLE_ block? */
   char* instance_id;
+  int replaying;                /* is the process replaying a trace */
   xbt_bar_t finalization_barrier;
 } s_smpi_process_data_t;
 
   xbt_bar_t finalization_barrier;
 } s_smpi_process_data_t;
 
@@ -91,6 +92,7 @@ void smpi_process_init(int *argc, char ***argv)
     if(temp_bar != NULL) data->finalization_barrier = temp_bar;
     data->index = index;
     data->instance_id = instance_id;
     if(temp_bar != NULL) data->finalization_barrier = temp_bar;
     data->index = index;
     data->instance_id = instance_id;
+    data->replaying = 0;
     xbt_free(simcall_process_get_data(proc));
     simcall_process_set_data(proc, data);
     if (*argc > 3) {
     xbt_free(simcall_process_get_data(proc));
     simcall_process_set_data(proc, data);
     if (*argc > 3) {
@@ -173,6 +175,19 @@ void smpi_process_mark_as_initialized(void)
     process_data[index_to_process_data[index]]->state = SMPI_INITIALIZED;
 }
 
     process_data[index_to_process_data[index]]->state = SMPI_INITIALIZED;
 }
 
+void smpi_process_set_replaying(int value){
+  int index = smpi_process_index();
+  if ((index != MPI_UNDEFINED) && (process_data[index_to_process_data[index]]->state != SMPI_FINALIZED))
+    process_data[index_to_process_data[index]]->replaying = value;
+}
+
+int smpi_process_get_replaying(){
+  int index = smpi_process_index();
+  if (index != MPI_UNDEFINED)
+    return process_data[index_to_process_data[index]]->replaying;
+  else return _xbt_replay_is_active();
+}
+
 
 int smpi_global_size(void)
 {
 
 int smpi_global_size(void)
 {
@@ -358,6 +373,13 @@ void smpi_comm_copy_buffer_callback(smx_action_t comm,
 
 }
 
 
 }
 
+
+void smpi_comm_null_copy_buffer_callback(smx_action_t comm,
+                                           void *buff, size_t buff_size)
+{
+  return;
+}
+
 static void smpi_check_options(){
   //check correctness of MPI parameters
 
 static void smpi_check_options(){
   //check correctness of MPI parameters
 
index 29843af..e943324 100644 (file)
@@ -223,7 +223,7 @@ int smpi_datatype_copy(void *sendbuf, int sendcount, MPI_Datatype sendtype,
     count = sendcount < recvcount ? sendcount : recvcount;
 
     if(sendtype->has_subtype == 0 && recvtype->has_subtype == 0) {
     count = sendcount < recvcount ? sendcount : recvcount;
 
     if(sendtype->has_subtype == 0 && recvtype->has_subtype == 0) {
-      if(!_xbt_replay_is_active()) memcpy(recvbuf, sendbuf, count);
+      if(!smpi_process_get_replaying()) memcpy(recvbuf, sendbuf, count);
     }
     else if (sendtype->has_subtype == 0)
     {
     }
     else if (sendtype->has_subtype == 0)
     {
@@ -1615,11 +1615,14 @@ void smpi_op_destroy(MPI_Op op)
 void smpi_op_apply(MPI_Op op, void *invec, void *inoutvec, int *len,
                    MPI_Datatype * datatype)
 {
 void smpi_op_apply(MPI_Op op, void *invec, void *inoutvec, int *len,
                    MPI_Datatype * datatype)
 {
+  if(op==MPI_OP_NULL)
+    return;
+
   if(smpi_privatize_global_variables){ //we need to switch here, as the called function may silently touch global variables
     XBT_DEBUG("Applying operation, switch to the right data frame ");
     smpi_switch_data_segment(smpi_process_index());
   }
 
   if(smpi_privatize_global_variables){ //we need to switch here, as the called function may silently touch global variables
     XBT_DEBUG("Applying operation, switch to the right data frame ");
     smpi_switch_data_segment(smpi_process_index());
   }
 
-  if(!_xbt_replay_is_active())
+  if(!smpi_process_get_replaying())
   op->func(invec, inoutvec, len, datatype);
 }
   op->func(invec, inoutvec, len, datatype);
 }
index 9b5ff76..261aec6 100644 (file)
@@ -33,7 +33,7 @@ static void log_timed_action (const char *const *action, double clock){
 
 //allocate a single buffer for all sends, growing it if needed
 void* smpi_get_tmp_sendbuffer(int size){
 
 //allocate a single buffer for all sends, growing it if needed
 void* smpi_get_tmp_sendbuffer(int size){
-  if (!_xbt_replay_is_active())
+  if (!smpi_process_get_replaying())
        return xbt_malloc(size);
   if (sendbuffer_size<size){
     sendbuffer=xbt_realloc(sendbuffer,size);
        return xbt_malloc(size);
   if (sendbuffer_size<size){
     sendbuffer=xbt_realloc(sendbuffer,size);
@@ -43,7 +43,7 @@ void* smpi_get_tmp_sendbuffer(int size){
 }
 //allocate a single buffer for all recv
 void* smpi_get_tmp_recvbuffer(int size){
 }
 //allocate a single buffer for all recv
 void* smpi_get_tmp_recvbuffer(int size){
-  if (!_xbt_replay_is_active())
+  if (!smpi_process_get_replaying())
        return xbt_malloc(size);
   if (recvbuffer_size<size){
     recvbuffer=xbt_realloc(recvbuffer,size);
        return xbt_malloc(size);
   if (recvbuffer_size<size){
     recvbuffer=xbt_realloc(recvbuffer,size);
@@ -53,7 +53,7 @@ void* smpi_get_tmp_recvbuffer(int size){
 }
 
 void smpi_free_tmp_buffer(void* buf){
 }
 
 void smpi_free_tmp_buffer(void* buf){
-  if (!_xbt_replay_is_active())
+  if (!smpi_process_get_replaying())
     xbt_free(buf);
 }
 
     xbt_free(buf);
 }
 
@@ -988,6 +988,7 @@ static void action_allToAllv(const char *const *action) {
 void smpi_replay_init(int *argc, char***argv){
   smpi_process_init(argc, argv);
   smpi_process_mark_as_initialized();
 void smpi_replay_init(int *argc, char***argv){
   smpi_process_init(argc, argv);
   smpi_process_mark_as_initialized();
+  smpi_process_set_replaying(1);
 #ifdef HAVE_TRACING
   int rank = smpi_process_index();
   TRACE_smpi_init(rank);
 #ifdef HAVE_TRACING
   int rank = smpi_process_index();
   TRACE_smpi_init(rank);