Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Prevent user from making call to MPI within a SMPI_SAMPLE_ bloc.
[simgrid.git] / src / smpi / smpi_bench.c
index f3926a7..7e69951 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (c) 2007, 2009, 2010. The SimGrid Team.
+/* Copyright (c) 2007, 2009-2013. The SimGrid Team.
  * All rights reserved.                                                     */
 
 /* This program is free software; you can redistribute it and/or modify it
@@ -8,9 +8,13 @@
 #include "xbt/dict.h"
 #include "xbt/sysdep.h"
 #include "xbt/ex.h"
+#include "xbt/hash.h"
 #include "surf/surf.h"
+#include "simgrid/sg_config.h"
 
+#ifndef WIN32
 #include <sys/mman.h>
+#endif
 #include <sys/stat.h>
 #include <sys/types.h>
 #include <errno.h>
@@ -62,6 +66,9 @@ xbt_dict_t samples = NULL;         /* Allocated on first use */
 xbt_dict_t calls = NULL;           /* Allocated on first use */
 __thread int smpi_current_rank = 0;      /* Updated after each MPI call */
 
+double smpi_cpu_threshold;
+double smpi_running_power;
+
 typedef struct {
   int fd;
   int count;
@@ -82,6 +89,7 @@ static size_t shm_size(int fd) {
   return (size_t)st.st_size;
 }
 
+#ifndef WIN32
 static void* shm_map(int fd, size_t size, shared_data_t* data) {
   void* mem;
   char loc[PTR_STRLEN];
@@ -92,25 +100,28 @@ static void* shm_map(int fd, size_t size, shared_data_t* data) {
       xbt_die("Could not truncate fd %d to %zu: %s", fd, size, strerror(errno));
     }
   }
+
   mem = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
   if(mem == MAP_FAILED) {
     xbt_die("Could not map fd %d: %s", fd, strerror(errno));
   }
   if(!allocs_metadata) {
-    allocs_metadata = xbt_dict_new();
+    allocs_metadata = xbt_dict_new_homogeneous(xbt_free);
   }
   snprintf(loc, PTR_STRLEN, "%p", mem);
   meta = xbt_new(shared_metadata_t, 1);
   meta->size = size;
   meta->data = data;
-  xbt_dict_set(allocs_metadata, loc, meta, &free);
+  xbt_dict_set(allocs_metadata, loc, meta, NULL);
   XBT_DEBUG("MMAP %zu to %p", size, mem);
   return mem;
 }
+#endif
 
 void smpi_bench_destroy(void)
 {
   xbt_dict_free(&allocs);
+  xbt_dict_free(&allocs_metadata);
   xbt_dict_free(&samples);
   xbt_dict_free(&calls);
 }
