diff --git a/src/vault/gear/load-controller.hpp b/src/vault/gear/load-controller.hpp index 21b05e259..6068df373 100644 --- a/src/vault/gear/load-controller.hpp +++ b/src/vault/gear/load-controller.hpp @@ -65,10 +65,19 @@ ** assigning new tasks, while workers returning from idle state are typically ** sent back into idle state, unless there is direct need for more capacity. ** + ** # Load indicator + ** A fusion of some operational values is used to build a heuristic indicator + ** of current scheduler load. These values can be retrieved with low overhead. + ** - the fraction of maximal concurrency actually used + ** - a sampling of the lag, i.e. the average distance to the next task; + ** this observation is sampled whenever a worker asks for more work. + ** ** @see scheduler.hpp + ** @see SchedulerLoadControl_test + ** @see SchedulerService_test::verify_LoadFactor() ** @see SchedulerStress_test ** - ** @todo WIP-WIP-WIP 10/2023 »Playback Vertical Slice« + ** @todo WIP-WIP 11/2023 »Playback Vertical Slice« ** */ @@ -109,6 +118,7 @@ namespace gear { using std::chrono_literals::operator ""us; using std::function; using std::atomic_int64_t; + using std::memory_order_relaxed; namespace { // Scheduler default config @@ -122,12 +132,16 @@ namespace gear { Duration SLEEP_HORIZON{_uTicks (20ms)}; Duration WORK_HORIZON {_uTicks ( 5ms)}; Duration NEAR_HORIZON {_uTicks (50us)}; + + const double LAG_SAMPLE_DAMPING = 2; ///< smoothing factor for exponential moving average of lag; } /** * Controller to coordinate resource usage related to the Scheduler. + * - implements the schematics for capacity redistribution + * - provides some performance indicators, notably the LoadController::effectiveLoad() * @todo WIP-WIP 10/2023 gradually filling in functionality as needed * @see BlockFlow * @see Scheduler @@ -161,32 +175,65 @@ namespace gear { atomic_int64_t sampledLag_{0}; + /** + * @internal evaluate the situation encountered when a worker calls for work. + * @remark this function updates an exponential moving average of schedule + * head distance in a concurrency safe way. The value sampled is + * clamped to prevent poisoning by excess peaks. + * @warning Called from a hot path, with the potential to create congestion. + * Measurements indicate single call < 200ns and < 5µs when contended. + */ void markLagSample (Time head, Time now) - { - double headroom = _raw(std::clamp (now - (head.isRegular()? head:now) - , -SLEEP_HORIZON - , WORK_HORIZON)); - const int64_t N = wiring_.maxCapacity * 3; - int64_t average = sampledLag_.load (std::memory_order_relaxed); + { // negative when free capacity + double lag = _raw(std::clamp (now - (head.isRegular()? head:now) + , -SLEEP_HORIZON + , WORK_HORIZON)); + const double alpha = LAG_SAMPLE_DAMPING / (1 + wiring_.maxCapacity); + int64_t average = sampledLag_.load (memory_order_relaxed); int64_t newAverage; - do newAverage = std::floor ((headroom + (N-1)*average) / N); - while (not sampledLag_.compare_exchange_weak (average, newAverage, std::memory_order_relaxed)); + do newAverage = std::floor (lag*alpha + (1-alpha)*average); + while (not sampledLag_.compare_exchange_weak (average, newAverage, memory_order_relaxed)); } public: + /** + * @return guess of current scheduler pressure + * @remark the value is sampled at the points where workers pull work. + * Since these »capacity events« happen randomly, the current + * distance to the schedule head hints at either free headroom + * or overload leading to congestion. + * @see #markLagSample + */ int64_t - lag() + averageLag() const { - return sampledLag_.load (std::memory_order_relaxed); + return sampledLag_.load (memory_order_relaxed); } + + /** + * @internal (re)set the currently seen average lag. + * @return the previous average value + * @remark intended for unit testing and state reset; + * thread-save. Regular use not recommended. + */ + int64_t + setCurrentAverageLag (int64_t lag) + { + return sampledLag_.exchange(lag, memory_order_relaxed); + } + /** * @return guess of current load relative to full load + * @remark based on the fusion of several state values, + * which can be retrieved with low overhead + * - the used fraction of possible concurrency + * - [sampling of distance to next task](\ref averageLag) */ double - effectiveLoad() + effectiveLoad() const { - double lag = sampledLag_.load (std::memory_order_relaxed); + double lag = sampledLag_.load (memory_order_relaxed); lag -= 200; lag /= _raw(WORK_HORIZON); lag *= 10; diff --git a/tests/vault/gear/scheduler-load-control-test.cpp b/tests/vault/gear/scheduler-load-control-test.cpp index dccd9bcf5..035307662 100644 --- a/tests/vault/gear/scheduler-load-control-test.cpp +++ b/tests/vault/gear/scheduler-load-control-test.cpp @@ -34,9 +34,12 @@ #include "lib/test/diagnostic-output.hpp"/////////////////////TODO //#include +#include using test::Test; using std::move; + +using std::chrono::microseconds; //using util::isSameObject; @@ -71,6 +74,7 @@ namespace test { tendNextActivity(); classifyCapacity(); scatteredReCheck(); + indicateAverageLoad(); walkingDeadline(); } @@ -288,6 +292,88 @@ namespace test { + + /** @test verify fusion of sampled observations to guess average scheduler load + * - use a rigged wiring of the load controller to verify calculation + * based on known values of current _concurrency_ and _schedule pressure_ + * - scheduling on average 200µs behind nominal schedule is considered + * the regular balanced state and thus defined as 100% schedule pressure + * - if congestion builds up to 1/10 of WORK_HORIZON, 200% overload is indicated + * - on the other hand, if workers appear on average 200µs before the typical + * balanced state, the resulting headroom is defined to constitute 50% pressure + * - the pressure value is multiplied with the degree of concurrency + * - the pressure is sampled from the lag (distance of current time to the + * next activity to schedule), which is observed whenever a worker + * calls in to retrieve more work. These calls happen randomly. + * @todo WIP 10/23 ✔ define ⟶ ✔ implement + */ + void + indicateAverageLoad() + { + uint maxThreads = 10; + uint currThreads = 0; + + LoadController::Wiring setup; + setup.maxCapacity = maxThreads; + setup.currWorkForceSize = [&]{ return currThreads; }; + // rigged setup to verify calculated load indicator + LoadController lctrl{move(setup)}; + + CHECK (0 == lctrl.averageLag()); + CHECK (0 == lctrl.effectiveLoad()); + + // Manipulate the sampled average lag (in µs) + lctrl.setCurrentAverageLag (200); + // Scheduling 200µs behind nominal start time -> 100% schedule pressure + + currThreads = 5; + CHECK (0.5 == lctrl.effectiveLoad()); + currThreads = 8; + CHECK (0.8 == lctrl.effectiveLoad()); + currThreads = 10; + CHECK (1.0 == lctrl.effectiveLoad()); + + // congestion +500µs -> 200% schedule pressure + lctrl.setCurrentAverageLag (200+500); + CHECK (2.0 == lctrl.effectiveLoad()); + + lctrl.setCurrentAverageLag (200+500+500); + CHECK (3.0 == lctrl.effectiveLoad()); // -> 300% + + // if average headroom 500µs -> 50% load + lctrl.setCurrentAverageLag (200-500); + CHECK (0.5 == lctrl.effectiveLoad()); + CHECK (-300 == lctrl.averageLag()); + + lctrl.setCurrentAverageLag (200-500-500-500); + CHECK (0.25 == lctrl.effectiveLoad()); + CHECK (-1300 == lctrl.averageLag()); + + // load indicator is always modulated by concurrency level + currThreads = 2; + CHECK (0.05 == lctrl.effectiveLoad()); + + // average lag is sampled from the situation when workers call in + Time head = Time::ZERO; + TimeVar curr = Time{1,0}; + lctrl.markIncomingCapacity (head,curr); + CHECK (-882 == lctrl.averageLag()); + + lctrl.markIncomingCapacity (head,curr); + CHECK (-540 == lctrl.averageLag()); + + curr = Time{0,1}; + lctrl.markIncomingCapacity (head,curr); + lctrl.markIncomingCapacity (head,curr); + CHECK (1291 == lctrl.averageLag()); + + curr = head - Time{0,2}; + lctrl.markIncomingCapacity (head,curr); + CHECK (-2581 == lctrl.averageLag()); + } + + + /** @test TODO * @todo WIP 10/23 🔁 define ⟶ implement */ diff --git a/tests/vault/gear/scheduler-service-test.cpp b/tests/vault/gear/scheduler-service-test.cpp index 1440b25f4..07626f5c2 100644 --- a/tests/vault/gear/scheduler-service-test.cpp +++ b/tests/vault/gear/scheduler-service-test.cpp @@ -167,7 +167,7 @@ SHOW_EXPR(wuff()) sleep_for(50us); cout << wuff() << " +++ Load: "< - + - - - +

