3 /* xbt_os_thread -- portability layer over the pthread API */
4 /* Used in RL to get win/lin portability, and in SG when CONTEXT_THREAD */
5 /* in SG, when using CONTEXT_UCONTEXT, xbt_os_thread_stub is used instead */
7 /* Copyright 2006,2007 Malek Cherier, Martin Quinson
8 * All right reserved. */
10 /* This program is free software; you can redistribute it and/or modify it
11 * under the terms of the license (GNU LGPL) which comes with this package. */
13 #include "xbt/sysdep.h"
15 #include "xbt/ex_interface.h" /* We play crude games with exceptions */
17 #include "xbt/xbt_os_time.h" /* Portable time facilities */
18 #include "xbt/xbt_os_thread.h" /* This module */
19 #include "xbt_modinter.h" /* Initialization/finalization of this module */
21 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_sync_os,xbt,"Synchronization mechanism (OS-level)");
23 /* ********************************* PTHREAD IMPLEMENTATION ************************************ */
26 #include <semaphore.h>
28 typedef struct xbt_os_thread_ {
32 pvoid_f_pvoid_t start_routine;
35 static xbt_os_thread_t main_thread = NULL;
37 /* thread-specific data containing the xbt_os_thread_t structure */
38 static pthread_key_t xbt_self_thread_key;
39 static int thread_mod_inited = 0;
41 /* frees the xbt_os_thread_t corresponding to the current thread */
42 static void xbt_os_thread_free_thread_data(void*d){
46 /* callback: context fetching */
47 static ex_ctx_t *_os_thread_ex_ctx(void) {
48 return xbt_os_thread_self()->exception;
51 /* callback: termination */
52 static void _os_thread_ex_terminate(xbt_ex_t * e) {
56 /* FIXME: there should be a configuration variable to choose to kill everyone or only this one */
59 void xbt_os_thread_mod_init(void) {
62 if (thread_mod_inited)
65 if ((errcode=pthread_key_create(&xbt_self_thread_key, NULL)))
66 THROW0(system_error,errcode,"pthread_key_create failed for xbt_self_thread_key");
68 main_thread=xbt_new(s_xbt_os_thread_t,1);
69 main_thread->name = (char*)"main";
70 main_thread->start_routine = NULL;
71 main_thread->param = NULL;
72 main_thread->exception = xbt_new(ex_ctx_t, 1);
73 XBT_CTX_INITIALIZE(main_thread->exception);
75 __xbt_ex_ctx = _os_thread_ex_ctx;
76 __xbt_ex_terminate = _os_thread_ex_terminate;
78 thread_mod_inited = 1;
80 void xbt_os_thread_mod_exit(void) {
81 /* FIXME: don't try to free our key on shutdown.
82 Valgrind detects no leak if we don't, and whine if we try to */
85 // if ((errcode=pthread_key_delete(xbt_self_thread_key)))
86 // THROW0(system_error,errcode,"pthread_key_delete failed for xbt_self_thread_key");
89 static void * wrapper_start_routine(void *s) {
90 xbt_os_thread_t t = s;
93 if ((errcode=pthread_setspecific(xbt_self_thread_key,t)))
94 THROW0(system_error,errcode,
95 "pthread_setspecific failed for xbt_self_thread_key");
97 return (*(t->start_routine))(t->param);
99 xbt_os_thread_t xbt_os_thread_create(const char*name,
100 pvoid_f_pvoid_t start_routine,
104 xbt_os_thread_t res_thread=xbt_new(s_xbt_os_thread_t,1);
105 res_thread->name = xbt_strdup(name);
106 res_thread->start_routine = start_routine;
107 res_thread->param = param;
108 res_thread->exception = xbt_new(ex_ctx_t, 1);
109 XBT_CTX_INITIALIZE(res_thread->exception);
111 if ((errcode = pthread_create(&(res_thread->t), NULL,
112 wrapper_start_routine, res_thread)))
113 THROW1(system_error,errcode,
114 "pthread_create failed: %s",strerror(errcode));
119 const char* xbt_os_thread_name(xbt_os_thread_t t) {
123 const char* xbt_os_thread_self_name(void) {
124 xbt_os_thread_t self = xbt_os_thread_self();
125 return self?self->name:"main";
128 xbt_os_thread_join(xbt_os_thread_t thread,void ** thread_return) {
132 if ((errcode = pthread_join(thread->t,thread_return)))
133 THROW1(system_error,errcode, "pthread_join failed: %s",
135 if (thread->exception)
136 free(thread->exception);
138 if (thread == main_thread) /* just killed main thread */
144 void xbt_os_thread_exit(int *retval) {
145 pthread_exit(retval);
148 xbt_os_thread_t xbt_os_thread_self(void) {
151 if (!thread_mod_inited)
154 res = pthread_getspecific(xbt_self_thread_key);
162 void xbt_os_thread_yield(void) {
165 void xbt_os_thread_cancel(xbt_os_thread_t t) {
166 pthread_cancel(t->t);
168 /****** mutex related functions ******/
169 typedef struct xbt_os_mutex_ {
170 /* KEEP IT IN SYNC WITH xbt_thread.c */
174 xbt_os_mutex_t xbt_os_mutex_init(void) {
175 xbt_os_mutex_t res = xbt_new(s_xbt_os_mutex_t,1);
178 if ((errcode = pthread_mutex_init(&(res->m),NULL)))
179 THROW1(system_error,errcode,"pthread_mutex_init() failed: %s",
185 void xbt_os_mutex_acquire(xbt_os_mutex_t mutex) {
188 if ((errcode=pthread_mutex_lock(&(mutex->m))))
189 THROW2(system_error,errcode,"pthread_mutex_lock(%p) failed: %s",
190 mutex, strerror(errcode));
193 void xbt_os_mutex_release(xbt_os_mutex_t mutex) {
196 if ((errcode=pthread_mutex_unlock(&(mutex->m))))
197 THROW2(system_error,errcode,"pthread_mutex_unlock(%p) failed: %s",
198 mutex, strerror(errcode));
201 void xbt_os_mutex_destroy(xbt_os_mutex_t mutex) {
206 if ((errcode=pthread_mutex_destroy(&(mutex->m))))
207 THROW2(system_error,errcode,"pthread_mutex_destroy(%p) failed: %s",
208 mutex, strerror(errcode));
212 /***** condition related functions *****/
213 typedef struct xbt_os_cond_ {
214 /* KEEP IT IN SYNC WITH xbt_thread.c */
218 xbt_os_cond_t xbt_os_cond_init(void) {
219 xbt_os_cond_t res = xbt_new(s_xbt_os_cond_t,1);
221 if ((errcode=pthread_cond_init(&(res->c),NULL)))
222 THROW1(system_error,errcode,"pthread_cond_init() failed: %s",
228 void xbt_os_cond_wait(xbt_os_cond_t cond, xbt_os_mutex_t mutex) {
230 if ((errcode=pthread_cond_wait(&(cond->c),&(mutex->m))))
231 THROW3(system_error,errcode,"pthread_cond_wait(%p,%p) failed: %s",
232 cond,mutex, strerror(errcode));
237 void xbt_os_cond_timedwait(xbt_os_cond_t cond, xbt_os_mutex_t mutex, double delay) {
239 struct timespec ts_end;
240 double end = delay + xbt_os_time();
243 xbt_os_cond_wait(cond,mutex);
245 ts_end.tv_sec = (time_t) floor(end);
246 ts_end.tv_nsec = (long) ( ( end - ts_end.tv_sec) * 1000000000);
247 DEBUG3("pthread_cond_timedwait(%p,%p,%p)",&(cond->c),&(mutex->m), &ts_end);
248 switch ( (errcode=pthread_cond_timedwait(&(cond->c),&(mutex->m), &ts_end)) ) {
252 THROW3(timeout_error,errcode,"condition %p (mutex %p) wasn't signaled before timeout (%f)",
255 THROW4(system_error,errcode,"pthread_cond_timedwait(%p,%p,%f) failed: %s",
256 cond,mutex, delay, strerror(errcode));
261 void xbt_os_cond_signal(xbt_os_cond_t cond) {
263 if ((errcode=pthread_cond_signal(&(cond->c))))
264 THROW2(system_error,errcode,"pthread_cond_signal(%p) failed: %s",
265 cond, strerror(errcode));
268 void xbt_os_cond_broadcast(xbt_os_cond_t cond){
270 if ((errcode=pthread_cond_broadcast(&(cond->c))))
271 THROW2(system_error,errcode,"pthread_cond_broadcast(%p) failed: %s",
272 cond, strerror(errcode));
274 void xbt_os_cond_destroy(xbt_os_cond_t cond){
279 if ((errcode=pthread_cond_destroy(&(cond->c))))
280 THROW2(system_error,errcode,"pthread_cond_destroy(%p) failed: %s",
281 cond, strerror(errcode));
285 void *xbt_os_thread_getparam(void) {
286 xbt_os_thread_t t = xbt_os_thread_self();
287 return t?t->param:NULL;
290 typedef struct xbt_os_sem_ {
295 xbt_os_sem_init(unsigned int value)
297 xbt_os_sem_t res = xbt_new(s_xbt_os_sem_t,1);
299 if(sem_init(&(res->s),0,value) < 0)
300 THROW1(system_error,errno,"sem_init() failed: %s",
307 xbt_os_sem_acquire(xbt_os_sem_t sem)
310 THROW1(arg_error,EINVAL,"xbt_os_sem_acquire() failed: %s",
313 if(sem_wait(&(sem->s)) < 0)
314 THROW1(system_error,errno,"sem_wait() failed: %s",
318 void xbt_os_sem_timedacquire(xbt_os_sem_t sem,double timeout)
321 struct timespec ts_end;
322 double end = timeout + xbt_os_time();
325 THROW1(arg_error,EINVAL,"xbt_os_sem_timedacquire() failed: %s",strerror(EINVAL));
329 xbt_os_sem_acquire(sem);
333 ts_end.tv_sec = (time_t) floor(end);
334 ts_end.tv_nsec = (long) ( ( end - ts_end.tv_sec) * 1000000000);
335 DEBUG2("sem_timedwait(%p,%p)",&(sem->s),&ts_end);
337 switch ((errcode=sem_timedwait(&(sem->s),&ts_end)))
343 THROW2(timeout_error,errcode,"semaphore %p wasn't signaled before timeout (%f)",sem,timeout);
346 THROW3(system_error,errcode,"sem_timedwait(%p,%f) failed: %s",sem,timeout, strerror(errcode));
352 xbt_os_sem_release(xbt_os_sem_t sem)
355 THROW1(arg_error,EINVAL,"xbt_os_sem_release() failed: %s",
358 if(sem_post(&(sem->s)) < 0)
359 THROW1(system_error,errno,"sem_post() failed: %s",
364 xbt_os_sem_destroy(xbt_os_sem_t sem)
367 THROW1(arg_error,EINVAL,"xbt_os_sem_destroy() failed: %s",
370 if(sem_destroy(&(sem->s)) < 0)
371 THROW1(system_error,errno,"sem_destroy() failed: %s",
376 xbt_os_sem_get_value(xbt_os_sem_t sem, int* svalue)
379 THROW1(arg_error,EINVAL,"xbt_os_sem_getvalue() failed: %s",
382 if(sem_getvalue(&(sem->s),svalue) < 0)
383 THROW1(system_error,errno,"sem_getvalue() failed: %s",
387 /* ********************************* WINDOWS IMPLEMENTATION ************************************ */
393 typedef struct xbt_os_thread_ {
395 HANDLE handle; /* the win thread handle */
396 unsigned long id; /* the win thread id */
397 pvoid_f_pvoid_t start_routine;
399 } s_xbt_os_thread_t ;
401 /* so we can specify the size of the stack of the threads */
402 #ifndef STACK_SIZE_PARAM_IS_A_RESERVATION
403 #define STACK_SIZE_PARAM_IS_A_RESERVATION 0x00010000
406 /* the default size of the stack of the threads (in bytes)*/
407 #define XBT_DEFAULT_THREAD_STACK_SIZE 4096
409 /* key to the TLS containing the xbt_os_thread_t structure */
410 static unsigned long xbt_self_thread_key;
412 void xbt_os_thread_mod_init(void) {
413 xbt_self_thread_key = TlsAlloc();
415 void xbt_os_thread_mod_exit(void) {
417 if (!TlsFree(xbt_self_thread_key))
418 THROW0(system_error,(int)GetLastError(),"TlsFree() failed to cleanup the thread submodule");
421 static DWORD WINAPI wrapper_start_routine(void *s) {
422 xbt_os_thread_t t = (xbt_os_thread_t)s;
425 if(!TlsSetValue(xbt_self_thread_key,t))
426 THROW0(system_error,(int)GetLastError(),"TlsSetValue of data describing the created thread failed");
428 rv = (*(t->start_routine))(t->param);
430 return *((DWORD*)rv);
434 xbt_os_thread_t xbt_os_thread_create(const char *name,pvoid_f_pvoid_t start_routine,
437 xbt_os_thread_t t = xbt_new(s_xbt_os_thread_t,1);
439 t->name = xbt_strdup(name);
440 t->start_routine = start_routine ;
443 t->handle = CreateThread(NULL,XBT_DEFAULT_THREAD_STACK_SIZE,
444 (LPTHREAD_START_ROUTINE)wrapper_start_routine,
445 t,STACK_SIZE_PARAM_IS_A_RESERVATION,&(t->id));
449 THROW0(system_error,(int)GetLastError(),"CreateThread failed");
455 const char* xbt_os_thread_name(xbt_os_thread_t t) {
459 const char* xbt_os_thread_self_name(void) {
460 xbt_os_thread_t t = xbt_os_thread_self();
461 return t?t->name:"main";
465 xbt_os_thread_join(xbt_os_thread_t thread,void ** thread_return) {
467 if(WAIT_OBJECT_0 != WaitForSingleObject(thread->handle,INFINITE))
468 THROW0(system_error,(int)GetLastError(), "WaitForSingleObject failed");
472 if(!GetExitCodeThread(thread->handle,(DWORD*)(*thread_return)))
473 THROW0(system_error,(int)GetLastError(), "GetExitCodeThread failed");
476 CloseHandle(thread->handle);
481 void xbt_os_thread_exit(int *retval) {
488 xbt_os_thread_t xbt_os_thread_self(void) {
489 return TlsGetValue(xbt_self_thread_key);
492 void *xbt_os_thread_getparam(void) {
493 xbt_os_thread_t t = xbt_os_thread_self();
498 void xbt_os_thread_yield(void) {
501 void xbt_os_thread_cancel(xbt_os_thread_t t) {
505 /****** mutex related functions ******/
506 typedef struct xbt_os_mutex_ {
507 /* KEEP IT IN SYNC WITH xbt_thread.c */
508 CRITICAL_SECTION lock;
511 xbt_os_mutex_t xbt_os_mutex_init(void) {
512 xbt_os_mutex_t res = xbt_new(s_xbt_os_mutex_t,1);
514 /* initialize the critical section object */
515 InitializeCriticalSection(&(res->lock));
520 void xbt_os_mutex_acquire(xbt_os_mutex_t mutex) {
522 EnterCriticalSection(& mutex->lock);
525 void xbt_os_mutex_release(xbt_os_mutex_t mutex) {
527 LeaveCriticalSection (& mutex->lock);
531 void xbt_os_mutex_destroy(xbt_os_mutex_t mutex) {
535 DeleteCriticalSection(& mutex->lock);
539 /***** condition related functions *****/
540 enum { /* KEEP IT IN SYNC WITH xbt_thread.c */
546 typedef struct xbt_os_cond_ {
547 /* KEEP IT IN SYNC WITH xbt_thread.c */
548 HANDLE events[MAX_EVENTS];
550 unsigned int waiters_count; /* the number of waiters */
551 CRITICAL_SECTION waiters_count_lock; /* protect access to waiters_count */
554 xbt_os_cond_t xbt_os_cond_init(void) {
556 xbt_os_cond_t res = xbt_new0(s_xbt_os_cond_t,1);
558 memset(& res->waiters_count_lock,0,sizeof(CRITICAL_SECTION));
560 /* initialize the critical section object */
561 InitializeCriticalSection(& res->waiters_count_lock);
563 res->waiters_count = 0;
565 /* Create an auto-reset event */
566 res->events[SIGNAL] = CreateEvent (NULL, FALSE, FALSE, NULL);
568 if(!res->events[SIGNAL]){
569 DeleteCriticalSection(& res->waiters_count_lock);
571 THROW0(system_error,0,"CreateEvent failed for the signals");
574 /* Create a manual-reset event. */
575 res->events[BROADCAST] = CreateEvent (NULL, TRUE, FALSE,NULL);
577 if(!res->events[BROADCAST]){
579 DeleteCriticalSection(& res->waiters_count_lock);
580 CloseHandle(res->events[SIGNAL]);
582 THROW0(system_error,0,"CreateEvent failed for the broadcasts");
588 void xbt_os_cond_wait(xbt_os_cond_t cond, xbt_os_mutex_t mutex) {
590 unsigned long wait_result;
593 /* lock the threads counter and increment it */
594 EnterCriticalSection (& cond->waiters_count_lock);
595 cond->waiters_count++;
596 LeaveCriticalSection (& cond->waiters_count_lock);
598 /* unlock the mutex associate with the condition */
599 LeaveCriticalSection (& mutex->lock);
601 /* wait for a signal (broadcast or no) */
602 wait_result = WaitForMultipleObjects (2, cond->events, FALSE, INFINITE);
604 if(wait_result == WAIT_FAILED)
605 THROW0(system_error,0,"WaitForMultipleObjects failed, so we cannot wait on the condition");
607 /* we have a signal lock the condition */
608 EnterCriticalSection (& cond->waiters_count_lock);
609 cond->waiters_count--;
611 /* it's the last waiter or it's a broadcast ? */
612 is_last_waiter = ((wait_result == WAIT_OBJECT_0 + BROADCAST - 1) && (cond->waiters_count == 0));
614 LeaveCriticalSection (& cond->waiters_count_lock);
616 /* yes it's the last waiter or it's a broadcast
617 * only reset the manual event (the automatic event is reset in the WaitForMultipleObjects() function
621 if(!ResetEvent (cond->events[BROADCAST]))
622 THROW0(system_error,0,"ResetEvent failed");
624 /* relock the mutex associated with the condition in accordance with the posix thread specification */
625 EnterCriticalSection (& mutex->lock);
627 void xbt_os_cond_timedwait(xbt_os_cond_t cond, xbt_os_mutex_t mutex, double delay) {
629 unsigned long wait_result = WAIT_TIMEOUT;
631 unsigned long end = (unsigned long)(delay * 1000);
635 xbt_os_cond_wait(cond,mutex);
637 DEBUG3("xbt_cond_timedwait(%p,%p,%ul)",&(cond->events),&(mutex->lock),end);
639 /* lock the threads counter and increment it */
640 EnterCriticalSection (& cond->waiters_count_lock);
641 cond->waiters_count++;
642 LeaveCriticalSection (& cond->waiters_count_lock);
644 /* unlock the mutex associate with the condition */
645 LeaveCriticalSection (& mutex->lock);
646 /* wait for a signal (broadcast or no) */
648 wait_result = WaitForMultipleObjects (2, cond->events, FALSE, end);
650 switch(wait_result) {
652 THROW3(timeout_error,GetLastError(),"condition %p (mutex %p) wasn't signaled before timeout (%f)",cond,mutex, delay);
654 THROW0(system_error,GetLastError(),"WaitForMultipleObjects failed, so we cannot wait on the condition");
657 /* we have a signal lock the condition */
658 EnterCriticalSection (& cond->waiters_count_lock);
659 cond->waiters_count--;
661 /* it's the last waiter or it's a broadcast ? */
662 is_last_waiter = ((wait_result == WAIT_OBJECT_0 + BROADCAST - 1) && (cond->waiters_count == 0));
664 LeaveCriticalSection (& cond->waiters_count_lock);
666 /* yes it's the last waiter or it's a broadcast
667 * only reset the manual event (the automatic event is reset in the WaitForMultipleObjects() function
671 if(!ResetEvent (cond->events[BROADCAST]))
672 THROW0(system_error,0,"ResetEvent failed");
674 /* relock the mutex associated with the condition in accordance with the posix thread specification */
675 EnterCriticalSection (& mutex->lock);
677 /*THROW_UNIMPLEMENTED;*/
680 void xbt_os_cond_signal(xbt_os_cond_t cond) {
683 EnterCriticalSection (& cond->waiters_count_lock);
684 have_waiters = cond->waiters_count > 0;
685 LeaveCriticalSection (& cond->waiters_count_lock);
688 if(!SetEvent(cond->events[SIGNAL]))
689 THROW0(system_error,0,"SetEvent failed");
691 xbt_os_thread_yield();
694 void xbt_os_cond_broadcast(xbt_os_cond_t cond){
697 EnterCriticalSection (& cond->waiters_count_lock);
698 have_waiters = cond->waiters_count > 0;
699 LeaveCriticalSection (& cond->waiters_count_lock);
702 SetEvent(cond->events[BROADCAST]);
705 void xbt_os_cond_destroy(xbt_os_cond_t cond){
710 if(!CloseHandle(cond->events[SIGNAL]))
713 if(!CloseHandle(cond->events[BROADCAST]))
716 DeleteCriticalSection(& cond->waiters_count_lock);
721 THROW0(system_error,0,"Error while destroying the condition");
724 typedef struct xbt_os_sem_ {
727 CRITICAL_SECTION value_lock; /* protect access to value of the semaphore */
731 xbt_os_sem_init(unsigned int value)
736 THROW1(arg_error,EINVAL,"xbt_os_sem_init() failed: %s",
739 res = (xbt_os_sem_t)xbt_new0(s_xbt_os_sem_t,1);
741 if(!(res->h = CreateSemaphore(NULL,value,(long)INT_MAX,NULL))) {
742 THROW1(system_error,GetLastError(),"CreateSemaphore() failed: %s",
743 strerror(GetLastError()));
749 InitializeCriticalSection(&(res->value_lock));
755 xbt_os_sem_acquire(xbt_os_sem_t sem)
758 THROW1(arg_error,EINVAL,"xbt_os_sem_acquire() failed: %s",
762 if(WAIT_OBJECT_0 != WaitForSingleObject(sem->h,INFINITE))
763 THROW1(system_error,GetLastError(),"WaitForSingleObject() failed: %s",
764 strerror(GetLastError()));
765 EnterCriticalSection(&(sem->value_lock));
767 LeaveCriticalSection(&(sem->value_lock));
770 void xbt_os_sem_timedacquire(xbt_os_sem_t sem, double timeout)
774 double end = timeout + xbt_os_time();
777 THROW1(arg_error,EINVAL,"xbt_os_sem_timedacquire() failed: %s",
782 xbt_os_sem_acquire(sem);
787 seconds = (long) floor(end);
788 milliseconds = (long)( ( end - seconds) * 1000);
789 milliseconds += (seconds * 1000);
791 switch(WaitForSingleObject(sem->h,milliseconds))
794 EnterCriticalSection(&(sem->value_lock));
796 LeaveCriticalSection(&(sem->value_lock));
800 THROW2(timeout_error,GetLastError(),"semaphore %p wasn't signaled before timeout (%f)",sem,timeout);
805 THROW3(system_error,GetLastError(),"WaitForSingleObject(%p,%f) failed: %s",sem,timeout, strerror(GetLastError()));
811 xbt_os_sem_release(xbt_os_sem_t sem)
814 THROW1(arg_error,EINVAL,"xbt_os_sem_post() failed: %s",
817 if(!ReleaseSemaphore(sem->h,1, NULL))
818 THROW1(system_error,GetLastError(),"ReleaseSemaphore() failed: %s",
819 strerror(GetLastError()));
820 EnterCriticalSection (&(sem->value_lock));
822 LeaveCriticalSection(&(sem->value_lock));
826 xbt_os_sem_destroy(xbt_os_sem_t sem)
829 THROW1(arg_error,EINVAL,"xbt_os_sem_destroy() failed: %s",
832 if(!CloseHandle(sem->h))
833 THROW1(system_error,GetLastError(),"CloseHandle() failed: %s",
834 strerror(GetLastError()));
836 DeleteCriticalSection(&(sem->value_lock));
843 xbt_os_sem_get_value(xbt_os_sem_t sem, int* svalue)
846 THROW1(arg_error,EINVAL,"xbt_os_sem_get_value() failed: %s",
849 EnterCriticalSection(&(sem->value_lock));
850 *svalue = sem->value;
851 LeaveCriticalSection(&(sem->value_lock));