3 /* xbt_os_thread -- portability layer over the pthread API */
4 /* Used in RL to get win/lin portability, and in SG when CONTEXT_THREAD */
5 /* in SG, when using CONTEXT_UCONTEXT, xbt_os_thread_stub is used instead */
7 /* Copyright 2006,2007 Malek Cherier, Martin Quinson
8 * All right reserved. */
10 /* This program is free software; you can redistribute it and/or modify it
11 * under the terms of the license (GNU LGPL) which comes with this package. */
13 #include "xbt/sysdep.h"
15 #include "xbt/ex_interface.h" /* We play crude games with exceptions */
17 #include "xbt/xbt_os_time.h" /* Portable time facilities */
18 #include "xbt/xbt_os_thread.h" /* This module */
19 #include "xbt_modinter.h" /* Initialization/finalization of this module */
21 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_sync_os,xbt,"Synchronization mechanism (OS-level)");
23 /* ********************************* PTHREAD IMPLEMENTATION ************************************ */
26 #include <semaphore.h>
28 typedef struct xbt_os_thread_ {
32 pvoid_f_pvoid_t start_routine;
35 static xbt_os_thread_t main_thread = NULL;
37 /* thread-specific data containing the xbt_os_thread_t structure */
38 static pthread_key_t xbt_self_thread_key;
39 static int thread_mod_inited = 0;
41 /* frees the xbt_os_thread_t corresponding to the current thread */
42 static void xbt_os_thread_free_thread_data(void*d){
46 /* callback: context fetching */
47 static ex_ctx_t *_os_thread_ex_ctx(void) {
48 return xbt_os_thread_self()->exception;
51 /* callback: termination */
52 static void _os_thread_ex_terminate(xbt_ex_t * e) {
56 /* FIXME: there should be a configuration variable to choose to kill everyone or only this one */
59 void xbt_os_thread_mod_init(void) {
62 if (thread_mod_inited)
65 if ((errcode=pthread_key_create(&xbt_self_thread_key, NULL)))
66 THROW0(system_error,errcode,"pthread_key_create failed for xbt_self_thread_key");
68 main_thread=xbt_new(s_xbt_os_thread_t,1);
69 main_thread->name = (char*)"main";
70 main_thread->start_routine = NULL;
71 main_thread->param = NULL;
72 main_thread->exception = xbt_new(ex_ctx_t, 1);
73 XBT_CTX_INITIALIZE(main_thread->exception);
75 __xbt_ex_ctx = _os_thread_ex_ctx;
76 __xbt_ex_terminate = _os_thread_ex_terminate;
78 thread_mod_inited = 1;
80 void xbt_os_thread_mod_exit(void) {
81 /* FIXME: don't try to free our key on shutdown.
82 Valgrind detects no leak if we don't, and whine if we try to */
85 // if ((errcode=pthread_key_delete(xbt_self_thread_key)))
86 // THROW0(system_error,errcode,"pthread_key_delete failed for xbt_self_thread_key");
89 static void * wrapper_start_routine(void *s) {
90 xbt_os_thread_t t = s;
93 if ((errcode=pthread_setspecific(xbt_self_thread_key,t)))
94 THROW0(system_error,errcode,
95 "pthread_setspecific failed for xbt_self_thread_key");
97 return (*(t->start_routine))(t->param);
99 xbt_os_thread_t xbt_os_thread_create(const char*name,
100 pvoid_f_pvoid_t start_routine,
104 xbt_os_thread_t res_thread=xbt_new(s_xbt_os_thread_t,1);
105 res_thread->name = xbt_strdup(name);
106 res_thread->start_routine = start_routine;
107 res_thread->param = param;
108 res_thread->exception = xbt_new(ex_ctx_t, 1);
109 XBT_CTX_INITIALIZE(res_thread->exception);
111 if ((errcode = pthread_create(&(res_thread->t), NULL,
112 wrapper_start_routine, res_thread)))
113 THROW1(system_error,errcode,
114 "pthread_create failed: %s",strerror(errcode));
119 const char* xbt_os_thread_name(xbt_os_thread_t t) {
123 const char* xbt_os_thread_self_name(void) {
124 xbt_os_thread_t self = xbt_os_thread_self();
125 return self?self->name:"main";
128 xbt_os_thread_join(xbt_os_thread_t thread,void ** thread_return) {
132 if ((errcode = pthread_join(thread->t,thread_return)))
133 THROW1(system_error,errcode, "pthread_join failed: %s",
135 if (thread->exception)
136 free(thread->exception);
138 if (thread == main_thread) /* just killed main thread */
144 void xbt_os_thread_exit(int *retval) {
145 pthread_exit(retval);
148 xbt_os_thread_t xbt_os_thread_self(void) {
151 if (!thread_mod_inited)
154 res = pthread_getspecific(xbt_self_thread_key);
162 void xbt_os_thread_yield(void) {
165 void xbt_os_thread_cancel(xbt_os_thread_t t) {
166 pthread_cancel(t->t);
168 /****** mutex related functions ******/
169 typedef struct xbt_os_mutex_ {
170 /* KEEP IT IN SYNC WITH xbt_thread.c */
174 xbt_os_mutex_t xbt_os_mutex_init(void) {
175 xbt_os_mutex_t res = xbt_new(s_xbt_os_mutex_t,1);
178 if ((errcode = pthread_mutex_init(&(res->m),NULL)))
179 THROW1(system_error,errcode,"pthread_mutex_init() failed: %s",
185 void xbt_os_mutex_acquire(xbt_os_mutex_t mutex) {
188 if ((errcode=pthread_mutex_lock(&(mutex->m))))
189 THROW2(system_error,errcode,"pthread_mutex_lock(%p) failed: %s",
190 mutex, strerror(errcode));
193 void xbt_os_mutex_release(xbt_os_mutex_t mutex) {
196 if ((errcode=pthread_mutex_unlock(&(mutex->m))))
197 THROW2(system_error,errcode,"pthread_mutex_unlock(%p) failed: %s",
198 mutex, strerror(errcode));
201 void xbt_os_mutex_destroy(xbt_os_mutex_t mutex) {
206 if ((errcode=pthread_mutex_destroy(&(mutex->m))))
207 THROW2(system_error,errcode,"pthread_mutex_destroy(%p) failed: %s",
208 mutex, strerror(errcode));
212 /***** condition related functions *****/
213 typedef struct xbt_os_cond_ {
214 /* KEEP IT IN SYNC WITH xbt_thread.c */
218 xbt_os_cond_t xbt_os_cond_init(void) {
219 xbt_os_cond_t res = xbt_new(s_xbt_os_cond_t,1);
221 if ((errcode=pthread_cond_init(&(res->c),NULL)))
222 THROW1(system_error,errcode,"pthread_cond_init() failed: %s",
228 void xbt_os_cond_wait(xbt_os_cond_t cond, xbt_os_mutex_t mutex) {
230 if ((errcode=pthread_cond_wait(&(cond->c),&(mutex->m))))
231 THROW3(system_error,errcode,"pthread_cond_wait(%p,%p) failed: %s",
232 cond,mutex, strerror(errcode));
237 void xbt_os_cond_timedwait(xbt_os_cond_t cond, xbt_os_mutex_t mutex, double delay) {
239 struct timespec ts_end;
240 double end = delay + xbt_os_time();
243 xbt_os_cond_wait(cond,mutex);
245 ts_end.tv_sec = (time_t) floor(end);
246 ts_end.tv_nsec = (long) ( ( end - ts_end.tv_sec) * 1000000000);
247 DEBUG3("pthread_cond_timedwait(%p,%p,%p)",&(cond->c),&(mutex->m), &ts_end);
248 switch ( (errcode=pthread_cond_timedwait(&(cond->c),&(mutex->m), &ts_end)) ) {
252 THROW3(timeout_error,errcode,"condition %p (mutex %p) wasn't signaled before timeout (%f)",
255 THROW4(system_error,errcode,"pthread_cond_timedwait(%p,%p,%f) failed: %s",
256 cond,mutex, delay, strerror(errcode));
261 void xbt_os_cond_signal(xbt_os_cond_t cond) {
263 if ((errcode=pthread_cond_signal(&(cond->c))))
264 THROW2(system_error,errcode,"pthread_cond_signal(%p) failed: %s",
265 cond, strerror(errcode));
268 void xbt_os_cond_broadcast(xbt_os_cond_t cond){
270 if ((errcode=pthread_cond_broadcast(&(cond->c))))
271 THROW2(system_error,errcode,"pthread_cond_broadcast(%p) failed: %s",
272 cond, strerror(errcode));
274 void xbt_os_cond_destroy(xbt_os_cond_t cond){
279 if ((errcode=pthread_cond_destroy(&(cond->c))))
280 THROW2(system_error,errcode,"pthread_cond_destroy(%p) failed: %s",
281 cond, strerror(errcode));
285 void *xbt_os_thread_getparam(void) {
286 xbt_os_thread_t t = xbt_os_thread_self();
287 return t?t->param:NULL;
290 typedef struct xbt_os_sem_ {
295 xbt_os_sem_init(unsigned int value)
297 xbt_os_sem_t res = xbt_new(s_xbt_os_sem_t,1);
299 if(sem_init(&(res->s),0,value) < 0)
300 THROW1(system_error,errno,"sem_init() failed: %s",
307 xbt_os_sem_acquire(xbt_os_sem_t sem)
310 THROW0(arg_error,EINVAL,"Cannot acquire of the NULL semaphore");
312 if(sem_wait(&(sem->s)) < 0)
313 THROW1(system_error,errno,"sem_wait() failed: %s",
317 void xbt_os_sem_timedacquire(xbt_os_sem_t sem,double timeout)
319 /* mac os x have not the sem_timedwait() function */
320 #ifndef HAVE_SEM_TIMEDWAIT
324 struct timespec ts_end;
325 double end = timeout + xbt_os_time();
328 THROW0(arg_error,EINVAL,"Cannot acquire of the NULL semaphore");
332 xbt_os_sem_acquire(sem);
336 ts_end.tv_sec = (time_t) floor(end);
337 ts_end.tv_nsec = (long) ( ( end - ts_end.tv_sec) * 1000000000);
338 DEBUG2("sem_timedwait(%p,%p)",&(sem->s),&ts_end);
340 switch ((errcode=sem_timedwait(&(sem->s),&ts_end)))
346 THROW2(timeout_error,errcode,"semaphore %p wasn't signaled before timeout (%f)",sem,timeout);
349 THROW3(system_error,errcode,"sem_timedwait(%p,%f) failed: %s",sem,timeout, strerror(errcode));
356 xbt_os_sem_release(xbt_os_sem_t sem)
359 THROW0(arg_error,EINVAL,"Cannot release of the NULL semaphore");
361 if(sem_post(&(sem->s)) < 0)
362 THROW1(system_error,errno,"sem_post() failed: %s",
367 xbt_os_sem_destroy(xbt_os_sem_t sem)
372 if(sem_destroy(&(sem->s)) < 0)
373 THROW1(system_error,errno,"sem_destroy() failed: %s",
378 xbt_os_sem_get_value(xbt_os_sem_t sem, int* svalue)
381 THROW0(arg_error,EINVAL,"Cannot get the value of the NULL semaphore");
383 if(sem_getvalue(&(sem->s),svalue) < 0)
384 THROW1(system_error,errno,"sem_getvalue() failed: %s",
388 /* ********************************* WINDOWS IMPLEMENTATION ************************************ */
394 typedef struct xbt_os_thread_ {
396 HANDLE handle; /* the win thread handle */
397 unsigned long id; /* the win thread id */
398 pvoid_f_pvoid_t start_routine;
400 } s_xbt_os_thread_t ;
402 /* so we can specify the size of the stack of the threads */
403 #ifndef STACK_SIZE_PARAM_IS_A_RESERVATION
404 #define STACK_SIZE_PARAM_IS_A_RESERVATION 0x00010000
407 /* the default size of the stack of the threads (in bytes)*/
408 #define XBT_DEFAULT_THREAD_STACK_SIZE 4096
410 /* key to the TLS containing the xbt_os_thread_t structure */
411 static unsigned long xbt_self_thread_key;
413 void xbt_os_thread_mod_init(void) {
414 xbt_self_thread_key = TlsAlloc();
416 void xbt_os_thread_mod_exit(void) {
418 if (!TlsFree(xbt_self_thread_key))
419 THROW0(system_error,(int)GetLastError(),"TlsFree() failed to cleanup the thread submodule");
422 static DWORD WINAPI wrapper_start_routine(void *s) {
423 xbt_os_thread_t t = (xbt_os_thread_t)s;
426 if(!TlsSetValue(xbt_self_thread_key,t))
427 THROW0(system_error,(int)GetLastError(),"TlsSetValue of data describing the created thread failed");
429 rv = (*(t->start_routine))(t->param);
431 return *((DWORD*)rv);
435 xbt_os_thread_t xbt_os_thread_create(const char *name,pvoid_f_pvoid_t start_routine,
438 xbt_os_thread_t t = xbt_new(s_xbt_os_thread_t,1);
440 t->name = xbt_strdup(name);
441 t->start_routine = start_routine ;
444 t->handle = CreateThread(NULL,XBT_DEFAULT_THREAD_STACK_SIZE,
445 (LPTHREAD_START_ROUTINE)wrapper_start_routine,
446 t,STACK_SIZE_PARAM_IS_A_RESERVATION,&(t->id));
450 THROW0(system_error,(int)GetLastError(),"CreateThread failed");
456 const char* xbt_os_thread_name(xbt_os_thread_t t) {
460 const char* xbt_os_thread_self_name(void) {
461 xbt_os_thread_t t = xbt_os_thread_self();
462 return t?t->name:"main";
466 xbt_os_thread_join(xbt_os_thread_t thread,void ** thread_return) {
468 if(WAIT_OBJECT_0 != WaitForSingleObject(thread->handle,INFINITE))
469 THROW0(system_error,(int)GetLastError(), "WaitForSingleObject failed");
473 if(!GetExitCodeThread(thread->handle,(DWORD*)(*thread_return)))
474 THROW0(system_error,(int)GetLastError(), "GetExitCodeThread failed");
477 CloseHandle(thread->handle);
482 void xbt_os_thread_exit(int *retval) {
489 xbt_os_thread_t xbt_os_thread_self(void) {
490 return TlsGetValue(xbt_self_thread_key);
493 void *xbt_os_thread_getparam(void) {
494 xbt_os_thread_t t = xbt_os_thread_self();
499 void xbt_os_thread_yield(void) {
502 void xbt_os_thread_cancel(xbt_os_thread_t t) {
506 /****** mutex related functions ******/
507 typedef struct xbt_os_mutex_ {
508 /* KEEP IT IN SYNC WITH xbt_thread.c */
509 CRITICAL_SECTION lock;
512 xbt_os_mutex_t xbt_os_mutex_init(void) {
513 xbt_os_mutex_t res = xbt_new(s_xbt_os_mutex_t,1);
515 /* initialize the critical section object */
516 InitializeCriticalSection(&(res->lock));
521 void xbt_os_mutex_acquire(xbt_os_mutex_t mutex) {
523 EnterCriticalSection(& mutex->lock);
526 void xbt_os_mutex_release(xbt_os_mutex_t mutex) {
528 LeaveCriticalSection (& mutex->lock);
532 void xbt_os_mutex_destroy(xbt_os_mutex_t mutex) {
536 DeleteCriticalSection(& mutex->lock);
540 /***** condition related functions *****/
541 enum { /* KEEP IT IN SYNC WITH xbt_thread.c */
547 typedef struct xbt_os_cond_ {
548 /* KEEP IT IN SYNC WITH xbt_thread.c */
549 HANDLE events[MAX_EVENTS];
551 unsigned int waiters_count; /* the number of waiters */
552 CRITICAL_SECTION waiters_count_lock; /* protect access to waiters_count */
555 xbt_os_cond_t xbt_os_cond_init(void) {
557 xbt_os_cond_t res = xbt_new0(s_xbt_os_cond_t,1);
559 memset(& res->waiters_count_lock,0,sizeof(CRITICAL_SECTION));
561 /* initialize the critical section object */
562 InitializeCriticalSection(& res->waiters_count_lock);
564 res->waiters_count = 0;
566 /* Create an auto-reset event */
567 res->events[SIGNAL] = CreateEvent (NULL, FALSE, FALSE, NULL);
569 if(!res->events[SIGNAL]){
570 DeleteCriticalSection(& res->waiters_count_lock);
572 THROW0(system_error,0,"CreateEvent failed for the signals");
575 /* Create a manual-reset event. */
576 res->events[BROADCAST] = CreateEvent (NULL, TRUE, FALSE,NULL);
578 if(!res->events[BROADCAST]){
580 DeleteCriticalSection(& res->waiters_count_lock);
581 CloseHandle(res->events[SIGNAL]);
583 THROW0(system_error,0,"CreateEvent failed for the broadcasts");
589 void xbt_os_cond_wait(xbt_os_cond_t cond, xbt_os_mutex_t mutex) {
591 unsigned long wait_result;
594 /* lock the threads counter and increment it */
595 EnterCriticalSection (& cond->waiters_count_lock);
596 cond->waiters_count++;
597 LeaveCriticalSection (& cond->waiters_count_lock);
599 /* unlock the mutex associate with the condition */
600 LeaveCriticalSection (& mutex->lock);
602 /* wait for a signal (broadcast or no) */
603 wait_result = WaitForMultipleObjects (2, cond->events, FALSE, INFINITE);
605 if(wait_result == WAIT_FAILED)
606 THROW0(system_error,0,"WaitForMultipleObjects failed, so we cannot wait on the condition");
608 /* we have a signal lock the condition */
609 EnterCriticalSection (& cond->waiters_count_lock);
610 cond->waiters_count--;
612 /* it's the last waiter or it's a broadcast ? */
613 is_last_waiter = ((wait_result == WAIT_OBJECT_0 + BROADCAST - 1) && (cond->waiters_count == 0));
615 LeaveCriticalSection (& cond->waiters_count_lock);
617 /* yes it's the last waiter or it's a broadcast
618 * only reset the manual event (the automatic event is reset in the WaitForMultipleObjects() function
622 if(!ResetEvent (cond->events[BROADCAST]))
623 THROW0(system_error,0,"ResetEvent failed");
625 /* relock the mutex associated with the condition in accordance with the posix thread specification */
626 EnterCriticalSection (& mutex->lock);
628 void xbt_os_cond_timedwait(xbt_os_cond_t cond, xbt_os_mutex_t mutex, double delay) {
630 unsigned long wait_result = WAIT_TIMEOUT;
632 unsigned long end = (unsigned long)(delay * 1000);
636 xbt_os_cond_wait(cond,mutex);
638 DEBUG3("xbt_cond_timedwait(%p,%p,%ul)",&(cond->events),&(mutex->lock),end);
640 /* lock the threads counter and increment it */
641 EnterCriticalSection (& cond->waiters_count_lock);
642 cond->waiters_count++;
643 LeaveCriticalSection (& cond->waiters_count_lock);
645 /* unlock the mutex associate with the condition */
646 LeaveCriticalSection (& mutex->lock);
647 /* wait for a signal (broadcast or no) */
649 wait_result = WaitForMultipleObjects (2, cond->events, FALSE, end);
651 switch(wait_result) {
653 THROW3(timeout_error,GetLastError(),"condition %p (mutex %p) wasn't signaled before timeout (%f)",cond,mutex, delay);
655 THROW0(system_error,GetLastError(),"WaitForMultipleObjects failed, so we cannot wait on the condition");
658 /* we have a signal lock the condition */
659 EnterCriticalSection (& cond->waiters_count_lock);
660 cond->waiters_count--;
662 /* it's the last waiter or it's a broadcast ? */
663 is_last_waiter = ((wait_result == WAIT_OBJECT_0 + BROADCAST - 1) && (cond->waiters_count == 0));
665 LeaveCriticalSection (& cond->waiters_count_lock);
667 /* yes it's the last waiter or it's a broadcast
668 * only reset the manual event (the automatic event is reset in the WaitForMultipleObjects() function
672 if(!ResetEvent (cond->events[BROADCAST]))
673 THROW0(system_error,0,"ResetEvent failed");
675 /* relock the mutex associated with the condition in accordance with the posix thread specification */
676 EnterCriticalSection (& mutex->lock);
678 /*THROW_UNIMPLEMENTED;*/
681 void xbt_os_cond_signal(xbt_os_cond_t cond) {
684 EnterCriticalSection (& cond->waiters_count_lock);
685 have_waiters = cond->waiters_count > 0;
686 LeaveCriticalSection (& cond->waiters_count_lock);
689 if(!SetEvent(cond->events[SIGNAL]))
690 THROW0(system_error,0,"SetEvent failed");
692 xbt_os_thread_yield();
695 void xbt_os_cond_broadcast(xbt_os_cond_t cond){
698 EnterCriticalSection (& cond->waiters_count_lock);
699 have_waiters = cond->waiters_count > 0;
700 LeaveCriticalSection (& cond->waiters_count_lock);
703 SetEvent(cond->events[BROADCAST]);
706 void xbt_os_cond_destroy(xbt_os_cond_t cond){
711 if(!CloseHandle(cond->events[SIGNAL]))
714 if(!CloseHandle(cond->events[BROADCAST]))
717 DeleteCriticalSection(& cond->waiters_count_lock);
722 THROW0(system_error,0,"Error while destroying the condition");
725 typedef struct xbt_os_sem_ {
728 CRITICAL_SECTION value_lock; /* protect access to value of the semaphore */
732 xbt_os_sem_init(unsigned int value)
737 THROW1(arg_error,value,"Semaphore initial value too big: %ud cannot be stored as a signed int",
740 res = (xbt_os_sem_t)xbt_new0(s_xbt_os_sem_t,1);
742 if(!(res->h = CreateSemaphore(NULL,value,(long)INT_MAX,NULL))) {
743 THROW1(system_error,GetLastError(),"CreateSemaphore() failed: %s",
744 strerror(GetLastError()));
750 InitializeCriticalSection(&(res->value_lock));
756 xbt_os_sem_acquire(xbt_os_sem_t sem)
759 THROW0(arg_error,EINVAL,"Cannot acquire the NULL semaphore");
762 if(WAIT_OBJECT_0 != WaitForSingleObject(sem->h,INFINITE))
763 THROW1(system_error,GetLastError(),"WaitForSingleObject() failed: %s",
764 strerror(GetLastError()));
765 EnterCriticalSection(&(sem->value_lock));
767 LeaveCriticalSection(&(sem->value_lock));
770 void xbt_os_sem_timedacquire(xbt_os_sem_t sem, double timeout)
774 double end = timeout + xbt_os_time();
777 THROW0(arg_error,EINVAL,"Cannot acquire the NULL semaphore");
781 xbt_os_sem_acquire(sem);
786 seconds = (long) floor(end);
787 milliseconds = (long)( ( end - seconds) * 1000);
788 milliseconds += (seconds * 1000);
790 switch(WaitForSingleObject(sem->h,milliseconds))
793 EnterCriticalSection(&(sem->value_lock));
795 LeaveCriticalSection(&(sem->value_lock));
799 THROW2(timeout_error,GetLastError(),"semaphore %p wasn't signaled before timeout (%f)",sem,timeout);
804 THROW3(system_error,GetLastError(),"WaitForSingleObject(%p,%f) failed: %s",sem,timeout, strerror(GetLastError()));
810 xbt_os_sem_release(xbt_os_sem_t sem)
813 THROW0(arg_error,EINVAL,"Cannot release the NULL semaphore");
815 if(!ReleaseSemaphore(sem->h,1, NULL))
816 THROW1(system_error,GetLastError(),"ReleaseSemaphore() failed: %s",
817 strerror(GetLastError()));
818 EnterCriticalSection (&(sem->value_lock));
820 LeaveCriticalSection(&(sem->value_lock));
824 xbt_os_sem_destroy(xbt_os_sem_t sem)
828 if(!CloseHandle(sem->h))
829 THROW1(system_error,GetLastError(),"CloseHandle() failed: %s",
830 strerror(GetLastError()));
832 DeleteCriticalSection(&(sem->value_lock));
839 xbt_os_sem_get_value(xbt_os_sem_t sem, int* svalue)
842 THROW0(arg_error,EINVAL,"Cannot get the value of the NULL semaphore");
844 EnterCriticalSection(&(sem->value_lock));
845 *svalue = sem->value;
846 LeaveCriticalSection(&(sem->value_lock));