Merge remote branch 'ct/for_plouj2' into second-tp-attempt

Conflicts:
	src/backend/thread-wrapper.hpp
	src/backend/threadpool.c
	src/backend/threadpool.h
	src/backend/threads.c
This commit is contained in:
Michael Ploujnikov 2010-01-07 07:18:23 -05:00
commit d989babc8a
8 changed files with 192 additions and 106 deletions

View file

@ -47,6 +47,7 @@ lumiera_threadpool_init(void)
{
NOBUG_INIT_FLAG (threadpool);
TRACE (threadpool);
NOBUG_INIT_FLAG (threads);
for (int i = 0; i < LUMIERA_THREADCLASS_COUNT; ++i)
{
@ -66,6 +67,7 @@ void
lumiera_threadpool_destroy(void)
{
TRACE (threadpool);
for (int i = 0; i < LUMIERA_THREADCLASS_COUNT; ++i)
{
TRACE (threadpool, "destroying individual pool #%d", i);
@ -73,7 +75,7 @@ lumiera_threadpool_destroy(void)
{
REQUIRE (0 == threadpool.pool[i].working_thread_count, "%d threads are still running", threadpool.pool[i].working_thread_count);
// TODO need to have a stronger assertion that no threads are really running because they will not even be in the list
INFO (threadpool, "number of threads in the pool=%d", llist_count(&threadpool.pool[i].list));
INFO (threadpool, "number of threads in the pool=%d", llist_count (&threadpool.pool[i].list));
LLIST_WHILE_HEAD (&threadpool.pool[i].list, t)
{
lumiera_thread_delete ((LumieraThread)t);
@ -84,6 +86,7 @@ lumiera_threadpool_destroy(void)
}
}
LumieraThread
lumiera_threadpool_acquire_thread(enum lumiera_thread_class kind,
const char* purpose,
@ -120,6 +123,7 @@ lumiera_threadpool_acquire_thread(enum lumiera_thread_class kind,
return ret;
}
// TODO: rename to lumiera_threadpool_park_thread
void
lumiera_threadpool_release_thread(LumieraThread thread)
{

View file

@ -54,8 +54,8 @@ lumiera_threadpool_acquire_thread(enum lumiera_thread_class kind,
struct nobug_flag* flag);
/**
* Release a thread
* This ends up putting a (parked/idle) thread back on the list of an appropriate threadpool.
* Park a thread
* This ends up putting a finished thread back on the list of an appropriate threadpool.
* This function doesn't need to be accessible outside of the threadpool implementation.
*/
void
@ -69,10 +69,10 @@ struct lumiera_threadpool_struct
struct
{
llist list;
lumiera_condition sync;
unsigned working_thread_count;
unsigned idle_thread_count;
pthread_attr_t pthread_attrs;
lumiera_condition sync;
} pool[LUMIERA_THREADCLASS_COUNT];
};

View file

@ -22,7 +22,6 @@
//TODO: Support library includes//
#include "include/logging.h"
#include "lib/mutex.h"
#include "lib/safeclib.h"
@ -60,14 +59,34 @@ const char* lumiera_threadstate_names[] = {
};
#undef LUMIERA_THREAD_STATE
static void* thread_loop (void* arg)
static void* thread_loop (void* thread)
{
(void)arg;
TRACE(threads);
LumieraThread t = (LumieraThread)thread;
pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, NULL);
REQUIRE (t, "thread does not exist");
// this seems to deadlock unexpectedly:
LUMIERA_CONDITION_SECTION (threads, &t->signal)
{
do {
// NULL function means: no work to do
INFO(threads, "function %p", t->function);
if (t->function)
t->function (t->arguments);
lumiera_threadpool_release_thread(t);
LUMIERA_CONDITION_WAIT(t->state != LUMIERA_THREADSTATE_IDLE);
INFO(threads, "Thread awaken with state %d", t->state);
} while (t->state != LUMIERA_THREADSTATE_SHUTDOWN);
// SHUTDOWN state
INFO(threads, "Thread Shutdown");
}
return 0;
}
// TODO: new implementation, remove the above one
// maybe this shouldn't return LumieraThread at all
// when this is called it should have already been decided that the function
// shall run in parallel, as a thread
LumieraThread
@ -77,17 +96,21 @@ lumiera_thread_run (enum lumiera_thread_class kind,
const char* purpose,
struct nobug_flag* flag)
{
(void)function;
(void)arg;
TRACE(threads);
// REQUIRE (function, "invalid function");
// ask the threadpool for a thread (it might create a new one)
LumieraThread self = lumiera_threadpool_acquire_thread (kind, purpose, flag);
// TODO: set the function and data to be run
// lumiera_thread_set_func_data (self, start_routine, arg, purpose, flag);
// set the function and data to be run
self->function = function;
self->arguments = arg;
// and let it really run (signal the condition var, the thread waits on it)
LUMIERA_RECCONDITION_SECTION (cond_sync, self->finished)
LUMIERA_RECCONDITION_SIGNAL;
self->state = LUMIERA_THREADSTATE_WAKEUP;
LUMIERA_CONDITION_SECTION (cond_sync, self->finished)
LUMIERA_CONDITION_SIGNAL;
// NOTE: example only, add solid error handling!
@ -105,19 +128,19 @@ lumiera_thread_new (enum lumiera_thread_class kind,
{
// TODO: do something with these:
(void) purpose;
(void) flag;
REQUIRE (kind < LUMIERA_THREADCLASS_COUNT, "invalid thread kind specified: %d", kind);
REQUIRE (attrs, "invalid pthread attributes structure passed");
LumieraThread self = lumiera_malloc (sizeof (*self));
llist_init (&self->node);
lumiera_reccondition_init (&self->finished, "thread-control-condition", flag);
lumiera_condition_init (&self->signal, "thread-control", flag);
self->kind = kind;
self->state = LUMIERA_THREADSTATE_IDLE;
self->state = LUMIERA_THREADSTATE_STARTUP;
self->function = NULL;
self->arguments = NULL;
//REQUIRE (thread_loop);
int error = pthread_create (&self->id, attrs, &thread_loop, self);
ENSURE (error == 0 || EAGAIN == error, "pthread returned %d:%s", error, strerror (error));
ENSURE(error == 0 || EAGAIN == error, "pthread_create returned %d:%s", error, strerror(error));
if (error)
{
// error here can only be EAGAIN, given the above ENSURE
@ -130,19 +153,36 @@ lumiera_thread_new (enum lumiera_thread_class kind,
LumieraThread
lumiera_thread_destroy (LumieraThread self)
{
TRACE(threads);
REQUIRE (self, "trying to destroy an invalid thread");
// TODO: stop the pthread
llist_unlink (&self->node);
lumiera_reccondition_destroy (self->finished, &NOBUG_FLAG (threads));
//kind = 0;
//state = 0;
// get the pthread out of the processing loop
// need to signal to the thread that it should start quitting
// should this be within the section?
LUMIERA_CONDITION_SECTION(threads, &self->signal)
{
REQUIRE (self->state == LUMIERA_THREADSTATE_IDLE, "trying to delete a thread in state other than IDLE (%s)", lumiera_threadstate_names[self->state]);
self->state = LUMIERA_THREADSTATE_SHUTDOWN;
self->function = NULL;
self->arguments = NULL;
LUMIERA_CONDITION_SIGNAL;
}
int error = pthread_join(self->id, NULL);
ENSURE (0 == error, "pthread_join returned %d:%s", error, strerror(error));
// condition has to be destroyed after joining with the thread
lumiera_condition_destroy (&self->signal, &NOBUG_FLAG(threads));
return self;
}
void
lumiera_thread_delete (LumieraThread self)
{
TRACE(threads);
ECHO ("deleting thread");
lumiera_free (lumiera_thread_destroy (self));
}

View file

@ -23,7 +23,7 @@
#define LUMIERA_THREADS_H
//TODO: Support library includes//
#include "lib/reccondition.h"
#include "lib/condition.h"
//TODO: Forward declarations//
@ -34,6 +34,7 @@
//TODO: System includes//
#include <nobug.h>
NOBUG_DECLARE_FLAG (threads);
/**
@ -90,10 +91,16 @@ enum lumiera_thread_class
// defined in threads.c
extern const char* lumiera_threadclass_names[];
#define LUMIERA_THREAD_STATES \
LUMIERA_THREAD_STATE(IDLE) \
LUMIERA_THREAD_STATE(RUNNING) \
LUMIERA_THREAD_STATE(ERROR)
// there is some confusion between the meaning of this
// on one hand it could be used to tell the current state of the thread
// on the other, it is used to tell the thread which state to enter on next iteration
#define LUMIERA_THREAD_STATES \
LUMIERA_THREAD_STATE(IDLE) \
LUMIERA_THREAD_STATE(ERROR) \
LUMIERA_THREAD_STATE(RUNNING) \
LUMIERA_THREAD_STATE(WAKEUP) \
LUMIERA_THREAD_STATE(SHUTDOWN) \
LUMIERA_THREAD_STATE(STARTUP)
#define LUMIERA_THREAD_STATE(name) LUMIERA_THREADSTATE_##name,
@ -124,12 +131,16 @@ struct lumiera_thread_struct
// void (*function)(void*);
// void* arg;
pthread_t id;
LumieraReccondition finished;
// TODO: maybe this condition variable should be renamed when we have a better understanding of how it will be used
lumiera_condition signal; // control signal, state change signal
// the following member could have been called "class" except that it would conflict with C++ keyword
// as consequence, it's been decided to leave the type name containing the word "class",
// while all members/variables called "kind"
enum lumiera_thread_class kind;
// this is used both as a command and as a state tracker
lumiera_thread_state state;
void (*function)(void *);
void * arguments;
};
/**

View file

@ -5,24 +5,11 @@ PLANNED "create"
PLANNED "yield"
PLANNED "cancel"
TEST "Acquire/Release test" basic-acquire-release <<END
err: start by initializing the threadpool
err: acquiring thread 1
err: acquiring thread 2
err: thread 1 kind=INTERACTIVE
err: thread 1 state=IDLE
err: thread 2 kind=IDLE
err: thread 2 state=IDLE
err: releasing thread 1
err: thread 1 has been released
err: releasing thread 2
err: thread 2 has been released
TEST "Most basic threadpool test" threadpool-basic <<END
err: destroying threadpool
err: destroying individual pool #0
err: number of threads in the pool=1
err: deleting thread
err: number of threads in the pool=0
err: destroying the pool mutex
err: pool mutex destroyed
@ -42,48 +29,14 @@ err: destroying the pool mutex
err: pool mutex destroyed
err: destroying individual pool #4
err: number of threads in the pool=1
err: deleting thread
err: number of threads in the pool=0
err: destroying the pool mutex
err: pool mutex destroyed
END
TEST "Many Acquires/Releases test" many-acquire-release <<END
err: destroying threadpool
err: destroying individual pool #0
err: number of threads in the pool=10
err: deleting thread
err: destroying the pool mutex
err: pool mutex destroyed
err: destroying individual pool #1
err: number of threads in the pool=10
err: deleting thread
err: destroying the pool mutex
err: pool mutex destroyed
err: destroying individual pool #2
err: number of threads in the pool=10
err: deleting thread
err: destroying the pool mutex
err: pool mutex destroyed
err: destroying individual pool #3
err: number of threads in the pool=10
err: deleting thread
err: destroying the pool mutex
err: pool mutex destroyed
err: destroying individual pool #4
err: number of threads in the pool=10
err: deleting thread
err: destroying the pool mutex
err: pool mutex destroyed
END
TEST "Too Many Acquires/Releases test" toomany-acquire-release <<END
err: Fatal Error
err: .*
return: !0
TEST "process a function" process-function <<END
err: the input to the function is 440616
err: executing the user function
err: thread quitting
err: the result is 1
END

View file

@ -25,9 +25,46 @@
#include <stdio.h>
#include <string.h>
#include <math.h>
#include <unistd.h>
void is_prime(void * arg)
{
int number = *(int *)arg;
int prime = 1;
for (int x = number; x >= sqrt(number); --x)
{
if (number % x == 0)
{
prime = 0;
break;
}
}
*(int *)arg = prime;
}
TESTS_BEGIN
TEST ("threadpool-basic")
{
lumiera_threadpool_init(100);
lumiera_threadpool_destroy();
}
TEST ("threadpool1")
{
ECHO("start by initializing the threadpool");
lumiera_threadpool_init(100);
LumieraThread t1 =
lumiera_threadpool_acquire_thread(LUMIERA_THREADCLASS_INTERACTIVE,
"test purpose",
&NOBUG_FLAG(NOBUG_ON));
// lumiera_threadpool_release_thread(t1);
ECHO("acquired thread 1 %p",t1);
lumiera_threadpool_destroy();
}
TEST ("basic-acquire-release")
{
@ -37,12 +74,12 @@ TEST ("basic-acquire-release")
LumieraThread t1 =
lumiera_threadpool_acquire_thread(LUMIERA_THREADCLASS_INTERACTIVE,
"test purpose",
NULL);
&NOBUG_FLAG(NOBUG_ON));
ECHO("acquiring thread 2");
LumieraThread t2 =
lumiera_threadpool_acquire_thread(LUMIERA_THREADCLASS_IDLE,
"test purpose",
NULL);
&NOBUG_FLAG(NOBUG_ON));
ECHO("thread 1 kind=%s", lumiera_threadclass_names[t1->kind]);
CHECK(LUMIERA_THREADCLASS_INTERACTIVE == t1->kind);
@ -54,16 +91,17 @@ TEST ("basic-acquire-release")
CHECK(LUMIERA_THREADSTATE_IDLE == t2->state);
ECHO("releasing thread 1");
lumiera_threadpool_release_thread(t1);
//lumiera_threadpool_release_thread(t1);
ECHO("thread 1 has been released");
ECHO("releasing thread 2");
lumiera_threadpool_release_thread(t2);
//lumiera_threadpool_release_thread(t2);
ECHO("thread 2 has been released");
lumiera_threadpool_destroy();
}
#if 0
TEST ("many-acquire-release")
{
@ -79,7 +117,7 @@ TEST ("many-acquire-release")
threads[i+kind*threads_per_pool_count] =
lumiera_threadpool_acquire_thread(kind,
"test purpose",
NULL);
&NOBUG_FLAG(NOBUG_ON));
}
}
@ -107,7 +145,7 @@ TEST ("toomany-acquire-release")
threads[i+kind*threads_per_pool_count] =
lumiera_threadpool_acquire_thread(kind,
"test purpose",
NULL);
&NOBUG_FLAG(NOBUG_ON));
}
}
@ -119,5 +157,48 @@ TEST ("toomany-acquire-release")
lumiera_threadpool_destroy();
}
#endif
TEST ("no-function")
{
LumieraThread t;
lumiera_threadpool_init(10);
t = lumiera_thread_run (LUMIERA_THREADCLASS_INTERACTIVE,
NULL,
NULL,
"process my test function",
&NOBUG_FLAG(NOBUG_ON));
// cleanup
ECHO("wait 1 sec");
usleep(1000000);
ECHO("finished waiting");
lumiera_threadpool_destroy();
}
TEST ("process-function")
{
// this is what the scheduler would do once it figures out what function a job needs to run
LumieraThread t;
int number = 440616;
lumiera_threadpool_init(10);
ECHO ("the input to the function is %d", number);
t = lumiera_thread_run (LUMIERA_THREADCLASS_INTERACTIVE,
&is_prime,
(void *)&number, //void * arg,
"process my test function",
&NOBUG_FLAG(NOBUG_ON)); // struct nobug_flag* flag)
// cleanup
ECHO("wait 1 sec");
usleep(1000000);
ECHO("finished waiting");
lumiera_threadpool_destroy();
}
TESTS_END

View file

@ -47,15 +47,15 @@ void threadfn(void* blah)
void threadsyncfn(void* blah)
{
struct timespec wait = {0,200000000};
LumieraReccondition sync = (LumieraReccondition) blah;
LumieraCondition sync = (LumieraCondition) blah;
ECHO ("thread starting up %s", NOBUG_THREAD_ID_GET);
LUMIERA_RECCONDITION_SECTION(cond_sync, sync)
LUMIERA_CONDITION_SECTION(cond_sync, sync)
{
ECHO ("send startup signal %s", NOBUG_THREAD_ID_GET);
LUMIERA_RECCONDITION_SIGNAL;
LUMIERA_CONDITION_SIGNAL;
ECHO ("wait for trigger %s", NOBUG_THREAD_ID_GET);
LUMIERA_RECCONDITION_WAIT(1);
LUMIERA_CONDITION_WAIT(1);
}
ECHO ("thread running %s", NOBUG_THREAD_ID_GET);
@ -91,7 +91,6 @@ TEST ("simple_thread")
lumiera_thread_run (LUMIERA_THREADCLASS_WORKER,
threadfn,
NULL,
NULL,
argv[1],
NULL);
@ -102,32 +101,31 @@ TEST ("simple_thread")
TEST ("thread_synced")
{
lumiera_reccondition cnd;
lumiera_reccondition_init (&cnd, "threadsync", &NOBUG_FLAG(NOBUG_ON));
lumiera_condition cnd;
lumiera_condition_init (&cnd, "threadsync", &NOBUG_FLAG(NOBUG_ON));
LUMIERA_RECCONDITION_SECTION(cond_sync, &cnd)
LUMIERA_CONDITION_SECTION(cond_sync, &cnd)
{
ECHO ("main before thread %s", NOBUG_THREAD_ID_GET);
lumiera_thread_run (LUMIERA_THREADCLASS_WORKER,
threadsyncfn,
&cnd,
&cnd,
argv[1],
NULL);
ECHO ("main wait for thread being ready %s", NOBUG_THREAD_ID_GET);
LUMIERA_RECCONDITION_WAIT(1);
LUMIERA_CONDITION_WAIT(1);
ECHO ("main trigger thread %s", NOBUG_THREAD_ID_GET);
LUMIERA_RECCONDITION_SIGNAL;
LUMIERA_CONDITION_SIGNAL;
ECHO ("wait for thread end %s", NOBUG_THREAD_ID_GET);
LUMIERA_RECCONDITION_WAIT(1);
LUMIERA_CONDITION_WAIT(1);
ECHO ("thread ended %s", NOBUG_THREAD_ID_GET);
}
lumiera_reccondition_destroy (&cnd, &NOBUG_FLAG(NOBUG_ON));
lumiera_condition_destroy (&cnd, &NOBUG_FLAG(NOBUG_ON));
}
@ -143,7 +141,6 @@ TEST ("mutex_thread")
lumiera_thread_run (LUMIERA_THREADCLASS_WORKER,
mutexfn,
NULL,
NULL,
argv[1],
NULL);

View file

@ -26,7 +26,7 @@
# stop testing on the first failure
export LC_ALL=C
NOBUG_LOGREGEX='^\(\*\*[0-9]*\*\* \)\?[0-9]\{10,\}: \(TRACE\|INFO\|NOTICE\|WARNING\|ERR\|TODO\|PLANNED\|FIXME\|DEPRECATED\|UNIMPLEMENTED\):'
NOBUG_LOGREGEX='^\(\*\*[0-9]*\*\* \)\?[0-9]\{10,\}: \(TRACE\|INFO\|NOTICE\|WARNING\|ERR\|TODO\|PLANNED\|FIXME\|DEPRECATED\|UNIMPLEMENTED\|RESOURCE_ENTER\|RESOURCE_LEAVE\|RESOURCE_STATE\):'
arg0="$0"
srcdir="$(dirname "$arg0")"