begin implementing a 'soft' thread count limit per pool
add LUMIERA_DIE in cases where this soft limit is reached add LUMIERA_DIE when pthread_create fails to create threads - serious add a test which tries to break the soft limit
This commit is contained in:
parent
bd076e4210
commit
af80622ef5
5 changed files with 76 additions and 12 deletions
|
|
@ -53,11 +53,14 @@ void* pool_thread_loop(void * arg)
|
|||
}
|
||||
|
||||
void
|
||||
lumiera_threadpool_init(void)
|
||||
lumiera_threadpool_init(unsigned limit)
|
||||
{
|
||||
for (int i = 0; i < LUMIERA_THREADCLASS_COUNT; ++i)
|
||||
{
|
||||
llist_init(&threadpool.kind[i].pool);
|
||||
threadpool.kind[i].max_threads = limit;
|
||||
threadpool.kind[i].working_thread_count = 0;
|
||||
threadpool.kind[i].idle_thread_count = 0;
|
||||
lumiera_mutex_init(&threadpool.kind[i].lock,"pool of threads", &NOBUG_FLAG(threadpool));
|
||||
}
|
||||
}
|
||||
|
|
@ -91,17 +94,34 @@ lumiera_threadpool_acquire_thread(enum lumiera_thread_class kind,
|
|||
if (llist_is_empty (&threadpool.kind[kind].pool))
|
||||
{
|
||||
// TODO: fill in the reccondition argument, currently NULL
|
||||
ret = lumiera_thread_new (kind, NULL, purpose, flag);
|
||||
ENSURE (ret, "did not create a valid thread");
|
||||
FIXME ("this max thread logic needs to be deeply thought about and made more efficient as well as rebust");
|
||||
if (threadpool.kind[kind].working_thread_count
|
||||
+ threadpool.kind[kind].idle_thread_count
|
||||
< threadpool.kind[kind].max_threads) {
|
||||
ret = lumiera_thread_new (kind, NULL, purpose, flag);
|
||||
threadpool.kind[kind].working_thread_count++;
|
||||
ENSURE (ret, "did not create a valid thread");
|
||||
}
|
||||
else
|
||||
{
|
||||
//ERROR (threadpool, "did not create a new thread because per-pool limit was reached: %d", threadpool.kind[kind].max_threads);
|
||||
LUMIERA_DIE(ERRNO);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
REQUIRE (&threadpool.kind[kind].lock, "invalid threadpool lock");
|
||||
// use an existing thread, pick the first one
|
||||
// remove it from the pool's list
|
||||
LUMIERA_MUTEX_SECTION (threadpool, &threadpool.kind[kind].lock)
|
||||
{
|
||||
ret = (LumieraThread)(llist_unlink(llist_head (&threadpool.kind[kind].pool)));
|
||||
threadpool.kind[kind].working_thread_count++;
|
||||
threadpool.kind[kind].idle_thread_count--; // cheaper than using llist_count
|
||||
ENSURE (threadpool.kind[kind].idle_thread_count ==
|
||||
llist_count(&threadpool.kind[kind].pool),
|
||||
"idle thread count %d is wrong, should be %d",
|
||||
threadpool.kind[kind].idle_thread_count,
|
||||
llist_count(&threadpool.kind[kind].pool));
|
||||
}
|
||||
ENSURE (ret, "did not find a valid thread");
|
||||
}
|
||||
|
|
@ -118,8 +138,15 @@ lumiera_threadpool_release_thread(LumieraThread thread)
|
|||
// TOOD: currently, locking produces memory leaks
|
||||
// LUMIERA_MUTEX_SECTION (threadpool, &threadpool.kind[thread->kind].lock)
|
||||
// {
|
||||
//REQUIRE (llist_is_single(&thread->node), "thread already belongs to some list");
|
||||
REQUIRE (llist_is_single(&thread->node), "thread already belongs to some list");
|
||||
llist_insert_head(&threadpool.kind[thread->kind].pool, &thread->node);
|
||||
threadpool.kind[thread->kind].working_thread_count--;
|
||||
threadpool.kind[thread->kind].idle_thread_count++; // cheaper than using llist_count
|
||||
ENSURE (threadpool.kind[thread->kind].idle_thread_count ==
|
||||
llist_count(&threadpool.kind[thread->kind].pool),
|
||||
"idle thread count %d is wrong, should be %d",
|
||||
threadpool.kind[thread->kind].idle_thread_count,
|
||||
llist_count(&threadpool.kind[thread->kind].pool));
|
||||
// REQUIRE (!llist_is_empty (&threadpool.kind[thread->kind].pool), "thread pool is still empty after insertion");
|
||||
// }
|
||||
}
|
||||
|
|
|
|||
|
|
@ -71,14 +71,18 @@ struct lumiera_threadpool_struct
|
|||
{
|
||||
llist pool;
|
||||
lumiera_mutex lock;
|
||||
unsigned max_threads;
|
||||
unsigned working_thread_count;
|
||||
unsigned idle_thread_count;
|
||||
} kind[LUMIERA_THREADCLASS_COUNT];
|
||||
};
|
||||
|
||||
/**
|
||||
* Initialize the thread pool.
|
||||
* @param limit the maximum number of threads (idle+working) allowed per pool
|
||||
*/
|
||||
void
|
||||
lumiera_threadpool_init(void);
|
||||
lumiera_threadpool_init(unsigned limit);
|
||||
|
||||
void
|
||||
lumiera_threadpool_destroy(void);
|
||||
|
|
|
|||
|
|
@ -182,12 +182,11 @@ lumiera_thread_new (enum lumiera_thread_class kind,
|
|||
REQUIRE (self);
|
||||
int error = pthread_create (&self->id, &attrs, &thread_loop, self);
|
||||
ENSURE(error == 0 || EAGAIN == error, "pthread returned %d:%s", error, strerror(error));
|
||||
FIXME("handle EAGAIN");
|
||||
if (error)
|
||||
{
|
||||
#if 0
|
||||
return 0; /////TODO temporary addition by Ichthyo; probably we'll set lumiera_error?
|
||||
#endif
|
||||
// error here can only be EAGAIN, given the above ENSURE
|
||||
FIXME ("error is %d:%s, see if this can be improved", error, strerror(error));
|
||||
LUMIERA_DIE (ERRNO);
|
||||
}
|
||||
|
||||
REQUIRE (self, "returning an invalid thread structure");
|
||||
|
|
|
|||
|
|
@ -77,3 +77,9 @@ err: deleting thread
|
|||
err: destroying the pool mutex
|
||||
err: pool mutex destroyed
|
||||
END
|
||||
|
||||
TEST "Too Many Acquires/Releases test" toomany-acquire-release <<END
|
||||
err: Fatal Error
|
||||
err: .*
|
||||
return: !0
|
||||
END
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ TESTS_BEGIN
|
|||
TEST ("basic-acquire-release")
|
||||
{
|
||||
ECHO("start by initializing the threadpool");
|
||||
lumiera_threadpool_init();
|
||||
lumiera_threadpool_init(100);
|
||||
ECHO("acquiring thread 1");
|
||||
LumieraThread t1 =
|
||||
lumiera_threadpool_acquire_thread(LUMIERA_THREAD_INTERACTIVE,
|
||||
|
|
@ -69,7 +69,35 @@ TEST ("many-acquire-release")
|
|||
|
||||
const int threads_per_pool_count = 50;
|
||||
|
||||
lumiera_threadpool_init();
|
||||
lumiera_threadpool_init(50);
|
||||
LumieraThread threads[threads_per_pool_count*LUMIERA_THREADCLASS_COUNT];
|
||||
|
||||
for (int kind = 0; kind < LUMIERA_THREADCLASS_COUNT; ++kind)
|
||||
{
|
||||
for (int i = 0; i < threads_per_pool_count; ++i)
|
||||
{
|
||||
threads[i+kind*threads_per_pool_count] =
|
||||
lumiera_threadpool_acquire_thread(kind,
|
||||
"test purpose",
|
||||
NULL);
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < threads_per_pool_count*LUMIERA_THREADCLASS_COUNT; ++i)
|
||||
{
|
||||
lumiera_threadpool_release_thread(threads[i]);
|
||||
}
|
||||
|
||||
lumiera_threadpool_destroy();
|
||||
|
||||
}
|
||||
|
||||
TEST ("toomany-acquire-release")
|
||||
{
|
||||
|
||||
const int threads_per_pool_count = 51;
|
||||
|
||||
lumiera_threadpool_init(50);
|
||||
LumieraThread threads[threads_per_pool_count*LUMIERA_THREADCLASS_COUNT];
|
||||
|
||||
for (int kind = 0; kind < LUMIERA_THREADCLASS_COUNT; ++kind)
|
||||
|
|
|
|||
Loading…
Reference in a new issue