From e55e648f29b4c7654bc6a04edc276d5be21cf106 Mon Sep 17 00:00:00 2001 From: Christian Thaeter Date: Mon, 18 Jan 2010 00:34:53 +0100 Subject: [PATCH 1/4] Thread syncronization and joining add a custom defined range for states provide functions for barrier like syncing between 2 threads provide joinable threads and a thread_join() function --- src/backend/threads.c | 100 ++++++++++++++++++++++++++++++++++++++++-- src/backend/threads.h | 76 ++++++++++++++++++++++++++++---- 2 files changed, 164 insertions(+), 12 deletions(-) diff --git a/src/backend/threads.c b/src/backend/threads.c index 3dd1bdbae..3c175e81a 100644 --- a/src/backend/threads.c +++ b/src/backend/threads.c @@ -59,18 +59,37 @@ const char* lumiera_threadstate_names[] = { }; #undef LUMIERA_THREAD_STATE -static void* thread_loop (void* thread) +LUMIERA_ERROR_DEFINE(THREAD, "fatal threads initialization error"); + +/* thread local storage pointing back to the thread structure of each thread */ +static pthread_key_t lumiera_thread_tls; +static pthread_once_t lumiera_thread_initialized = PTHREAD_ONCE_INIT; + +static void +lumiera_thread_tls_init (void) +{ + if (!!pthread_key_create (&lumiera_thread_tls, NULL)) + LUMIERA_DIE (THREAD); /* should never happen */ +} + + +static void* +thread_loop (void* thread) { TRACE (threads); NOBUG_THREAD_ID_SET ("worker"); LumieraThread t = (LumieraThread)thread; + pthread_setspecific (lumiera_thread_tls, t); + pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, NULL); REQUIRE (t, "thread does not exist"); LUMIERA_CONDITION_SECTION (threads, &t->signal) { + t->rh = &lumiera_lock_section_.rh; + do { // NULL function means: no work to do INFO (threads, "function %p", t->function); @@ -80,10 +99,21 @@ static void* thread_loop (void* thread) LUMIERA_CONDITION_WAIT (t->state != LUMIERA_THREADSTATE_IDLE); INFO (threads, "Thread awaken with state %d", t->state); } while (t->state != LUMIERA_THREADSTATE_SHUTDOWN); - // SHUTDOWN state + // SHUTDOWN state + + if (t->kind & LUMIERA_THREAD_JOINABLE) + { + INFO (threads, "Thread zombified"); + /* move error state to data the other thread will it pick up from there */ + t->arguments = (void*)lumiera_error (); + t->state = LUMIERA_THREADSTATE_ZOMBIE; + LUMIERA_CONDITION_WAIT (t->state == LUMIERA_THREADSTATE_JOINED); + INFO (threads, "Thread joined"); + } INFO (threads, "Thread Shutdown"); } + TODO ("no error must be pending here, else do app shutdown"); return 0; } @@ -126,9 +156,10 @@ lumiera_thread_new (enum lumiera_thread_class kind, struct nobug_flag* flag, pthread_attr_t* attrs) { + pthread_once (&lumiera_thread_initialized, lumiera_thread_tls_init); + // TODO: do something with these: (void) purpose; - REQUIRE (kind < LUMIERA_THREADCLASS_COUNT, "invalid thread kind specified: %d", kind); REQUIRE (attrs, "invalid pthread attributes structure passed"); LumieraThread self = lumiera_malloc (sizeof (*self)); @@ -183,6 +214,69 @@ lumiera_thread_delete (LumieraThread self) lumiera_free (lumiera_thread_destroy (self)); } + +LumieraThread +lumiera_thread_self (void) +{ + pthread_once (&lumiera_thread_initialized, lumiera_thread_tls_init); + return pthread_getspecific (lumiera_thread_tls); +} + + +LumieraThread +lumiera_thread_sync_other (LumieraThread other, int state) +{ + TRACE(threads); + + LUMIERA_CONDITION_SECTION (threads, &other->signal) + { + REQUIRE (other->state == ~state); TODO("Runtime error when state expectation isnt met"); + other->state = state; + LUMIERA_CONDITION_SIGNAL; + } + return other; +} + + +LumieraThread +lumiera_thread_sync (int state) +{ + TRACE(threads); + + LumieraThread self = lumiera_thread_self (); + REQUIRE(self, "not a lumiera thread"); + + self->state = ~state; + + TODO("error handing, maybe timed mutex (using the threads heartbeat timeout, shortly before timeout)"); + + do { + lumiera_condition_wait (&self->signal, &NOBUG_FLAG(threads), self->rh); + } while (self->state != state); + + return self; +} + + + +lumiera_err +lumiera_thread_join (LumieraThread thread) +{ + TRACE(threads); + lumiera_err ret = NULL; + + LUMIERA_CONDITION_SECTION (threads, &thread->signal) + { + LUMIERA_CONDITION_WAIT (thread->state == LUMIERA_THREADSTATE_ZOMBIE); + ret = (lumiera_err)thread->arguments; + thread->state = LUMIERA_THREADSTATE_JOINED; + LUMIERA_CONDITION_SIGNAL; /* kiss it a last goodbye */ + } + return ret; +} + + + /* // Local Variables: // mode: C diff --git a/src/backend/threads.h b/src/backend/threads.h index 42d76bcfe..34a38eb40 100644 --- a/src/backend/threads.h +++ b/src/backend/threads.h @@ -80,10 +80,15 @@ enum lumiera_thread_class * flag to let the decision to run the function in a thread open to the backend. * depending on load it might decide to run it sequentially. * This has some constraints: - * The condition variable to signal the finish of the thread must not be used. * The Thread must be very careful with locking, better don't. + * TODO explain syncronization issues **/ - LUMIERA_THREAD_OR_NOT = 1<<16 + LUMIERA_THREAD_OR_NOT = 1<<16, + + /** + * Thread must be joined finally + **/ + LUMIERA_THREAD_JOINABLE = 1<<17 }; #undef LUMIERA_THREAD_CLASS @@ -95,11 +100,13 @@ extern const char* lumiera_threadclass_names[]; // on one hand it could be used to tell the current state of the thread // on the other, it is used to tell the thread which state to enter on next iteration #define LUMIERA_THREAD_STATES \ - LUMIERA_THREAD_STATE(IDLE) \ LUMIERA_THREAD_STATE(ERROR) \ + LUMIERA_THREAD_STATE(IDLE) \ LUMIERA_THREAD_STATE(RUNNING) \ LUMIERA_THREAD_STATE(WAKEUP) \ LUMIERA_THREAD_STATE(SHUTDOWN) \ + LUMIERA_THREAD_STATE(ZOMBIE) \ + LUMIERA_THREAD_STATE(JOINED) \ LUMIERA_THREAD_STATE(STARTUP) #define LUMIERA_THREAD_STATE(name) LUMIERA_THREADSTATE_##name, @@ -108,9 +115,12 @@ extern const char* lumiera_threadclass_names[]; * Thread state. * These are the only states our threads can be in. */ -typedef enum +typedef enum { LUMIERA_THREAD_STATES + + LUMIERA_THREADSTATE_CUSTOM_START = 1024, + LUMIERA_THREADSTATE_CUSTOM_END = 32768, } lumiera_thread_state; @@ -127,18 +137,20 @@ extern const char* lumiera_threadstate_names[]; struct lumiera_thread_struct { llist node; // this should be first for easy casting - // the function and argument can be passed to the thread at creation time - // void (*function)(void*); - // void* arg; + pthread_t id; // TODO: maybe this condition variable should be renamed when we have a better understanding of how it will be used lumiera_condition signal; // control signal, state change signal + + struct nobug_resource_user** rh; + // the following member could have been called "class" except that it would conflict with C++ keyword // as consequence, it's been decided to leave the type name containing the word "class", // while all members/variables called "kind" - enum lumiera_thread_class kind; + int kind; + // this is used both as a command and as a state tracker - lumiera_thread_state state; + int state; void (*function)(void *); void * arguments; }; @@ -189,6 +201,52 @@ lumiera_thread_run (enum lumiera_thread_class kind, const char* purpose, struct nobug_flag* flag); +/** + * Query the LumieraThread handle of the current thread + * + * + * @return pointer to the (opaque) handle of the current lumiera thread or NULL when this is not a lumiera thread + */ +LumieraThread +lumiera_thread_self (void); + + +/** + * Thread syncronization + * Lumiera threads can be syncronized with custom states. + * The syncronization primitives act as barrier over 2 threads, any thread reaching a syncronization + * point first is blocked until the other one reaches it with the same state. + * Providing different states is errorneous! + */ + + +/** + * Syncronize with another threads state + * + * this blocks until/unless the other thread reaches 'state' + */ +LumieraThread +lumiera_thread_sync_other (LumieraThread other, int state); + +/** + * Syncronize current thread + * + * signifies that this thread reached 'state' and blocks until/unless + * some other thread synced with this state + * @return on success pointer to self (opaque), or NULL on error + */ +LumieraThread +lumiera_thread_sync (int state); + +/** + * Joining threads + * a thread can be set up with the LUMEIRA_THREAD_JOINABLE flag, if so + * then it must be joined finally. Joining clears the error state of the joined thread + * and returns it to the joiner. + * + */ +lumiera_err +lumiera_thread_join (LumieraThread thread); #endif /* From 87918c657c4d013489dd9f8fcb052c15b7e7a57f Mon Sep 17 00:00:00 2001 From: Christian Thaeter Date: Mon, 18 Jan 2010 14:23:23 +0100 Subject: [PATCH 2/4] check wait condition before loop body --- src/lib/condition.h | 9 ++++----- src/lib/reccondition.h | 9 ++++----- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/src/lib/condition.h b/src/lib/condition.h index 51abfb61a..9ad3370d7 100644 --- a/src/lib/condition.h +++ b/src/lib/condition.h @@ -92,12 +92,12 @@ * @param expr Conditon which must become true, else the condition variable goes back into sleep */ #define LUMIERA_CONDITION_WAIT(expr) \ - do { \ + while (!(expr)) { \ REQUIRE (lumiera_lock_section_.lock, "Condition mutex not locked"); \ lumiera_condition_wait (lumiera_lock_section_.lock, \ lumiera_lock_section_.flag, \ &lumiera_lock_section_.rh); \ - } while (!(expr)) + } /** @@ -108,15 +108,14 @@ * sets LUMIERA_ERROR_LOCK_TIMEOUT when the timeout passed */ #define LUMIERA_CONDITION_TIMEDWAIT(expr, timeout) \ - do { \ + while (!(expr)) { \ REQUIRE (lumiera_lock_section_.lock, "Condition mutex not locked"); \ if (!lumiera_condition_timedwait (lumiera_lock_section_.lock, \ timeout, \ lumiera_lock_section_.flag, \ &lumiera_lock_section_.rh)) \ break; \ - } while (!(expr)) - + } /** * Signal a condition variable diff --git a/src/lib/reccondition.h b/src/lib/reccondition.h index bbb321861..b8b8fbd97 100644 --- a/src/lib/reccondition.h +++ b/src/lib/reccondition.h @@ -91,12 +91,12 @@ * @param expr Condition which must become true, else the condition variable goes back into sleep */ #define LUMIERA_RECCONDITION_WAIT(expr) \ - do { \ + while (!(expr)) { \ REQUIRE (lumiera_lock_section_.lock, "Reccondition mutex not locked"); \ lumiera_reccondition_wait (lumiera_lock_section_.lock, \ lumiera_lock_section_.flag, \ &lumiera_lock_section_.rh); \ - } while (!(expr)) + } /** @@ -107,15 +107,14 @@ * sets LUMIERA_ERROR_LOCK_TIMEOUT when the timeout passed */ #define LUMIERA_RECCONDITION_TIMEDWAIT(expr, timeout) \ - do { \ + while (!(expr)) { \ REQUIRE (lumiera_lock_section_.lock, "Reccondition mutex not locked"); \ if (!lumiera_reccondition_timedwait (lumiera_lock_section_.lock, \ timeout, \ lumiera_lock_section_.flag, \ &lumiera_lock_section_.rh)) \ break; \ - } while (!(expr)) - + } From cc4dc25f4ad88402988f08cdbd642d5790e8a3e7 Mon Sep 17 00:00:00 2001 From: Christian Thaeter Date: Mon, 18 Jan 2010 16:23:28 +0100 Subject: [PATCH 3/4] while loop is little more solid than do..while --- src/backend/threads.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/backend/threads.c b/src/backend/threads.c index 3c175e81a..d52c7eb2c 100644 --- a/src/backend/threads.c +++ b/src/backend/threads.c @@ -250,9 +250,9 @@ lumiera_thread_sync (int state) TODO("error handing, maybe timed mutex (using the threads heartbeat timeout, shortly before timeout)"); - do { + while (self->state != state) { lumiera_condition_wait (&self->signal, &NOBUG_FLAG(threads), self->rh); - } while (self->state != state); + } return self; } From 7371db8a2c385ef7dfc326268f2bdfd96d933a2d Mon Sep 17 00:00:00 2001 From: Christian Thaeter Date: Mon, 18 Jan 2010 16:24:45 +0100 Subject: [PATCH 4/4] thread deadlines, first implementation --- src/backend/threads.c | 64 +++++++++++++++++++++++++++++++++++++++++++ src/backend/threads.h | 36 ++++++++++++++++++++++++ 2 files changed, 100 insertions(+) diff --git a/src/backend/threads.c b/src/backend/threads.c index d52c7eb2c..ded04a60e 100644 --- a/src/backend/threads.c +++ b/src/backend/threads.c @@ -33,6 +33,7 @@ //TODO: System includes// #include +#include #include /** @@ -136,6 +137,8 @@ lumiera_thread_run (enum lumiera_thread_class kind, self->function = function; self->arguments = arg; + self->deadline.tv_sec = 0; + // and let it really run (signal the condition var, the thread waits on it) self->state = LUMIERA_THREADSTATE_WAKEUP; @@ -169,6 +172,8 @@ lumiera_thread_new (enum lumiera_thread_class kind, self->state = LUMIERA_THREADSTATE_STARTUP; self->function = NULL; self->arguments = NULL; + self->deadline.tv_sec = 0; + self->deadline.tv_nsec = 0; int error = pthread_create (&self->id, attrs, &thread_loop, self); if (error) @@ -223,6 +228,65 @@ lumiera_thread_self (void) } +/** + * Set a threads deadline + * A thread must finish before its deadline is hit. Otherwise it counts as stalled + * which is a fatal error which might pull the application down. + */ +LumieraThread +lumiera_thread_deadline_set (struct timespec deadline) +{ + LumieraThread self = lumiera_thread_self (); + if (self) + self->deadline = deadline; + return self; +} + + +/** + * Extend a threads deadline + * sets the deadline to now+ms in future. This can be used to implement a heartbeat. + */ +LumieraThread +lumiera_thread_deadline_extend (unsigned ms) +{ + LumieraThread self = lumiera_thread_self (); + if (self) + { + struct timespec deadline; + clock_gettime (CLOCK_REALTIME, &deadline); + deadline.tv_sec += ms / 1000; + deadline.tv_nsec += 1000000 * (ms % 1000); + if (deadline.tv_nsec > 1000000000) + { + deadline.tv_sec += (deadline.tv_nsec / 1000000000); + deadline.tv_nsec %= 1000000000; + } + self->deadline = deadline; + } + + return self; +} + + +/** + * Clear a threads deadline + * Threads without deadline will not be checked against deadlocks (this is the default) + */ +LumieraThread +lumiera_thread_deadline_clear (void) +{ + LumieraThread self = lumiera_thread_self (); + if (self) + { + self->deadline.tv_sec = 0; + self->deadline.tv_nsec = 0; + } + return self; +} + + + LumieraThread lumiera_thread_sync_other (LumieraThread other, int state) { diff --git a/src/backend/threads.h b/src/backend/threads.h index 34a38eb40..131d25911 100644 --- a/src/backend/threads.h +++ b/src/backend/threads.h @@ -142,6 +142,8 @@ struct lumiera_thread_struct // TODO: maybe this condition variable should be renamed when we have a better understanding of how it will be used lumiera_condition signal; // control signal, state change signal + struct timespec deadline; + struct nobug_resource_user** rh; // the following member could have been called "class" except that it would conflict with C++ keyword @@ -210,6 +212,40 @@ lumiera_thread_run (enum lumiera_thread_class kind, LumieraThread lumiera_thread_self (void); +/** + * Heartbeat and Deadlines + * + * Any thread can have an optional 'deadline' which must never be hit. + * This deadlines are lazily checked and if hit this is a fatal error which triggers + * an emergency shutdown. Thus threads are obliged to set and extend their deadlines + * accordingly. + * + */ + +/** + * Set a threads deadline + * A thread must finish before its deadline is hit. Otherwise it counts as stalled + * which is a fatal error which might pull the application down. + */ +LumieraThread +lumiera_thread_deadline_set (struct timespec deadline); + + +/** + * Extend a threads deadline + * sets the deadline to now+ms in future. This can be used to implement a heartbeat. + */ +LumieraThread +lumiera_thread_deadline_extend (unsigned ms); + + +/** + * Clear a threads deadline + * Threads without deadline will not be checked against deadlocks (this is the default) + */ +LumieraThread +lumiera_thread_deadline_clear (void); + /** * Thread syncronization