* under the terms of the license (GNU LGPL) which comes with this package. */
#include "gras_config.h"
#include <unistd.h>
+
#ifndef _XBT_WIN32
#include <sys/syscall.h>
#endif
#ifdef HAVE_FUTEX_H
- #include <linux/futex.h>
-#else
- #include "xbt/xbt_os_thread.h"
+#include <linux/futex.h>
#endif
+
#include "xbt/parmap.h"
#include "xbt/log.h"
#include "xbt/function_types.h"
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_parmap, xbt, "parmap: parallel map");
XBT_LOG_NEW_SUBCATEGORY(xbt_parmap_unit, xbt_parmap, "parmap unit testing");
-typedef enum{
+typedef enum {
PARMAP_WORK = 0,
PARMAP_DESTROY
} e_xbt_parmap_flag_t;
#ifdef HAVE_FUTEX_H
-typedef struct s_xbt_event{
+typedef struct s_xbt_event {
int work;
int done;
unsigned int thread_counter;
unsigned int threads_to_wait;
-}s_xbt_event_t, *xbt_event_t;
+} s_xbt_event_t, *xbt_event_t;
void xbt_event_init(xbt_event_t event);
void xbt_event_signal(xbt_event_t event);
static void *_xbt_parmap_worker_main(void *parmap);
#ifdef HAVE_FUTEX_H
- static void futex_wait(int *uaddr, int val);
- static void futex_wake(int *uaddr, int val);
+static void futex_wait(int *uaddr, int val);
+static void futex_wake(int *uaddr, int val);
#endif
+
xbt_parmap_t xbt_parmap_new(unsigned int num_workers)
{
unsigned int i;
parmap->sync_event->threads_to_wait = num_workers;
#endif
/* Create the pool of worker threads */
- for(i=0; i < num_workers; i++){
+ for (i = 0; i < num_workers; i++) {
worker = xbt_os_thread_create(NULL, _xbt_parmap_worker_main, parmap, NULL);
xbt_os_thread_detach(worker);
}
}
void xbt_parmap_destroy(xbt_parmap_t parmap)
-{
+{
parmap->status = PARMAP_DESTROY;
#ifdef HAVE_FUTEX_H
xbt_event_signal(parmap->sync_event);
xbt_free(parmap);
}
- void xbt_parmap_apply(xbt_parmap_t parmap, void_f_pvoid_t fun, xbt_dynar_t data)
+void xbt_parmap_apply(xbt_parmap_t parmap, void_f_pvoid_t fun, xbt_dynar_t data)
{
/* Assign resources to worker threads*/
parmap->fun = fun;
XBT_DEBUG("Job done");
}
-void* xbt_parmap_next(xbt_parmap_t parmap) {
-
+void* xbt_parmap_next(xbt_parmap_t parmap)
+{
unsigned int index = __sync_fetch_and_add(&parmap->index, 1);
if (index < xbt_dynar_length(parmap->data)) {
return xbt_dynar_get_as(parmap->data, index, void*);
return NULL;
}
-unsigned long xbt_parmap_get_worker_id(xbt_parmap_t parmap) {
+unsigned long xbt_parmap_get_worker_id(xbt_parmap_t parmap)
+{
return (unsigned long) xbt_os_thread_get_extra_data();
}
void xbt_event_init(xbt_event_t event)
{
int myflag = event->done;
- if(event->thread_counter < event->threads_to_wait)
+ if (event->thread_counter < event->threads_to_wait) {
futex_wait(&event->done, myflag);
+ }
}
void xbt_event_signal(xbt_event_t event)
myflag = event->work;
mycount = __sync_add_and_fetch(&event->thread_counter, 1);
- if(mycount == event->threads_to_wait){
+ if (mycount == event->threads_to_wait) {
event->done++;
futex_wake(&event->done, 1);
}
unsigned int mycount;
mycount = __sync_add_and_fetch(&event->thread_counter, 1);
- if(mycount == event->threads_to_wait){
+ if (mycount == event->threads_to_wait) {
event->done++;
futex_wake(&event->done, 1);
}
XBT_TEST_SUITE("parmap", "Parallel Map");
XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(xbt_parmap_unit);
-
-
xbt_parmap_t parmap;
void fun(void *arg);
{
xbt_test_add("Create the parmap");
- unsigned long i,j;
+ unsigned long i, j;
xbt_dynar_t data = xbt_dynar_new(sizeof(void *), NULL);
/* Create the parallel map */
parmap = xbt_parmap_new(10);
- for(j=0; j < 100; j++){
+ for(j = 0; j < 100; j++) {
xbt_dynar_push_as(data, void *, (void *)j);
}
- for(i=0; i < 5; i++)
+ for (i = 0; i < 5; i++) {
xbt_parmap_apply(parmap, fun, data);
+ }
/* Destroy the parmap */
xbt_parmap_destroy(parmap);