1 /* Copyright (c) 2004, 2005, 2007, 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "parmap_private.h"
9 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_parmap, xbt,
10 "parmap: parallel map");
12 static void *_xbt_parmap_worker_main(void *parmap);
14 xbt_parmap_t xbt_parmap_new(unsigned int num_workers)
17 xbt_os_thread_t worker = NULL;
19 DEBUG1("Create new parmap (%u workers)", num_workers);
21 /* Initialize thread pool data structure */
22 xbt_parmap_t parmap = xbt_new0(s_xbt_parmap_t, 1);
23 parmap->mutex = xbt_os_mutex_init();
24 parmap->job_posted = xbt_os_cond_init();
25 parmap->all_done = xbt_os_cond_init();
26 parmap->flags = xbt_new0(e_xbt_parmap_flag_t, num_workers + 1);
27 parmap->num_workers = num_workers;
28 parmap->num_idle_workers = 0;
29 parmap->workers_max_id = 0;
31 /* Init our flag to wait (for workers' initialization) */
32 parmap->flags[num_workers] = PARMAP_WAIT;
34 /* Create the pool of worker threads */
35 for(i=0; i < num_workers; i++){
36 worker = xbt_os_thread_create(NULL, _xbt_parmap_worker_main, parmap, NULL);
37 xbt_os_thread_detach(worker);
40 /* wait for the workers to initialize */
41 xbt_os_mutex_acquire(parmap->mutex);
42 while(parmap->flags[num_workers] == PARMAP_WAIT)
43 xbt_os_cond_wait(parmap->all_done, parmap->mutex);
44 xbt_os_mutex_release(parmap->mutex);
49 void xbt_parmap_destroy(xbt_parmap_t parmap)
51 DEBUG1("Destroy parmap %p", parmap);
55 /* Lock the parmap, then signal every worker an wait for each to finish */
56 xbt_os_mutex_acquire(parmap->mutex);
57 for(i=0; i < parmap->num_workers; i++){
58 parmap->flags[i] = PARMAP_DESTROY;
61 xbt_os_cond_broadcast(parmap->job_posted);
62 while(parmap->num_workers){
63 DEBUG1("Still %u workers, waiting...", parmap->num_workers);
64 xbt_os_cond_wait(parmap->all_done, parmap->mutex);
67 /* Destroy pool's data structures */
68 xbt_os_cond_destroy(parmap->job_posted);
69 xbt_os_cond_destroy(parmap->all_done);
70 xbt_free(parmap->flags);
71 xbt_os_mutex_release(parmap->mutex);
72 xbt_os_mutex_destroy(parmap->mutex);
76 void xbt_parmap_apply(xbt_parmap_t parmap, void_f_pvoid_t fun, xbt_dynar_t data)
79 unsigned int myflag_idx = parmap->num_workers;
81 /* Assign resources to worker threads */
82 xbt_os_mutex_acquire(parmap->mutex);
85 parmap->num_idle_workers = 0;
87 /* Set worker flags to work */
88 for(i=0; i < parmap->num_workers; i++){
89 parmap->flags[i] = PARMAP_WORK;
92 /* Set our flag to wait (for the job to be completed)*/
93 parmap->flags[myflag_idx] = PARMAP_WAIT;
95 /* Notify workers that there is a job */
96 xbt_os_cond_broadcast(parmap->job_posted);
97 DEBUG0("Job dispatched, lets wait...");
99 /* wait for the workers to finish */
100 while(parmap->flags[myflag_idx] == PARMAP_WAIT)
101 xbt_os_cond_wait(parmap->all_done, parmap->mutex);
107 xbt_os_mutex_release(parmap->mutex);
111 static void *_xbt_parmap_worker_main(void *arg)
113 unsigned int data_start, data_end, data_size, worker_id;
114 xbt_parmap_t parmap = (xbt_parmap_t)arg;
116 /* Fetch a worker id */
117 xbt_os_mutex_acquire(parmap->mutex);
118 worker_id = parmap->workers_max_id++;
119 xbt_os_mutex_release(parmap->mutex);
121 DEBUG1("New worker thread created (%u)", worker_id);
123 /* Worker's main loop */
125 xbt_os_mutex_acquire(parmap->mutex);
126 parmap->flags[worker_id] = PARMAP_WAIT;
127 parmap->num_idle_workers++;
129 /* If everybody is done set the parmap work flag and signal it */
130 if(parmap->num_idle_workers == parmap->num_workers){
131 DEBUG1("Worker %u: All done, signal the parmap", worker_id);
132 parmap->flags[parmap->num_workers] = PARMAP_WORK;
133 xbt_os_cond_signal(parmap->all_done);
136 /* If the wait flag is set then ... wait. */
137 while(parmap->flags[worker_id] == PARMAP_WAIT)
138 xbt_os_cond_wait(parmap->job_posted, parmap->mutex);
140 DEBUG1("Worker %u got a job", worker_id);
142 /* If we are shutting down, the last worker is going to signal the
143 * parmap so it can finish destroying the data structure */
144 if(parmap->flags[worker_id] == PARMAP_DESTROY){
145 DEBUG1("Shutting down worker %u", worker_id);
146 parmap->num_workers--;
147 if(parmap->num_workers == 0)
148 xbt_os_cond_signal(parmap->all_done);
149 xbt_os_mutex_release(parmap->mutex);
152 xbt_os_mutex_release(parmap->mutex);
154 /* Compute how much data does every worker gets */
155 data_size = (xbt_dynar_length(parmap->data) / parmap->num_workers)
156 + ((xbt_dynar_length(parmap->data) % parmap->num_workers) ? 1 : 0);
158 /* Each worker data segment starts in a position associated with its id*/
159 data_start = data_size * worker_id;
161 /* The end of the worker data segment must be bounded by the end of the data vector */
162 data_end = MIN(data_start + data_size, xbt_dynar_length(parmap->data));
164 DEBUG4("Worker %u: data_start=%u data_end=%u (data_size=%u)", worker_id, data_start, data_end, data_size);
166 /* While the worker don't pass the end of it data segment apply the function */
167 while(data_start < data_end){
168 parmap->fun(*(void **)xbt_dynar_get_ptr(parmap->data, data_start));
178 XBT_TEST_SUITE("parmap", "Parallel Map");
180 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_parmap_unit, xbt,
190 INFO1("I'm job %lu", (unsigned long)arg);
193 XBT_TEST_UNIT("basic", test_parmap_basic, "Basic usage")
195 xbt_test_add0("Create the parmap");
198 xbt_dynar_t data = xbt_dynar_new(sizeof(void *), NULL);
200 /* Create the parallel map */
201 parmap = xbt_parmap_new(5);
203 for(j=0; j < 200; j++){
204 xbt_dynar_push_as(data, void *, (void *)j);
207 xbt_parmap_apply(parmap, fun, data);
209 /* Destroy the parmap */
210 xbt_parmap_destroy(parmap);
213 #endif /* SIMGRID_TEST */