Merge remote branch 'ct/backend_devel' into second-tp-attempt

This commit is contained in:
Michael Ploujnikov 2010-01-18 11:54:40 -05:00
commit ecbd29846d
4 changed files with 272 additions and 22 deletions

View file

@ -33,6 +33,7 @@
//TODO: System includes//
#include <pthread.h>
#include <time.h>
#include <errno.h>
/**
@ -59,18 +60,37 @@ const char* lumiera_threadstate_names[] = {
};
#undef LUMIERA_THREAD_STATE
static void* thread_loop (void* thread)
LUMIERA_ERROR_DEFINE(THREAD, "fatal threads initialization error");
/* thread local storage pointing back to the thread structure of each thread */
static pthread_key_t lumiera_thread_tls;
static pthread_once_t lumiera_thread_initialized = PTHREAD_ONCE_INIT;
static void
lumiera_thread_tls_init (void)
{
if (!!pthread_key_create (&lumiera_thread_tls, NULL))
LUMIERA_DIE (THREAD); /* should never happen */
}
static void*
thread_loop (void* thread)
{
TRACE (threads);
NOBUG_THREAD_ID_SET ("worker");
LumieraThread t = (LumieraThread)thread;
pthread_setspecific (lumiera_thread_tls, t);
pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, NULL);
REQUIRE (t, "thread does not exist");
LUMIERA_CONDITION_SECTION (threads, &t->signal)
{
t->rh = &lumiera_lock_section_.rh;
do {
// NULL function means: no work to do
INFO (threads, "function %p", t->function);
@ -80,10 +100,21 @@ static void* thread_loop (void* thread)
LUMIERA_CONDITION_WAIT (t->state != LUMIERA_THREADSTATE_IDLE);
INFO (threads, "Thread awaken with state %d", t->state);
} while (t->state != LUMIERA_THREADSTATE_SHUTDOWN);
// SHUTDOWN state
// SHUTDOWN state
if (t->kind & LUMIERA_THREAD_JOINABLE)
{
INFO (threads, "Thread zombified");
/* move error state to data the other thread will it pick up from there */
t->arguments = (void*)lumiera_error ();
t->state = LUMIERA_THREADSTATE_ZOMBIE;
LUMIERA_CONDITION_WAIT (t->state == LUMIERA_THREADSTATE_JOINED);
INFO (threads, "Thread joined");
}
INFO (threads, "Thread Shutdown");
}
TODO ("no error must be pending here, else do app shutdown");
return 0;
}
@ -106,6 +137,8 @@ lumiera_thread_run (enum lumiera_thread_class kind,
self->function = function;
self->arguments = arg;
self->deadline.tv_sec = 0;
// and let it really run (signal the condition var, the thread waits on it)
self->state = LUMIERA_THREADSTATE_WAKEUP;
@ -126,9 +159,10 @@ lumiera_thread_new (enum lumiera_thread_class kind,
struct nobug_flag* flag,
pthread_attr_t* attrs)
{
pthread_once (&lumiera_thread_initialized, lumiera_thread_tls_init);
// TODO: do something with these:
(void) purpose;
REQUIRE (kind < LUMIERA_THREADCLASS_COUNT, "invalid thread kind specified: %d", kind);
REQUIRE (attrs, "invalid pthread attributes structure passed");
LumieraThread self = lumiera_malloc (sizeof (*self));
@ -138,6 +172,8 @@ lumiera_thread_new (enum lumiera_thread_class kind,
self->state = LUMIERA_THREADSTATE_STARTUP;
self->function = NULL;
self->arguments = NULL;
self->deadline.tv_sec = 0;
self->deadline.tv_nsec = 0;
int error = pthread_create (&self->id, attrs, &thread_loop, self);
if (error)
@ -183,6 +219,128 @@ lumiera_thread_delete (LumieraThread self)
lumiera_free (lumiera_thread_destroy (self));
}
LumieraThread
lumiera_thread_self (void)
{
pthread_once (&lumiera_thread_initialized, lumiera_thread_tls_init);
return pthread_getspecific (lumiera_thread_tls);
}
/**
* Set a threads deadline
* A thread must finish before its deadline is hit. Otherwise it counts as stalled
* which is a fatal error which might pull the application down.
*/
LumieraThread
lumiera_thread_deadline_set (struct timespec deadline)
{
LumieraThread self = lumiera_thread_self ();
if (self)
self->deadline = deadline;
return self;
}
/**
* Extend a threads deadline
* sets the deadline to now+ms in future. This can be used to implement a heartbeat.
*/
LumieraThread
lumiera_thread_deadline_extend (unsigned ms)
{
LumieraThread self = lumiera_thread_self ();
if (self)
{
struct timespec deadline;
clock_gettime (CLOCK_REALTIME, &deadline);
deadline.tv_sec += ms / 1000;
deadline.tv_nsec += 1000000 * (ms % 1000);
if (deadline.tv_nsec > 1000000000)
{
deadline.tv_sec += (deadline.tv_nsec / 1000000000);
deadline.tv_nsec %= 1000000000;
}
self->deadline = deadline;
}
return self;
}
/**
* Clear a threads deadline
* Threads without deadline will not be checked against deadlocks (this is the default)
*/
LumieraThread
lumiera_thread_deadline_clear (void)
{
LumieraThread self = lumiera_thread_self ();
if (self)
{
self->deadline.tv_sec = 0;
self->deadline.tv_nsec = 0;
}
return self;
}
LumieraThread
lumiera_thread_sync_other (LumieraThread other, int state)
{
TRACE(threads);
LUMIERA_CONDITION_SECTION (threads, &other->signal)
{
REQUIRE (other->state == ~state); TODO("Runtime error when state expectation isnt met");
other->state = state;
LUMIERA_CONDITION_SIGNAL;
}
return other;
}
LumieraThread
lumiera_thread_sync (int state)
{
TRACE(threads);
LumieraThread self = lumiera_thread_self ();
REQUIRE(self, "not a lumiera thread");
self->state = ~state;
TODO("error handing, maybe timed mutex (using the threads heartbeat timeout, shortly before timeout)");
while (self->state != state) {
lumiera_condition_wait (&self->signal, &NOBUG_FLAG(threads), self->rh);
}
return self;
}
lumiera_err
lumiera_thread_join (LumieraThread thread)
{
TRACE(threads);
lumiera_err ret = NULL;
LUMIERA_CONDITION_SECTION (threads, &thread->signal)
{
LUMIERA_CONDITION_WAIT (thread->state == LUMIERA_THREADSTATE_ZOMBIE);
ret = (lumiera_err)thread->arguments;
thread->state = LUMIERA_THREADSTATE_JOINED;
LUMIERA_CONDITION_SIGNAL; /* kiss it a last goodbye */
}
return ret;
}
/*
// Local Variables:
// mode: C

View file

@ -80,10 +80,15 @@ enum lumiera_thread_class
* flag to let the decision to run the function in a thread open to the backend.
* depending on load it might decide to run it sequentially.
* This has some constraints:
* The condition variable to signal the finish of the thread must not be used.
* The Thread must be very careful with locking, better don't.
* TODO explain syncronization issues
**/
LUMIERA_THREAD_OR_NOT = 1<<16
LUMIERA_THREAD_OR_NOT = 1<<16,
/**
* Thread must be joined finally
**/
LUMIERA_THREAD_JOINABLE = 1<<17
};
#undef LUMIERA_THREAD_CLASS
@ -95,11 +100,13 @@ extern const char* lumiera_threadclass_names[];
// on one hand it could be used to tell the current state of the thread
// on the other, it is used to tell the thread which state to enter on next iteration
#define LUMIERA_THREAD_STATES \
LUMIERA_THREAD_STATE(IDLE) \
LUMIERA_THREAD_STATE(ERROR) \
LUMIERA_THREAD_STATE(IDLE) \
LUMIERA_THREAD_STATE(RUNNING) \
LUMIERA_THREAD_STATE(WAKEUP) \
LUMIERA_THREAD_STATE(SHUTDOWN) \
LUMIERA_THREAD_STATE(ZOMBIE) \
LUMIERA_THREAD_STATE(JOINED) \
LUMIERA_THREAD_STATE(STARTUP)
#define LUMIERA_THREAD_STATE(name) LUMIERA_THREADSTATE_##name,
@ -108,9 +115,12 @@ extern const char* lumiera_threadclass_names[];
* Thread state.
* These are the only states our threads can be in.
*/
typedef enum
typedef enum
{
LUMIERA_THREAD_STATES
LUMIERA_THREADSTATE_CUSTOM_START = 1024,
LUMIERA_THREADSTATE_CUSTOM_END = 32768,
}
lumiera_thread_state;
@ -127,18 +137,22 @@ extern const char* lumiera_threadstate_names[];
struct lumiera_thread_struct
{
llist node; // this should be first for easy casting
// the function and argument can be passed to the thread at creation time
// void (*function)(void*);
// void* arg;
pthread_t id;
// TODO: maybe this condition variable should be renamed when we have a better understanding of how it will be used
lumiera_condition signal; // control signal, state change signal
struct timespec deadline;
struct nobug_resource_user** rh;
// the following member could have been called "class" except that it would conflict with C++ keyword
// as consequence, it's been decided to leave the type name containing the word "class",
// while all members/variables called "kind"
enum lumiera_thread_class kind;
int kind;
// this is used both as a command and as a state tracker
lumiera_thread_state state;
int state;
void (*function)(void *);
void * arguments;
};
@ -189,6 +203,86 @@ lumiera_thread_run (enum lumiera_thread_class kind,
const char* purpose,
struct nobug_flag* flag);
/**
* Query the LumieraThread handle of the current thread
*
*
* @return pointer to the (opaque) handle of the current lumiera thread or NULL when this is not a lumiera thread
*/
LumieraThread
lumiera_thread_self (void);
/**
* Heartbeat and Deadlines
*
* Any thread can have an optional 'deadline' which must never be hit.
* This deadlines are lazily checked and if hit this is a fatal error which triggers
* an emergency shutdown. Thus threads are obliged to set and extend their deadlines
* accordingly.
*
*/
/**
* Set a threads deadline
* A thread must finish before its deadline is hit. Otherwise it counts as stalled
* which is a fatal error which might pull the application down.
*/
LumieraThread
lumiera_thread_deadline_set (struct timespec deadline);
/**
* Extend a threads deadline
* sets the deadline to now+ms in future. This can be used to implement a heartbeat.
*/
LumieraThread
lumiera_thread_deadline_extend (unsigned ms);
/**
* Clear a threads deadline
* Threads without deadline will not be checked against deadlocks (this is the default)
*/
LumieraThread
lumiera_thread_deadline_clear (void);
/**
* Thread syncronization
* Lumiera threads can be syncronized with custom states.
* The syncronization primitives act as barrier over 2 threads, any thread reaching a syncronization
* point first is blocked until the other one reaches it with the same state.
* Providing different states is errorneous!
*/
/**
* Syncronize with another threads state
*
* this blocks until/unless the other thread reaches 'state'
*/
LumieraThread
lumiera_thread_sync_other (LumieraThread other, int state);
/**
* Syncronize current thread
*
* signifies that this thread reached 'state' and blocks until/unless
* some other thread synced with this state
* @return on success pointer to self (opaque), or NULL on error
*/
LumieraThread
lumiera_thread_sync (int state);
/**
* Joining threads
* a thread can be set up with the LUMEIRA_THREAD_JOINABLE flag, if so
* then it must be joined finally. Joining clears the error state of the joined thread
* and returns it to the joiner.
*
*/
lumiera_err
lumiera_thread_join (LumieraThread thread);
#endif
/*

View file

@ -92,12 +92,12 @@
* @param expr Conditon which must become true, else the condition variable goes back into sleep
*/
#define LUMIERA_CONDITION_WAIT(expr) \
do { \
while (!(expr)) { \
REQUIRE (lumiera_lock_section_.lock, "Condition mutex not locked"); \
lumiera_condition_wait (lumiera_lock_section_.lock, \
lumiera_lock_section_.flag, \
&lumiera_lock_section_.rh); \
} while (!(expr))
}
/**
@ -108,15 +108,14 @@
* sets LUMIERA_ERROR_LOCK_TIMEOUT when the timeout passed
*/
#define LUMIERA_CONDITION_TIMEDWAIT(expr, timeout) \
do { \
while (!(expr)) { \
REQUIRE (lumiera_lock_section_.lock, "Condition mutex not locked"); \
if (!lumiera_condition_timedwait (lumiera_lock_section_.lock, \
timeout, \
lumiera_lock_section_.flag, \
&lumiera_lock_section_.rh)) \
break; \
} while (!(expr))
}
/**
* Signal a condition variable

View file

@ -91,12 +91,12 @@
* @param expr Condition which must become true, else the condition variable goes back into sleep
*/
#define LUMIERA_RECCONDITION_WAIT(expr) \
do { \
while (!(expr)) { \
REQUIRE (lumiera_lock_section_.lock, "Reccondition mutex not locked"); \
lumiera_reccondition_wait (lumiera_lock_section_.lock, \
lumiera_lock_section_.flag, \
&lumiera_lock_section_.rh); \
} while (!(expr))
}
/**
@ -107,15 +107,14 @@
* sets LUMIERA_ERROR_LOCK_TIMEOUT when the timeout passed
*/
#define LUMIERA_RECCONDITION_TIMEDWAIT(expr, timeout) \
do { \
while (!(expr)) { \
REQUIRE (lumiera_lock_section_.lock, "Reccondition mutex not locked"); \
if (!lumiera_reccondition_timedwait (lumiera_lock_section_.lock, \
timeout, \
lumiera_lock_section_.flag, \
&lumiera_lock_section_.rh)) \
break; \
} while (!(expr))
}