diff --git a/src/backend/thread-wrapper.hpp b/src/backend/thread-wrapper.hpp index 3f897b582..f8ec2bc87 100644 --- a/src/backend/thread-wrapper.hpp +++ b/src/backend/thread-wrapper.hpp @@ -177,10 +177,10 @@ namespace backend { lumiera_thread_run ( kind , &run // invoking the run helper and.. , this // passing this start context as parameter - , joinCond // maybe wait-blocking for the thread to terminate , purpose.c() , logging_flag ); + (void)joinCond; if (!res) throw lumiera::error::State("failed to create new thread."); diff --git a/src/backend/threadpool.c b/src/backend/threadpool.c index 66ecd4e33..2a33978d6 100644 --- a/src/backend/threadpool.c +++ b/src/backend/threadpool.c @@ -53,21 +53,22 @@ void* pool_thread_loop(void * arg) } void -lumiera_threadpool_init(unsigned limit) +lumiera_threadpool_init() { for (int i = 0; i < LUMIERA_THREADCLASS_COUNT; ++i) { llist_init(&threadpool.pool[i].list); - threadpool.pool[i].max_threads = limit; - threadpool.pool[i].working_thread_count = 0; + threadpool.pool[i].working_thread_count = 0; threadpool.pool[i].idle_thread_count = 0; //TODO: configure each pools' pthread_attrs appropriately pthread_attr_init (&threadpool.pool[i].pthread_attrs); - pthread_attr_setdetachstate (&threadpool.pool[i].pthread_attrs, PTHREAD_CREATE_DETACHED); + // cehteh prefers that threads are joinable by default + //pthread_attr_setdetachstate (&threadpool.pool[i].pthread_attrs, PTHREAD_CREATE_DETACHED); //cancel... lumiera_mutex_init(&threadpool.pool[i].lock,"pool of threads", &NOBUG_FLAG(threadpool)); + lumiera_reccondition_init (&threadpool.pool[i].signal, "thread-signal", &NOBUG_FLAG(threadpool)); } } @@ -78,10 +79,16 @@ lumiera_threadpool_destroy(void) for (int i = 0; i < LUMIERA_THREADCLASS_COUNT; ++i) { ECHO ("destroying individual pool #%d", i); - // no locking is done at this point - ECHO ("number of threads in the pool=%d", llist_count(&threadpool.pool[i].list)); - LLIST_WHILE_HEAD(&threadpool.pool[i].list, thread) - lumiera_thread_delete((LumieraThread)thread); + LUMIERA_MUTEX_SECTION (threadpool, &threadpool.pool[i].lock) + { + REQUIRE (0 == threadpool.pool[i].working_thread_count, "%d threads are running", threadpool.pool[i].working_thread_count); + // TODO need to have a stronger assertion that no threads are really running because they will not even be in the list + ECHO ("number of threads in the pool=%d", llist_count(&threadpool.pool[i].list)); + LLIST_WHILE_HEAD(&threadpool.pool[i].list, t) + { + lumiera_thread_delete((LumieraThread)t); + } + } ECHO ("destroying the pool mutex"); lumiera_mutex_destroy (&threadpool.pool[i].lock, &NOBUG_FLAG (threadpool)); ECHO ("pool mutex destroyed"); @@ -89,6 +96,15 @@ lumiera_threadpool_destroy(void) } } +void lumiera_threadpool_unlink(LumieraThread thread) +{ + REQUIRE (thread, "invalid thread given"); + REQUIRE (thread->kind < LUMIERA_THREADCLASS_COUNT, "thread belongs to an unknown pool kind: %d", thread->kind); + llist_unlink(&thread->node); + ENSURE (llist_is_empty(&thread->node), "failed to unlink the thread"); +} + + LumieraThread lumiera_threadpool_acquire_thread(enum lumiera_thread_class kind, const char* purpose, @@ -97,52 +113,45 @@ lumiera_threadpool_acquire_thread(enum lumiera_thread_class kind, LumieraThread ret; REQUIRE (kind < LUMIERA_THREADCLASS_COUNT, "unknown pool kind specified: %d", kind); - if (llist_is_empty (&threadpool.pool[kind].list)) - { - // TODO: fill in the reccondition argument, currently NULL - FIXME ("this max thread logic needs to be deeply thought about and made more efficient as well as rebust"); - if (threadpool.pool[kind].working_thread_count - + threadpool.pool[kind].idle_thread_count - < threadpool.pool[kind].max_threads) { - ret = lumiera_thread_new (kind, NULL, purpose, flag, + LUMIERA_RECCONDITION_SECTION (threadpool, &threadpool.pool[kind].signal) + { + if (llist_is_empty (&threadpool.pool[kind].list)) + { + + ret = lumiera_thread_new (kind, purpose, flag, &threadpool.pool[kind].pthread_attrs); - threadpool.pool[kind].working_thread_count++; + threadpool.pool[kind].idle_thread_count++; ENSURE (ret, "did not create a valid thread"); + LUMIERA_RECCONDITION_WAIT (!llist_is_empty (&threadpool.pool[kind].list)); } - else - { - //ERROR (threadpool, "did not create a new thread because per-pool limit was reached: %d", threadpool.pool[kind].max_threads); - LUMIERA_DIE(ERRNO); - } - } - else - { - // use an existing thread, pick the first one - // remove it from the pool's list - LUMIERA_MUTEX_SECTION (threadpool, &threadpool.pool[kind].lock) - { - ret = (LumieraThread)(llist_unlink(llist_head (&threadpool.pool[kind].list))); - threadpool.pool[kind].working_thread_count++; - threadpool.pool[kind].idle_thread_count--; // cheaper than using llist_count - ENSURE (threadpool.pool[kind].idle_thread_count == - llist_count(&threadpool.pool[kind].list), - "idle thread count %d is wrong, should be %d", - threadpool.pool[kind].idle_thread_count, - llist_count(&threadpool.pool[kind].list)); - } - ENSURE (ret, "did not find a valid thread"); - } - return ret; + // use an existing thread, pick the first one + // remove it from the pool's list + ret = (LumieraThread)(llist_unlink(llist_head (&threadpool.pool[kind].list))); + REQUIRE (ret->state == LUMIERA_THREADSTATE_IDLE, "trying to return a non-idle thread (state=%s)", lumiera_threadstate_names[ret->state]); + threadpool.pool[kind].working_thread_count++; + threadpool.pool[kind].idle_thread_count--; // cheaper than using llist_count + ENSURE (threadpool.pool[kind].idle_thread_count == + llist_count(&threadpool.pool[kind].list), + "idle thread count %d is wrong, should be %d", + threadpool.pool[kind].idle_thread_count, + llist_count(&threadpool.pool[kind].list)); + ENSURE (ret, "did not find a valid thread"); + } + return ret; } +// TODO: rename to lumiera_threadpool_park_thread void -lumiera_threadpool_release_thread(LumieraThread thread) +lumiera_threadpool_park_thread(LumieraThread thread) { REQUIRE (thread, "invalid thread given"); REQUIRE (thread->kind < LUMIERA_THREADCLASS_COUNT, "thread belongs to an unknown pool kind: %d", thread->kind); - LUMIERA_MUTEX_SECTION (threadpool, &threadpool.pool[thread->kind].lock) + REQUIRE (thread->state != LUMIERA_THREADSTATE_IDLE, "trying to park an already idle thread"); + + LUMIERA_RECCONDITION_SECTION (threadpool, &threadpool.pool[thread->kind].signal) { + thread->state = LUMIERA_THREADSTATE_IDLE; REQUIRE (llist_is_single(&thread->node), "thread already belongs to some list"); llist_insert_head(&threadpool.pool[thread->kind].list, &thread->node); threadpool.pool[thread->kind].working_thread_count--; @@ -153,6 +162,7 @@ lumiera_threadpool_release_thread(LumieraThread thread) threadpool.pool[thread->kind].idle_thread_count, llist_count(&threadpool.pool[thread->kind].list)); // REQUIRE (!llist_is_empty (&threadpool.pool[thread->kind].list), "thread pool is still empty after insertion"); + LUMIERA_RECCONDITION_BROADCAST; } } diff --git a/src/backend/threadpool.h b/src/backend/threadpool.h index 03b56a4fe..de545876c 100644 --- a/src/backend/threadpool.h +++ b/src/backend/threadpool.h @@ -55,12 +55,12 @@ lumiera_threadpool_acquire_thread(enum lumiera_thread_class kind, struct nobug_flag* flag); /** - * Release a thread - * This ends up putting a (parked/idle) thread back on the list of an appropriate threadpool. + * Park a thread + * This ends up putting a finished thread back on the list of an appropriate threadpool. * This function doesn't need to be accessible outside of the threadpool implementation. */ void -lumiera_threadpool_release_thread(LumieraThread thread); +lumiera_threadpool_park_thread(LumieraThread thread); typedef struct lumiera_threadpool_struct lumiera_threadpool; typedef lumiera_threadpool* LumieraThreadpool; @@ -71,23 +71,28 @@ struct lumiera_threadpool_struct { llist list; lumiera_mutex lock; - unsigned max_threads; unsigned working_thread_count; unsigned idle_thread_count; pthread_attr_t pthread_attrs; + lumiera_reccondition signal; } pool[LUMIERA_THREADCLASS_COUNT]; }; /** * Initialize the thread pool. - * @param limit the maximum number of threads (idle+working) allowed per pool */ void -lumiera_threadpool_init(unsigned limit); +lumiera_threadpool_init(); void lumiera_threadpool_destroy(void); +/** + * Just remove the thread structure from an associated pool list. + */ +void +lumiera_threadpool_unlink(LumieraThread thread); + #endif /* // Local Variables: diff --git a/src/backend/threads.c b/src/backend/threads.c index 0520de288..a75b7f81b 100644 --- a/src/backend/threads.c +++ b/src/backend/threads.c @@ -67,9 +67,29 @@ struct lumiera_thread_mockup LumieraReccondition finished; }; -static void* thread_loop (void* arg) +static void* thread_loop (void* thread) { - (void)arg; + LumieraThread t = (LumieraThread)thread; + + pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, NULL); + + REQUIRE (t, "thread does not exist"); + + ECHO ("entering section 1"); + // this seems to deadlock unexpectedly: + LUMIERA_RECCONDITION_SECTION (threads, &t->signal) + { + do { + // NULL function means: no work to do + if (t->function) + t->function (t->arguments); + lumiera_threadpool_park_thread(t); + LUMIERA_RECCONDITION_WAIT(t->state != LUMIERA_THREADSTATE_IDLE); + } while (t->state != LUMIERA_THREADSTATE_SHUTDOWN); + // SHUTDOWN state + + ECHO ("thread quitting"); + } return 0; } @@ -79,22 +99,23 @@ LumieraThread lumiera_thread_run (enum lumiera_thread_class kind, void (*function)(void *), void * arg, - LumieraReccondition finished, const char* purpose, struct nobug_flag* flag) { - (void)finished; - (void)function; - (void)arg; + REQUIRE (function, "invalid function"); + // ask the threadpool for a thread (it might create a new one) LumieraThread self = lumiera_threadpool_acquire_thread(kind, purpose, flag); - // TODO: set the function and data to be run - // lumiera_thread_set_func_data (self, start_routine, arg, purpose, flag); + // set the function and data to be run + self->function = function; + self->arguments = arg; // and let it really run (signal the condition var, the thread waits on it) - LUMIERA_RECCONDITION_SECTION(cond_sync, self->finished) - LUMIERA_RECCONDITION_SIGNAL; + self->state = LUMIERA_THREADSTATE_WAKEUP; + ECHO ("entering section 2"); + LUMIERA_RECCONDITION_SECTION(threads, &self->signal) + LUMIERA_RECCONDITION_BROADCAST; // NOTE: example only, add solid error handling! @@ -106,29 +127,25 @@ lumiera_thread_run (enum lumiera_thread_class kind, */ LumieraThread lumiera_thread_new (enum lumiera_thread_class kind, - LumieraReccondition finished, const char* purpose, struct nobug_flag* flag, pthread_attr_t* attrs) { // TODO: do something with these: (void) purpose; - (void) flag; REQUIRE (kind < LUMIERA_THREADCLASS_COUNT, "invalid thread kind specified: %d", kind); REQUIRE (attrs, "invalid pthread attributes structure passed"); - //REQUIRE (finished, "invalid finished flag passed"); - - LumieraThread self = lumiera_malloc (sizeof (*self)); llist_init(&self->node); - self->finished = finished; + lumiera_reccondition_init (&self->signal, "thread-control", flag); self->kind = kind; - self->state = LUMIERA_THREADSTATE_IDLE; + self->state = LUMIERA_THREADSTATE_STARTUP; + self->function = NULL; + self->arguments = NULL; - //REQUIRE (thread_loop); int error = pthread_create (&self->id, attrs, &thread_loop, self); - ENSURE(error == 0 || EAGAIN == error, "pthread returned %d:%s", error, strerror(error)); + ENSURE(error == 0 || EAGAIN == error, "pthread_create returned %d:%s", error, strerror(error)); if (error) { // error here can only be EAGAIN, given the above ENSURE @@ -143,12 +160,26 @@ lumiera_thread_destroy (LumieraThread self) { REQUIRE (self, "trying to destroy an invalid thread"); - // TODO: stop the pthread - llist_unlink(&self->node); - //finished = NULL; // or free(finished)? - lumiera_reccondition_destroy (self->finished, &NOBUG_FLAG(threads)); - //kind = 0; - //state = 0; + lumiera_threadpool_unlink(self); + + // get the pthread out of the processing loop + // need to signal to the thread that it should start quitting + // should this be within the section? + LUMIERA_RECCONDITION_SECTION(threads, &self->signal) + { + REQUIRE (self->state == LUMIERA_THREADSTATE_IDLE, "trying to delete a thread in state other than IDLE (%s)", lumiera_threadstate_names[self->state]); + self->state = LUMIERA_THREADSTATE_SHUTDOWN; + self->function = NULL; + self->arguments = NULL; + LUMIERA_RECCONDITION_SIGNAL; + } + + int error = pthread_join(self->id, NULL); + ENSURE (0 == error, "pthread_join returned %d:%s", error, strerror(error)); + + // condition has to be destroyed after joining with the thread + lumiera_reccondition_destroy (&self->signal, &NOBUG_FLAG(threads)); + return self; } diff --git a/src/backend/threads.h b/src/backend/threads.h index ac8a163fd..a55926123 100644 --- a/src/backend/threads.h +++ b/src/backend/threads.h @@ -90,10 +90,16 @@ enum lumiera_thread_class // defined in threads.c extern const char* lumiera_threadclass_names[]; -#define LUMIERA_THREAD_STATES \ - LUMIERA_THREAD_STATE(IDLE) \ - LUMIERA_THREAD_STATE(RUNNING) \ - LUMIERA_THREAD_STATE(ERROR) +// there is some confusion between the meaning of this +// on one hand it could be used to tell the current state of the thread +// on the other, it is used to tell the thread which state to enter on next iteration +#define LUMIERA_THREAD_STATES \ + LUMIERA_THREAD_STATE(IDLE) \ + LUMIERA_THREAD_STATE(ERROR) \ + LUMIERA_THREAD_STATE(RUNNING) \ + LUMIERA_THREAD_STATE(WAKEUP) \ + LUMIERA_THREAD_STATE(SHUTDOWN) \ + LUMIERA_THREAD_STATE(STARTUP) #define LUMIERA_THREAD_STATE(name) LUMIERA_THREADSTATE_##name, @@ -125,12 +131,15 @@ struct lumiera_thread_struct // void* arg; pthread_t id; // TODO: maybe this condition variable should be renamed when we have a better understanding of how it will be used - LumieraReccondition finished; + lumiera_reccondition signal; // control signal, state change signal // the following member could have been called "class" except that it would conflict with C++ keyword // as consequence, it's been decided to leave the type name containing the word "class", // while all members/variables called "kind" enum lumiera_thread_class kind; + // this is used both as a command and as a state tracker lumiera_thread_state state; + void (*function)(void *); + void * arguments; }; /** @@ -138,7 +147,6 @@ struct lumiera_thread_struct */ LumieraThread lumiera_thread_new (enum lumiera_thread_class kind, - LumieraReccondition finished, const char* purpose, struct nobug_flag* flag, pthread_attr_t* attrs); @@ -162,8 +170,6 @@ lumiera_thread_delete (LumieraThread self); * @param kind class of the thread to start * @param function pointer to a function to execute in a thread (returning void, not void* as in pthreads) * @param arg generic pointer passed to the thread - * @param finished a condition variable to be broadcasted, if not NULL. - * The associated mutex should be locked at thread_run time already, else the signal can get lost. * @param purpose descriptive name of this thread, used by NoBug * @param flag NoBug flag used for logging the thread startup and return */ @@ -171,7 +177,6 @@ LumieraThread lumiera_thread_run (enum lumiera_thread_class kind, void (*function)(void *), void * arg, - LumieraReccondition finished, const char* purpose, struct nobug_flag* flag); diff --git a/tests/30backend-threadpool.tests b/tests/30backend-threadpool.tests index c136aff42..2cad43477 100644 --- a/tests/30backend-threadpool.tests +++ b/tests/30backend-threadpool.tests @@ -5,24 +5,11 @@ PLANNED "create" PLANNED "yield" PLANNED "cancel" -TEST "Acquire/Release test" basic-acquire-release < #include +#include + +void is_prime(void * arg) +{ + int number = *(int *)arg; + int prime = 1; + + for (int x = number; x >= sqrt(number); --x) + { + if (number % x == 0) + { + prime = 0; + break; + } + } + *(int *)arg = prime; +} TESTS_BEGIN +TEST ("threadpool-basic") +{ + lumiera_threadpool_init(100); + lumiera_threadpool_destroy(); +} + +TEST ("threadpool1") +{ + ECHO("start by initializing the threadpool"); + lumiera_threadpool_init(100); + LumieraThread t1 = + lumiera_threadpool_acquire_thread(LUMIERA_THREADCLASS_INTERACTIVE, + "test purpose", + &NOBUG_FLAG(NOBUG_ON)); + // lumiera_threadpool_release_thread(t1); + ECHO("acquired thread 1 %p",t1); + lumiera_threadpool_destroy(); +} + TEST ("basic-acquire-release") { @@ -37,12 +73,12 @@ TEST ("basic-acquire-release") LumieraThread t1 = lumiera_threadpool_acquire_thread(LUMIERA_THREADCLASS_INTERACTIVE, "test purpose", - NULL); + &NOBUG_FLAG(NOBUG_ON)); ECHO("acquiring thread 2"); LumieraThread t2 = lumiera_threadpool_acquire_thread(LUMIERA_THREADCLASS_IDLE, "test purpose", - NULL); + &NOBUG_FLAG(NOBUG_ON)); ECHO("thread 1 kind=%s", lumiera_threadclass_names[t1->kind]); CHECK(LUMIERA_THREADCLASS_INTERACTIVE == t1->kind); @@ -54,16 +90,17 @@ TEST ("basic-acquire-release") CHECK(LUMIERA_THREADSTATE_IDLE == t2->state); ECHO("releasing thread 1"); - lumiera_threadpool_release_thread(t1); + //lumiera_threadpool_release_thread(t1); ECHO("thread 1 has been released"); ECHO("releasing thread 2"); - lumiera_threadpool_release_thread(t2); + //lumiera_threadpool_release_thread(t2); ECHO("thread 2 has been released"); lumiera_threadpool_destroy(); } +#if 0 TEST ("many-acquire-release") { @@ -79,7 +116,7 @@ TEST ("many-acquire-release") threads[i+kind*threads_per_pool_count] = lumiera_threadpool_acquire_thread(kind, "test purpose", - NULL); + &NOBUG_FLAG(NOBUG_ON)); } } @@ -107,7 +144,7 @@ TEST ("toomany-acquire-release") threads[i+kind*threads_per_pool_count] = lumiera_threadpool_acquire_thread(kind, "test purpose", - NULL); + &NOBUG_FLAG(NOBUG_ON)); } } @@ -119,5 +156,26 @@ TEST ("toomany-acquire-release") lumiera_threadpool_destroy(); } +#endif + +TEST ("process-function") +{ + // this is what the scheduler would do once it figures out what function a job needs to run + LumieraThread t; + int number = 440616; + + lumiera_threadpool_init(10); + + ECHO ("the input to the function is %d", number); + + t = lumiera_thread_run (LUMIERA_THREADCLASS_INTERACTIVE, + &is_prime, + (void *)&number, //void * arg, + "process my test function", + &NOBUG_FLAG(NOBUG_ON)); // struct nobug_flag* flag) + + // cleanup + lumiera_threadpool_destroy(); +} TESTS_END diff --git a/tests/backend/test-threads.c b/tests/backend/test-threads.c index 76da2f450..21786ae8b 100644 --- a/tests/backend/test-threads.c +++ b/tests/backend/test-threads.c @@ -91,7 +91,6 @@ TEST ("simple_thread") lumiera_thread_run (LUMIERA_THREADCLASS_WORKER, threadfn, NULL, - NULL, argv[1], NULL); @@ -112,7 +111,6 @@ TEST ("thread_synced") lumiera_thread_run (LUMIERA_THREADCLASS_WORKER, threadsyncfn, &cnd, - &cnd, argv[1], NULL); @@ -143,7 +141,6 @@ TEST ("mutex_thread") lumiera_thread_run (LUMIERA_THREADCLASS_WORKER, mutexfn, NULL, - NULL, argv[1], NULL); diff --git a/tests/test.sh b/tests/test.sh index 951d2fcea..b1fd0d99b 100755 --- a/tests/test.sh +++ b/tests/test.sh @@ -26,7 +26,7 @@ # stop testing on the first failure export LC_ALL=C -NOBUG_LOGREGEX='^\(\*\*[0-9]*\*\* \)\?[0-9]\{10,\}: \(TRACE\|INFO\|NOTICE\|WARNING\|ERR\|TODO\|PLANNED\|FIXME\|DEPRECATED\|UNIMPLEMENTED\):' +NOBUG_LOGREGEX='^\(\*\*[0-9]*\*\* \)\?[0-9]\{10,\}: \(TRACE\|INFO\|NOTICE\|WARNING\|ERR\|TODO\|PLANNED\|FIXME\|DEPRECATED\|UNIMPLEMENTED\|RESOURCE_ENTER\|RESOURCE_LEAVE\|RESOURCE_STATE\):' arg0="$0" srcdir="$(dirname "$arg0")"