diff --git a/src/backend/threadpool.c b/src/backend/threadpool.c index fc6041b5f..6a4f90289 100644 --- a/src/backend/threadpool.c +++ b/src/backend/threadpool.c @@ -45,6 +45,9 @@ NOBUG_DEFINE_FLAG_PARENT (threadpool, threads_dbg); /*TODO insert a suitable/bet void lumiera_threadpool_init(void) { + NOBUG_INIT_FLAG (threadpool); + TRACE (threadpool); + for (int i = 0; i < LUMIERA_THREADCLASS_COUNT; ++i) { llist_init (&threadpool.pool[i].list); @@ -55,24 +58,28 @@ lumiera_threadpool_init(void) pthread_attr_init (&threadpool.pool[i].pthread_attrs); //cancel... - lumiera_mutex_init (&threadpool.pool[i].lock,"pool of threads", &NOBUG_FLAG (threadpool)); + lumiera_condition_init (&threadpool.pool[i].sync,"pool of threads", &NOBUG_FLAG (threadpool)); } } void lumiera_threadpool_destroy(void) { - ECHO ("destroying threadpool"); + TRACE (threadpool); for (int i = 0; i < LUMIERA_THREADCLASS_COUNT; ++i) { - ECHO ("destroying individual pool #%d", i); - // no locking is done at this point - ECHO ("number of threads in the pool=%d", llist_count (&threadpool.pool[i].list)); - LLIST_WHILE_HEAD (&threadpool.pool[i].list, thread) - lumiera_thread_delete ((LumieraThread)thread); - ECHO ("destroying the pool mutex"); - lumiera_mutex_destroy (&threadpool.pool[i].lock, &NOBUG_FLAG (threadpool)); - ECHO ("pool mutex destroyed"); + TRACE (threadpool, "destroying individual pool #%d", i); + LUMIERA_CONDITION_SECTION (threadpool, &threadpool.pool[i].sync) + { + REQUIRE (0 == threadpool.pool[i].working_thread_count, "%d threads are still running", threadpool.pool[i].working_thread_count); + // TODO need to have a stronger assertion that no threads are really running because they will not even be in the list + INFO (threadpool, "number of threads in the pool=%d", llist_count(&threadpool.pool[i].list)); + LLIST_WHILE_HEAD (&threadpool.pool[i].list, t) + { + lumiera_thread_delete ((LumieraThread)t); + } + } + lumiera_condition_destroy (&threadpool.pool[i].sync, &NOBUG_FLAG (threadpool)); pthread_attr_destroy (&threadpool.pool[i].pthread_attrs); } } @@ -82,55 +89,61 @@ lumiera_threadpool_acquire_thread(enum lumiera_thread_class kind, const char* purpose, struct nobug_flag* flag) { - LumieraThread ret; + TRACE (threadpool); + LumieraThread ret = NULL; REQUIRE (kind < LUMIERA_THREADCLASS_COUNT, "unknown pool kind specified: %d", kind); - if (llist_is_empty (&threadpool.pool[kind].list)) + + LUMIERA_CONDITION_SECTION (threadpool, &threadpool.pool[kind].sync) { - // TODO: fill in the reccondition argument, currently NULL - ret = lumiera_thread_new (kind, NULL, purpose, flag, - &threadpool.pool[kind].pthread_attrs); + if (llist_is_empty (&threadpool.pool[kind].list)) + { + ret = lumiera_thread_new (kind, purpose, flag, + &threadpool.pool[kind].pthread_attrs); + ENSURE (ret, "did not create a valid thread"); + TODO ("no error handing, let the resourcecollector do it, no need when returning the thread"); + LUMIERA_CONDITION_WAIT (!llist_is_empty (&threadpool.pool[kind].list)); + } + // use an existing thread, pick the first one + // remove it from the pool's list + ret = (LumieraThread)(llist_unlink (llist_head (&threadpool.pool[kind].list))); + REQUIRE (ret->state == LUMIERA_THREADSTATE_IDLE, "trying to return a non-idle thread (state=%s)", lumiera_threadstate_names[ret->state]); threadpool.pool[kind].working_thread_count++; - ENSURE (ret, "did not create a valid thread"); + threadpool.pool[kind].idle_thread_count--; // cheaper than using llist_count + ENSURE (threadpool.pool[kind].idle_thread_count == + llist_count (&threadpool.pool[kind].list), + "idle thread count %d is wrong, should be %d", + threadpool.pool[kind].idle_thread_count, + llist_count (&threadpool.pool[kind].list)); + ENSURE (ret, "did not find a valid thread"); } - else - { - // use an existing thread, pick the first one - // remove it from the pool's list - LUMIERA_MUTEX_SECTION (threadpool, &threadpool.pool[kind].lock) - { - ret = (LumieraThread)(llist_unlink (llist_head (&threadpool.pool[kind].list))); - threadpool.pool[kind].working_thread_count++; - threadpool.pool[kind].idle_thread_count--; // cheaper than using llist_count - ENSURE (threadpool.pool[kind].idle_thread_count == - llist_count (&threadpool.pool[kind].list), - "idle thread count %d is wrong, should be %d", - threadpool.pool[kind].idle_thread_count, - llist_count (&threadpool.pool[kind].list)); - } - ENSURE (ret, "did not find a valid thread"); - } return ret; } void lumiera_threadpool_release_thread(LumieraThread thread) { + TRACE (threadpool); REQUIRE (thread, "invalid thread given"); REQUIRE (thread->kind < LUMIERA_THREADCLASS_COUNT, "thread belongs to an unknown pool kind: %d", thread->kind); - LUMIERA_MUTEX_SECTION (threadpool, &threadpool.pool[thread->kind].lock) + REQUIRE (thread->state != LUMIERA_THREADSTATE_IDLE, "trying to park an already idle thread"); + LUMIERA_CONDITION_SECTION (threadpool, &threadpool.pool[thread->kind].sync) { - REQUIRE (llist_is_single (&thread->node), "thread already belongs to some list"); + thread->state = LUMIERA_THREADSTATE_IDLE; + REQUIRE (llist_is_empty (&thread->node), "thread already belongs to some list"); llist_insert_head (&threadpool.pool[thread->kind].list, &thread->node); - threadpool.pool[thread->kind].working_thread_count--; - threadpool.pool[thread->kind].idle_thread_count++; // cheaper than using llist_count - ENSURE (threadpool.pool[thread->kind].idle_thread_count == - llist_count (&threadpool.pool[thread->kind].list), - "idle thread count %d is wrong, should be %d", - threadpool.pool[thread->kind].idle_thread_count, - llist_count (&threadpool.pool[thread->kind].list)); - // REQUIRE (!llist_is_empty (&threadpool.pool[thread->kind].list), "thread pool is still empty after insertion"); + + threadpool.pool[thread->kind].working_thread_count--; + threadpool.pool[thread->kind].idle_thread_count++; // cheaper than using llist_count + + ENSURE (threadpool.pool[thread->kind].idle_thread_count == + llist_count (&threadpool.pool[thread->kind].list), + "idle thread count %d is wrong, should be %d", + threadpool.pool[thread->kind].idle_thread_count, + llist_count (&threadpool.pool[thread->kind].list)); + REQUIRE (!llist_is_empty (&threadpool.pool[thread->kind].list), "thread pool is still empty after insertion"); + LUMIERA_CONDITION_BROADCAST; } } diff --git a/src/backend/threadpool.h b/src/backend/threadpool.h index 0c56de62d..a14c87b50 100644 --- a/src/backend/threadpool.h +++ b/src/backend/threadpool.h @@ -23,9 +23,8 @@ #define LUMIERA_THREADPOOL_H //TODO: Support library includes// -#include "lib/reccondition.h" +#include "lib/condition.h" #include "lib/llist.h" -#include "lib/mutex.h" //TODO: Forward declarations// @@ -70,7 +69,7 @@ struct lumiera_threadpool_struct struct { llist list; - lumiera_mutex lock; + lumiera_condition sync; unsigned working_thread_count; unsigned idle_thread_count; pthread_attr_t pthread_attrs;