Thread syncronization and joining

add a custom defined range for states
provide functions for barrier like syncing between 2 threads
provide joinable threads and a thread_join() function
This commit is contained in:
Christian Thaeter 2010-01-18 00:34:53 +01:00
parent d659b1e8e7
commit e55e648f29
2 changed files with 164 additions and 12 deletions

View file

@ -59,18 +59,37 @@ const char* lumiera_threadstate_names[] = {
};
#undef LUMIERA_THREAD_STATE
static void* thread_loop (void* thread)
LUMIERA_ERROR_DEFINE(THREAD, "fatal threads initialization error");
/* thread local storage pointing back to the thread structure of each thread */
static pthread_key_t lumiera_thread_tls;
static pthread_once_t lumiera_thread_initialized = PTHREAD_ONCE_INIT;
static void
lumiera_thread_tls_init (void)
{
if (!!pthread_key_create (&lumiera_thread_tls, NULL))
LUMIERA_DIE (THREAD); /* should never happen */
}
static void*
thread_loop (void* thread)
{
TRACE (threads);
NOBUG_THREAD_ID_SET ("worker");
LumieraThread t = (LumieraThread)thread;
pthread_setspecific (lumiera_thread_tls, t);
pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, NULL);
REQUIRE (t, "thread does not exist");
LUMIERA_CONDITION_SECTION (threads, &t->signal)
{
t->rh = &lumiera_lock_section_.rh;
do {
// NULL function means: no work to do
INFO (threads, "function %p", t->function);
@ -80,10 +99,21 @@ static void* thread_loop (void* thread)
LUMIERA_CONDITION_WAIT (t->state != LUMIERA_THREADSTATE_IDLE);
INFO (threads, "Thread awaken with state %d", t->state);
} while (t->state != LUMIERA_THREADSTATE_SHUTDOWN);
// SHUTDOWN state
// SHUTDOWN state
if (t->kind & LUMIERA_THREAD_JOINABLE)
{
INFO (threads, "Thread zombified");
/* move error state to data the other thread will it pick up from there */
t->arguments = (void*)lumiera_error ();
t->state = LUMIERA_THREADSTATE_ZOMBIE;
LUMIERA_CONDITION_WAIT (t->state == LUMIERA_THREADSTATE_JOINED);
INFO (threads, "Thread joined");
}
INFO (threads, "Thread Shutdown");
}
TODO ("no error must be pending here, else do app shutdown");
return 0;
}
@ -126,9 +156,10 @@ lumiera_thread_new (enum lumiera_thread_class kind,
struct nobug_flag* flag,
pthread_attr_t* attrs)
{
pthread_once (&lumiera_thread_initialized, lumiera_thread_tls_init);
// TODO: do something with these:
(void) purpose;
REQUIRE (kind < LUMIERA_THREADCLASS_COUNT, "invalid thread kind specified: %d", kind);
REQUIRE (attrs, "invalid pthread attributes structure passed");
LumieraThread self = lumiera_malloc (sizeof (*self));
@ -183,6 +214,69 @@ lumiera_thread_delete (LumieraThread self)
lumiera_free (lumiera_thread_destroy (self));
}
LumieraThread
lumiera_thread_self (void)
{
pthread_once (&lumiera_thread_initialized, lumiera_thread_tls_init);
return pthread_getspecific (lumiera_thread_tls);
}
LumieraThread
lumiera_thread_sync_other (LumieraThread other, int state)
{
TRACE(threads);
LUMIERA_CONDITION_SECTION (threads, &other->signal)
{
REQUIRE (other->state == ~state); TODO("Runtime error when state expectation isnt met");
other->state = state;
LUMIERA_CONDITION_SIGNAL;
}
return other;
}
LumieraThread
lumiera_thread_sync (int state)
{
TRACE(threads);
LumieraThread self = lumiera_thread_self ();
REQUIRE(self, "not a lumiera thread");
self->state = ~state;
TODO("error handing, maybe timed mutex (using the threads heartbeat timeout, shortly before timeout)");
do {
lumiera_condition_wait (&self->signal, &NOBUG_FLAG(threads), self->rh);
} while (self->state != state);
return self;
}
lumiera_err
lumiera_thread_join (LumieraThread thread)
{
TRACE(threads);
lumiera_err ret = NULL;
LUMIERA_CONDITION_SECTION (threads, &thread->signal)
{
LUMIERA_CONDITION_WAIT (thread->state == LUMIERA_THREADSTATE_ZOMBIE);
ret = (lumiera_err)thread->arguments;
thread->state = LUMIERA_THREADSTATE_JOINED;
LUMIERA_CONDITION_SIGNAL; /* kiss it a last goodbye */
}
return ret;
}
/*
// Local Variables:
// mode: C

View file

@ -80,10 +80,15 @@ enum lumiera_thread_class
* flag to let the decision to run the function in a thread open to the backend.
* depending on load it might decide to run it sequentially.
* This has some constraints:
* The condition variable to signal the finish of the thread must not be used.
* The Thread must be very careful with locking, better don't.
* TODO explain syncronization issues
**/
LUMIERA_THREAD_OR_NOT = 1<<16
LUMIERA_THREAD_OR_NOT = 1<<16,
/**
* Thread must be joined finally
**/
LUMIERA_THREAD_JOINABLE = 1<<17
};
#undef LUMIERA_THREAD_CLASS
@ -95,11 +100,13 @@ extern const char* lumiera_threadclass_names[];
// on one hand it could be used to tell the current state of the thread
// on the other, it is used to tell the thread which state to enter on next iteration
#define LUMIERA_THREAD_STATES \
LUMIERA_THREAD_STATE(IDLE) \
LUMIERA_THREAD_STATE(ERROR) \
LUMIERA_THREAD_STATE(IDLE) \
LUMIERA_THREAD_STATE(RUNNING) \
LUMIERA_THREAD_STATE(WAKEUP) \
LUMIERA_THREAD_STATE(SHUTDOWN) \
LUMIERA_THREAD_STATE(ZOMBIE) \
LUMIERA_THREAD_STATE(JOINED) \
LUMIERA_THREAD_STATE(STARTUP)
#define LUMIERA_THREAD_STATE(name) LUMIERA_THREADSTATE_##name,
@ -108,9 +115,12 @@ extern const char* lumiera_threadclass_names[];
* Thread state.
* These are the only states our threads can be in.
*/
typedef enum
typedef enum
{
LUMIERA_THREAD_STATES
LUMIERA_THREADSTATE_CUSTOM_START = 1024,
LUMIERA_THREADSTATE_CUSTOM_END = 32768,
}
lumiera_thread_state;
@ -127,18 +137,20 @@ extern const char* lumiera_threadstate_names[];
struct lumiera_thread_struct
{
llist node; // this should be first for easy casting
// the function and argument can be passed to the thread at creation time
// void (*function)(void*);
// void* arg;
pthread_t id;
// TODO: maybe this condition variable should be renamed when we have a better understanding of how it will be used
lumiera_condition signal; // control signal, state change signal
struct nobug_resource_user** rh;
// the following member could have been called "class" except that it would conflict with C++ keyword
// as consequence, it's been decided to leave the type name containing the word "class",
// while all members/variables called "kind"
enum lumiera_thread_class kind;
int kind;
// this is used both as a command and as a state tracker
lumiera_thread_state state;
int state;
void (*function)(void *);
void * arguments;
};
@ -189,6 +201,52 @@ lumiera_thread_run (enum lumiera_thread_class kind,
const char* purpose,
struct nobug_flag* flag);
/**
* Query the LumieraThread handle of the current thread
*
*
* @return pointer to the (opaque) handle of the current lumiera thread or NULL when this is not a lumiera thread
*/
LumieraThread
lumiera_thread_self (void);
/**
* Thread syncronization
* Lumiera threads can be syncronized with custom states.
* The syncronization primitives act as barrier over 2 threads, any thread reaching a syncronization
* point first is blocked until the other one reaches it with the same state.
* Providing different states is errorneous!
*/
/**
* Syncronize with another threads state
*
* this blocks until/unless the other thread reaches 'state'
*/
LumieraThread
lumiera_thread_sync_other (LumieraThread other, int state);
/**
* Syncronize current thread
*
* signifies that this thread reached 'state' and blocks until/unless
* some other thread synced with this state
* @return on success pointer to self (opaque), or NULL on error
*/
LumieraThread
lumiera_thread_sync (int state);
/**
* Joining threads
* a thread can be set up with the LUMEIRA_THREAD_JOINABLE flag, if so
* then it must be joined finally. Joining clears the error state of the joined thread
* and returns it to the joiner.
*
*/
lumiera_err
lumiera_thread_join (LumieraThread thread);
#endif
/*