From 1c4f605e8f8a46fea2709feefa17060c258ae3c5 Mon Sep 17 00:00:00 2001 From: Ichthyostega Date: Thu, 12 Oct 2023 22:00:55 +0200 Subject: [PATCH] Library/Application: switch WorkForce The WorkForce (passive worker pool) has been coded just recently, and -- in anticipation of this refactoring -- directly against std::thread instead of using the old framework. ...the switch is straight-forward, using the default case ...add the ability to decorate the thread-IDs with a running counter --- src/lib/thread.cpp | 11 ++++ src/lib/thread.hpp | 10 ++- src/vault/gear/work-force.hpp | 38 ++++++----- tests/vault/gear/work-force-test.cpp | 43 ++++++------ wiki/thinkPad.ichthyo.mm | 98 +++++++++++++++++++++++++++- 5 files changed, 163 insertions(+), 37 deletions(-) diff --git a/src/lib/thread.cpp b/src/lib/thread.cpp index dfb9f84d3..13c267877 100644 --- a/src/lib/thread.cpp +++ b/src/lib/thread.cpp @@ -35,10 +35,12 @@ #include "lib/util.hpp" #include +#include #include using util::_Fmt; using lib::Literal; +using std::atomic_uint; using std::chrono::steady_clock; using std::chrono_literals::operator ""ms; @@ -58,6 +60,15 @@ namespace thread{ } + /** Helper to create a suffix to the thread-ID with running count */ + string + ThreadWrapper::decorate_with_global_count (string const& rawID) + { + static atomic_uint globalCnt{1}; + return _Fmt{"%s.%03i"} % rawID % globalCnt.fetch_add (+1, std::memory_order_acq_rel); + } + + /** @note implies get_id() != std::thread::id{} ==> it is running */ bool ThreadWrapper::invokedWithinThread() const diff --git a/src/lib/thread.hpp b/src/lib/thread.hpp index 8f1cbb0e2..99f2a706d 100644 --- a/src/lib/thread.hpp +++ b/src/lib/thread.hpp @@ -226,6 +226,8 @@ namespace lib { void handle_after_thread() { } ///< called immediately before end of thread void handle_loose_thread() { } ///< called when destroying wrapper on still running thread + static string decorate_with_global_count (string const&); + /** * allow to detach explicitly — independent from thread-function's state. * @warning this function is borderline dangerous; it might be acceptable @@ -389,7 +391,6 @@ namespace lib { void invokeThreadFunction (ARGS&& ...args) { - if (not Policy::isLive()) return; Policy::handle_begin_thread(); Policy::markThreadStart(); Policy::perform_thread_function (forward (args)...); @@ -484,6 +485,13 @@ namespace lib { return move(*this); } + Launch&& + decorateCounter() + { + id = Policy::decorate_with_global_count (id); + return move(*this); + } + template Launch&& atStart (HOOK&& hook) diff --git a/src/vault/gear/work-force.hpp b/src/vault/gear/work-force.hpp index e1f9d4938..a0246c362 100644 --- a/src/vault/gear/work-force.hpp +++ b/src/vault/gear/work-force.hpp @@ -43,7 +43,7 @@ ** in parallel, yet any other worker about to pick the next task has to wait until ** it is possible to grab the `GroomingToken` exclusively. For the WorkForce this ** usage pattern implies that there is *no explicit synchronisation* -- scaling up - ** and shutting down must be performed non-concurrent. + ** and shutting down must be performed non-concurrently. ** @see work-force-test.cpp ** @see scheduler-commutator.hpp usage as part of the scheduler */ @@ -56,12 +56,12 @@ #include "vault/common.hpp" #include "vault/gear/activity.hpp" #include "lib/meta/function.hpp" +#include "lib/thread.hpp" #include "lib/nocopy.hpp" #include "lib/util.hpp" #include #include -#include #include #include @@ -72,6 +72,7 @@ namespace gear { using std::move; using std::atomic; using util::unConst; + using std::this_thread::sleep_for; namespace work { ///< Details of WorkForce (worker pool) implementation @@ -100,26 +101,35 @@ namespace gear { }; - /** + using Launch = lib::Thread::Launch; + + /*************************************//** * Individual worker thread: * repeatedly pulls the `doWork` functor. */ template - class Runner + class Worker : CONF , util::NonCopyable - , public std::thread { public: - Runner (CONF config) + Worker (CONF config) : CONF{move (config)} - , thread{[this]{ pullWork(); }} + , thread_{Launch{&Worker::pullWork, this} + .threadID("Worker") + .decorateCounter()} { } /** emergency break to trigger cooperative halt */ std::atomic emergency{false}; + /** this Worker starts out active, but may terminate */ + bool isDead() const { return not thread_; } + + private: + lib::Thread thread_; + void pullWork() { @@ -150,9 +160,7 @@ namespace gear { CONF::finalHook (not regularExit); } ERROR_LOG_AND_IGNORE (threadpool, "failure in thread-exit hook") - - thread::detach(); - } + }// Thread will terminate.... activity::Proc idleWait() @@ -160,7 +168,7 @@ namespace gear { ++idleCycles; if (idleCycles < CONF::DISMISS_CYCLES) { - std::this_thread::sleep_for (CONF::IDLE_WAIT); + sleep_for (CONF::IDLE_WAIT); return activity::PASS; } else // idle beyond threshold => terminate worker @@ -185,7 +193,7 @@ namespace gear { class WorkForce : util::NonCopyable { - using Pool = std::list>; + using Pool = std::list>; CONF setup_; Pool workers_; @@ -235,15 +243,15 @@ namespace gear { awaitShutdown() { for (auto& w : workers_) - w.emergency.store(true, std::memory_order_relaxed); + w.emergency.store (true, std::memory_order_relaxed); while (0 < size()) - std::this_thread::sleep_for(setup_.IDLE_WAIT); + sleep_for (setup_.IDLE_WAIT); } size_t size() const { - unConst(workers_).remove_if([](auto& w){ return not w.joinable(); }); + unConst(workers_).remove_if([](auto& w){ return w.isDead(); }); return workers_.size(); } }; diff --git a/tests/vault/gear/work-force-test.cpp b/tests/vault/gear/work-force-test.cpp index da61929c9..2dc397797 100644 --- a/tests/vault/gear/work-force-test.cpp +++ b/tests/vault/gear/work-force-test.cpp @@ -27,7 +27,9 @@ #include "lib/test/run.hpp" #include "vault/gear/work-force.hpp" +#include "lib/thread.hpp" #include "lib/sync.hpp" +#include "lib/test/diagnostic-output.hpp"////////////TODO #include #include @@ -44,6 +46,7 @@ namespace test { using std::this_thread::sleep_for; using namespace std::chrono_literals; using std::chrono::milliseconds; + using lib::Thread; namespace { @@ -213,7 +216,7 @@ namespace test { .withSleepPeriod (10ms)}; wof.incScale(); - sleep_for(50us); + sleep_for(1ms); CHECK (1 == check); @@ -238,7 +241,7 @@ namespace test { .dismissAfter(5)}; wof.incScale(); - sleep_for(100us); + sleep_for(1ms); CHECK (1 == check); @@ -375,12 +378,12 @@ namespace test { CHECK (0 == wof.size()); wof.incScale(); - sleep_for(100us); + sleep_for(1ms); CHECK (1 == uniqueCnt); CHECK (1 == wof.size()); wof.incScale(); - sleep_for(100us); + sleep_for(1ms); CHECK (2 == uniqueCnt); CHECK (2 == wof.size()); @@ -462,22 +465,22 @@ namespace test { atomic pool_scaled_up{false}; atomic shutdown_done{false}; - std::thread operate{[&]{ - {// nested scope... - WorkForce wof{setup (blockingWork)}; - - wof.activate(); - sleep_for(10ms); - CHECK (wof.size() == work::Config::COMPUTATION_CAPACITY); - pool_scaled_up = true; - } // WorkForce goes out of scope => dtor called - - // when reaching this point, dtor has terminated - shutdown_done = true; - operate.detach(); - }}; + Thread operate{"controller" + ,[&] { + {// nested scope... + WorkForce wof{setup (blockingWork)}; + + wof.activate(); + sleep_for (10ms); + CHECK (wof.size() == work::Config::COMPUTATION_CAPACITY); + pool_scaled_up = true; + } // WorkForce goes out of scope => dtor called + + // when reaching this point, dtor has terminated + shutdown_done = true; + }}; - CHECK (operate.joinable()); // operate-thread is in running state + CHECK (operate); // operate-thread is in running state sleep_for(100ms); CHECK (pool_scaled_up); @@ -486,7 +489,7 @@ namespace test { trapped = false; sleep_for(20ms); CHECK (shutdown_done); - CHECK (not operate.joinable()); // operate-thread has detached and terminated + CHECK (not operate); // operate-thread has detached and terminated } }; diff --git a/wiki/thinkPad.ichthyo.mm b/wiki/thinkPad.ichthyo.mm index 778bd8d87..293ce790b 100644 --- a/wiki/thinkPad.ichthyo.mm +++ b/wiki/thinkPad.ichthyo.mm @@ -82276,6 +82276,23 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
+ + + + + + + + + +

+ ...denn der zielt auf eine echte Laufzeit, die benötigt wird um 500 Loop-Durchgänge zu machen +

+ +
+
+
+
@@ -82782,9 +82799,88 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
+ + + + - + + + + + + + +

+ es ist hier sinnvoll, direkt in der Thread-Funktion eine dedizierte Fehlerbehandlung vorzusehen — auch schon im Vorgriff auf weitere Entwicklung +

+ +
+
+ + + + +

+ das heißt, er ist nicht zu verwechseln mit dem atExit-λ aus dem Thread-wrapper Framework +

+ +
+
+ + + + +

+ es muß in keinster Weise noch extra per Lifecycle-Hook implementiert werden, sondern entspricht genau dem Verhalten, das die PolicyLaunchOnly ohnehin bietet +

+ +
+
+ + + + + + + + + + + + + + + + + + + + + + + + +

+ ziemlich dämliche Idee, einen Worker nicht »Worker« zu nennen +

+ +
+ +
+
+
+ + + + + + + + + +