diff --git a/src/backend/threadpool.c b/src/backend/threadpool.c index ea88b4503..9de1e6d81 100644 --- a/src/backend/threadpool.c +++ b/src/backend/threadpool.c @@ -39,6 +39,7 @@ static lumiera_threadpool threadpool; */ NOBUG_DEFINE_FLAG_PARENT (threadpool, threads_dbg); /*TODO insert a suitable/better parent flag here */ +LUMIERA_ERROR_DEFINE(THREADPOOL_OFFLINE, "tried to acquire thread while threadpool is not available"); //code goes here// @@ -56,6 +57,7 @@ lumiera_threadpool_init(void) llist_init (&threadpool.pool[i].idle_list); threadpool.pool[i].thread_count = 0; threadpool.pool[i].idle_thread_count = 0; + threadpool.pool[i].status = LUMIERA_THREADPOOL_ONLINE; //TODO: configure each pools' pthread_attrs appropriately pthread_attr_init (&threadpool.pool[i].pthread_attrs); @@ -70,9 +72,27 @@ lumiera_threadpool_destroy(void) { TRACE (threadpool); + /* set all threadpools offline must be done first, since running threads may attempt to start new ones */ + for (int i = 0; i < LUMIERA_THREADCLASS_COUNT; ++i) + LUMIERA_CONDITION_SECTION (threadpool, &threadpool.pool[i].sync) + threadpool.pool[i].status = LUMIERA_THREADPOOL_OFFLINE; + + /* wait that all theads have finished */ + for (int i = 0; i < LUMIERA_THREADCLASS_COUNT; ++i) + { + LUMIERA_CONDITION_SECTION (threadpool, &threadpool.pool[i].sync) + { + TODO ("check threads deadlines, kill them when they are stalled"); + TODO ("for threads without deadline use a timeout from config system, 500ms or so by default"); + LUMIERA_CONDITION_WAIT(llist_is_empty (&threadpool.pool[i].working_list)); + } + } + + /* now we can delete all threads */ for (int i = 0; i < LUMIERA_THREADCLASS_COUNT; ++i) { TRACE (threadpool, "destroying individual pool #%d", i); + LUMIERA_CONDITION_SECTION (threadpool, &threadpool.pool[i].sync) { REQUIRE (threadpool.pool[i].thread_count @@ -100,11 +120,14 @@ lumiera_threadpool_destroy(void) } } - +/** + * + * @return thread handle or NULL on error (lumiera error will be set) + */ LumieraThread -lumiera_threadpool_acquire_thread(enum lumiera_thread_class kind, - const char* purpose, - struct nobug_flag* flag) +lumiera_threadpool_acquire_thread (enum lumiera_thread_class kind, + const char* purpose, + struct nobug_flag* flag) { TRACE (threadpool); LumieraThread ret = NULL; @@ -113,34 +136,39 @@ lumiera_threadpool_acquire_thread(enum lumiera_thread_class kind, LUMIERA_CONDITION_SECTION (threadpool, &threadpool.pool[kind].sync) { - if (llist_is_empty (&threadpool.pool[kind].idle_list)) + if (threadpool.pool[kind].status != LUMIERA_THREADPOOL_ONLINE) + LUMIERA_ERROR_SET_WARNING (threadpool, THREADPOOL_OFFLINE, purpose); + else { - ret = lumiera_thread_new (kind, purpose, flag, - &threadpool.pool[kind].pthread_attrs); - ENSURE (ret, "did not create a valid thread"); - TODO ("no error handing, let the resourcecollector do it, no need when returning the thread"); - threadpool.pool[kind].thread_count++; - LUMIERA_CONDITION_WAIT (!llist_is_empty (&threadpool.pool[kind].idle_list)); + if (llist_is_empty (&threadpool.pool[kind].idle_list)) + { + ret = lumiera_thread_new (kind, purpose, flag, + &threadpool.pool[kind].pthread_attrs); + ENSURE (ret, "did not create a valid thread"); + TODO ("no error handing, let the resourcecollector do it, no need when returning the thread"); + threadpool.pool[kind].thread_count++; + LUMIERA_CONDITION_WAIT (!llist_is_empty (&threadpool.pool[kind].idle_list)); + } + // use an existing thread, pick the first one + // remove it from the pool's list + ret = (LumieraThread) (llist_head (&threadpool.pool[kind].idle_list)); + + ENSURE (ret, "did not find a valid thread"); + + REQUIRE (ret->state == LUMIERA_THREADSTATE_IDLE, "trying to return a non-idle thread (state=%s)", lumiera_threadstate_names[ret->state]); + + // move thread to the working_list + llist_insert_head (&threadpool.pool[kind].working_list, &ret->node); + + threadpool.pool[kind].idle_thread_count--; // cheaper than using llist_count + REQUIRE ((int)(llist_count (&threadpool.pool[kind].working_list) + + llist_count (&threadpool.pool[kind].idle_list)) + == threadpool.pool[kind].thread_count, + "threadpool counter miscalculation (working_list count = %u, idle_list count = %u, thread_count = %d )", + llist_count (&threadpool.pool[kind].working_list), + llist_count (&threadpool.pool[kind].idle_list), + threadpool.pool[kind].thread_count); } - // use an existing thread, pick the first one - // remove it from the pool's list - ret = (LumieraThread) (llist_head (&threadpool.pool[kind].idle_list)); - - ENSURE (ret, "did not find a valid thread"); - - REQUIRE (ret->state == LUMIERA_THREADSTATE_IDLE, "trying to return a non-idle thread (state=%s)", lumiera_threadstate_names[ret->state]); - - // move thread to the working_list - llist_insert_head (&threadpool.pool[kind].working_list, &ret->node); - - threadpool.pool[kind].idle_thread_count--; // cheaper than using llist_count - REQUIRE ((int)(llist_count (&threadpool.pool[kind].working_list) - + llist_count (&threadpool.pool[kind].idle_list)) - == threadpool.pool[kind].thread_count, - "threadpool counter miscalculation (working_list count = %u, idle_list count = %u, thread_count = %d )", - llist_count (&threadpool.pool[kind].working_list), - llist_count (&threadpool.pool[kind].idle_list), - threadpool.pool[kind].thread_count); } return ret; } diff --git a/src/backend/threadpool.h b/src/backend/threadpool.h index 47d90e6fc..5a3455d39 100644 --- a/src/backend/threadpool.h +++ b/src/backend/threadpool.h @@ -64,6 +64,11 @@ lumiera_threadpool_release_thread(LumieraThread thread); typedef struct lumiera_threadpool_struct lumiera_threadpool; typedef lumiera_threadpool* LumieraThreadpool; +enum lumiera_threadpool_state { + LUMIERA_THREADPOOL_OFFLINE, + LUMIERA_THREADPOOL_ONLINE +}; + struct lumiera_threadpool_struct { struct @@ -74,6 +79,7 @@ struct lumiera_threadpool_struct int idle_thread_count; pthread_attr_t pthread_attrs; lumiera_condition sync; + enum lumiera_threadpool_state status; } pool[LUMIERA_THREADCLASS_COUNT]; }; diff --git a/tests/backend/test-threadpool.c b/tests/backend/test-threadpool.c index 2acb8b861..952bfac2b 100644 --- a/tests/backend/test-threadpool.c +++ b/tests/backend/test-threadpool.c @@ -172,8 +172,6 @@ TEST ("no-function") &NOBUG_FLAG(NOBUG_ON)); // cleanup - ECHO("wait 1 sec"); - usleep(1000000); ECHO("finished waiting"); lumiera_threadpool_destroy(); } @@ -195,8 +193,6 @@ TEST ("process-function") &NOBUG_FLAG(NOBUG_ON)); // struct nobug_flag* flag) // cleanup - ECHO("wait 1 sec"); - usleep(1000000); ECHO("finished waiting"); lumiera_threadpool_destroy(); }