3 /* xbt_os_thread -- portability layer over the pthread API */
4 /* Used in RL to get win/lin portability, and in SG when CONTEXT_THREAD */
5 /* in SG, when using CONTEXT_UCONTEXT, xbt_os_thread_stub is used instead */
7 /* Copyright 2006,2007 Malek Cherier, Martin Quinson
8 * All right reserved. */
10 /* This program is free software; you can redistribute it and/or modify it
11 * under the terms of the license (GNU LGPL) which comes with this package. */
13 #include "xbt/sysdep.h"
15 #include "xbt/ex_interface.h" /* We play crude games with exceptions */
17 #include "xbt/xbt_os_time.h" /* Portable time facilities */
18 #include "xbt/xbt_os_thread.h" /* This module */
19 #include "xbt_modinter.h" /* Initialization/finalization of this module */
21 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_sync_os,xbt,"Synchronization mechanism (OS-level)");
23 /* ********************************* PTHREAD IMPLEMENTATION ************************************ */
26 #include <semaphore.h>
28 typedef struct xbt_os_thread_ {
32 pvoid_f_pvoid_t start_routine;
35 static xbt_os_thread_t main_thread = NULL;
37 /* thread-specific data containing the xbt_os_thread_t structure */
38 static pthread_key_t xbt_self_thread_key;
39 static int thread_mod_inited = 0;
41 /* frees the xbt_os_thread_t corresponding to the current thread */
42 static void xbt_os_thread_free_thread_data(void*d){
46 /* callback: context fetching */
47 static ex_ctx_t *_os_thread_ex_ctx(void) {
48 return xbt_os_thread_self()->exception;
51 /* callback: termination */
52 static void _os_thread_ex_terminate(xbt_ex_t * e) {
56 /* FIXME: there should be a configuration variable to choose to kill everyone or only this one */
59 void xbt_os_thread_mod_init(void) {
62 if (thread_mod_inited)
65 if ((errcode=pthread_key_create(&xbt_self_thread_key, NULL)))
66 THROW0(system_error,errcode,"pthread_key_create failed for xbt_self_thread_key");
68 main_thread=xbt_new(s_xbt_os_thread_t,1);
69 main_thread->name = (char*)"main";
70 main_thread->start_routine = NULL;
71 main_thread->param = NULL;
72 main_thread->exception = xbt_new(ex_ctx_t, 1);
73 XBT_CTX_INITIALIZE(main_thread->exception);
75 __xbt_ex_ctx = _os_thread_ex_ctx;
76 __xbt_ex_terminate = _os_thread_ex_terminate;
78 thread_mod_inited = 1;
80 void xbt_os_thread_mod_exit(void) {
81 /* FIXME: don't try to free our key on shutdown.
82 Valgrind detects no leak if we don't, and whine if we try to */
85 // if ((errcode=pthread_key_delete(xbt_self_thread_key)))
86 // THROW0(system_error,errcode,"pthread_key_delete failed for xbt_self_thread_key");
89 static void * wrapper_start_routine(void *s) {
90 xbt_os_thread_t t = s;
93 if ((errcode=pthread_setspecific(xbt_self_thread_key,t)))
94 THROW0(system_error,errcode,
95 "pthread_setspecific failed for xbt_self_thread_key");
97 return (*(t->start_routine))(t->param);
99 xbt_os_thread_t xbt_os_thread_create(const char*name,
100 pvoid_f_pvoid_t start_routine,
104 xbt_os_thread_t res_thread=xbt_new(s_xbt_os_thread_t,1);
105 res_thread->name = xbt_strdup(name);
106 res_thread->start_routine = start_routine;
107 res_thread->param = param;
108 res_thread->exception = xbt_new(ex_ctx_t, 1);
109 XBT_CTX_INITIALIZE(res_thread->exception);
111 if ((errcode = pthread_create(&(res_thread->t), NULL,
112 wrapper_start_routine, res_thread)))
113 THROW1(system_error,errcode,
114 "pthread_create failed: %s",strerror(errcode));
119 const char* xbt_os_thread_name(xbt_os_thread_t t) {
123 const char* xbt_os_thread_self_name(void) {
124 xbt_os_thread_t self = xbt_os_thread_self();
125 return self?self->name:"main";
128 xbt_os_thread_join(xbt_os_thread_t thread,void ** thread_return) {
132 if ((errcode = pthread_join(thread->t,thread_return)))
133 THROW1(system_error,errcode, "pthread_join failed: %s",
135 if (thread->exception)
136 free(thread->exception);
138 if (thread == main_thread) /* just killed main thread */
144 void xbt_os_thread_exit(int *retval) {
145 pthread_exit(retval);
148 xbt_os_thread_t xbt_os_thread_self(void) {
151 if (!thread_mod_inited)
154 res = pthread_getspecific(xbt_self_thread_key);
162 void xbt_os_thread_yield(void) {
165 void xbt_os_thread_cancel(xbt_os_thread_t t) {
166 pthread_cancel(t->t);
168 /****** mutex related functions ******/
169 typedef struct xbt_os_mutex_ {
170 /* KEEP IT IN SYNC WITH xbt_thread.c */
174 xbt_os_mutex_t xbt_os_mutex_init(void) {
175 xbt_os_mutex_t res = xbt_new(s_xbt_os_mutex_t,1);
178 if ((errcode = pthread_mutex_init(&(res->m),NULL)))
179 THROW1(system_error,errcode,"pthread_mutex_init() failed: %s",
185 void xbt_os_mutex_acquire(xbt_os_mutex_t mutex) {
188 if ((errcode=pthread_mutex_lock(&(mutex->m))))
189 THROW2(system_error,errcode,"pthread_mutex_lock(%p) failed: %s",
190 mutex, strerror(errcode));
193 void xbt_os_mutex_release(xbt_os_mutex_t mutex) {
196 if ((errcode=pthread_mutex_unlock(&(mutex->m))))
197 THROW2(system_error,errcode,"pthread_mutex_unlock(%p) failed: %s",
198 mutex, strerror(errcode));
201 void xbt_os_mutex_destroy(xbt_os_mutex_t mutex) {
206 if ((errcode=pthread_mutex_destroy(&(mutex->m))))
207 THROW2(system_error,errcode,"pthread_mutex_destroy(%p) failed: %s",
208 mutex, strerror(errcode));
212 /***** condition related functions *****/
213 typedef struct xbt_os_cond_ {
214 /* KEEP IT IN SYNC WITH xbt_thread.c */
218 xbt_os_cond_t xbt_os_cond_init(void) {
219 xbt_os_cond_t res = xbt_new(s_xbt_os_cond_t,1);
221 if ((errcode=pthread_cond_init(&(res->c),NULL)))
222 THROW1(system_error,errcode,"pthread_cond_init() failed: %s",
228 void xbt_os_cond_wait(xbt_os_cond_t cond, xbt_os_mutex_t mutex) {
230 if ((errcode=pthread_cond_wait(&(cond->c),&(mutex->m))))
231 THROW3(system_error,errcode,"pthread_cond_wait(%p,%p) failed: %s",
232 cond,mutex, strerror(errcode));
237 void xbt_os_cond_timedwait(xbt_os_cond_t cond, xbt_os_mutex_t mutex, double delay) {
239 struct timespec ts_end;
240 double end = delay + xbt_os_time();
243 xbt_os_cond_wait(cond,mutex);
245 ts_end.tv_sec = (time_t) floor(end);
246 ts_end.tv_nsec = (long) ( ( end - ts_end.tv_sec) * 1000000000);
247 DEBUG3("pthread_cond_timedwait(%p,%p,%p)",&(cond->c),&(mutex->m), &ts_end);
248 switch ( (errcode=pthread_cond_timedwait(&(cond->c),&(mutex->m), &ts_end)) ) {
252 THROW3(timeout_error,errcode,"condition %p (mutex %p) wasn't signaled before timeout (%f)",
255 THROW4(system_error,errcode,"pthread_cond_timedwait(%p,%p,%f) failed: %s",
256 cond,mutex, delay, strerror(errcode));
261 void xbt_os_cond_signal(xbt_os_cond_t cond) {
263 if ((errcode=pthread_cond_signal(&(cond->c))))
264 THROW2(system_error,errcode,"pthread_cond_signal(%p) failed: %s",
265 cond, strerror(errcode));
268 void xbt_os_cond_broadcast(xbt_os_cond_t cond){
270 if ((errcode=pthread_cond_broadcast(&(cond->c))))
271 THROW2(system_error,errcode,"pthread_cond_broadcast(%p) failed: %s",
272 cond, strerror(errcode));
274 void xbt_os_cond_destroy(xbt_os_cond_t cond){
279 if ((errcode=pthread_cond_destroy(&(cond->c))))
280 THROW2(system_error,errcode,"pthread_cond_destroy(%p) failed: %s",
281 cond, strerror(errcode));
285 void *xbt_os_thread_getparam(void) {
286 xbt_os_thread_t t = xbt_os_thread_self();
287 return t?t->param:NULL;
290 typedef struct xbt_os_sem_ {
295 xbt_os_sem_init(unsigned int value)
297 xbt_os_sem_t res = xbt_new(s_xbt_os_sem_t,1);
299 if(sem_init(&(res->s),0,value) < 0)
300 THROW1(system_error,errno,"sem_init() failed: %s",
307 xbt_os_sem_acquire(xbt_os_sem_t sem)
310 THROW1(arg_error,EINVAL,"xbt_os_sem_acquire() failed: %s",
313 if(sem_wait(&(sem->s)) < 0)
314 THROW1(system_error,errno,"sem_wait() failed: %s",
318 void xbt_os_sem_timedacquire(xbt_os_sem_t sem,double timeout)
320 /* mac os x have not the sem_timedwait() function */
321 #ifndef HAVE_SEM_TIMEDWAIT
325 struct timespec ts_end;
326 double end = timeout + xbt_os_time();
329 THROW1(arg_error,EINVAL,"xbt_os_sem_timedacquire() failed: %s",strerror(EINVAL));
333 xbt_os_sem_acquire(sem);
337 ts_end.tv_sec = (time_t) floor(end);
338 ts_end.tv_nsec = (long) ( ( end - ts_end.tv_sec) * 1000000000);
339 DEBUG2("sem_timedwait(%p,%p)",&(sem->s),&ts_end);
341 switch ((errcode=sem_timedwait(&(sem->s),&ts_end)))
347 THROW2(timeout_error,errcode,"semaphore %p wasn't signaled before timeout (%f)",sem,timeout);
350 THROW3(system_error,errcode,"sem_timedwait(%p,%f) failed: %s",sem,timeout, strerror(errcode));
357 xbt_os_sem_release(xbt_os_sem_t sem)
360 THROW1(arg_error,EINVAL,"xbt_os_sem_release() failed: %s",
363 if(sem_post(&(sem->s)) < 0)
364 THROW1(system_error,errno,"sem_post() failed: %s",
369 xbt_os_sem_destroy(xbt_os_sem_t sem)
372 THROW1(arg_error,EINVAL,"xbt_os_sem_destroy() failed: %s",
375 if(sem_destroy(&(sem->s)) < 0)
376 THROW1(system_error,errno,"sem_destroy() failed: %s",
381 xbt_os_sem_get_value(xbt_os_sem_t sem, int* svalue)
384 THROW1(arg_error,EINVAL,"xbt_os_sem_getvalue() failed: %s",
387 if(sem_getvalue(&(sem->s),svalue) < 0)
388 THROW1(system_error,errno,"sem_getvalue() failed: %s",
392 /* ********************************* WINDOWS IMPLEMENTATION ************************************ */
398 typedef struct xbt_os_thread_ {
400 HANDLE handle; /* the win thread handle */
401 unsigned long id; /* the win thread id */
402 pvoid_f_pvoid_t start_routine;
404 } s_xbt_os_thread_t ;
406 /* so we can specify the size of the stack of the threads */
407 #ifndef STACK_SIZE_PARAM_IS_A_RESERVATION
408 #define STACK_SIZE_PARAM_IS_A_RESERVATION 0x00010000
411 /* the default size of the stack of the threads (in bytes)*/
412 #define XBT_DEFAULT_THREAD_STACK_SIZE 4096
414 /* key to the TLS containing the xbt_os_thread_t structure */
415 static unsigned long xbt_self_thread_key;
417 void xbt_os_thread_mod_init(void) {
418 xbt_self_thread_key = TlsAlloc();
420 void xbt_os_thread_mod_exit(void) {
422 if (!TlsFree(xbt_self_thread_key))
423 THROW0(system_error,(int)GetLastError(),"TlsFree() failed to cleanup the thread submodule");
426 static DWORD WINAPI wrapper_start_routine(void *s) {
427 xbt_os_thread_t t = (xbt_os_thread_t)s;
430 if(!TlsSetValue(xbt_self_thread_key,t))
431 THROW0(system_error,(int)GetLastError(),"TlsSetValue of data describing the created thread failed");
433 rv = (*(t->start_routine))(t->param);
435 return *((DWORD*)rv);
439 xbt_os_thread_t xbt_os_thread_create(const char *name,pvoid_f_pvoid_t start_routine,
442 xbt_os_thread_t t = xbt_new(s_xbt_os_thread_t,1);
444 t->name = xbt_strdup(name);
445 t->start_routine = start_routine ;
448 t->handle = CreateThread(NULL,XBT_DEFAULT_THREAD_STACK_SIZE,
449 (LPTHREAD_START_ROUTINE)wrapper_start_routine,
450 t,STACK_SIZE_PARAM_IS_A_RESERVATION,&(t->id));
454 THROW0(system_error,(int)GetLastError(),"CreateThread failed");
460 const char* xbt_os_thread_name(xbt_os_thread_t t) {
464 const char* xbt_os_thread_self_name(void) {
465 xbt_os_thread_t t = xbt_os_thread_self();
466 return t?t->name:"main";
470 xbt_os_thread_join(xbt_os_thread_t thread,void ** thread_return) {
472 if(WAIT_OBJECT_0 != WaitForSingleObject(thread->handle,INFINITE))
473 THROW0(system_error,(int)GetLastError(), "WaitForSingleObject failed");
477 if(!GetExitCodeThread(thread->handle,(DWORD*)(*thread_return)))
478 THROW0(system_error,(int)GetLastError(), "GetExitCodeThread failed");
481 CloseHandle(thread->handle);
486 void xbt_os_thread_exit(int *retval) {
493 xbt_os_thread_t xbt_os_thread_self(void) {
494 return TlsGetValue(xbt_self_thread_key);
497 void *xbt_os_thread_getparam(void) {
498 xbt_os_thread_t t = xbt_os_thread_self();
503 void xbt_os_thread_yield(void) {
506 void xbt_os_thread_cancel(xbt_os_thread_t t) {
510 /****** mutex related functions ******/
511 typedef struct xbt_os_mutex_ {
512 /* KEEP IT IN SYNC WITH xbt_thread.c */
513 CRITICAL_SECTION lock;
516 xbt_os_mutex_t xbt_os_mutex_init(void) {
517 xbt_os_mutex_t res = xbt_new(s_xbt_os_mutex_t,1);
519 /* initialize the critical section object */
520 InitializeCriticalSection(&(res->lock));
525 void xbt_os_mutex_acquire(xbt_os_mutex_t mutex) {
527 EnterCriticalSection(& mutex->lock);
530 void xbt_os_mutex_release(xbt_os_mutex_t mutex) {
532 LeaveCriticalSection (& mutex->lock);
536 void xbt_os_mutex_destroy(xbt_os_mutex_t mutex) {
540 DeleteCriticalSection(& mutex->lock);
544 /***** condition related functions *****/
545 enum { /* KEEP IT IN SYNC WITH xbt_thread.c */
551 typedef struct xbt_os_cond_ {
552 /* KEEP IT IN SYNC WITH xbt_thread.c */
553 HANDLE events[MAX_EVENTS];
555 unsigned int waiters_count; /* the number of waiters */
556 CRITICAL_SECTION waiters_count_lock; /* protect access to waiters_count */
559 xbt_os_cond_t xbt_os_cond_init(void) {
561 xbt_os_cond_t res = xbt_new0(s_xbt_os_cond_t,1);
563 memset(& res->waiters_count_lock,0,sizeof(CRITICAL_SECTION));
565 /* initialize the critical section object */
566 InitializeCriticalSection(& res->waiters_count_lock);
568 res->waiters_count = 0;
570 /* Create an auto-reset event */
571 res->events[SIGNAL] = CreateEvent (NULL, FALSE, FALSE, NULL);
573 if(!res->events[SIGNAL]){
574 DeleteCriticalSection(& res->waiters_count_lock);
576 THROW0(system_error,0,"CreateEvent failed for the signals");
579 /* Create a manual-reset event. */
580 res->events[BROADCAST] = CreateEvent (NULL, TRUE, FALSE,NULL);
582 if(!res->events[BROADCAST]){
584 DeleteCriticalSection(& res->waiters_count_lock);
585 CloseHandle(res->events[SIGNAL]);
587 THROW0(system_error,0,"CreateEvent failed for the broadcasts");
593 void xbt_os_cond_wait(xbt_os_cond_t cond, xbt_os_mutex_t mutex) {
595 unsigned long wait_result;
598 /* lock the threads counter and increment it */
599 EnterCriticalSection (& cond->waiters_count_lock);
600 cond->waiters_count++;
601 LeaveCriticalSection (& cond->waiters_count_lock);
603 /* unlock the mutex associate with the condition */
604 LeaveCriticalSection (& mutex->lock);
606 /* wait for a signal (broadcast or no) */
607 wait_result = WaitForMultipleObjects (2, cond->events, FALSE, INFINITE);
609 if(wait_result == WAIT_FAILED)
610 THROW0(system_error,0,"WaitForMultipleObjects failed, so we cannot wait on the condition");
612 /* we have a signal lock the condition */
613 EnterCriticalSection (& cond->waiters_count_lock);
614 cond->waiters_count--;
616 /* it's the last waiter or it's a broadcast ? */
617 is_last_waiter = ((wait_result == WAIT_OBJECT_0 + BROADCAST - 1) && (cond->waiters_count == 0));
619 LeaveCriticalSection (& cond->waiters_count_lock);
621 /* yes it's the last waiter or it's a broadcast
622 * only reset the manual event (the automatic event is reset in the WaitForMultipleObjects() function
626 if(!ResetEvent (cond->events[BROADCAST]))
627 THROW0(system_error,0,"ResetEvent failed");
629 /* relock the mutex associated with the condition in accordance with the posix thread specification */
630 EnterCriticalSection (& mutex->lock);
632 void xbt_os_cond_timedwait(xbt_os_cond_t cond, xbt_os_mutex_t mutex, double delay) {
634 unsigned long wait_result = WAIT_TIMEOUT;
636 unsigned long end = (unsigned long)(delay * 1000);
640 xbt_os_cond_wait(cond,mutex);
642 DEBUG3("xbt_cond_timedwait(%p,%p,%ul)",&(cond->events),&(mutex->lock),end);
644 /* lock the threads counter and increment it */
645 EnterCriticalSection (& cond->waiters_count_lock);
646 cond->waiters_count++;
647 LeaveCriticalSection (& cond->waiters_count_lock);
649 /* unlock the mutex associate with the condition */
650 LeaveCriticalSection (& mutex->lock);
651 /* wait for a signal (broadcast or no) */
653 wait_result = WaitForMultipleObjects (2, cond->events, FALSE, end);
655 switch(wait_result) {
657 THROW3(timeout_error,GetLastError(),"condition %p (mutex %p) wasn't signaled before timeout (%f)",cond,mutex, delay);
659 THROW0(system_error,GetLastError(),"WaitForMultipleObjects failed, so we cannot wait on the condition");
662 /* we have a signal lock the condition */
663 EnterCriticalSection (& cond->waiters_count_lock);
664 cond->waiters_count--;
666 /* it's the last waiter or it's a broadcast ? */
667 is_last_waiter = ((wait_result == WAIT_OBJECT_0 + BROADCAST - 1) && (cond->waiters_count == 0));
669 LeaveCriticalSection (& cond->waiters_count_lock);
671 /* yes it's the last waiter or it's a broadcast
672 * only reset the manual event (the automatic event is reset in the WaitForMultipleObjects() function
676 if(!ResetEvent (cond->events[BROADCAST]))
677 THROW0(system_error,0,"ResetEvent failed");
679 /* relock the mutex associated with the condition in accordance with the posix thread specification */
680 EnterCriticalSection (& mutex->lock);
682 /*THROW_UNIMPLEMENTED;*/
685 void xbt_os_cond_signal(xbt_os_cond_t cond) {
688 EnterCriticalSection (& cond->waiters_count_lock);
689 have_waiters = cond->waiters_count > 0;
690 LeaveCriticalSection (& cond->waiters_count_lock);
693 if(!SetEvent(cond->events[SIGNAL]))
694 THROW0(system_error,0,"SetEvent failed");
696 xbt_os_thread_yield();
699 void xbt_os_cond_broadcast(xbt_os_cond_t cond){
702 EnterCriticalSection (& cond->waiters_count_lock);
703 have_waiters = cond->waiters_count > 0;
704 LeaveCriticalSection (& cond->waiters_count_lock);
707 SetEvent(cond->events[BROADCAST]);
710 void xbt_os_cond_destroy(xbt_os_cond_t cond){
715 if(!CloseHandle(cond->events[SIGNAL]))
718 if(!CloseHandle(cond->events[BROADCAST]))
721 DeleteCriticalSection(& cond->waiters_count_lock);
726 THROW0(system_error,0,"Error while destroying the condition");
729 typedef struct xbt_os_sem_ {
732 CRITICAL_SECTION value_lock; /* protect access to value of the semaphore */
736 xbt_os_sem_init(unsigned int value)
741 THROW1(arg_error,EINVAL,"xbt_os_sem_init() failed: %s",
744 res = (xbt_os_sem_t)xbt_new0(s_xbt_os_sem_t,1);
746 if(!(res->h = CreateSemaphore(NULL,value,(long)INT_MAX,NULL))) {
747 THROW1(system_error,GetLastError(),"CreateSemaphore() failed: %s",
748 strerror(GetLastError()));
754 InitializeCriticalSection(&(res->value_lock));
760 xbt_os_sem_acquire(xbt_os_sem_t sem)
763 THROW1(arg_error,EINVAL,"xbt_os_sem_acquire() failed: %s",
767 if(WAIT_OBJECT_0 != WaitForSingleObject(sem->h,INFINITE))
768 THROW1(system_error,GetLastError(),"WaitForSingleObject() failed: %s",
769 strerror(GetLastError()));
770 EnterCriticalSection(&(sem->value_lock));
772 LeaveCriticalSection(&(sem->value_lock));
775 void xbt_os_sem_timedacquire(xbt_os_sem_t sem, double timeout)
779 double end = timeout + xbt_os_time();
782 THROW1(arg_error,EINVAL,"xbt_os_sem_timedacquire() failed: %s",
787 xbt_os_sem_acquire(sem);
792 seconds = (long) floor(end);
793 milliseconds = (long)( ( end - seconds) * 1000);
794 milliseconds += (seconds * 1000);
796 switch(WaitForSingleObject(sem->h,milliseconds))
799 EnterCriticalSection(&(sem->value_lock));
801 LeaveCriticalSection(&(sem->value_lock));
805 THROW2(timeout_error,GetLastError(),"semaphore %p wasn't signaled before timeout (%f)",sem,timeout);
810 THROW3(system_error,GetLastError(),"WaitForSingleObject(%p,%f) failed: %s",sem,timeout, strerror(GetLastError()));
816 xbt_os_sem_release(xbt_os_sem_t sem)
819 THROW1(arg_error,EINVAL,"xbt_os_sem_post() failed: %s",
822 if(!ReleaseSemaphore(sem->h,1, NULL))
823 THROW1(system_error,GetLastError(),"ReleaseSemaphore() failed: %s",
824 strerror(GetLastError()));
825 EnterCriticalSection (&(sem->value_lock));
827 LeaveCriticalSection(&(sem->value_lock));
831 xbt_os_sem_destroy(xbt_os_sem_t sem)
834 THROW1(arg_error,EINVAL,"xbt_os_sem_destroy() failed: %s",
837 if(!CloseHandle(sem->h))
838 THROW1(system_error,GetLastError(),"CloseHandle() failed: %s",
839 strerror(GetLastError()));
841 DeleteCriticalSection(&(sem->value_lock));
848 xbt_os_sem_get_value(xbt_os_sem_t sem, int* svalue)
851 THROW1(arg_error,EINVAL,"xbt_os_sem_get_value() failed: %s",
854 EnterCriticalSection(&(sem->value_lock));
855 *svalue = sem->value;
856 LeaveCriticalSection(&(sem->value_lock));