Library/Application: switch WorkForce

The WorkForce (passive worker pool) has been coded just recently,
and -- in anticipation of this refactoring -- directly against std::thread
instead of using the old framework.

...the switch is straight-forward, using the default case
...add the ability to decorate the thread-IDs with a running counter
This commit is contained in:
Fischlurch 2023-10-12 22:00:55 +02:00
parent 1ffee39b23
commit 1c4f605e8f
5 changed files with 163 additions and 37 deletions

View file

@ -35,10 +35,12 @@
#include "lib/util.hpp"
#include <chrono>
#include <atomic>
#include <pthread.h>
using util::_Fmt;
using lib::Literal;
using std::atomic_uint;
using std::chrono::steady_clock;
using std::chrono_literals::operator ""ms;
@ -58,6 +60,15 @@ namespace thread{
}
/** Helper to create a suffix to the thread-ID with running count */
string
ThreadWrapper::decorate_with_global_count (string const& rawID)
{
static atomic_uint globalCnt{1};
return _Fmt{"%s.%03i"} % rawID % globalCnt.fetch_add (+1, std::memory_order_acq_rel);
}
/** @note implies get_id() != std::thread::id{} ==> it is running */
bool
ThreadWrapper::invokedWithinThread() const

View file

@ -226,6 +226,8 @@ namespace lib {
void handle_after_thread() { } ///< called immediately before end of thread
void handle_loose_thread() { } ///< called when destroying wrapper on still running thread
static string decorate_with_global_count (string const&);
/**
* allow to detach explicitly independent from thread-function's state.
* @warning this function is borderline dangerous; it might be acceptable
@ -389,7 +391,6 @@ namespace lib {
void
invokeThreadFunction (ARGS&& ...args)
{
if (not Policy::isLive()) return;
Policy::handle_begin_thread();
Policy::markThreadStart();
Policy::perform_thread_function (forward<ARGS> (args)...);
@ -484,6 +485,13 @@ namespace lib {
return move(*this);
}
Launch&&
decorateCounter()
{
id = Policy::decorate_with_global_count (id);
return move(*this);
}
template<typename HOOK>
Launch&&
atStart (HOOK&& hook)

View file

@ -43,7 +43,7 @@
** in parallel, yet any other worker about to pick the next task has to wait until
** it is possible to grab the `GroomingToken` exclusively. For the WorkForce this
** usage pattern implies that there is *no explicit synchronisation* -- scaling up
** and shutting down must be performed non-concurrent.
** and shutting down must be performed non-concurrently.
** @see work-force-test.cpp
** @see scheduler-commutator.hpp usage as part of the scheduler
*/
@ -56,12 +56,12 @@
#include "vault/common.hpp"
#include "vault/gear/activity.hpp"
#include "lib/meta/function.hpp"
#include "lib/thread.hpp"
#include "lib/nocopy.hpp"
#include "lib/util.hpp"
#include <utility>
#include <chrono>
#include <thread>
#include <atomic>
#include <list>
@ -72,6 +72,7 @@ namespace gear {
using std::move;
using std::atomic;
using util::unConst;
using std::this_thread::sleep_for;
namespace work { ///< Details of WorkForce (worker pool) implementation
@ -100,26 +101,35 @@ namespace gear {
};
/**
using Launch = lib::Thread::Launch;
/*************************************//**
* Individual worker thread:
* repeatedly pulls the `doWork` functor.
*/
template<class CONF>
class Runner
class Worker
: CONF
, util::NonCopyable
, public std::thread
{
public:
Runner (CONF config)
Worker (CONF config)
: CONF{move (config)}
, thread{[this]{ pullWork(); }}
, thread_{Launch{&Worker::pullWork, this}
.threadID("Worker")
.decorateCounter()}
{ }
/** emergency break to trigger cooperative halt */
std::atomic<bool> emergency{false};
/** this Worker starts out active, but may terminate */
bool isDead() const { return not thread_; }
private:
lib::Thread thread_;
void
pullWork()
{
@ -150,9 +160,7 @@ namespace gear {
CONF::finalHook (not regularExit);
}
ERROR_LOG_AND_IGNORE (threadpool, "failure in thread-exit hook")
thread::detach();
}
}// Thread will terminate....
activity::Proc
idleWait()
@ -160,7 +168,7 @@ namespace gear {
++idleCycles;
if (idleCycles < CONF::DISMISS_CYCLES)
{
std::this_thread::sleep_for (CONF::IDLE_WAIT);
sleep_for (CONF::IDLE_WAIT);
return activity::PASS;
}
else // idle beyond threshold => terminate worker
@ -185,7 +193,7 @@ namespace gear {
class WorkForce
: util::NonCopyable
{
using Pool = std::list<work::Runner<CONF>>;
using Pool = std::list<work::Worker<CONF>>;
CONF setup_;
Pool workers_;
@ -235,15 +243,15 @@ namespace gear {
awaitShutdown()
{
for (auto& w : workers_)
w.emergency.store(true, std::memory_order_relaxed);
w.emergency.store (true, std::memory_order_relaxed);
while (0 < size())
std::this_thread::sleep_for(setup_.IDLE_WAIT);
sleep_for (setup_.IDLE_WAIT);
}
size_t
size() const
{
unConst(workers_).remove_if([](auto& w){ return not w.joinable(); });
unConst(workers_).remove_if([](auto& w){ return w.isDead(); });
return workers_.size();
}
};

View file

@ -27,7 +27,9 @@
#include "lib/test/run.hpp"
#include "vault/gear/work-force.hpp"
#include "lib/thread.hpp"
#include "lib/sync.hpp"
#include "lib/test/diagnostic-output.hpp"////////////TODO
#include <functional>
#include <thread>
@ -44,6 +46,7 @@ namespace test {
using std::this_thread::sleep_for;
using namespace std::chrono_literals;
using std::chrono::milliseconds;
using lib::Thread;
namespace {
@ -213,7 +216,7 @@ namespace test {
.withSleepPeriod (10ms)};
wof.incScale();
sleep_for(50us);
sleep_for(1ms);
CHECK (1 == check);
@ -238,7 +241,7 @@ namespace test {
.dismissAfter(5)};
wof.incScale();
sleep_for(100us);
sleep_for(1ms);
CHECK (1 == check);
@ -375,12 +378,12 @@ namespace test {
CHECK (0 == wof.size());
wof.incScale();
sleep_for(100us);
sleep_for(1ms);
CHECK (1 == uniqueCnt);
CHECK (1 == wof.size());
wof.incScale();
sleep_for(100us);
sleep_for(1ms);
CHECK (2 == uniqueCnt);
CHECK (2 == wof.size());
@ -462,22 +465,22 @@ namespace test {
atomic<bool> pool_scaled_up{false};
atomic<bool> shutdown_done{false};
std::thread operate{[&]{
{// nested scope...
WorkForce wof{setup (blockingWork)};
wof.activate();
sleep_for(10ms);
CHECK (wof.size() == work::Config::COMPUTATION_CAPACITY);
pool_scaled_up = true;
} // WorkForce goes out of scope => dtor called
// when reaching this point, dtor has terminated
shutdown_done = true;
operate.detach();
}};
Thread operate{"controller"
,[&] {
{// nested scope...
WorkForce wof{setup (blockingWork)};
wof.activate();
sleep_for (10ms);
CHECK (wof.size() == work::Config::COMPUTATION_CAPACITY);
pool_scaled_up = true;
} // WorkForce goes out of scope => dtor called
// when reaching this point, dtor has terminated
shutdown_done = true;
}};
CHECK (operate.joinable()); // operate-thread is in running state
CHECK (operate); // operate-thread is in running state
sleep_for(100ms);
CHECK (pool_scaled_up);
@ -486,7 +489,7 @@ namespace test {
trapped = false;
sleep_for(20ms);
CHECK (shutdown_done);
CHECK (not operate.joinable()); // operate-thread has detached and terminated
CHECK (not operate); // operate-thread has detached and terminated
}
};

View file

@ -82276,6 +82276,23 @@ Date:&#160;&#160;&#160;Thu Apr 20 18:53:17 2023 +0200<br/>
</node>
</node>
</node>
<node COLOR="#338800" CREATED="1697144004829" FOLDED="true" ID="ID_1133272068" MODIFIED="1697144107776" TEXT="WorkForece_test">
<icon BUILTIN="button_ok"/>
<node CREATED="1697144009429" ID="ID_974358387" MODIFIED="1697144020943" TEXT="hatte hier std::thread direkt verwendet"/>
<node CREATED="1697144021507" ID="ID_1880048346" MODIFIED="1697144043164" TEXT="Test ist immer noch instabil &#x2014; Timings zu knapp">
<node CREATED="1697144051767" ID="ID_169093956" MODIFIED="1697144065793" TEXT="sollte ehr 1ms geben f&#xfc;r Thread-Start"/>
<node CREATED="1697144068469" ID="ID_296969811" MODIFIED="1697144104370" TEXT="am problematischten ist der Fehler-Test">
<richcontent TYPE="NOTE"><html>
<head/>
<body>
<p>
...denn der zielt auf eine echte Laufzeit, die ben&#246;tigt wird um 500 Loop-Durchg&#228;nge zu machen
</p>
</body>
</html></richcontent>
</node>
</node>
</node>
</node>
<node BACKGROUND_COLOR="#d2beaf" COLOR="#5c4d6e" CREATED="1695394237210" ID="ID_11553358" MODIFIED="1697133471895" TEXT="Applikation umstellen">
<icon BUILTIN="flag-yellow"/>
@ -82782,9 +82799,88 @@ Date:&#160;&#160;&#160;Thu Apr 20 18:53:17 2023 +0200<br/>
</node>
</node>
</node>
<node COLOR="#338800" CREATED="1697135926071" FOLDED="true" ID="ID_447905558" MODIFIED="1697143981730" TEXT="work-force.hpp">
<icon BUILTIN="button_ok"/>
<node CREATED="1697135941988" ID="ID_1931283279" MODIFIED="1697135957808" TEXT="war im Vorgriff auf den Umbau zun&#xe4;chst direkt per std::thread implementiert">
<icon BUILTIN="info"/>
</node>
<node BACKGROUND_COLOR="#d2beaf" COLOR="#5c4d6e" CREATED="1697133453673" ID="ID_1053950423" MODIFIED="1697133470456" TEXT="R&#xfc;ckbau alter code">
<node BACKGROUND_COLOR="#ccb59b" COLOR="#6e2a38" CREATED="1697136061828" ID="ID_1747876427" MODIFIED="1697136128861" TEXT="Baumuster ist der Standard-Fall">
<font ITALIC="true" NAME="SansSerif" SIZE="14"/>
<icon BUILTIN="yes"/>
<node CREATED="1697136134755" ID="ID_1167712294" MODIFIED="1697136186393" TEXT="spezielle Fehlerbehandlung hier sinnvoll">
<richcontent TYPE="NOTE"><html>
<head/>
<body>
<p>
es ist hier sinnvoll, direkt in der Thread-Funktion eine dedizierte Fehlerbehandlung vorzusehen &#8212; auch schon im Vorgriff auf weitere Entwicklung
</p>
</body>
</html></richcontent>
</node>
<node CREATED="1697136209177" ID="ID_1500433483" MODIFIED="1697136267281" TEXT="der CONF::finalHook geh&#xf6;rt hier zum Business-API">
<richcontent TYPE="NOTE"><html>
<head/>
<body>
<p>
das hei&#223;t, er ist nicht zu verwechseln mit dem atExit-&#955; aus dem Thread-wrapper Framework
</p>
</body>
</html></richcontent>
</node>
<node CREATED="1697136289876" ID="ID_610290763" MODIFIED="1697136351821" TEXT="thread.detach() ist hier einfach das normale Verhalten">
<richcontent TYPE="NOTE"><html>
<head/>
<body>
<p>
es mu&#223; in keinster Weise noch extra per Lifecycle-Hook implementiert werden, sondern entspricht genau dem Verhalten, das die PolicyLaunchOnly ohnehin bietet
</p>
</body>
</html></richcontent>
</node>
<node CREATED="1697136668816" ID="ID_419020878" MODIFIED="1697136713713" TEXT="das thread-API mu&#xdf; nicht sichtbar sein &#x27f9; Thread kann Member sein"/>
</node>
<node COLOR="#338800" CREATED="1697136727707" ID="ID_801893805" MODIFIED="1697139035903" TEXT="umstellen">
<icon BUILTIN="button_ok"/>
<node COLOR="#338800" CREATED="1697136731234" ID="ID_741477101" MODIFIED="1697138708777" TEXT="Thread wird Member">
<icon BUILTIN="button_ok"/>
</node>
<node COLOR="#338800" CREATED="1697136828489" ID="ID_1346516939" MODIFIED="1697138711182" TEXT="Thread running-State &#x27fc; isDead()">
<icon BUILTIN="button_ok"/>
</node>
<node COLOR="#338800" CREATED="1697136745649" ID="ID_1842560714" MODIFIED="1697138732881" TEXT="WorkForce::size() nun auf isDead()">
<icon BUILTIN="button_ok"/>
</node>
<node COLOR="#338800" CREATED="1697136876287" ID="ID_1031434844" MODIFIED="1697138733761" TEXT="detach() und weitere Detail-Logik &#xd83d;&#xddd1;">
<icon BUILTIN="button_ok"/>
</node>
<node COLOR="#338800" CREATED="1697138744225" ID="ID_1469260267" MODIFIED="1697138756027" TEXT="ThreadID automatisch dekorieren">
<icon BUILTIN="button_ok"/>
</node>
</node>
<node COLOR="#435e98" CREATED="1697138990557" ID="ID_1523216174" MODIFIED="1697139030736" TEXT="und: Runner &#x27fc; Worker">
<richcontent TYPE="NOTE"><html>
<head/>
<body>
<p>
ziemlich d&#228;mliche Idee, einen Worker nicht &#187;Worker&#171; zu nennen
</p>
</body>
</html></richcontent>
<icon BUILTIN="yes"/>
</node>
</node>
</node>
<node BACKGROUND_COLOR="#d2beaf" COLOR="#5c4d6e" CREATED="1697133453673" ID="ID_1053950423" MODIFIED="1697139678989" TEXT="R&#xfc;ckbau alter Code">
<icon BUILTIN="hourglass"/>
<node COLOR="#338800" CREATED="1697139618834" ID="ID_1256502123" MODIFIED="1697139634445" TEXT="pr&#xfc;fen: std::thread nicht mehr verwendet">
<icon BUILTIN="button_ok"/>
</node>
<node BACKGROUND_COLOR="#eee5c3" COLOR="#990000" CREATED="1697139657093" ID="ID_242211449" MODIFIED="1697139673334" TEXT="alle Dependencies vom alten thread-wrapper verfolgen">
<icon BUILTIN="flag-yellow"/>
</node>
<node BACKGROUND_COLOR="#eee5c3" COLOR="#990000" CREATED="1697139635632" ID="ID_1923775684" MODIFIED="1697139648878" TEXT="vault/thread-wrapper.hpp zur&#xfc;ckbauen">
<icon BUILTIN="flag-yellow"/>
</node>
</node>
</node>
</node>