From 997fc36c810081eb90b65025358df572b5b7796e Mon Sep 17 00:00:00 2001 From: Ichthyostega Date: Sat, 9 Sep 2023 23:42:13 +0200 Subject: [PATCH] Workforce: implementation complete --- src/vault/gear/work-force.hpp | 47 ++++--- tests/32scheduler.tests | 2 +- .../vault/gear/scheduler-commutator-test.cpp | 2 +- .../vault/gear/scheduler-invocation-test.cpp | 2 +- tests/vault/gear/work-force-test.cpp | 117 +++++++++++++----- wiki/renderengine.html | 8 +- wiki/thinkPad.ichthyo.mm | 47 ++++--- 7 files changed, 153 insertions(+), 72 deletions(-) diff --git a/src/vault/gear/work-force.hpp b/src/vault/gear/work-force.hpp index 526179dbb..e1f9d4938 100644 --- a/src/vault/gear/work-force.hpp +++ b/src/vault/gear/work-force.hpp @@ -23,12 +23,29 @@ /** @file work-force.hpp ** A pool of workers for multithreaded rendering. + ** The Lumiera Render Engine is driven by active workers repeatedly pulling + ** the next planned chunk of work; maintaining the internal organisation of the + ** Scheduler is integrated into that pattern as _just another activity_ performed + ** by the workers. As a consequence, there is no need for a central »master« or + ** coordinator to dispatch individual jobs. As long as the worker pool holds + ** active workers, the engine is in running state. ** + ** The WorkForce (worker pool service) in turn is passive and fulfils the purpose of + ** holding storage for the active worker objects in a list, pruning terminated entries. + ** Some parameters and configuration is provided to the workers, notably a _work functor_ + ** invoked actively to »pull« work. The return value from this `doWork()`-function governs + ** the worker's behaviour, either by prompting to pull further work, by sending a worker + ** into a sleep cycle, or even asking the worker to terminate. + ** + ** @warning concurrency and synchronisation in the Scheduler (which maintains and operates + ** WorkForce) is based on the assumption that _all maintenance and organisational + ** work is done chunk-wise by a single worker._ Other render activities may proceed + ** in parallel, yet any other worker about to pick the next task has to wait until + ** it is possible to grab the `GroomingToken` exclusively. For the WorkForce this + ** usage pattern implies that there is *no explicit synchronisation* -- scaling up + ** and shutting down must be performed non-concurrent. ** @see work-force-test.cpp ** @see scheduler-commutator.hpp usage as part of the scheduler - ** - ** @todo WIP-WIP-WIP 6/2023 »Playback Vertical Slice« - ** */ @@ -40,11 +57,8 @@ #include "vault/gear/activity.hpp" #include "lib/meta/function.hpp" #include "lib/nocopy.hpp" -//#include "lib/symbol.hpp" #include "lib/util.hpp" -#include "lib/format-cout.hpp"////////////////////WIP -//#include #include #include #include @@ -55,21 +69,17 @@ namespace vault{ namespace gear { -// using util::isnil; -// using std::string; using std::move; -// using std::forward; using std::atomic; using util::unConst; - - namespace work { + namespace work { ///< Details of WorkForce (worker pool) implementation using std::chrono::milliseconds; using std::chrono_literals::operator ""ms; - using SIG_WorkFun = activity::Proc(void); ///< config should define callable to perform work - using SIG_FinalHook = void(bool); ///< config should define callable invoked at exit (argument: is error) + using SIG_WorkFun = activity::Proc(void); ///< config should define a callable with this signature to perform work + using SIG_FinalHook = void(bool); ///< config should define callable invoked at exit (argument: isFailure) /** * Base for configuration of the worker pool. @@ -90,7 +100,10 @@ namespace gear { }; - /** Individual worker thread: repeatedly pulls the `doWork` functor */ + /** + * Individual worker thread: + * repeatedly pulls the `doWork` functor. + */ template class Runner : CONF @@ -160,9 +173,9 @@ namespace gear { - /*************************************//** + /***********************************************************//** * Pool of worker threads for rendering. - * @note the \tparam CONF configuration/policy base must define + * @note the \tparam CONF configuration/policy base must define: * - `doWork` - the _work-functor_ (with #SIG_WorkFun) * - `finalHook` - called at thread exit * @see WorkForce_test @@ -196,7 +209,7 @@ namespace gear { /** * Activate or scale up the worker pool. * @param degree fraction of the full #COMPUTATION_CAPACITY to activate - * @note will always activate at least one worker; + * @note will always activate at least one worker; will never scale down; * setting values > 1.0 leads to over-provisioning... */ void diff --git a/tests/32scheduler.tests b/tests/32scheduler.tests index 5d1286fd4..49ab44cab 100644 --- a/tests/32scheduler.tests +++ b/tests/32scheduler.tests @@ -38,6 +38,6 @@ END -PLANNED "Worker Thread Service" WorkForce_test < -//#include #include #include #include +#include using test::Test; -//using std::move; -//using util::isSameObject; namespace vault{ @@ -51,14 +45,16 @@ namespace test { using namespace std::chrono_literals; using std::chrono::milliseconds; -// using lib::time::FrameRate; -// using lib::time::Offset; -// using lib::time::Time; namespace { using WorkFun = std::function; using FinalFun = std::function; + /** + * Helper: setup a Worker-Pool configuration for the test. + * Derived from the default configuration, it allows to bind + * a lambda as work-functor and to tweak other parameters. + */ template auto setup (FUN&& workFun) @@ -108,6 +104,7 @@ namespace test { /*************************************************************************//** * @test WorkForce-Service: maintain a pool of active worker threads. + * @warning this test relies on empirical timings and can be brittle. * @see SchedulerUsage_test */ class WorkForce_test : public Test @@ -138,13 +135,13 @@ namespace test { { atomic check{0}; WorkForce wof{setup ([&]{ ++check; return activity::PASS; })}; - + // ^^^ this is the doWork-λ CHECK (0 == check); wof.activate(); sleep_for(20ms); - CHECK (0 < check); + CHECK (0 < check); // λ invoked in the worker threads } @@ -206,7 +203,7 @@ namespace test { - /** @test a worker can be sent to sleep, reducing the poll frequency. + /** @test a worker can be sent to sleep, throttling the poll frequency. */ void verify_workerSleep() @@ -230,7 +227,7 @@ namespace test { /** @test when a worker is sent into sleep-cycles for an extended time, - * the worker terminates itself + * the worker terminates itself. */ void verify_workerDismiss() @@ -260,28 +257,29 @@ namespace test { void verify_finalHook() { - atomic check{0}; + atomic exited{0}; atomic control{activity::PASS}; WorkForce wof{setup([&]{ return activity::Proc(control); }) - .withFinalHook([&](bool){ ++check; })}; + .withFinalHook([&](bool){ ++exited; })}; - CHECK (0 == check); + CHECK (0 == exited); wof.activate(); sleep_for(10ms); CHECK (wof.size() == work::Config::COMPUTATION_CAPACITY); - CHECK (0 == check); + CHECK (0 == exited); control = activity::HALT; sleep_for(10ms); CHECK (0 == wof.size()); - CHECK (check == work::Config::COMPUTATION_CAPACITY); + CHECK (exited == work::Config::COMPUTATION_CAPACITY); } - /** @test TODO - * @todo WIP 9/23 ⟶ define ⟶ implement + /** @test exceptions emanating from within the worker are catched + * and reported by setting the isFailure argument flag of + * the `finalHook` functor invoked at worker termination. */ void verify_detectError() @@ -298,7 +296,6 @@ namespace test { if (isFailure) ++errors; })}; - CHECK (0 == check); CHECK (0 == errors); @@ -308,7 +305,7 @@ namespace test { sleep_for(10us); CHECK (3 == wof.size()); - CHECK (0 < check); + CHECK (0 < check); CHECK (0 == errors); sleep_for(200ms); // wait for the programmed disaster @@ -342,12 +339,75 @@ namespace test { - /** @test TODO - * @todo WIP 9/23 ⟶ define ⟶ implement + /** @test the number of (separate) workers can be scaled up, + * both stepwise and as fraction of full hardware concurrency */ void verify_scalePool() { + /** helper to count distinct thread-IDs */ + class UniqueCnt + : public std::set + , public lib::Sync<> + { + public: + void + mark (std::thread::id const& tID) + { + Lock guard(this); + this->insert(tID); + } + + operator size_t() const + { + Lock guard(this); + return this->size(); + } + } + uniqueCnt; + + WorkForce wof{setup ([&]{ + uniqueCnt.mark(std::this_thread::get_id()); + return activity::PASS; + })}; + + CHECK (0 == uniqueCnt); + CHECK (0 == wof.size()); + + wof.incScale(); + sleep_for(100us); + CHECK (1 == uniqueCnt); + CHECK (1 == wof.size()); + + wof.incScale(); + sleep_for(100us); + CHECK (2 == uniqueCnt); + CHECK (2 == wof.size()); + + + auto fullCnt = work::Config::COMPUTATION_CAPACITY; + + wof.activate (1.0); + sleep_for(1ms); + CHECK (fullCnt == uniqueCnt); + CHECK (fullCnt == wof.size()); + + wof.activate (2.0); + sleep_for(1ms); + CHECK (2*fullCnt == uniqueCnt); + CHECK (2*fullCnt == wof.size()); + + wof.awaitShutdown(); + CHECK (0 == wof.size()); + + uniqueCnt.clear(); + sleep_for(1ms); + CHECK (0 == uniqueCnt); + + wof.activate (0.5); + sleep_for(1ms); + CHECK (fullCnt/2 == uniqueCnt); + CHECK (fullCnt/2 == wof.size()); } @@ -386,7 +446,7 @@ namespace test { * - use a work-functor which keeps all workers blocked * - start the WorkForce within a separate thread * - in this separate thread, cause the WorkForce destructor to be called - * - in the outer (controlling thread) release the work-functor blocking + * - in the test main thread release the work-functor blocking * - at this point, all workers return, detect shutdown and terminate */ void @@ -435,5 +495,4 @@ namespace test { LAUNCHER (WorkForce_test, "unit engine"); - -}}} // namespace vault::mem::test +}}} // namespace vault::gear::test diff --git a/wiki/renderengine.html b/wiki/renderengine.html index ce7df8905..f6fec7c74 100644 --- a/wiki/renderengine.html +++ b/wiki/renderengine.html @@ -7167,9 +7167,9 @@ Later on we expect a distinct __query subsystem__ to emerge, presumably embeddin &rarr; QuantiserImpl -
+
//Invoke and control the dependency and time based execution of  [[render jobs|RenderJob]]//
-The Scheduler acts as the central hub in the implementation of the RenderEngine and coordinates the //processing resources// of the application. Regarding architecture, the Scheduler is located in the Vault-Layer and //running// the Scheduler is equivalent to activating the »Vault Subsystem«. An EngineFaçade acts as entrance point, providing high-level render services to other parts of the application: [[render jobs|RenderJob]] can be activated under various timing and dependency constraints. Internally, the implementation is segregated into two layers
+The Scheduler acts as the central hub in the implementation of the RenderEngine and coordinates the //processing resources// of the application. Regarding architecture, the Scheduler is located in the Vault-Layer and //running// the Scheduler is equivalent to activating the »Vault Subsystem«. An EngineFaçade acts as entrance point, providing high-level render services to other parts of the application: [[render jobs|RenderJob]] can be activated under various timing and dependency constraints. Internally, the implementation is organised into two layers:
 ;Layer-2: Coordination
 :maintains a network of interconnected [[activities|RenderActivity]], tracks dependencies and observes timing constraints
 :coordinates a [[pool of active Workers|SchedulerWorker]] to dispatch the next activities