@@ -119,8 +130,7 @@ void smpi_execute_flops(double flops) {
   smx_action_t action;
   smx_host_t host;
   host = SIMIX_host_self();
-
-  XBT_DEBUG("Handle real computation time: %f flops", flops);
+  XBT_DEBUG("Handle real computation time: %g flops", flops);
   action = simcall_host_execute("computation", host, flops, 1);
 #ifdef HAVE_TRACING
   simcall_set_category (action, TRACE_internal_smpi_get_category());
@@ -130,48 +140,77 @@ void smpi_execute_flops(double flops) {
 
 static void smpi_execute(double duration)
 {
-  /* FIXME: a global variable would be less expensive to consult than a call to xbt_cfg_get_double() right on the critical path */
-  if (duration >= xbt_cfg_get_double(_surf_cfg_set, "smpi/cpu_threshold")) {
-    XBT_DEBUG("Sleep for %f to handle real computation time", duration);
-    smpi_execute_flops(duration *
-                       xbt_cfg_get_double(_surf_cfg_set,
-                                          "smpi/running_power"));
+  if (duration >= smpi_cpu_threshold) {
+    XBT_DEBUG("Sleep for %g to handle real computation time", duration);
+    double flops = duration * smpi_running_power;
+#ifdef HAVE_TRACING
+    int rank = smpi_process_index();
+    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
+    extra->type=TRACING_COMPUTING;
+    extra->comp_size=flops;
+    TRACE_smpi_computing_in(rank, extra);
+#endif
+    smpi_execute_flops(flops);
+
+#ifdef HAVE_TRACING
+    TRACE_smpi_computing_out(rank);
+#endif
+
   } else {
-    XBT_DEBUG("Real computation took %f while option smpi/cpu_threshold is set to %f => ignore it",
-        duration, xbt_cfg_get_double(_surf_cfg_set, "smpi/cpu_threshold"));
+    XBT_DEBUG("Real computation took %g while option smpi/cpu_threshold is set to %g => ignore it",
+              duration, smpi_cpu_threshold);
   }
 }
 
 void smpi_bench_begin(void)
 {
-  xbt_os_timer_start(smpi_process_timer());
+  xbt_os_threadtimer_start(smpi_process_timer());
   smpi_current_rank = smpi_process_index();
 }
 
 void smpi_bench_end(void)
 {
   xbt_os_timer_t timer = smpi_process_timer();
-
-  xbt_os_timer_stop(timer);
+  xbt_os_threadtimer_stop(timer);
+  if (smpi_process_get_sampling()) {
+    XBT_CRITICAL("Cannot do recursive benchmarks.");
+    XBT_CRITICAL("Are you trying to make a call to MPI within a SMPI_SAMPLE_ block?");
+    xbt_backtrace_display_current();
+    xbt_die("Aborting.");
+  }
   smpi_execute(xbt_os_timer_elapsed(timer));
 }
 
 unsigned int smpi_sleep(unsigned int secs)
 {
+  smx_action_t action;
+
   smpi_bench_end();
-  smpi_execute((double) secs);
+
+  double flops = (double) secs*simcall_host_get_speed(SIMIX_host_self());
+  XBT_DEBUG("Sleep for: %f flops", flops);
+  action = simcall_host_execute("computation", SIMIX_host_self(), flops, 1);
+  #ifdef HAVE_TRACING
+    simcall_set_category (action, TRACE_internal_smpi_get_category());
+  #endif
+  simcall_host_execution_wait(action);
+
   smpi_bench_begin();
   return secs;
 }
 
-int smpi_gettimeofday(struct timeval *tv, struct timezone *tz)
+int smpi_gettimeofday(struct timeval *tv)
 {
   double now;
   smpi_bench_end();
   now = SIMIX_get_clock();
   if (tv) {
     tv->tv_sec = (time_t)now;
+#ifdef WIN32
+    tv->tv_usec = (useconds_t)((now - tv->tv_sec) * 1e6);
+#else
     tv->tv_usec = (suseconds_t)((now - tv->tv_sec) * 1e6);
+#endif
   }
   smpi_bench_begin();
   return 0;
@@ -199,13 +238,13 @@ unsigned long long smpi_rastro_timestamp (void)
 
 /* ****************************** Functions related to the SMPI_SAMPLE_ macros ************************************/
 typedef struct {
-  int iters;        /* amount of requested iterations */
-  int count;        /* amount of iterations done so far */
   double threshold; /* maximal stderr requested (if positive) */
   double relstderr; /* observed stderr so far */
   double mean;      /* mean of benched times, to be used if the block is disabled */
   double sum;       /* sum of benched times (to compute the mean and stderr) */
   double sum_pow2;  /* sum of the square of the benched times (to compute the stderr) */
+  int iters;        /* amount of requested iterations */
+  int count;        /* amount of iterations done so far */
   int benching;     /* 1: we are benchmarking; 0: we have enough data, no bench anymore */
 } local_data_t;
 
@@ -236,6 +275,8 @@ void smpi_sample_1(int global, const char *file, int line, int iters, double thr
   local_data_t *data;
 
   smpi_bench_end();     /* Take time from previous, unrelated computation into account */
+  smpi_process_set_sampling(1);
+
   if (!samples)
     samples = xbt_dict_new_homogeneous(free);
 
@@ -264,35 +305,36 @@ void smpi_sample_1(int global, const char *file, int line, int iters, double thr
     data->benching = !sample_enough_benchs(data);
     XBT_DEBUG("XXXX Re-entering the benched nest %s. %s",loc, (data->benching?"more benching needed":"we have enough data, skip computes"));
   }
-  free(loc);
+  xbt_free(loc);
 }
 
 int smpi_sample_2(int global, const char *file, int line)
 {
   char *loc = sample_location(global, file, line);
   local_data_t *data;
+  int res;
 
   xbt_assert(samples, "Y U NO use SMPI_SAMPLE_* macros? Stop messing directly with smpi_sample_* functions!");
   data = xbt_dict_get(samples, loc);
   XBT_DEBUG("sample2 %s",loc);
-  free(loc);
+  xbt_free(loc);
 
   if (data->benching==1) {
     // we need to run a new bench
     XBT_DEBUG("benchmarking: count:%d iter:%d stderr:%f thres:%f; mean:%f",
         data->count, data->iters, data->relstderr, data->threshold, data->mean);
-    smpi_bench_begin();
-    return 1;
+    res = 1;
   } else {
     // Enough data, no more bench (either we got enough data from previous visits to this benched nest, or we just ran one bench and need to bail out now that our job is done).
     // Just sleep instead
     XBT_DEBUG("No benchmark (either no need, or just ran one): count >= iter (%d >= %d) or stderr<thres (%f<=%f). apply the %fs delay instead",
         data->count, data->iters, data->relstderr, data->threshold, data->mean);
     smpi_execute(data->mean);
-
-    smpi_bench_begin(); // prepare to capture future, unrelated computations
-    return 0;
+    smpi_process_set_sampling(0);
+    res = 0; // prepare to capture future, unrelated computations
   }
+  smpi_bench_begin();
+  return res;
 }
 
 
@@ -304,13 +346,14 @@ void smpi_sample_3(int global, const char *file, int line)
   xbt_assert(samples, "Y U NO use SMPI_SAMPLE_* macros? Stop messing directly with smpi_sample_* functions!");
   data = xbt_dict_get(samples, loc);
   XBT_DEBUG("sample3 %s",loc);
+  xbt_free(loc);
 
   if (data->benching==0) {
     THROW_IMPOSSIBLE;
   }
 
   // ok, benchmarking this loop is over
-  xbt_os_timer_stop(smpi_process_timer());
+  xbt_os_threadtimer_stop(smpi_process_timer());
 
   // update the stats
   double sample, n;
@@ -331,87 +374,127 @@ void smpi_sample_3(int global, const char *file, int line)
   data->benching = 0;
 }
 
-void *smpi_shared_malloc(size_t size, const char *file, int line)
+#ifndef WIN32
+static void smpi_shared_alloc_free(void *p)
 {
-  char *loc = bprintf("%zu_%s_%d", (size_t)getpid(), file, line);
-  size_t len = strlen(loc);
-  size_t i;
-  int fd;
-  void* mem;
-  shared_data_t *data;
+  shared_data_t *data = p;
+  xbt_free(data->loc);
+  xbt_free(data);
+}
 
-  for(i = 0; i < len; i++) {
-    /* Make the 'loc' ID be a flat filename */
-    if(loc[i] == '/') {
-      loc[i] = '_';
+static char *smpi_shared_alloc_hash(char *loc)
+{
+  char hash[42];
+  char s[7];
+  unsigned val;
+  int i, j;
+
+  xbt_sha(loc, hash);
+  hash[41] = '\0';
+  s[6] = '\0';
+  loc = xbt_realloc(loc, 30);
+  loc[0] = '/';
+  for (i = 0; i < 40; i += 6) { /* base64 encode */
+    memcpy(s, hash + i, 6);
+    val = strtoul(s, NULL, 16);
+    for (j = 0; j < 4; j++) {
+      unsigned char x = (val >> (18 - 3 * j)) & 0x3f;
+      loc[1 + 4 * i / 6 + j] =
+        "ABCDEFGHIJKLMNOPQRSTUVZXYZabcdefghijklmnopqrstuvzxyz0123456789-_"[x];
     }
   }
-  if (!allocs) {
-    allocs = xbt_dict_new_homogeneous(free);
-  }
-  data = xbt_dict_get_or_null(allocs, loc);
-  if(!data) {
-    fd = shm_open(loc, O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
-    if(fd < 0) {
-      switch(errno) {
-        case EEXIST:
-          xbt_die("Please cleanup /dev/shm/%s", loc);
-        default:
-          xbt_die("An unhandled error occured while opening %s: %s", loc, strerror(errno));
-      }
+  loc[29] = '\0';
+  return loc;
+}
+
+void *smpi_shared_malloc(size_t size, const char *file, int line)
+{
+  void* mem;
+  if (sg_cfg_get_boolean("smpi/use_shared_malloc")){
+    char *loc = bprintf("%zu_%s_%d", (size_t)getpid(), file, line);
+    int fd;
+    shared_data_t *data;
+    loc = smpi_shared_alloc_hash(loc); /* hash loc, in order to have something
+                                        * not too long */
+    if (!allocs) {
+      allocs = xbt_dict_new_homogeneous(smpi_shared_alloc_free);
     }
-    data = xbt_new(shared_data_t, 1);
-    data->fd = fd;
-    data->count = 1;
-    data->loc = loc;
-    mem = shm_map(fd, size, data);
-    if(shm_unlink(loc) < 0) {
-      XBT_WARN("Could not early unlink %s: %s", loc, strerror(errno));
+    data = xbt_dict_get_or_null(allocs, loc);
+    if (!data) {
+      fd = shm_open(loc, O_RDWR | O_CREAT | O_EXCL,
+                    S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+      if (fd < 0) {
+        switch(errno) {
+          case EEXIST:
+            xbt_die("Please cleanup /dev/shm/%s", loc);
+          default:
+            xbt_die("An unhandled error occured while opening %s. shm_open: %s", loc, strerror(errno));
+        }
+      }
+      data = xbt_new(shared_data_t, 1);
+      data->fd = fd;
+      data->count = 1;
+      data->loc = loc;
+      mem = shm_map(fd, size, data);
+      if (shm_unlink(loc) < 0) {
+        XBT_WARN("Could not early unlink %s. shm_unlink: %s", loc, strerror(errno));
+      }
+      xbt_dict_set(allocs, loc, data, NULL);
+      XBT_DEBUG("Mapping %s at %p through %d", loc, mem, fd);
+    } else {
+      xbt_free(loc);
+      mem = shm_map(data->fd, size, data);
+      data->count++;
     }
-    xbt_dict_set(allocs, loc, data, NULL);
-    XBT_DEBUG("Mapping %s at %p through %d", loc, mem, fd);
+    XBT_DEBUG("Shared malloc %zu in %p (metadata at %p)", size, mem, data);
   } else {
-    mem = shm_map(data->fd, size, data);
-    data->count++;
+    mem = xbt_malloc(size);
+    XBT_DEBUG("Classic malloc %zu in %p", size, mem);
   }
-  XBT_DEBUG("Malloc %zu in %p (metadata at %p)", size, mem, data);
+
   return mem;
 }
-
 void smpi_shared_free(void *ptr)
 {
   char loc[PTR_STRLEN];
   shared_metadata_t* meta;
   shared_data_t* data;
-
-  if (!allocs) {
-    XBT_WARN("Cannot free: nothing was allocated");
-    return;
-  }
-  if(!allocs_metadata) {
-    XBT_WARN("Cannot free: no metadata was allocated");
-  }
-  snprintf(loc, PTR_STRLEN, "%p", ptr);
-  meta = (shared_metadata_t*)xbt_dict_get_or_null(allocs_metadata, loc);
-  if (!meta) {
-    XBT_WARN("Cannot free: %p was not shared-allocated by SMPI", ptr);
-    return;
-  }
-  data = meta->data;
-  if(!data) {
-    XBT_WARN("Cannot free: something is broken in the metadata link");
-    return;
-  }
-  if(munmap(ptr, meta->size) < 0) {
-    XBT_WARN("Unmapping of fd %d failed: %s", data->fd, strerror(errno));
-  }
-  data->count--;
-  if (data->count <= 0) {
-    close(data->fd);
-    xbt_dict_remove(allocs, data->loc);
-    free(data->loc);
+  if (sg_cfg_get_boolean("smpi/use_shared_malloc")){
+  
+    if (!allocs) {
+      XBT_WARN("Cannot free: nothing was allocated");
+      return;
+    }
+    if(!allocs_metadata) {
+      XBT_WARN("Cannot free: no metadata was allocated");
+    }
+    snprintf(loc, PTR_STRLEN, "%p", ptr);
+    meta = (shared_metadata_t*)xbt_dict_get_or_null(allocs_metadata, loc);
+    if (!meta) {
+      XBT_WARN("Cannot free: %p was not shared-allocated by SMPI", ptr);
+      return;
+    }
+    data = meta->data;
+    if(!data) {
+      XBT_WARN("Cannot free: something is broken in the metadata link");
+      return;
+    }
+    if(munmap(ptr, meta->size) < 0) {
+      XBT_WARN("Unmapping of fd %d failed: %s", data->fd, strerror(errno));
+    }
+    data->count--;
+    XBT_DEBUG("Shared free - no removal - of %p, count = %d", ptr, data->count);
+    if (data->count <= 0) {
+      close(data->fd);
+      xbt_dict_remove(allocs, data->loc);
+      XBT_DEBUG("Shared free - with removal - of %p", ptr);
+    }
+  }else{
+    XBT_DEBUG("Classic free of %p", ptr);
+    xbt_free(ptr);
   }
 }
+#endif
 
 int smpi_shared_known_call(const char* func, const char* input) {
    char* loc = bprintf("%s:%s", func, input);