From b8e52d008c15df07a17205165404a30b3e227d1d Mon Sep 17 00:00:00 2001 From: Ichthyostega Date: Thu, 7 Sep 2023 17:15:25 +0200 Subject: [PATCH] Workforce: configuration and initialisation of workers - use a template parameter to allow for hook into local facilities (Scheduler) - pass config initialisation down through constructors --- src/vault/gear/work-force.cpp | 4 +- src/vault/gear/work-force.hpp | 63 ++++++-------- tests/vault/gear/work-force-test.cpp | 124 +++++++++++++++++++++++++-- wiki/thinkPad.ichthyo.mm | 115 ++++++++++++++++++++++++- 4 files changed, 258 insertions(+), 48 deletions(-) diff --git a/src/vault/gear/work-force.cpp b/src/vault/gear/work-force.cpp index 31423df24..442f0daa1 100644 --- a/src/vault/gear/work-force.cpp +++ b/src/vault/gear/work-force.cpp @@ -58,7 +58,9 @@ namespace gear { // NA::~NA() { } - const size_t WorkForce::FULL_SIZE = util::max (std::thread::hardware_concurrency(), MINIMAL_CONCURRENCY); + /** default value for full computing capacity is to use all (virtual) cores */ + const size_t work::Config::COMPUTATION_CAPACITY = util::max (std::thread::hardware_concurrency() + , MINIMAL_CONCURRENCY); diff --git a/src/vault/gear/work-force.hpp b/src/vault/gear/work-force.hpp index 1f08657c9..475cb48a8 100644 --- a/src/vault/gear/work-force.hpp +++ b/src/vault/gear/work-force.hpp @@ -38,12 +38,12 @@ #include "vault/common.hpp" #include "vault/gear/activity.hpp" +#include "lib/meta/function.hpp" #include "lib/nocopy.hpp" //#include "lib/symbol.hpp" #include "lib/util.hpp" //#include -#include #include #include #include @@ -56,30 +56,37 @@ namespace gear { // using util::isnil; // using std::string; using std::move; +// using std::forward; using std::atomic; namespace work { + using SIG_WorkFun = activity::Proc(void); + struct Config { - + static const size_t COMPUTATION_CAPACITY; + const size_t EXPECTED_MAX_POOL = 1.5*COMPUTATION_CAPACITY; }; template class Runner - : std::thread - , CONF + : CONF + , public std::thread { public: - Runner() - : thread{} + Runner (CONF config) + : CONF{move (config)} + , thread{[this]{ pullWork(); }} { } private: void pullWork() { + ASSERT_VALID_SIGNATURE (decltype(CONF::doWork), SIG_WorkFun); + try { while (true) { @@ -103,7 +110,7 @@ namespace gear { return activity::PASS; } }; - } + }//(End)namespace work /** @@ -112,32 +119,29 @@ namespace gear { * @see SomeSystem * @see NA_test */ + template class WorkForce : util::NonCopyable { - using WorkFun = std::function; - using Pool = std::vector; + using Pool = std::vector>; - WorkFun workFun_; + CONF setup_; Pool workers_; - atomic halt_{false}; public: - static const size_t FULL_SIZE; - explicit - WorkForce (WorkFun&& fun) - : workFun_{move (fun)} + WorkForce (CONF config) + : setup_{move (config)} , workers_{} { - workers_.reserve (1.5*FULL_SIZE); + workers_.reserve (setup_.EXPECTED_MAX_POOL); } ~WorkForce() { try { - deactivate(); + awaitShutdown(); } ERROR_LOG_AND_IGNORE (threadpool, "defunct worker thread") } @@ -146,16 +150,16 @@ namespace gear { void activate (double degree =1.0) { - halt_ = false; - size_t scale = util::max (size_t(degree*FULL_SIZE), 1u); + size_t scale{setup_.COMPUTATION_CAPACITY}; + scale *= degree; + scale = util::max (scale, 1u); for (uint i = workers_.size(); i < scale; ++i) - workers_.emplace_back ([this]{ pullWork(); }); + workers_.emplace_back (setup_); } void - deactivate() + awaitShutdown() { - halt_ = true; for (auto& w : workers_) if (w.joinable()) w.join(); @@ -163,21 +167,6 @@ namespace gear { } private: - void - pullWork() - { - try { - while (true) - { - activity::Proc res = workFun_(); - if (halt_ or res != activity::PASS) - break; - } - } - ERROR_LOG_AND_IGNORE (threadpool, "defunct worker thread") - } - - }; diff --git a/tests/vault/gear/work-force-test.cpp b/tests/vault/gear/work-force-test.cpp index 78178b57f..d4b44614d 100644 --- a/tests/vault/gear/work-force-test.cpp +++ b/tests/vault/gear/work-force-test.cpp @@ -33,6 +33,7 @@ //#include //#include +#include #include using test::Test; @@ -50,6 +51,9 @@ namespace test { // using lib::time::Offset; // using lib::time::Time; + namespace { + using WorkFun = std::function; + } @@ -64,9 +68,18 @@ namespace test { virtual void run (Arg) { - simpleUsage(); - walkingDeadline(); - setupLalup(); + simpleUsage(); + + verify_pullWork(); + verify_workerHalt(); + verify_workerSleep(); + verify_workerDemote(); + verify_finalHook(); + verify_detectError(); + verify_defaultPool(); + verify_scalePool(); + verify_countActive(); + verify_dtor_blocks(); } @@ -76,8 +89,15 @@ namespace test { simpleUsage() { atomic check{0}; + struct Setup + : work::Config + { + WorkFun doWork; + } + setup; + setup.doWork = [&]{ ++check; return activity::PASS; }; - WorkForce wof{[&]{ ++check; return activity::PASS; }}; + WorkForce wof{setup}; CHECK (0 == check); @@ -90,18 +110,110 @@ namespace test { /** @test TODO + * @todo WIP 9/23 ⟶ define ⟶ implement */ void - walkingDeadline() + verify_pullWork() { } /** @test TODO + * @todo WIP 9/23 ⟶ define ⟶ implement */ void - setupLalup() + verify_workerHalt() + { + } + + + + /** @test TODO + * @todo WIP 9/23 ⟶ define ⟶ implement + */ + void + verify_workerSleep() + { + } + + + + /** @test TODO + * @todo WIP 9/23 ⟶ define ⟶ implement + */ + void + verify_workerDemote() + { + } + + + + /** @test TODO + * @todo WIP 9/23 ⟶ define ⟶ implement + */ + void + verify_finalHook() + { + } + + + + /** @test TODO + * @todo WIP 9/23 ⟶ define ⟶ implement + */ + void + verify_detectError() + { + } + + + + /** @test TODO + * @todo WIP 9/23 ⟶ define ⟶ implement + */ + void + verify_defaultPool() + { + } + + + + /** @test TODO + * @todo WIP 9/23 ⟶ define ⟶ implement + */ + void + verify_scalePool() + { + } + + + + /** @test TODO + * @todo WIP 9/23 ⟶ define ⟶ implement + */ + void + verify_countActive() + { + } + + + + /** @test TODO + * @todo WIP 9/23 ⟶ define ⟶ implement + */ + void + verify_dtor_blocks() + { + } + + + + /** @test TODO + * @todo WIP 9/23 ⟶ define ⟶ implement + */ + void + walkingDeadline() { } }; diff --git a/wiki/thinkPad.ichthyo.mm b/wiki/thinkPad.ichthyo.mm index 82cfd8e9b..9a08d6f7b 100644 --- a/wiki/thinkPad.ichthyo.mm +++ b/wiki/thinkPad.ichthyo.mm @@ -79627,7 +79627,7 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
- + @@ -79637,7 +79637,24 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
- + + + + + + + + + + + +

+ klinkt sich vor Beenden aus: thread::detach() +

+ +
+
+
@@ -79751,8 +79768,7 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
...es wäre denkbar, daß es sich nicht um einen KILL-Switch handelt, sondern um einen Heartbeat, der z.B. aus den tick()-Aufrufen erneuert werden muß, so daß im Fall einer Verklemmung sich der Scheduler selbst terminiert

- -
+
@@ -79809,6 +79825,59 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
+ + + + + + + +

+ lambda in local class 'vault::gear::test::WorkForce_test::simpleUsage()::Setup' cannot capture variables from the enclosing context +

+ +
+
+ + + + +

+ ...das könnte zwar ein statischer Service sein, der dann sogar im Lebenszyklus der Applikation irgendwo konfiguriert wird +

+ +
+ +
+ + + +
+ + + + + + + +

+ und in jeden zu startenden Thread kopiert +

+ +
+
+ + + + +

+ C++ zwingt uns dazu, explizit das zu tun was ohnehin getan werden muß; da jedoch der Typ der Config per Template-Parameter gewählt wird, ist komplettes Inlining möglich; letztlich wird daher nur ein Pointer auf das Scheduler-Objekt in alle Threads kopiert — exakt das  was wir brauchen +

+ +
+ +
+
@@ -79850,6 +79919,44 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +