diff --git a/src/backend/threadpool.c b/src/backend/threadpool.c index 3dbfd0c92..c77225f78 100644 --- a/src/backend/threadpool.c +++ b/src/backend/threadpool.c @@ -53,11 +53,14 @@ void* pool_thread_loop(void * arg) } void -lumiera_threadpool_init(void) +lumiera_threadpool_init(unsigned limit) { for (int i = 0; i < LUMIERA_THREADCLASS_COUNT; ++i) { llist_init(&threadpool.kind[i].pool); + threadpool.kind[i].max_threads = limit; + threadpool.kind[i].working_thread_count = 0; + threadpool.kind[i].idle_thread_count = 0; lumiera_mutex_init(&threadpool.kind[i].lock,"pool of threads", &NOBUG_FLAG(threadpool)); } } @@ -91,17 +94,34 @@ lumiera_threadpool_acquire_thread(enum lumiera_thread_class kind, if (llist_is_empty (&threadpool.kind[kind].pool)) { // TODO: fill in the reccondition argument, currently NULL - ret = lumiera_thread_new (kind, NULL, purpose, flag); - ENSURE (ret, "did not create a valid thread"); + FIXME ("this max thread logic needs to be deeply thought about and made more efficient as well as rebust"); + if (threadpool.kind[kind].working_thread_count + + threadpool.kind[kind].idle_thread_count + < threadpool.kind[kind].max_threads) { + ret = lumiera_thread_new (kind, NULL, purpose, flag); + threadpool.kind[kind].working_thread_count++; + ENSURE (ret, "did not create a valid thread"); + } + else + { + //ERROR (threadpool, "did not create a new thread because per-pool limit was reached: %d", threadpool.kind[kind].max_threads); + LUMIERA_DIE(ERRNO); + } } else { - REQUIRE (&threadpool.kind[kind].lock, "invalid threadpool lock"); // use an existing thread, pick the first one // remove it from the pool's list LUMIERA_MUTEX_SECTION (threadpool, &threadpool.kind[kind].lock) { ret = (LumieraThread)(llist_unlink(llist_head (&threadpool.kind[kind].pool))); + threadpool.kind[kind].working_thread_count++; + threadpool.kind[kind].idle_thread_count--; // cheaper than using llist_count + ENSURE (threadpool.kind[kind].idle_thread_count == + llist_count(&threadpool.kind[kind].pool), + "idle thread count %d is wrong, should be %d", + threadpool.kind[kind].idle_thread_count, + llist_count(&threadpool.kind[kind].pool)); } ENSURE (ret, "did not find a valid thread"); } @@ -118,8 +138,15 @@ lumiera_threadpool_release_thread(LumieraThread thread) // TOOD: currently, locking produces memory leaks // LUMIERA_MUTEX_SECTION (threadpool, &threadpool.kind[thread->kind].lock) // { - //REQUIRE (llist_is_single(&thread->node), "thread already belongs to some list"); + REQUIRE (llist_is_single(&thread->node), "thread already belongs to some list"); llist_insert_head(&threadpool.kind[thread->kind].pool, &thread->node); + threadpool.kind[thread->kind].working_thread_count--; + threadpool.kind[thread->kind].idle_thread_count++; // cheaper than using llist_count + ENSURE (threadpool.kind[thread->kind].idle_thread_count == + llist_count(&threadpool.kind[thread->kind].pool), + "idle thread count %d is wrong, should be %d", + threadpool.kind[thread->kind].idle_thread_count, + llist_count(&threadpool.kind[thread->kind].pool)); // REQUIRE (!llist_is_empty (&threadpool.kind[thread->kind].pool), "thread pool is still empty after insertion"); // } } diff --git a/src/backend/threadpool.h b/src/backend/threadpool.h index 0d2a0257f..fbd45c8a4 100644 --- a/src/backend/threadpool.h +++ b/src/backend/threadpool.h @@ -71,14 +71,18 @@ struct lumiera_threadpool_struct { llist pool; lumiera_mutex lock; + unsigned max_threads; + unsigned working_thread_count; + unsigned idle_thread_count; } kind[LUMIERA_THREADCLASS_COUNT]; }; /** * Initialize the thread pool. + * @param limit the maximum number of threads (idle+working) allowed per pool */ void -lumiera_threadpool_init(void); +lumiera_threadpool_init(unsigned limit); void lumiera_threadpool_destroy(void); diff --git a/src/backend/threads.c b/src/backend/threads.c index 6cf4559be..f4277fd47 100644 --- a/src/backend/threads.c +++ b/src/backend/threads.c @@ -182,12 +182,11 @@ lumiera_thread_new (enum lumiera_thread_class kind, REQUIRE (self); int error = pthread_create (&self->id, &attrs, &thread_loop, self); ENSURE(error == 0 || EAGAIN == error, "pthread returned %d:%s", error, strerror(error)); - FIXME("handle EAGAIN"); if (error) { -#if 0 - return 0; /////TODO temporary addition by Ichthyo; probably we'll set lumiera_error? -#endif + // error here can only be EAGAIN, given the above ENSURE + FIXME ("error is %d:%s, see if this can be improved", error, strerror(error)); + LUMIERA_DIE (ERRNO); } REQUIRE (self, "returning an invalid thread structure"); diff --git a/tests/30backend-threadpool.tests b/tests/30backend-threadpool.tests index 03808507a..a92b32059 100644 --- a/tests/30backend-threadpool.tests +++ b/tests/30backend-threadpool.tests @@ -77,3 +77,9 @@ err: deleting thread err: destroying the pool mutex err: pool mutex destroyed END + +TEST "Too Many Acquires/Releases test" toomany-acquire-release <