Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
rewrite the MPI execution sampling facility (previous implem was deeply bugged)
[simgrid.git] / src / smpi / smpi_bench.c
1 /* Copyright (c) 2007, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5   * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include <math.h> // sqrt
8 #include "private.h"
9 #include "xbt/dict.h"
10 #include "xbt/sysdep.h"
11 #include "xbt/ex.h"
12 #include "surf/surf.h"
13
14 #include <sys/mman.h>
15 #include <sys/stat.h>
16 #include <sys/types.h>
17 #include <errno.h>
18 #include <fcntl.h>
19 #include <unistd.h>
20 #include <string.h>
21 #include <stdio.h>
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_bench, smpi,
24                                 "Logging specific to SMPI (benchmarking)");
25
26 /* Shared allocations are handled through shared memory segments.
27  * Associated data and metadata are used as follows:
28  *
29  *                                                                    mmap #1
30  *    `allocs' dict                                                     ---- -.
31  *    ----------      shared_data_t               shared_metadata_t   / |  |  |
32  * .->| <name> | ---> -------------------- <--.   -----------------   | |  |  |
33  * |  ----------      | fd of <name>     |    |   | size of mmap  | --| |  |  |
34  * |                  | count (2)        |    |-- | data          |   \ |  |  |
35  * `----------------- | <name>           |    |   -----------------     ----  |
36  *                    --------------------    |   ^                           |
37  *                                            |   |                           |
38  *                                            |   |   `allocs_metadata' dict  |
39  *                                            |   |   ----------------------  |
40  *                                            |   `-- | <addr of mmap #1>  |<-'
41  *                                            |   .-- | <addr of mmap #2>  |<-.
42  *                                            |   |   ----------------------  |
43  *                                            |   |                           |
44  *                                            |   |                           |
45  *                                            |   |                           |
46  *                                            |   |                   mmap #2 |
47  *                                            |   v                     ---- -'
48  *                                            |   shared_metadata_t   / |  |
49  *                                            |   -----------------   | |  |
50  *                                            |   | size of mmap  | --| |  |
51  *                                            `-- | data          |   | |  |
52  *                                                -----------------   | |  |
53  *                                                                    \ |  |
54  *                                                                      ----
55  */
56
57 #define PTR_STRLEN (2 + 2 * sizeof(void*) + 1)
58
59 xbt_dict_t allocs = NULL;          /* Allocated on first use */
60 xbt_dict_t allocs_metadata = NULL; /* Allocated on first use */
61 xbt_dict_t samples = NULL;         /* Allocated on first use */
62 xbt_dict_t calls = NULL;           /* Allocated on first use */
63 __thread int smpi_current_rank = 0;      /* Updated after each MPI call */
64
65 typedef struct {
66   int fd;
67   int count;
68   char* loc;
69 } shared_data_t;
70
71 typedef struct  {
72   size_t size;
73   shared_data_t* data;
74 } shared_metadata_t;
75
76 static size_t shm_size(int fd) {
77   struct stat st;
78
79   if(fstat(fd, &st) < 0) {
80     xbt_die("Could not stat fd %d: %s", fd, strerror(errno));
81   }
82   return (size_t)st.st_size;
83 }
84
85 static void* shm_map(int fd, size_t size, shared_data_t* data) {
86   void* mem;
87   char loc[PTR_STRLEN];
88   shared_metadata_t* meta;
89
90   if(size > shm_size(fd)) {
91     if(ftruncate(fd, (off_t)size) < 0) {
92       xbt_die("Could not truncate fd %d to %zu: %s", fd, size, strerror(errno));
93     }
94   }
95   mem = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
96   if(mem == MAP_FAILED) {
97     xbt_die("Could not map fd %d: %s", fd, strerror(errno));
98   }
99   if(!allocs_metadata) {
100     allocs_metadata = xbt_dict_new();
101   }
102   snprintf(loc, PTR_STRLEN, "%p", mem);
103   meta = xbt_new(shared_metadata_t, 1);
104   meta->size = size;
105   meta->data = data;
106   xbt_dict_set(allocs_metadata, loc, meta, &free);
107   XBT_DEBUG("MMAP %zu to %p", size, mem);
108   return mem;
109 }
110
111 void smpi_bench_destroy(void)
112 {
113   xbt_dict_free(&allocs);
114   xbt_dict_free(&samples);
115   xbt_dict_free(&calls);
116 }
117
118 static void smpi_execute_flops(double flops)
119 {
120   smx_action_t action;
121   smx_host_t host;
122   host = SIMIX_host_self();
123
124   XBT_DEBUG("Handle real computation time: %f flops", flops);
125   action = simcall_host_execute("computation", host, flops, 1);
126 #ifdef HAVE_TRACING
127   simcall_set_category (action, TRACE_internal_smpi_get_category());
128 #endif
129   simcall_host_execution_wait(action);
130 }
131
132 static void smpi_execute(double duration)
133 {
134   /* FIXME: a global variable would be less expensive to consult than a call to xbt_cfg_get_double() right on the critical path */
135   if (duration >= xbt_cfg_get_double(_surf_cfg_set, "smpi/cpu_threshold")) {
136     XBT_DEBUG("Sleep for %f to handle real computation time", duration);
137     smpi_execute_flops(duration *
138                        xbt_cfg_get_double(_surf_cfg_set,
139                                           "smpi/running_power"));
140   } else {
141     XBT_DEBUG("Real computation took %f while option smpi/cpu_threshold is set to %f => ignore it",
142         duration, xbt_cfg_get_double(_surf_cfg_set, "smpi/cpu_threshold"));
143   }
144 }
145
146 void smpi_bench_begin(void)
147 {
148   xbt_os_timer_start(smpi_process_timer());
149   smpi_current_rank = smpi_process_index();
150 }
151
152 void smpi_bench_end(void)
153 {
154   xbt_os_timer_t timer = smpi_process_timer();
155
156   xbt_os_timer_stop(timer);
157   smpi_execute(xbt_os_timer_elapsed(timer));
158 }
159
160 unsigned int smpi_sleep(unsigned int secs)
161 {
162   smpi_bench_end();
163   smpi_execute((double) secs);
164   smpi_bench_begin();
165   return secs;
166 }
167
168 int smpi_gettimeofday(struct timeval *tv, struct timezone *tz)
169 {
170   double now;
171   smpi_bench_end();
172   now = SIMIX_get_clock();
173   if (tv) {
174     tv->tv_sec = (time_t)now;
175     tv->tv_usec = (suseconds_t)((now - tv->tv_sec) * 1e6);
176   }
177   smpi_bench_begin();
178   return 0;
179 }
180
181 extern double sg_maxmin_precision;
182 unsigned long long smpi_rastro_resolution (void)
183 {
184   smpi_bench_end();
185   double resolution = (1/sg_maxmin_precision);
186   smpi_bench_begin();
187   return (unsigned long long)resolution;
188 }
189
190 unsigned long long smpi_rastro_timestamp (void)
191 {
192   smpi_bench_end();
193   double now = SIMIX_get_clock();
194
195   unsigned long long sec = (unsigned long long)now;
196   unsigned long long pre = (now - sec) * smpi_rastro_resolution();
197   smpi_bench_begin();
198   return (unsigned long long)sec * smpi_rastro_resolution() + pre;
199 }
200
201 /* ****************************** Functions related to the SMPI_SAMPLE_ macros ************************************/
202 typedef struct {
203   int iters;        /* amount of requested iterations */
204   int count;        /* amount of iterations done so far */
205   double threshold; /* maximal stderr requested (if positive) */
206   double relstderr; /* observed stderr so far */
207   double mean;      /* mean of benched times, to be used if the block is disabled */
208   double sum;       /* sum of benched times (to compute the mean and stderr) */
209   double sum_pow2;  /* sum of the square of the benched times (to compute the stderr) */
210   int benching;     /* 1: we are benchmarking; 0: we have enough data, no bench anymore */
211 } local_data_t;
212
213 static char *sample_location(int global, const char *file, int line) {
214   if (global) {
215     return bprintf("%s:%d", file, line);
216   } else {
217     return bprintf("%s:%d:%d", file, line, smpi_process_index());
218   }
219 }
220 static int sample_enough_benchs(local_data_t *data) {
221   int res = (data->iters > 0 && data->count >= data->iters)
222       || (data->count > 2 && data->threshold > 0.0 && data->relstderr <= data->threshold);
223   XBT_DEBUG("%s (count:%d iter:%d stderr:%f thres:%f mean:%fs)",
224       (res?"enough benchs":"need more data"),
225       data->count, data->iters, data->relstderr, data->threshold, data->mean);
226   return res;
227 }
228
229 void smpi_sample_1(int global, const char *file, int line, int iters, double threshold)
230 {
231   char *loc = sample_location(global, file, line);
232   local_data_t *data;
233
234   smpi_bench_end();     /* Take time from previous, unrelated computation into account */
235   if (!samples)
236     samples = xbt_dict_new_homogeneous(free);
237
238   data = xbt_dict_get_or_null(samples, loc);
239   if (!data) {
240     data = (local_data_t *) xbt_new(local_data_t, 1);
241     data->count = 0;
242     data->sum = 0.0;
243     data->sum_pow2 = 0.0;
244     data->iters = iters;
245     data->threshold = threshold;
246     data->benching = 1; // If we have no data, we need at least one
247     data->mean = 0;
248     xbt_dict_set(samples, loc, data, NULL);
249     XBT_DEBUG("XXXXX First time ever on benched nest %s.",loc);
250   } else {
251     if (data->iters != iters || data->threshold != threshold) {
252       XBT_ERROR("Asked to bench block %s with different settings %d, %f is not %d, %f. How did you manage to give two numbers at the same line??",
253           loc, data->iters, data->threshold, iters,threshold);
254       THROW_IMPOSSIBLE;
255     }
256
257     // if we already have some data, check whether sample_2 should get one more bench or whether it should emulate the computation instead
258     data->benching = !sample_enough_benchs(data);
259     XBT_DEBUG("XXXX Re-entering the benched nest %s. %s",loc, (data->benching?"more benching needed":"we have enough data, skip computes"));
260   }
261   free(loc);
262 }
263
264 int smpi_sample_2(int global, const char *file, int line)
265 {
266   char *loc = sample_location(global, file, line);
267   local_data_t *data;
268
269   xbt_assert(samples, "Y U NO use SMPI_SAMPLE_* macros? Stop messing directly with smpi_sample_* functions!");
270   data = xbt_dict_get(samples, loc);
271   XBT_DEBUG("sample2 %s",loc);
272   free(loc);
273
274   if (data->benching==1) {
275     // we need to run a new bench
276     XBT_DEBUG("benchmarking: count:%d iter:%d stderr:%f thres:%f; mean:%f",
277         data->count, data->iters, data->relstderr, data->threshold, data->mean);
278     smpi_bench_begin();
279     return 1;
280   } else {
281     // Enough data, no more bench (either we got enough data from previous visits to this benched nest, or we just ran one bench and need to bail out now that our job is done).
282     // Just sleep instead
283     XBT_DEBUG("No benchmark (either no need, or just ran one): count >= iter (%d >= %d) or stderr<thres (%f<=%f). apply the %fs delay instead",
284         data->count, data->iters, data->relstderr, data->threshold, data->mean);
285     smpi_execute(data->mean);
286
287     smpi_bench_begin(); // prepare to capture future, unrelated computations
288     return 0;
289   }
290 }
291
292
293 void smpi_sample_3(int global, const char *file, int line)
294 {
295   char *loc = sample_location(global, file, line);
296   local_data_t *data;
297
298   xbt_assert(samples, "Y U NO use SMPI_SAMPLE_* macros? Stop messing directly with smpi_sample_* functions!");
299   data = xbt_dict_get(samples, loc);
300   XBT_DEBUG("sample3 %s",loc);
301
302   if (data->benching==0) {
303     THROW_IMPOSSIBLE;
304   }
305
306   // ok, benchmarking this loop is over
307   xbt_os_timer_stop(smpi_process_timer());
308
309   // update the stats
310   double sample, n;
311   data->count++;
312   sample = xbt_os_timer_elapsed(smpi_process_timer());
313   data->sum += sample;
314   data->sum_pow2 += sample * sample;
315   n = (double)data->count;
316   data->mean = data->sum / n;
317   data->relstderr = sqrt((data->sum_pow2 / n - data->mean * data->mean) / n) / data->mean;
318   if (!sample_enough_benchs(data)) {
319     data->mean = sample; // Still in benching process; We want sample_2 to simulate the exact time of this loop occurrence before leaving, not the mean over the history
320   }
321   XBT_DEBUG("Average mean after %d steps is %f, relative standard error is %f (sample was %f)", data->count,
322       data->mean, data->relstderr, sample);
323
324   // That's enough for now, prevent sample_2 to run the same code over and over
325   data->benching = 0;
326 }
327
328 void smpi_sample_flops(double flops)
329 {
330   smpi_execute_flops(flops);
331 }
332
333 void *smpi_shared_malloc(size_t size, const char *file, int line)
334 {
335   char *loc = bprintf("%zu_%s_%d", (size_t)getpid(), file, line);
336   size_t len = strlen(loc);
337   size_t i;
338   int fd;
339   void* mem;
340   shared_data_t *data;
341
342   for(i = 0; i < len; i++) {
343     /* Make the 'loc' ID be a flat filename */
344     if(loc[i] == '/') {
345       loc[i] = '_';
346     }
347   }
348   if (!allocs) {
349     allocs = xbt_dict_new_homogeneous(free);
350   }
351   data = xbt_dict_get_or_null(allocs, loc);
352   if(!data) {
353     fd = shm_open(loc, O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
354     if(fd < 0) {
355       switch(errno) {
356         case EEXIST:
357           xbt_die("Please cleanup /dev/shm/%s", loc);
358         default:
359           xbt_die("An unhandled error occured while opening %s: %s", loc, strerror(errno));
360       }
361     }
362     data = xbt_new(shared_data_t, 1);
363     data->fd = fd;
364     data->count = 1;
365     data->loc = loc;
366     mem = shm_map(fd, size, data);
367     if(shm_unlink(loc) < 0) {
368       XBT_WARN("Could not early unlink %s: %s", loc, strerror(errno));
369     }
370     xbt_dict_set(allocs, loc, data, NULL);
371     XBT_DEBUG("Mapping %s at %p through %d", loc, mem, fd);
372   } else {
373     mem = shm_map(data->fd, size, data);
374     data->count++;
375   }
376   XBT_DEBUG("Malloc %zu in %p (metadata at %p)", size, mem, data);
377   return mem;
378 }
379
380 void smpi_shared_free(void *ptr)
381 {
382   char loc[PTR_STRLEN];
383   shared_metadata_t* meta;
384   shared_data_t* data;
385
386   if (!allocs) {
387     XBT_WARN("Cannot free: nothing was allocated");
388     return;
389   }
390   if(!allocs_metadata) {
391     XBT_WARN("Cannot free: no metadata was allocated");
392   }
393   snprintf(loc, PTR_STRLEN, "%p", ptr);
394   meta = (shared_metadata_t*)xbt_dict_get_or_null(allocs_metadata, loc);
395   if (!meta) {
396     XBT_WARN("Cannot free: %p was not shared-allocated by SMPI", ptr);
397     return;
398   }
399   data = meta->data;
400   if(!data) {
401     XBT_WARN("Cannot free: something is broken in the metadata link");
402     return;
403   }
404   if(munmap(ptr, meta->size) < 0) {
405     XBT_WARN("Unmapping of fd %d failed: %s", data->fd, strerror(errno));
406   }
407   data->count--;
408   if (data->count <= 0) {
409     close(data->fd);
410     xbt_dict_remove(allocs, data->loc);
411     free(data->loc);
412   }
413 }
414
415 int smpi_shared_known_call(const char* func, const char* input) {
416    char* loc = bprintf("%s:%s", func, input);
417    xbt_ex_t ex;
418    int known;
419
420    if(!calls) {
421       calls = xbt_dict_new_homogeneous(NULL);
422    }
423    TRY {
424       xbt_dict_get(calls, loc); /* Succeed or throw */
425       known = 1;
426    }
427    CATCH(ex) {
428       if(ex.category == not_found_error) {
429          known = 0;
430          xbt_ex_free(ex);
431       } else {
432          RETHROW;
433       }
434    }
435    free(loc);
436    return known;
437 }
438
439 void* smpi_shared_get_call(const char* func, const char* input) {
440    char* loc = bprintf("%s:%s", func, input);
441    void* data;
442
443    if(!calls) {
444       calls = xbt_dict_new_homogeneous(NULL);
445    }
446    data = xbt_dict_get(calls, loc);
447    free(loc);
448    return data;
449 }
450
451 void* smpi_shared_set_call(const char* func, const char* input, void* data) {
452    char* loc = bprintf("%s:%s", func, input);
453
454    if(!calls) {
455       calls = xbt_dict_new_homogeneous(NULL);
456    }
457    xbt_dict_set(calls, loc, data, NULL);
458    free(loc);
459    return data;
460 }