/* SCHEDULER.hpp - coordination of render activities under timing and dependency constraints Copyright (C) Lumiera.org 2023, Hermann Vosseler This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ /** @file scheduler.hpp ** Service for coordination and dispatch of render activities. ** The implementation of scheduling services is provided by an integration ** of two layers of functionality: ** - Layer-1 allows to enqueue and prioritise render activity records ** - Layer-2 connects and coordinates activities to conduct complex calculations ** Additionally, a [custom allocation scheme](\ref BlockFlow) is involved, ** a [notification service](\ref EngineObserver) and the execution environment ** for the low-level [»Activity Language](\ref ActivityLang). Some operational ** control and and load management is delegated to the \ref LoadController. ** The *purpose* of the »Scheduler Service« in the lumiera Render Engine ** is to coordinate the execution of [»Render Jobs«](\ref vault::gear::Job), ** which can be controlled by a timing scheme, but also triggered in response ** to some prerequisite event, most notably the completion of IO work. ** ** # Thread coordination ** The typical situation found when rendering media is the demand to distribute ** rather scarce computation resources to various self-contained tasks sequenced ** in temporary and dependency order. In addition, some internal management work ** must be conducted to order these tasks, generate further tasks and coordinate ** the dependencies. Overall, any such internal work is by orders of magnitude ** less expensive than the actual media calculations, which reach up into the ** range of 1-10 milliseconds, possibly even way more (seconds for expensive ** computations). For this reason, the Scheduler in the Lumiera Render Engine ** uses a pool of workers, each representing one unit of computation resource ** (a »core«), and these workers will _pull work actively,_ rather then ** distributing, queuing and dispatching tasks to a passive set of workers. ** And notably the »management work« is performed also by the workers themselves, ** to the degree it is necessary to retrieve the next piece of computation. ** So there is no dedicated »queue manager« — scheduling is driven by the workers. ** ** Assuming that this internal work is comparatively cheap to perform, a choice ** was made to handle any internal state changes of the Scheduler exclusively ** in single-threaded mode. This is achieved by an atomic lock, maintained in ** [Layer-2 of the Scheduler implementation](\ref SchedulerCommutator::groomingToken_). ** Any thread looking for more work will pull a pre-configured functor, which ** is implemented by the [work-function](\ref Scheduler::getWork()). The thread ** will attempt to acquire the lock, designated as »grooming-token« -- but only ** if this is necessary to perform internal changes. Since workers are calling ** in randomly, in many cases there might be no task to perform at the moment, ** and the worker can be instructed to go to a sleep cycle and call back later. ** On the other hand, when load is high, workers are instructed to call back ** immediately again to find the next piece of work. Based on assessment of ** the current [»head time«](\ref SchedulerInvocation::headTime), a quick ** decision will be made if the thread's capacity is useful right now, ** or if this capacity will be re-focussed into another zone of the ** scheduler's time axis, based on the distance to the next task. ** ** If however a thread is put to work, it will start dequeuing an entry from ** the head of the [priority queue](\ref SchedulerInvocation::pullHead), ** and start interpreting this entry as a _chain of render activities_ with ** the help of the [»Activity Language«](\ref ActivityLang::dispatchChain). ** In the typical scenario, after some preparatory checks and notifications, ** the thread [transitions into work mode](\ref Scheduler::ExecutionCtx::work), ** which entails to [drop the grooming-token](\ref SchedulerCommutator::dropGroomingToken). ** Since the scheduler queue only stores references to render activities, which are ** allocated in a [special arrangement](\ref BlockFlow) exploiting the known deadline ** time of each task, further processing can commence concurrently. ** ** @see SchedulerService_test Component integration test ** @see SchedulerStress_test ** @see SchedulerUsage_test ** @see SchedulerInvocation Layer-1 ** @see SchedulerCommutator Layer-2 ** @see activity.hpp description of »Render Activities« ** ** @todo WIP-WIP 10/2023 »Playback Vertical Slice« ** */ #ifndef SRC_VAULT_GEAR_SCHEDULER_H_ #define SRC_VAULT_GEAR_SCHEDULER_H_ #include "lib/error.hpp" #include "vault/gear/block-flow.hpp" #include "vault/gear/work-force.hpp" #include "vault/gear/activity-lang.hpp" #include "vault/gear/scheduler-commutator.hpp" #include "vault/gear/scheduler-invocation.hpp" #include "vault/gear/load-controller.hpp" #include "vault/gear/engine-observer.hpp" #include "vault/real-clock.hpp" //#include "lib/symbol.hpp" #include "lib/nocopy.hpp" //#include "lib/util.hpp" //#include #include namespace vault{ namespace gear { // using util::isnil; // using std::string; using std::move; using lib::time::Time; using lib::time::FSecs; using lib::time::Offset; using lib::time::Duration; namespace test { // declared friend for test access class SchedulerService_test; } namespace { // Scheduler default config const auto IDLE_WAIT = 20ms; ///< sleep-recheck cycle for workers deemed _idle_ const size_t DISMISS_CYCLES = 100; ///< number of wait cycles before an idle worker terminates completely Offset POLL_WAIT_DELAY{FSecs(1,1000)}; ///< delay until re-evaluating a condition previously found unsatisfied Offset DUTY_CYCLE_PERIOD{FSecs(1,20)}; ///< period of the regular scheduler »tick« for state maintenance. Offset DUTY_CYCLE_TOLERANCE{FSecs(1,10)}; ///< maximum slip tolerated on duty-cycle start before triggering Scheduler-emergency } /******************************************************//** * »Scheduler-Service« : coordinate render activities. * @todo WIP-WIP 10/2023 * @see BlockFlow * @see SchedulerUsage_test */ class Scheduler : util::NonCopyable { /** Binding of worker callbacks to the scheduler implementation */ struct Setup : work::Config { Scheduler& scheduler; activity::Proc doWork() { return scheduler.getWork(); } void finalHook (bool _) { scheduler.handleWorkerTermination(_);} }; SchedulerInvocation layer1_; SchedulerCommutator layer2_; WorkForce workForce_; ActivityLang activityLang_; LoadController loadControl_; EngineObserver& engineObserver_; public: Scheduler (BlockFlowAlloc& activityAllocator ,EngineObserver& engineObserver) : layer1_{} , layer2_{} , workForce_{Setup{IDLE_WAIT, DISMISS_CYCLES, *this}} , activityLang_{activityAllocator} , loadControl_{connectMonitoring()} , engineObserver_{engineObserver} { } bool empty() const { return layer1_.empty(); } /** * Spark the engine self-regulation cycle and power up WorkForce. * @note set off automatically when [put to use](\ref #seedCalcStream); * while active, the [duty-cycle](\ref #handleDutyCycle) retains * itself, albeit bound to disengage when falling empty. */ void ignite() { TRACE (engine, "Ignite Scheduler Dispatch."); handleDutyCycle (RealClock::now()); if (not empty()) workForce_.activate(); } /** * Bring down processing destructively as fast as possible. * Dismiss worker threads as soon as possible, and clear the queues. * @warning Actually running Activities can not be aborted, but anything * not yet scheduled will be discarded, irrespective of dependencies. * @remark should never need to call this in regular operation, * since an empty scheduler disengages automatically. */ void terminateProcessing() { TRACE (engine, "Forcibly terminate Scheduler Dispatch."); workForce_.awaitShutdown(); layer1_.discardSchedule(); } /** * */ double getLoadIndicator() { UNIMPLEMENTED("load indicator"); } /** * */ void seedCalcStream() { UNIMPLEMENTED("get it going"); } /** * */ void buildJob() { UNIMPLEMENTED("wrap the ActivityTerm"); } /** * The worker-Functor: called by the active Workers from the * \ref WorkForce to pull / perform the actual render Activities. */ activity::Proc getWork(); private: void handleDutyCycle (Time now); void handleWorkerTermination (bool isFailure); void triggerEmergency(); /** send this thread into a targeted short-time wait. */ activity::Proc scatteredDelay (Time now, LoadController::Capacity); /** * monad-like step sequence: perform sequence of steps, * as long as the result remains activity::PASS */ struct WorkerInstruction { activity::Proc lastResult = activity::PASS; /** exposes the latest verdict as overall result * @note returning activity::SKIP from the dispatch * signals early exit, which is acquitted here. */ operator activity::Proc() { return activity::SKIP == lastResult? activity::PASS : lastResult; } template WorkerInstruction performStep (FUN step) { if (activity::PASS == lastResult) lastResult = step(); return move(*this); } }; /** @internal connect state signals for use by the LoadController */ LoadController::Wiring connectMonitoring() { LoadController::Wiring setup; setup.maxCapacity = work::Config::COMPUTATION_CAPACITY; return setup; } /** @internal expose a binding for Activity execution */ class ExecutionCtx; /** open private backdoor for tests */ friend class test::SchedulerService_test; }; /** * @remark when due, the scheduled Activities are performed within the * [Activity-Language execution environment](\ref ActivityLang::dispatchChain()); * some aspects of Activity _activation_ however require external functionality, * which — for the purpose of language definition — was abstracted as _Execution-context._ * The implementation of these binding functions fills in relevant external effects and * is in fact supplied by the implementation internals of the scheduler itself. */ class Scheduler::ExecutionCtx : private Scheduler { public: static ExecutionCtx& from (Scheduler& self) { return static_cast (self); } /* ==== Implementation of the Concept ExecutionCtx ==== */ /** * λ-post: enqueue for time-bound execution, possibly dispatch immediately. * This is the »main entrance« to get some Activity scheduled. * @remark the \a ctx argument is redundant (helpful for test/mocking) */ activity::Proc post (Time when, Activity* chain, ExecutionCtx& ctx) { return layer2_.postDispatch (chain, when, ctx, layer1_); } void work (Time, size_t) { UNIMPLEMENTED ("λ-work"); } void done (Time, size_t) { UNIMPLEMENTED ("λ-done"); } activity::Proc tick (Time) { UNIMPLEMENTED ("λ-tick"); } Offset getWaitDelay() { return POLL_WAIT_DELAY; } /** access high-resolution-clock, rounded to µ-Ticks */ Time getSchedTime() { return RealClock::now(); } }; /** * @remarks this function is invoked from within the worker thread(s) and will * - decide if and how the capacity of this worker shall be used right now * - possibly go into a short targeted wait state to redirect capacity at a better time point * - and most notably commence with dispatch of render Activities, to calculate media data. * @return an instruction for the work::Worker how to proceed next: * - activity::PROC causes the worker to poll again immediately * - activity::SLEEP induces a sleep state * - activity::HALT terminates the worker * @note Under some circumstances, this function depends on acquiring the »grooming-token«, * which is an atomic lock to ensure only one thread at a time can alter scheduler internals. * In the regular processing sequence, this token is dropped after dequeuing and processing * some Activities, yet prior to invoking the actual »Render Job«. Explicitly dropping the * token at the end of this function is a safeguard against deadlocking the system. * If some other thread happens to hold the token, SchedulerCommutator::findWork * will bail out, leading to active spinning wait for the current thread. */ inline activity::Proc Scheduler::getWork() { auto self = std::this_thread::get_id(); auto& ctx = ExecutionCtx::from (*this); try { auto res = WorkerInstruction{} .performStep([&]{ Time now = ctx.getSchedTime(); Time head = layer1_.headTime(); return scatteredDelay(now, loadControl_.markIncomingCapacity (head,now)); }) .performStep([&]{ Time now = ctx.getSchedTime(); Activity* act = layer2_.findWork (layer1_,now); return ctx.post (now, act, ctx); }) .performStep([&]{ Time now = ctx.getSchedTime(); Time head = layer1_.headTime(); return scatteredDelay(now, loadControl_.markOutgoingCapacity (head,now)); }); // ensure lock clean-up if (res != activity::PASS and layer2_.holdsGroomingToken(self)) layer2_.dropGroomingToken(); return res; } catch(...) { if (layer2_.holdsGroomingToken (self)) layer2_.dropGroomingToken(); throw; } } /** * A worker [asking for work](\ref #getWork) constitutes free capacity, * which can be redirected into a focused zone of the scheduler time axis * where it is most likely to be useful, unless there is active work to * be carried out right away. * @param capacity classification of the capacity to employ this thread * @return how to proceed further with this worker * - activity::PASS indicates to proceed or call back immediately * - activity::SKIP causes to exit this round, yet call back again * - activity::WAIT exits and places the worker into sleep mode * @note as part of the regular work processing, this function may * place the current thread into a short-term targeted sleep. */ inline activity::Proc Scheduler::scatteredDelay (Time now, LoadController::Capacity capacity) { auto doTargetedSleep = [&] { // ensure not to block the Scheduler after management work auto self = std::this_thread::get_id(); if (layer2_.holdsGroomingToken (self)) layer2_.dropGroomingToken(); // relocate this thread(capacity) to a time where its more useful Offset targetedDelay = loadControl_.scatteredDelayTime (now, capacity); std::this_thread::sleep_for (std::chrono::microseconds (_raw(targetedDelay))); }; auto doTendNextHead = [&] { Time head = layer1_.headTime(); auto self = std::this_thread::get_id(); if (not loadControl_.tendedNext(head) and (layer2_.holdsGroomingToken(self) or layer2_.acquireGoomingToken())) loadControl_.tendNext(head); }; switch (capacity) { case LoadController::DISPATCH: return activity::PASS; case LoadController::SPINTIME: std::this_thread::yield(); return activity::SKIP; // prompts to abort chain but call again immediately case LoadController::IDLEWAIT: return activity::WAIT; // prompts to switch this thread into sleep mode case LoadController::TENDNEXT: doTendNextHead(); doTargetedSleep(); // let this thread wait until nest head time is due return activity::SKIP; default: doTargetedSleep(); return activity::SKIP; // prompts to abort this processing-chain for good } } /** * »Tick-hook« : code to maintain sane running status. * This function will be invoked [regularly](\ref DUTY_CYCLE_PERIOD) while the scheduler * is actively processing; in fact this function determines when the scheduler falls empty * and can be shut down — and thus regular invocation is equivalent to running state. * Code for all kinds of status updates, low-level clean-up and maintenance work related * to the building blocks of the scheduler shall be added here. It will be invoked from * within some (random) worker thread, frequently enough for humans to seem like an * immediate response, but with sufficient large time period to amortise even slightly * more computational expensive work; IO and possibly blocking operations should be * avoided here though. Exceptions emanating from here will shut down the engine. */ inline void Scheduler::handleDutyCycle (Time now) { // consolidate queue content layer1_.feedPrioritisation(); //////////////////////////////////////////////////////////////////////OOO clean-up of outdated tasks here if (layer1_.isOutOfTime()) { triggerEmergency(); return; // leave everything as-is } // clean-up of obsolete Activities activityLang_.discardBefore (now); loadControl_.updateState (now); if (not empty()) {// prepare next duty cycle »tick« Time nextTick = now + DUTY_CYCLE_PERIOD; Time deadline = nextTick + DUTY_CYCLE_TOLERANCE; auto& ctx = ExecutionCtx::from (*this); Activity& tickActivity = activityLang_.createTick (deadline); ctx.post(nextTick, &tickActivity, ctx); } } /** * Callback invoked whenever a worker-thread is about to exit * @param isFailuere if the exit was caused by uncaught exception */ inline void Scheduler::handleWorkerTermination (bool isFailure) { if (isFailure) triggerEmergency(); else loadControl_.markWorkerExit(); } /** * Trip the emergency brake and unwind processing while retaining all state. */ inline void Scheduler::triggerEmergency() { UNIMPLEMENTED ("scheduler overrun -- trigger Emergency"); } }} // namespace vault::gear #endif /*SRC_VAULT_GEAR_SCHEDULER_H_*/