diff --git a/src/backend/threadpool.c b/src/backend/threadpool.c index 6a4f90289..993c7040f 100644 --- a/src/backend/threadpool.c +++ b/src/backend/threadpool.c @@ -47,6 +47,7 @@ lumiera_threadpool_init(void) { NOBUG_INIT_FLAG (threadpool); TRACE (threadpool); + NOBUG_INIT_FLAG (threads); for (int i = 0; i < LUMIERA_THREADCLASS_COUNT; ++i) { @@ -66,6 +67,7 @@ void lumiera_threadpool_destroy(void) { TRACE (threadpool); + for (int i = 0; i < LUMIERA_THREADCLASS_COUNT; ++i) { TRACE (threadpool, "destroying individual pool #%d", i); @@ -73,7 +75,7 @@ lumiera_threadpool_destroy(void) { REQUIRE (0 == threadpool.pool[i].working_thread_count, "%d threads are still running", threadpool.pool[i].working_thread_count); // TODO need to have a stronger assertion that no threads are really running because they will not even be in the list - INFO (threadpool, "number of threads in the pool=%d", llist_count(&threadpool.pool[i].list)); + INFO (threadpool, "number of threads in the pool=%d", llist_count (&threadpool.pool[i].list)); LLIST_WHILE_HEAD (&threadpool.pool[i].list, t) { lumiera_thread_delete ((LumieraThread)t); @@ -84,6 +86,7 @@ lumiera_threadpool_destroy(void) } } + LumieraThread lumiera_threadpool_acquire_thread(enum lumiera_thread_class kind, const char* purpose, @@ -120,6 +123,7 @@ lumiera_threadpool_acquire_thread(enum lumiera_thread_class kind, return ret; } +// TODO: rename to lumiera_threadpool_park_thread void lumiera_threadpool_release_thread(LumieraThread thread) { diff --git a/src/backend/threadpool.h b/src/backend/threadpool.h index a14c87b50..c2ec10785 100644 --- a/src/backend/threadpool.h +++ b/src/backend/threadpool.h @@ -54,8 +54,8 @@ lumiera_threadpool_acquire_thread(enum lumiera_thread_class kind, struct nobug_flag* flag); /** - * Release a thread - * This ends up putting a (parked/idle) thread back on the list of an appropriate threadpool. + * Park a thread + * This ends up putting a finished thread back on the list of an appropriate threadpool. * This function doesn't need to be accessible outside of the threadpool implementation. */ void @@ -69,10 +69,10 @@ struct lumiera_threadpool_struct struct { llist list; - lumiera_condition sync; unsigned working_thread_count; unsigned idle_thread_count; pthread_attr_t pthread_attrs; + lumiera_condition sync; } pool[LUMIERA_THREADCLASS_COUNT]; }; diff --git a/src/backend/threads.c b/src/backend/threads.c index 7b03f57fe..d7d7a242f 100644 --- a/src/backend/threads.c +++ b/src/backend/threads.c @@ -22,7 +22,6 @@ //TODO: Support library includes// #include "include/logging.h" -#include "lib/mutex.h" #include "lib/safeclib.h" @@ -60,14 +59,34 @@ const char* lumiera_threadstate_names[] = { }; #undef LUMIERA_THREAD_STATE -static void* thread_loop (void* arg) +static void* thread_loop (void* thread) { - (void)arg; + TRACE(threads); + LumieraThread t = (LumieraThread)thread; + + pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, NULL); + + REQUIRE (t, "thread does not exist"); + + // this seems to deadlock unexpectedly: + LUMIERA_CONDITION_SECTION (threads, &t->signal) + { + do { + // NULL function means: no work to do + INFO(threads, "function %p", t->function); + if (t->function) + t->function (t->arguments); + lumiera_threadpool_release_thread(t); + LUMIERA_CONDITION_WAIT(t->state != LUMIERA_THREADSTATE_IDLE); + INFO(threads, "Thread awaken with state %d", t->state); + } while (t->state != LUMIERA_THREADSTATE_SHUTDOWN); + // SHUTDOWN state + + INFO(threads, "Thread Shutdown"); + } return 0; } -// TODO: new implementation, remove the above one -// maybe this shouldn't return LumieraThread at all // when this is called it should have already been decided that the function // shall run in parallel, as a thread LumieraThread @@ -77,17 +96,21 @@ lumiera_thread_run (enum lumiera_thread_class kind, const char* purpose, struct nobug_flag* flag) { - (void)function; - (void)arg; + TRACE(threads); + // REQUIRE (function, "invalid function"); + // ask the threadpool for a thread (it might create a new one) LumieraThread self = lumiera_threadpool_acquire_thread (kind, purpose, flag); - // TODO: set the function and data to be run - // lumiera_thread_set_func_data (self, start_routine, arg, purpose, flag); + // set the function and data to be run + self->function = function; + self->arguments = arg; // and let it really run (signal the condition var, the thread waits on it) - LUMIERA_RECCONDITION_SECTION (cond_sync, self->finished) - LUMIERA_RECCONDITION_SIGNAL; + self->state = LUMIERA_THREADSTATE_WAKEUP; + + LUMIERA_CONDITION_SECTION (cond_sync, self->finished) + LUMIERA_CONDITION_SIGNAL; // NOTE: example only, add solid error handling! @@ -105,19 +128,19 @@ lumiera_thread_new (enum lumiera_thread_class kind, { // TODO: do something with these: (void) purpose; - (void) flag; REQUIRE (kind < LUMIERA_THREADCLASS_COUNT, "invalid thread kind specified: %d", kind); REQUIRE (attrs, "invalid pthread attributes structure passed"); LumieraThread self = lumiera_malloc (sizeof (*self)); llist_init (&self->node); - lumiera_reccondition_init (&self->finished, "thread-control-condition", flag); + lumiera_condition_init (&self->signal, "thread-control", flag); self->kind = kind; - self->state = LUMIERA_THREADSTATE_IDLE; + self->state = LUMIERA_THREADSTATE_STARTUP; + self->function = NULL; + self->arguments = NULL; - //REQUIRE (thread_loop); int error = pthread_create (&self->id, attrs, &thread_loop, self); - ENSURE (error == 0 || EAGAIN == error, "pthread returned %d:%s", error, strerror (error)); + ENSURE(error == 0 || EAGAIN == error, "pthread_create returned %d:%s", error, strerror(error)); if (error) { // error here can only be EAGAIN, given the above ENSURE @@ -130,19 +153,36 @@ lumiera_thread_new (enum lumiera_thread_class kind, LumieraThread lumiera_thread_destroy (LumieraThread self) { + TRACE(threads); REQUIRE (self, "trying to destroy an invalid thread"); - // TODO: stop the pthread llist_unlink (&self->node); - lumiera_reccondition_destroy (self->finished, &NOBUG_FLAG (threads)); - //kind = 0; - //state = 0; + + // get the pthread out of the processing loop + // need to signal to the thread that it should start quitting + // should this be within the section? + LUMIERA_CONDITION_SECTION(threads, &self->signal) + { + REQUIRE (self->state == LUMIERA_THREADSTATE_IDLE, "trying to delete a thread in state other than IDLE (%s)", lumiera_threadstate_names[self->state]); + self->state = LUMIERA_THREADSTATE_SHUTDOWN; + self->function = NULL; + self->arguments = NULL; + LUMIERA_CONDITION_SIGNAL; + } + + int error = pthread_join(self->id, NULL); + ENSURE (0 == error, "pthread_join returned %d:%s", error, strerror(error)); + + // condition has to be destroyed after joining with the thread + lumiera_condition_destroy (&self->signal, &NOBUG_FLAG(threads)); + return self; } void lumiera_thread_delete (LumieraThread self) { + TRACE(threads); ECHO ("deleting thread"); lumiera_free (lumiera_thread_destroy (self)); } diff --git a/src/backend/threads.h b/src/backend/threads.h index d91ae7bee..42d76bcfe 100644 --- a/src/backend/threads.h +++ b/src/backend/threads.h @@ -23,7 +23,7 @@ #define LUMIERA_THREADS_H //TODO: Support library includes// -#include "lib/reccondition.h" +#include "lib/condition.h" //TODO: Forward declarations// @@ -34,6 +34,7 @@ //TODO: System includes// #include +NOBUG_DECLARE_FLAG (threads); /** @@ -90,10 +91,16 @@ enum lumiera_thread_class // defined in threads.c extern const char* lumiera_threadclass_names[]; -#define LUMIERA_THREAD_STATES \ - LUMIERA_THREAD_STATE(IDLE) \ - LUMIERA_THREAD_STATE(RUNNING) \ - LUMIERA_THREAD_STATE(ERROR) +// there is some confusion between the meaning of this +// on one hand it could be used to tell the current state of the thread +// on the other, it is used to tell the thread which state to enter on next iteration +#define LUMIERA_THREAD_STATES \ + LUMIERA_THREAD_STATE(IDLE) \ + LUMIERA_THREAD_STATE(ERROR) \ + LUMIERA_THREAD_STATE(RUNNING) \ + LUMIERA_THREAD_STATE(WAKEUP) \ + LUMIERA_THREAD_STATE(SHUTDOWN) \ + LUMIERA_THREAD_STATE(STARTUP) #define LUMIERA_THREAD_STATE(name) LUMIERA_THREADSTATE_##name, @@ -124,12 +131,16 @@ struct lumiera_thread_struct // void (*function)(void*); // void* arg; pthread_t id; - LumieraReccondition finished; + // TODO: maybe this condition variable should be renamed when we have a better understanding of how it will be used + lumiera_condition signal; // control signal, state change signal // the following member could have been called "class" except that it would conflict with C++ keyword // as consequence, it's been decided to leave the type name containing the word "class", // while all members/variables called "kind" enum lumiera_thread_class kind; + // this is used both as a command and as a state tracker lumiera_thread_state state; + void (*function)(void *); + void * arguments; }; /** diff --git a/tests/30backend-threadpool.tests b/tests/30backend-threadpool.tests index c136aff42..2cad43477 100644 --- a/tests/30backend-threadpool.tests +++ b/tests/30backend-threadpool.tests @@ -5,24 +5,11 @@ PLANNED "create" PLANNED "yield" PLANNED "cancel" -TEST "Acquire/Release test" basic-acquire-release < #include +#include +#include + +void is_prime(void * arg) +{ + int number = *(int *)arg; + int prime = 1; + + for (int x = number; x >= sqrt(number); --x) + { + if (number % x == 0) + { + prime = 0; + break; + } + } + *(int *)arg = prime; +} TESTS_BEGIN +TEST ("threadpool-basic") +{ + lumiera_threadpool_init(100); + lumiera_threadpool_destroy(); +} + +TEST ("threadpool1") +{ + ECHO("start by initializing the threadpool"); + lumiera_threadpool_init(100); + LumieraThread t1 = + lumiera_threadpool_acquire_thread(LUMIERA_THREADCLASS_INTERACTIVE, + "test purpose", + &NOBUG_FLAG(NOBUG_ON)); + // lumiera_threadpool_release_thread(t1); + ECHO("acquired thread 1 %p",t1); + lumiera_threadpool_destroy(); +} + TEST ("basic-acquire-release") { @@ -37,12 +74,12 @@ TEST ("basic-acquire-release") LumieraThread t1 = lumiera_threadpool_acquire_thread(LUMIERA_THREADCLASS_INTERACTIVE, "test purpose", - NULL); + &NOBUG_FLAG(NOBUG_ON)); ECHO("acquiring thread 2"); LumieraThread t2 = lumiera_threadpool_acquire_thread(LUMIERA_THREADCLASS_IDLE, "test purpose", - NULL); + &NOBUG_FLAG(NOBUG_ON)); ECHO("thread 1 kind=%s", lumiera_threadclass_names[t1->kind]); CHECK(LUMIERA_THREADCLASS_INTERACTIVE == t1->kind); @@ -54,16 +91,17 @@ TEST ("basic-acquire-release") CHECK(LUMIERA_THREADSTATE_IDLE == t2->state); ECHO("releasing thread 1"); - lumiera_threadpool_release_thread(t1); + //lumiera_threadpool_release_thread(t1); ECHO("thread 1 has been released"); ECHO("releasing thread 2"); - lumiera_threadpool_release_thread(t2); + //lumiera_threadpool_release_thread(t2); ECHO("thread 2 has been released"); lumiera_threadpool_destroy(); } +#if 0 TEST ("many-acquire-release") { @@ -79,7 +117,7 @@ TEST ("many-acquire-release") threads[i+kind*threads_per_pool_count] = lumiera_threadpool_acquire_thread(kind, "test purpose", - NULL); + &NOBUG_FLAG(NOBUG_ON)); } } @@ -107,7 +145,7 @@ TEST ("toomany-acquire-release") threads[i+kind*threads_per_pool_count] = lumiera_threadpool_acquire_thread(kind, "test purpose", - NULL); + &NOBUG_FLAG(NOBUG_ON)); } } @@ -119,5 +157,48 @@ TEST ("toomany-acquire-release") lumiera_threadpool_destroy(); } +#endif + +TEST ("no-function") +{ + LumieraThread t; + + lumiera_threadpool_init(10); + + t = lumiera_thread_run (LUMIERA_THREADCLASS_INTERACTIVE, + NULL, + NULL, + "process my test function", + &NOBUG_FLAG(NOBUG_ON)); + + // cleanup + ECHO("wait 1 sec"); + usleep(1000000); + ECHO("finished waiting"); + lumiera_threadpool_destroy(); +} + +TEST ("process-function") +{ + // this is what the scheduler would do once it figures out what function a job needs to run + LumieraThread t; + int number = 440616; + + lumiera_threadpool_init(10); + + ECHO ("the input to the function is %d", number); + + t = lumiera_thread_run (LUMIERA_THREADCLASS_INTERACTIVE, + &is_prime, + (void *)&number, //void * arg, + "process my test function", + &NOBUG_FLAG(NOBUG_ON)); // struct nobug_flag* flag) + + // cleanup + ECHO("wait 1 sec"); + usleep(1000000); + ECHO("finished waiting"); + lumiera_threadpool_destroy(); +} TESTS_END diff --git a/tests/backend/test-threads.c b/tests/backend/test-threads.c index 76da2f450..ddf43214e 100644 --- a/tests/backend/test-threads.c +++ b/tests/backend/test-threads.c @@ -47,15 +47,15 @@ void threadfn(void* blah) void threadsyncfn(void* blah) { struct timespec wait = {0,200000000}; - LumieraReccondition sync = (LumieraReccondition) blah; + LumieraCondition sync = (LumieraCondition) blah; ECHO ("thread starting up %s", NOBUG_THREAD_ID_GET); - LUMIERA_RECCONDITION_SECTION(cond_sync, sync) + LUMIERA_CONDITION_SECTION(cond_sync, sync) { ECHO ("send startup signal %s", NOBUG_THREAD_ID_GET); - LUMIERA_RECCONDITION_SIGNAL; + LUMIERA_CONDITION_SIGNAL; ECHO ("wait for trigger %s", NOBUG_THREAD_ID_GET); - LUMIERA_RECCONDITION_WAIT(1); + LUMIERA_CONDITION_WAIT(1); } ECHO ("thread running %s", NOBUG_THREAD_ID_GET); @@ -91,7 +91,6 @@ TEST ("simple_thread") lumiera_thread_run (LUMIERA_THREADCLASS_WORKER, threadfn, NULL, - NULL, argv[1], NULL); @@ -102,32 +101,31 @@ TEST ("simple_thread") TEST ("thread_synced") { - lumiera_reccondition cnd; - lumiera_reccondition_init (&cnd, "threadsync", &NOBUG_FLAG(NOBUG_ON)); + lumiera_condition cnd; + lumiera_condition_init (&cnd, "threadsync", &NOBUG_FLAG(NOBUG_ON)); - LUMIERA_RECCONDITION_SECTION(cond_sync, &cnd) + LUMIERA_CONDITION_SECTION(cond_sync, &cnd) { ECHO ("main before thread %s", NOBUG_THREAD_ID_GET); lumiera_thread_run (LUMIERA_THREADCLASS_WORKER, threadsyncfn, &cnd, - &cnd, argv[1], NULL); ECHO ("main wait for thread being ready %s", NOBUG_THREAD_ID_GET); - LUMIERA_RECCONDITION_WAIT(1); + LUMIERA_CONDITION_WAIT(1); ECHO ("main trigger thread %s", NOBUG_THREAD_ID_GET); - LUMIERA_RECCONDITION_SIGNAL; + LUMIERA_CONDITION_SIGNAL; ECHO ("wait for thread end %s", NOBUG_THREAD_ID_GET); - LUMIERA_RECCONDITION_WAIT(1); + LUMIERA_CONDITION_WAIT(1); ECHO ("thread ended %s", NOBUG_THREAD_ID_GET); } - lumiera_reccondition_destroy (&cnd, &NOBUG_FLAG(NOBUG_ON)); + lumiera_condition_destroy (&cnd, &NOBUG_FLAG(NOBUG_ON)); } @@ -143,7 +141,6 @@ TEST ("mutex_thread") lumiera_thread_run (LUMIERA_THREADCLASS_WORKER, mutexfn, NULL, - NULL, argv[1], NULL); diff --git a/tests/test.sh b/tests/test.sh index 951d2fcea..b1fd0d99b 100755 --- a/tests/test.sh +++ b/tests/test.sh @@ -26,7 +26,7 @@ # stop testing on the first failure export LC_ALL=C -NOBUG_LOGREGEX='^\(\*\*[0-9]*\*\* \)\?[0-9]\{10,\}: \(TRACE\|INFO\|NOTICE\|WARNING\|ERR\|TODO\|PLANNED\|FIXME\|DEPRECATED\|UNIMPLEMENTED\):' +NOBUG_LOGREGEX='^\(\*\*[0-9]*\*\* \)\?[0-9]\{10,\}: \(TRACE\|INFO\|NOTICE\|WARNING\|ERR\|TODO\|PLANNED\|FIXME\|DEPRECATED\|UNIMPLEMENTED\|RESOURCE_ENTER\|RESOURCE_LEAVE\|RESOURCE_STATE\):' arg0="$0" srcdir="$(dirname "$arg0")"