@@ -7245,7 +7245,7 @@ The primary scaling effects exploited to achieve this level of performance are t
 The way other parts of the system are built, requires us to obtain a guaranteed knowledge of some job's termination. It is possible to obtain that knowledge with some limited delay, but it nees to be absoultely reliable (violations leading to segfault). The requirements stated above assume this can be achieved through //jobs with guaranteed execution.// Alternatively we could consider installing specific callbacks -- in this case the scheduler itself has to guarantee the invocation of these callbacks, even if the corresponding job fails or is never invoked. It doesn't seem there is any other option.
 
-
+
The Scheduler //maintains a ''Work Force'' (a pool of workers) to perform the next [[render activities|RenderActivity]] continuously.//
 Each worker runs in a dedicated thread; the Activities are arranged in a way to avoid blocking those worker threads
 * IO operations are performed asynchronously {{red{planned as of 9/23}}}
@@ -7257,7 +7257,7 @@ This leads to a situation where it is more adequate to //distribute the scarce c
 
 Moreover, the actual computation tasks, which can be parallelised, are at least by an order of magnitude more expensive than any administrative work for sorting tasks, checking dependencies and maintaining process state. This leads to a scheme where a worker first performs some »management work«, until encountering the next actual computation job, at which point the worker leaves the //management mode// and transitions into //concurrent work mode//. All workers are expected to be in work mode almost entirely most of the time, and thus we can expect not much contention between workers performing »management work« -- allowing to confine this management work to //single threaded operation,// thereby drastically reducing the complexity of management data structures and memory allocation.
 !!!Regulating workers
