Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
023848edb73868f0647a175de12b7d936c62c373
[simgrid.git] / src / xbt / xbt_os_thread.c
1 /* $Id$ */
2
3 /* xbt_os_thread -- portability layer over the pthread API                  */
4 /* Used in RL to get win/lin portability, and in SG when CONTEXT_THREAD     */
5 /* in SG, when using CONTEXT_UCONTEXT, xbt_os_thread_stub is used instead   */
6
7 /* Copyright 2006,2007 Malek Cherier, Martin Quinson          
8  * All right reserved.                                                      */
9
10 /* This program is free software; you can redistribute it and/or modify it
11  * under the terms of the license (GNU LGPL) which comes with this package. */
12
13 #include "xbt/sysdep.h"
14 #include "xbt/ex.h"
15 #include "xbt/ex_interface.h" /* We play crude games with exceptions */
16 #include "portable.h"
17 #include "xbt/xbt_os_time.h" /* Portable time facilities */
18 #include "xbt/xbt_os_thread.h" /* This module */
19 #include "xbt_modinter.h" /* Initialization/finalization of this module */
20
21 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_sync_os,xbt,"Synchronization mechanism (OS-level)");
22
23 /* ********************************* PTHREAD IMPLEMENTATION ************************************ */
24 #ifdef HAVE_PTHREAD_H
25 #include <pthread.h>
26 #include <semaphore.h>
27
28 typedef struct xbt_os_thread_ {
29    pthread_t t;
30    char *name;
31    void *param;
32    pvoid_f_pvoid_t start_routine;
33    ex_ctx_t *exception;
34 } s_xbt_os_thread_t ;
35 static xbt_os_thread_t main_thread = NULL;
36
37 /* thread-specific data containing the xbt_os_thread_t structure */
38 static pthread_key_t xbt_self_thread_key;
39 static int thread_mod_inited = 0;
40
41 /* frees the xbt_os_thread_t corresponding to the current thread */
42 static void xbt_os_thread_free_thread_data(void*d){
43    free(d);
44 }
45
46 /* callback: context fetching */
47 static ex_ctx_t *_os_thread_ex_ctx(void) {
48   return xbt_os_thread_self()->exception;
49 }
50
51 /* callback: termination */
52 static void _os_thread_ex_terminate(xbt_ex_t * e) {
53   xbt_ex_display(e);
54
55   abort();
56   /* FIXME: there should be a configuration variable to choose to kill everyone or only this one */
57 }
58
59 void xbt_os_thread_mod_init(void) {
60    int errcode;
61    
62    if (thread_mod_inited)
63      return;
64    
65    if ((errcode=pthread_key_create(&xbt_self_thread_key, NULL)))
66      THROW0(system_error,errcode,"pthread_key_create failed for xbt_self_thread_key");
67
68    main_thread=xbt_new(s_xbt_os_thread_t,1);
69    main_thread->name = (char*)"main";
70    main_thread->start_routine = NULL;
71    main_thread->param = NULL;
72    main_thread->exception = xbt_new(ex_ctx_t, 1);
73    XBT_CTX_INITIALIZE(main_thread->exception);
74
75    __xbt_ex_ctx = _os_thread_ex_ctx;
76    __xbt_ex_terminate = _os_thread_ex_terminate;
77
78    thread_mod_inited = 1;
79 }
80 void xbt_os_thread_mod_exit(void) {
81    /* FIXME: don't try to free our key on shutdown. 
82       Valgrind detects no leak if we don't, and whine if we try to */
83 //   int errcode;
84    
85 //   if ((errcode=pthread_key_delete(xbt_self_thread_key)))
86 //     THROW0(system_error,errcode,"pthread_key_delete failed for xbt_self_thread_key");
87 }
88
89 static void * wrapper_start_routine(void *s) {
90   xbt_os_thread_t t = s;   
91   int errcode;
92
93   if ((errcode=pthread_setspecific(xbt_self_thread_key,t)))
94     THROW0(system_error,errcode,
95            "pthread_setspecific failed for xbt_self_thread_key");   
96    
97   return (*(t->start_routine))(t->param);
98 }
99 xbt_os_thread_t xbt_os_thread_create(const char*name,
100                                      pvoid_f_pvoid_t start_routine,
101                                      void* param)  {
102    int errcode;
103
104    xbt_os_thread_t res_thread=xbt_new(s_xbt_os_thread_t,1);
105    res_thread->name = xbt_strdup(name);
106    res_thread->start_routine = start_routine;
107    res_thread->param = param;
108    res_thread->exception = xbt_new(ex_ctx_t, 1);
109    XBT_CTX_INITIALIZE(res_thread->exception);
110    
111    if ((errcode = pthread_create(&(res_thread->t), NULL, 
112                                  wrapper_start_routine, res_thread)))
113      THROW1(system_error,errcode, 
114             "pthread_create failed: %s",strerror(errcode));
115
116    return res_thread;
117 }
118
119 const char* xbt_os_thread_name(xbt_os_thread_t t) {
120    return t->name;
121 }
122
123 const char* xbt_os_thread_self_name(void) {
124    xbt_os_thread_t self = xbt_os_thread_self();
125    return self?self->name:"main";
126 }
127 void 
128 xbt_os_thread_join(xbt_os_thread_t thread,void ** thread_return) {
129         
130   int errcode;   
131   
132   if ((errcode = pthread_join(thread->t,thread_return)))
133     THROW1(system_error,errcode, "pthread_join failed: %s",
134            strerror(errcode));
135    if (thread->exception)
136      free(thread->exception);
137
138    if (thread == main_thread) /* just killed main thread */
139      main_thread = NULL;
140
141    free(thread);   
142 }                      
143
144 void xbt_os_thread_exit(int *retval) {
145    pthread_exit(retval);
146 }
147
148 xbt_os_thread_t xbt_os_thread_self(void) {
149   xbt_os_thread_t res;
150
151   if (!thread_mod_inited)
152     return NULL;
153   
154   res = pthread_getspecific(xbt_self_thread_key);
155   if (!res)
156     res = main_thread;
157
158   return res;
159 }
160
161 #include <sched.h>
162 void xbt_os_thread_yield(void) {
163    sched_yield();
164 }
165 void xbt_os_thread_cancel(xbt_os_thread_t t) {
166    pthread_cancel(t->t);
167 }
168 /****** mutex related functions ******/
169 typedef struct xbt_os_mutex_ {
170   /* KEEP IT IN SYNC WITH xbt_thread.c */
171    pthread_mutex_t m;
172 } s_xbt_os_mutex_t;
173
174 xbt_os_mutex_t xbt_os_mutex_init(void) {
175    xbt_os_mutex_t res = xbt_new(s_xbt_os_mutex_t,1);
176    int errcode;
177    
178    if ((errcode = pthread_mutex_init(&(res->m),NULL)))
179      THROW1(system_error,errcode,"pthread_mutex_init() failed: %s",
180             strerror(errcode));
181    
182    return res;
183 }
184
185 void xbt_os_mutex_acquire(xbt_os_mutex_t mutex) {
186    int errcode;
187    
188    if ((errcode=pthread_mutex_lock(&(mutex->m))))
189      THROW2(system_error,errcode,"pthread_mutex_lock(%p) failed: %s",
190             mutex, strerror(errcode));
191 }
192
193 void xbt_os_mutex_release(xbt_os_mutex_t mutex) {
194    int errcode;
195    
196    if ((errcode=pthread_mutex_unlock(&(mutex->m))))
197      THROW2(system_error,errcode,"pthread_mutex_unlock(%p) failed: %s",
198             mutex, strerror(errcode));
199 }
200
201 void xbt_os_mutex_destroy(xbt_os_mutex_t mutex) {
202    int errcode;
203    
204    if (!mutex) return;
205    
206    if ((errcode=pthread_mutex_destroy(&(mutex->m))))
207      THROW2(system_error,errcode,"pthread_mutex_destroy(%p) failed: %s",
208             mutex, strerror(errcode));
209    free(mutex);
210 }
211
212 /***** condition related functions *****/
213 typedef struct xbt_os_cond_ {
214   /* KEEP IT IN SYNC WITH xbt_thread.c */
215    pthread_cond_t c;
216 } s_xbt_os_cond_t;
217
218 xbt_os_cond_t xbt_os_cond_init(void) {
219    xbt_os_cond_t res = xbt_new(s_xbt_os_cond_t,1);
220    int errcode;
221    if ((errcode=pthread_cond_init(&(res->c),NULL)))
222      THROW1(system_error,errcode,"pthread_cond_init() failed: %s",
223             strerror(errcode));
224
225    return res;
226 }
227
228 void xbt_os_cond_wait(xbt_os_cond_t cond, xbt_os_mutex_t mutex) {
229    int errcode;
230    if ((errcode=pthread_cond_wait(&(cond->c),&(mutex->m))))
231      THROW3(system_error,errcode,"pthread_cond_wait(%p,%p) failed: %s",
232             cond,mutex, strerror(errcode));
233 }
234
235 #include <time.h>
236 #include <math.h>
237 void xbt_os_cond_timedwait(xbt_os_cond_t cond, xbt_os_mutex_t mutex, double delay) {
238    int errcode;
239    struct timespec ts_end;
240    double end = delay + xbt_os_time();
241    
242    if (delay < 0) {
243       xbt_os_cond_wait(cond,mutex);
244    } else {
245       ts_end.tv_sec = (time_t) floor(end);
246       ts_end.tv_nsec = (long)  ( ( end - ts_end.tv_sec) * 1000000000);
247       DEBUG3("pthread_cond_timedwait(%p,%p,%p)",&(cond->c),&(mutex->m), &ts_end);
248       switch ( (errcode=pthread_cond_timedwait(&(cond->c),&(mutex->m), &ts_end)) ) {
249        case 0:
250          return;
251        case ETIMEDOUT:
252          THROW3(timeout_error,errcode,"condition %p (mutex %p) wasn't signaled before timeout (%f)",
253                 cond,mutex, delay);
254        default:
255          THROW4(system_error,errcode,"pthread_cond_timedwait(%p,%p,%f) failed: %s",
256                 cond,mutex, delay, strerror(errcode));
257       }   
258    }
259 }
260
261 void xbt_os_cond_signal(xbt_os_cond_t cond) {
262    int errcode;
263    if ((errcode=pthread_cond_signal(&(cond->c))))
264      THROW2(system_error,errcode,"pthread_cond_signal(%p) failed: %s",
265             cond, strerror(errcode));
266 }
267          
268 void xbt_os_cond_broadcast(xbt_os_cond_t cond){
269    int errcode;
270    if ((errcode=pthread_cond_broadcast(&(cond->c))))
271      THROW2(system_error,errcode,"pthread_cond_broadcast(%p) failed: %s",
272             cond, strerror(errcode));
273 }
274 void xbt_os_cond_destroy(xbt_os_cond_t cond){
275    int errcode;
276
277    if (!cond) return;
278
279    if ((errcode=pthread_cond_destroy(&(cond->c))))
280      THROW2(system_error,errcode,"pthread_cond_destroy(%p) failed: %s",
281             cond, strerror(errcode));
282    free(cond);
283 }
284
285 void *xbt_os_thread_getparam(void) {
286    xbt_os_thread_t t = xbt_os_thread_self();
287    return t?t->param:NULL;
288 }
289
290 typedef struct xbt_os_sem_ {
291    sem_t s;
292 }s_xbt_os_sem_t ;
293
294 xbt_os_sem_t
295 xbt_os_sem_init(unsigned int value)
296 {
297         xbt_os_sem_t res = xbt_new(s_xbt_os_sem_t,1);
298         
299         if(sem_init(&(res->s),0,value) < 0)
300                 THROW1(system_error,errno,"sem_init() failed: %s",
301             strerror(errno));
302    
303    return res;
304 }
305
306 void 
307 xbt_os_sem_acquire(xbt_os_sem_t sem)
308 {
309         if(!sem)
310                 THROW1(arg_error,EINVAL,"xbt_os_sem_acquire() failed: %s",
311             strerror(EINVAL));
312         
313         if(sem_wait(&(sem->s)) < 0)
314                 THROW1(system_error,errno,"sem_wait() failed: %s",
315             strerror(errno));            
316 }
317
318 void xbt_os_sem_timedacquire(xbt_os_sem_t sem,double timeout)
319 {
320         int errcode;
321         struct timespec ts_end;
322         double end = timeout + xbt_os_time();
323         
324         if(!sem)
325                 THROW1(arg_error,EINVAL,"xbt_os_sem_timedacquire() failed: %s",strerror(EINVAL));
326         
327         if (timeout < 0) 
328         {
329                 xbt_os_sem_acquire(sem);
330         } 
331         else 
332         {
333                 ts_end.tv_sec = (time_t) floor(end);
334                 ts_end.tv_nsec = (long)  ( ( end - ts_end.tv_sec) * 1000000000);
335                 DEBUG2("sem_timedwait(%p,%p)",&(sem->s),&ts_end);
336         
337                 switch ((errcode=sem_timedwait(&(sem->s),&ts_end)))
338                 {
339                         case 0:
340                         return;
341                         
342                         case ETIMEDOUT:
343                         THROW2(timeout_error,errcode,"semaphore %p wasn't signaled before timeout (%f)",sem,timeout);
344                         
345                         default:
346                         THROW3(system_error,errcode,"sem_timedwait(%p,%f) failed: %s",sem,timeout, strerror(errcode));
347                 }   
348         }
349 }
350
351 void 
352 xbt_os_sem_release(xbt_os_sem_t sem)
353 {
354         if(!sem)
355                 THROW1(arg_error,EINVAL,"xbt_os_sem_release() failed: %s",
356             strerror(EINVAL));
357         
358         if(sem_post(&(sem->s)) < 0)
359                 THROW1(system_error,errno,"sem_post() failed: %s",
360             strerror(errno));            
361 }
362
363 void
364 xbt_os_sem_destroy(xbt_os_sem_t sem)
365 {
366         if(!sem)
367                 THROW1(arg_error,EINVAL,"xbt_os_sem_destroy() failed: %s",
368             strerror(EINVAL));
369             
370         if(sem_destroy(&(sem->s)) < 0)
371                 THROW1(system_error,errno,"sem_destroy() failed: %s",
372             strerror(errno));
373 }
374
375 void
376 xbt_os_sem_get_value(xbt_os_sem_t sem, int* svalue)
377 {
378         if(!sem)
379                 THROW1(arg_error,EINVAL,"xbt_os_sem_getvalue() failed: %s",
380             strerror(EINVAL));
381             
382         if(sem_getvalue(&(sem->s),svalue) < 0)
383                 THROW1(system_error,errno,"sem_getvalue() failed: %s",
384             strerror(errno));
385 }
386
387 /* ********************************* WINDOWS IMPLEMENTATION ************************************ */
388
389 #elif defined(WIN32)
390
391 #include <math.h>
392
393 typedef struct xbt_os_thread_ {
394   char *name;
395   HANDLE handle;                  /* the win thread handle        */
396   unsigned long id;               /* the win thread id            */
397   pvoid_f_pvoid_t start_routine;
398   void* param;
399 } s_xbt_os_thread_t ;
400
401 /* so we can specify the size of the stack of the threads */
402 #ifndef STACK_SIZE_PARAM_IS_A_RESERVATION
403 #define STACK_SIZE_PARAM_IS_A_RESERVATION 0x00010000
404 #endif
405
406 /* the default size of the stack of the threads (in bytes)*/
407 #define XBT_DEFAULT_THREAD_STACK_SIZE   4096
408
409 /* key to the TLS containing the xbt_os_thread_t structure */
410 static unsigned long xbt_self_thread_key;
411
412 void xbt_os_thread_mod_init(void) {
413    xbt_self_thread_key = TlsAlloc();
414 }
415 void xbt_os_thread_mod_exit(void) {
416    
417    if (!TlsFree(xbt_self_thread_key)) 
418      THROW0(system_error,(int)GetLastError(),"TlsFree() failed to cleanup the thread submodule");
419 }
420
421 static DWORD WINAPI  wrapper_start_routine(void *s) {
422   xbt_os_thread_t t = (xbt_os_thread_t)s;
423   void* rv;
424  
425     if(!TlsSetValue(xbt_self_thread_key,t))
426      THROW0(system_error,(int)GetLastError(),"TlsSetValue of data describing the created thread failed");
427    
428    rv = (*(t->start_routine))(t->param);
429
430    return *((DWORD*)rv);
431 }
432
433
434 xbt_os_thread_t xbt_os_thread_create(const char *name,pvoid_f_pvoid_t start_routine,
435                                void* param)  {
436    
437    xbt_os_thread_t t = xbt_new(s_xbt_os_thread_t,1);
438
439    t->name = xbt_strdup(name);
440    t->start_routine = start_routine ;
441    t->param = param;
442    
443    t->handle = CreateThread(NULL,XBT_DEFAULT_THREAD_STACK_SIZE,
444                             (LPTHREAD_START_ROUTINE)wrapper_start_routine,
445                             t,STACK_SIZE_PARAM_IS_A_RESERVATION,&(t->id));
446         
447    if(!t->handle) {
448      xbt_free(t);
449      THROW0(system_error,(int)GetLastError(),"CreateThread failed");
450    }
451    
452    return t;
453 }
454
455 const char* xbt_os_thread_name(xbt_os_thread_t t) {
456    return t->name;
457 }
458
459 const char* xbt_os_thread_self_name(void) {
460    xbt_os_thread_t t = xbt_os_thread_self();
461    return t?t->name:"main";
462 }
463
464 void 
465 xbt_os_thread_join(xbt_os_thread_t thread,void ** thread_return) {
466
467         if(WAIT_OBJECT_0 != WaitForSingleObject(thread->handle,INFINITE))  
468                 THROW0(system_error,(int)GetLastError(), "WaitForSingleObject failed");
469                 
470         if(thread_return){
471                 
472                 if(!GetExitCodeThread(thread->handle,(DWORD*)(*thread_return)))
473                         THROW0(system_error,(int)GetLastError(), "GetExitCodeThread failed");
474         }
475         
476         CloseHandle(thread->handle);
477         free(thread->name);
478         free(thread);
479 }
480
481 void xbt_os_thread_exit(int *retval) {
482    if(retval)
483         ExitThread(*retval);
484    else
485         ExitThread(0);
486 }
487
488 xbt_os_thread_t xbt_os_thread_self(void) {
489    return TlsGetValue(xbt_self_thread_key);
490 }
491
492 void *xbt_os_thread_getparam(void) {
493    xbt_os_thread_t t = xbt_os_thread_self();
494    return t->param;
495 }
496
497
498 void xbt_os_thread_yield(void) {
499     Sleep(0);
500 }
501 void xbt_os_thread_cancel(xbt_os_thread_t t) {
502    THROW_UNIMPLEMENTED;
503 }
504
505 /****** mutex related functions ******/
506 typedef struct xbt_os_mutex_ {
507   /* KEEP IT IN SYNC WITH xbt_thread.c */
508    CRITICAL_SECTION lock;   
509 } s_xbt_os_mutex_t;
510
511 xbt_os_mutex_t xbt_os_mutex_init(void) {
512    xbt_os_mutex_t res = xbt_new(s_xbt_os_mutex_t,1);
513
514    /* initialize the critical section object */
515    InitializeCriticalSection(&(res->lock));
516    
517    return res;
518 }
519
520 void xbt_os_mutex_acquire(xbt_os_mutex_t mutex) {
521
522    EnterCriticalSection(& mutex->lock);
523 }
524
525 void xbt_os_mutex_release(xbt_os_mutex_t mutex) {
526
527    LeaveCriticalSection (& mutex->lock);
528
529 }
530
531 void xbt_os_mutex_destroy(xbt_os_mutex_t mutex) {
532
533    if (!mutex) return;
534    
535    DeleteCriticalSection(& mutex->lock);                
536    free(mutex);
537 }
538
539 /***** condition related functions *****/
540  enum { /* KEEP IT IN SYNC WITH xbt_thread.c */
541     SIGNAL = 0,
542     BROADCAST = 1,
543     MAX_EVENTS = 2
544  };
545
546 typedef struct xbt_os_cond_ {
547   /* KEEP IT IN SYNC WITH xbt_thread.c */
548    HANDLE events[MAX_EVENTS];
549    
550    unsigned int waiters_count;           /* the number of waiters                        */
551    CRITICAL_SECTION waiters_count_lock;  /* protect access to waiters_count  */
552 } s_xbt_os_cond_t;
553
554 xbt_os_cond_t xbt_os_cond_init(void) {
555
556    xbt_os_cond_t res = xbt_new0(s_xbt_os_cond_t,1);
557         
558    memset(& res->waiters_count_lock,0,sizeof(CRITICAL_SECTION));
559         
560    /* initialize the critical section object */
561    InitializeCriticalSection(& res->waiters_count_lock);
562         
563    res->waiters_count = 0;
564         
565    /* Create an auto-reset event */
566    res->events[SIGNAL] = CreateEvent (NULL, FALSE, FALSE, NULL); 
567         
568    if(!res->events[SIGNAL]){
569       DeleteCriticalSection(& res->waiters_count_lock);
570       free(res);
571       THROW0(system_error,0,"CreateEvent failed for the signals");
572    }
573         
574    /* Create a manual-reset event. */
575    res->events[BROADCAST] = CreateEvent (NULL, TRUE, FALSE,NULL);
576         
577    if(!res->events[BROADCAST]){
578                 
579       DeleteCriticalSection(& res->waiters_count_lock);         
580       CloseHandle(res->events[SIGNAL]);
581       free(res); 
582       THROW0(system_error,0,"CreateEvent failed for the broadcasts");
583    }
584
585    return res;
586 }
587
588 void xbt_os_cond_wait(xbt_os_cond_t cond, xbt_os_mutex_t mutex) {
589         
590    unsigned long wait_result;
591    int is_last_waiter;
592
593    /* lock the threads counter and increment it */
594    EnterCriticalSection (& cond->waiters_count_lock);
595    cond->waiters_count++;
596    LeaveCriticalSection (& cond->waiters_count_lock);
597                 
598    /* unlock the mutex associate with the condition */
599    LeaveCriticalSection (& mutex->lock);
600         
601    /* wait for a signal (broadcast or no) */
602    wait_result = WaitForMultipleObjects (2, cond->events, FALSE, INFINITE);
603         
604    if(wait_result == WAIT_FAILED)
605      THROW0(system_error,0,"WaitForMultipleObjects failed, so we cannot wait on the condition");
606         
607    /* we have a signal lock the condition */
608    EnterCriticalSection (& cond->waiters_count_lock);
609    cond->waiters_count--;
610         
611    /* it's the last waiter or it's a broadcast ? */
612    is_last_waiter = ((wait_result == WAIT_OBJECT_0 + BROADCAST - 1) && (cond->waiters_count == 0));
613         
614    LeaveCriticalSection (& cond->waiters_count_lock);
615         
616    /* yes it's the last waiter or it's a broadcast
617     * only reset the manual event (the automatic event is reset in the WaitForMultipleObjects() function
618     * by the system. 
619     */
620    if (is_last_waiter)
621       if(!ResetEvent (cond->events[BROADCAST]))
622         THROW0(system_error,0,"ResetEvent failed");
623         
624    /* relock the mutex associated with the condition in accordance with the posix thread specification */
625    EnterCriticalSection (& mutex->lock);
626 }
627 void xbt_os_cond_timedwait(xbt_os_cond_t cond, xbt_os_mutex_t mutex, double delay) {
628    
629          unsigned long wait_result = WAIT_TIMEOUT;
630    int is_last_waiter;
631    unsigned long end = (unsigned long)(delay * 1000);
632
633
634    if (delay < 0) {
635       xbt_os_cond_wait(cond,mutex);
636    } else {
637           DEBUG3("xbt_cond_timedwait(%p,%p,%ul)",&(cond->events),&(mutex->lock),end);
638
639    /* lock the threads counter and increment it */
640    EnterCriticalSection (& cond->waiters_count_lock);
641    cond->waiters_count++;
642    LeaveCriticalSection (& cond->waiters_count_lock);
643                 
644    /* unlock the mutex associate with the condition */
645    LeaveCriticalSection (& mutex->lock);
646    /* wait for a signal (broadcast or no) */
647         
648    wait_result = WaitForMultipleObjects (2, cond->events, FALSE, end);
649         
650    switch(wait_result) {
651      case WAIT_TIMEOUT:
652         THROW3(timeout_error,GetLastError(),"condition %p (mutex %p) wasn't signaled before timeout (%f)",cond,mutex, delay);
653         case WAIT_FAILED:
654      THROW0(system_error,GetLastError(),"WaitForMultipleObjects failed, so we cannot wait on the condition");
655    }
656         
657    /* we have a signal lock the condition */
658    EnterCriticalSection (& cond->waiters_count_lock);
659    cond->waiters_count--;
660         
661    /* it's the last waiter or it's a broadcast ? */
662    is_last_waiter = ((wait_result == WAIT_OBJECT_0 + BROADCAST - 1) && (cond->waiters_count == 0));
663         
664    LeaveCriticalSection (& cond->waiters_count_lock);
665         
666    /* yes it's the last waiter or it's a broadcast
667     * only reset the manual event (the automatic event is reset in the WaitForMultipleObjects() function
668     * by the system. 
669     */
670    if (is_last_waiter)
671       if(!ResetEvent (cond->events[BROADCAST]))
672         THROW0(system_error,0,"ResetEvent failed");
673         
674    /* relock the mutex associated with the condition in accordance with the posix thread specification */
675    EnterCriticalSection (& mutex->lock);
676    }
677         /*THROW_UNIMPLEMENTED;*/
678 }
679
680 void xbt_os_cond_signal(xbt_os_cond_t cond) {
681    int have_waiters;
682
683    EnterCriticalSection (& cond->waiters_count_lock);
684    have_waiters = cond->waiters_count > 0;
685    LeaveCriticalSection (& cond->waiters_count_lock);
686         
687    if (have_waiters)
688      if(!SetEvent(cond->events[SIGNAL]))
689        THROW0(system_error,0,"SetEvent failed");
690        
691    xbt_os_thread_yield();
692 }
693
694 void xbt_os_cond_broadcast(xbt_os_cond_t cond){
695    int have_waiters;
696
697    EnterCriticalSection (& cond->waiters_count_lock);
698    have_waiters = cond->waiters_count > 0;
699    LeaveCriticalSection (& cond->waiters_count_lock);
700         
701    if (have_waiters)
702      SetEvent(cond->events[BROADCAST]);
703 }
704
705 void xbt_os_cond_destroy(xbt_os_cond_t cond){
706    int error = 0;
707    
708    if (!cond) return;
709    
710    if(!CloseHandle(cond->events[SIGNAL]))
711      error = 1;
712         
713    if(!CloseHandle(cond->events[BROADCAST]))
714      error = 1;
715         
716    DeleteCriticalSection(& cond->waiters_count_lock);
717         
718    xbt_free(cond);
719    
720    if (error)
721      THROW0(system_error,0,"Error while destroying the condition");
722 }
723
724 typedef struct xbt_os_sem_ {
725    HANDLE h;
726    unsigned int value;
727    CRITICAL_SECTION value_lock;  /* protect access to value of the semaphore  */
728 }s_xbt_os_sem_t ;
729
730 xbt_os_sem_t
731 xbt_os_sem_init(unsigned int value)
732 {
733         xbt_os_sem_t res;
734         
735         if(value > INT_MAX)
736         THROW1(arg_error,EINVAL,"xbt_os_sem_init() failed: %s",
737             strerror(EINVAL));
738         
739         res = (xbt_os_sem_t)xbt_new0(s_xbt_os_sem_t,1);
740         
741         if(!(res->h = CreateSemaphore(NULL,value,(long)INT_MAX,NULL))) {
742                 THROW1(system_error,GetLastError(),"CreateSemaphore() failed: %s",
743             strerror(GetLastError()));
744             return NULL;
745         }
746   
747         res->value = value;
748         
749         InitializeCriticalSection(&(res->value_lock));
750    
751         return res;
752 }
753
754 void 
755 xbt_os_sem_acquire(xbt_os_sem_t sem)
756 {
757         if(!sem)
758                 THROW1(arg_error,EINVAL,"xbt_os_sem_acquire() failed: %s",
759             strerror(EINVAL));  
760
761         /* wait failure */
762         if(WAIT_OBJECT_0 != WaitForSingleObject(sem->h,INFINITE))
763                 THROW1(system_error,GetLastError(),"WaitForSingleObject() failed: %s",
764                 strerror(GetLastError()));
765         EnterCriticalSection(&(sem->value_lock));
766         sem->value--;
767         LeaveCriticalSection(&(sem->value_lock));
768 }
769
770 void xbt_os_sem_timedacquire(xbt_os_sem_t sem, double timeout)
771 {
772         long seconds;
773         long milliseconds;
774         double end = timeout + xbt_os_time();
775         
776         if(!sem)
777                 THROW1(arg_error,EINVAL,"xbt_os_sem_timedacquire() failed: %s",
778             strerror(EINVAL));  
779         
780          if (timeout < 0) 
781         {
782                 xbt_os_sem_acquire(sem);
783         } 
784         else 
785         {
786                 
787                 seconds = (long) floor(end);
788                 milliseconds = (long)( ( end - seconds) * 1000);
789                 milliseconds += (seconds * 1000);
790                 
791                 switch(WaitForSingleObject(sem->h,milliseconds))
792                 {
793                         case WAIT_OBJECT_0:
794                         EnterCriticalSection(&(sem->value_lock));
795                         sem->value--;
796                         LeaveCriticalSection(&(sem->value_lock));
797                         return;
798                 
799                         case WAIT_TIMEOUT:
800                         THROW2(timeout_error,GetLastError(),"semaphore %p wasn't signaled before timeout (%f)",sem,timeout);
801                         return;
802                 
803                         default:
804                         
805                         THROW3(system_error,GetLastError(),"WaitForSingleObject(%p,%f) failed: %s",sem,timeout, strerror(GetLastError()));
806                 }
807         }
808 }
809
810 void 
811 xbt_os_sem_release(xbt_os_sem_t sem)
812 {
813         if(!sem)
814                 THROW1(arg_error,EINVAL,"xbt_os_sem_post() failed: %s",
815             strerror(EINVAL));
816         
817         if(!ReleaseSemaphore(sem->h,1, NULL)) 
818                 THROW1(system_error,GetLastError(),"ReleaseSemaphore() failed: %s",
819                 strerror(GetLastError()));
820         EnterCriticalSection (&(sem->value_lock));
821         sem->value++;
822         LeaveCriticalSection(&(sem->value_lock));
823 }
824
825 void
826 xbt_os_sem_destroy(xbt_os_sem_t sem)
827 {
828         if(!sem)
829                 THROW1(arg_error,EINVAL,"xbt_os_sem_destroy() failed: %s",
830             strerror(EINVAL));
831         
832         if(!CloseHandle(sem->h)) 
833                 THROW1(system_error,GetLastError(),"CloseHandle() failed: %s",
834                 strerror(GetLastError()));
835          
836          DeleteCriticalSection(&(sem->value_lock));
837                 
838          xbt_free(sem);     
839                 
840 }
841
842 void
843 xbt_os_sem_get_value(xbt_os_sem_t sem, int* svalue)
844 {
845         if(!sem)
846                 THROW1(arg_error,EINVAL,"xbt_os_sem_get_value() failed: %s",
847             strerror(EINVAL));
848         
849         EnterCriticalSection(&(sem->value_lock));  
850         *svalue = sem->value;
851         LeaveCriticalSection(&(sem->value_lock));
852 }
853
854 #endif