...weil wir im Moment immer mit Vollast einsteigen und dann zwar im Idle-Fall herunterregeln; letzteres passiert typischerweise aber schnell. Also im Test sehe ich daher immer nur 1.0 oder keine Last @@ -82652,9 +82650,7 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
- - - +

...das kann aber erst beurteilt werden, wenn wir echte Last-Szenarien kennen. Denkbar wäre, daß z.B. ein real-Time Render in etwa mit 3 Cores über die Runden kommt @@ -82666,13 +82662,11 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
- + - + - - - +

  • @@ -82689,7 +82683,7 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
    - + @@ -82703,19 +82697,18 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
    - - + + - - + + + - - - +

     Exponential MA: @@ -82739,9 +82732,7 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
    - - - +

    Bei einem moving-Average interessiert es uns eigentlich nicht, wann genau der Wert gilt und für wen er gilt. Es interessiert uns nur, was am Ende rausgekommen ist, bzw. wie der Trend im Moment so ist @@ -82751,9 +82742,7 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
    - - - +

    wir ziehen das aktuelle MA »jetzt« @@ -82770,9 +82759,7 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
    - - - +

    und das läßt sich mit zwei Atomic-Operationen realisieren @@ -82780,17 +82767,36 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
    - - + + + + + + + + +

    + wenn man auch nur ansatzweise bedenkt, wie Assembly funktioniert +

    + +
    +
    + +
    - - + + + + + + + - + @@ -82837,17 +82843,54 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
    - + + + + +

    + Microbenchmark der Meß-Funktion +

    + +
    + + +
    - - + + + + + + + + + + + + + + + + + + + + + +

    + Scheduler::connectMonitoring() +

    + +
    + +
    @@ -88848,6 +88891,46 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
    + + + + + + + +

    + Und zwar weil wir +

    +
      +
    • + ein Moving-Average im Spiel haben, das es sehr schwer macht, bestimmte Werte zu erzielen +
    • +
    • + das Moving-Average in einer privaten Variablen steckt (ist auch gut so, da es ein Atomic sein muß wegen concurrency) +
    • +
    + +
    + +
    + + + + + + +

    + Alternative wäre, den Test zum Friend zu machen. Das will ich aber möglichst nicht zur Gewohnheit werden lassen. Hier könnte man das umgehen, da wegen der Concurrency ein Setter sogar sinnvoll ist: der würde dann einen atomic-swap machen +

    + +
    + +
    + + + +
    +
    @@ -89497,9 +89580,7 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
    - - - +

    ...weil es eben ein Microbenchmark des reinen Dispatch-Aufrufs ist, und ohne das ganze Drumherum mit Grooming-Token, Queue-Verwaltung und Kapazitäts-Verteilung @@ -90830,6 +90911,29 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
    + + + + +

    + EMA(i) = α · value(i) + (1- α) · EMA(i-1) +

    +

    + +

    +

    + Im Grunde gibt es keine Herleitung, denn das ganze Verfahren ist pragmatisch. +

    +

    + In meiner Rationalisierung wäre dann α ≔ 1/N  ⟹ 1-α = (N-1)/N +

    +

    + In der Wirtschaft (Chart-Analyse) findet man oft α ≔ damp / (N+1)  und damp ≔ 2 ist die geläufige Wahl. Damit kommt man auf die oft zitierte Faustregel, daß ein neuer Wert in einen 10-day EMA mit 18% eingeht (2/11) +

    + +
    + +