-The behaviour of individual workers is guided solely by the return-value flag from the work-functor. Consequently, no shared flags and no direct synchronisation whatsoever is required //within the {{{WorkForce}}} implementation.// -- notwithstanding the fact that the implementation //within the work-functor// obviously needs some concurrency coordination to produce these return values, since this the whole point is to invoke this functor concurrently. The following aspects of worker behaviour can be directed:
+The behaviour of individual workers is guided solely by the return-value flag from the work-functor. Consequently, no shared flags and no direct synchronisation whatsoever is required //within the {{{WorkForce}}} implementation.// -- notwithstanding the fact that the implementation //within the work-functor// obviously needs some concurrency coordination to produce these return values, since the whole point is to invoke this functor concurrently. The following aspects of worker behaviour can be directed:
 * returning {{{activity::PASS}}} instructs the worker to re-invoke the work-functor in the same thread immediately
 * returning {{{activity::WAIT}}} requests an //idle-wait cycle//
 * any other value, notably {{{activity::HALT}}} causes the worker to terminate
diff --git a/wiki/thinkPad.ichthyo.mm b/wiki/thinkPad.ichthyo.mm
index 56e217462..e1b65b4d9 100644
--- a/wiki/thinkPad.ichthyo.mm
+++ b/wiki/thinkPad.ichthyo.mm
@@ -79444,10 +79444,10 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
- + - + @@ -79526,13 +79526,17 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
- + + + + + @@ -79619,14 +79623,15 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
- + + - - + + @@ -79869,7 +79874,7 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
- + @@ -80031,9 +80036,10 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
- - - + + + + @@ -80042,7 +80048,7 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
- + @@ -80065,15 +80071,15 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
- - + + - - + + @@ -90133,9 +90139,12 @@ class Something - + - + + + + @@ -90171,8 +90180,8 @@ class Something - - + +