Scheduler-test: reorganise planning-job entrance and coordination

This amounts to a rather massive refactoring, prompted by the enduring problems
observed when pressing the scheduler. All the various glitches and (fixed) crashes
are related to the way how planning-jobs enter the schedule items,
which is also closely tied to the difficulties getting the locking
for planning-jobs correct.

The solution pursued hereby is to reorder the main avenues into the
scheduler implementation. There is now a streamlined main entrance,
which **always** enqueues only, allowing to omit most checks and
coordination. On the other hand, the complete coordination and dispatch
of the work capacity is now shifted down into the SchedulerCommutator,
thereby linking all coordination and access control close together
into a single implementation facility.

If this works out as intended
 - several repeated checks on the Grooming-Token could be omitted (performance)
 - the planning-job would no longer be able to loose / drop the Token,
   thereby running enforcedly single-threaded (as was the original intention)
 - since all planning effectively originates from planning-jobs, this
   would allow to omit many safety barriers and complexities at the
   scheduler entrance avenue, since now all entries just go into the queue.

WIP: tests pass compiler, but must be adapted / reworked
This commit is contained in:
Fischlurch 2023-12-25 23:18:43 +01:00
parent 100252acdf
commit 09f0e92ea3
7 changed files with 1307 additions and 379 deletions

View file

@ -112,7 +112,7 @@ namespace gear {
namespace blockFlow {///< Parametrisation of Scheduler memory management scheme
/** limit for maximum number of blocks allowed in Epoch expansion
* @note SchedulerCommutator::sanityCheck() defines a similar limit,
* @note Scheduler::sanityCheck() defines a similar limit,
* but there the same reasoning is translated into a hard limit for
* deadlines to be < 20sec, while this limit here will only be triggered
* if the current block duration has been lowered to the OVERLOAD_LIMIT

View file

@ -57,10 +57,10 @@
** - silently dispose of any outdated entries
** - use the [Activity Language environment](\ref ActivityLang) to _perform_
** the retrieved chain within some worker thread; this is called _dispatch_
** The central cross road of this implementation is the #postDispatch function.
** The main entrance point into this implementation is the #postChain function.
** @see SchedulerCommutator::acquireGroomingToken()
** @see SchedulerCommutator::findWork()
** @see SchedulerCommutator::postDispatch()
** @see SchedulerCommutator::postChain()
** @see SchedulerCommutator_test
** @see scheduler.hpp usage
**
@ -74,6 +74,7 @@
#include "vault/common.hpp"
#include "vault/gear/activity.hpp"
#include "vault/gear/scheduler-invocation.hpp"
#include "vault/gear/load-controller.hpp"
#include "vault/gear/activity-lang.hpp"
#include "lib/time/timevalue.hpp"
#include "lib/format-string.hpp"
@ -98,7 +99,6 @@ namespace gear {
namespace { // Configuration / Scheduling limit
Offset FUTURE_PLANNING_LIMIT{FSecs{20}}; ///< limit timespan of deadline into the future (~360 MiB max)
microseconds GROOMING_WAIT_CYCLE{70us}; ///< wait-sleep in case a thread must forcibly acquire the Grooming-Token
/** convenient short-notation, also used by SchedulerService */
@ -174,37 +174,6 @@ namespace gear {
void
sanityCheck (ActivationEvent const& event, Time now)
{
if (event.startTime() == Time::ANYTIME)
throw error::Fatal ("Attempt to schedule an Activity without valid start time");
if (event.deathTime() == Time::NEVER)
throw error::Fatal ("Attempt to schedule an Activity without valid deadline");
Offset toDeadline{now, event.deathTime()};
if (toDeadline > FUTURE_PLANNING_LIMIT)
throw error::Fatal (util::_Fmt{"Attempt to schedule Activity %s "
"with a deadline by %s into the future"}
% *event.activity
% toDeadline);
}
/**
* Decide if Activities shall be performed now and in this thread.
* @param when the indicated time of start of the first Activity
* @param now current scheduler time
* @return allow dispatch if time is due and GroomingToken was acquired
* @warning accesses current ThreadID and changes GroomingToken state.
*/
bool
decideDispatchNow (Time when, Time now)
{
return (when <= now)
and (holdsGroomingToken (thisThread())
or acquireGoomingToken());
}
/** tend to the input queue if possible */
void
maybeFeed (SchedulerInvocation& layer1)
@ -236,48 +205,22 @@ namespace gear {
}
/***************************************************//**
/***********************************************************//**
* This is the primary entrance point to the Scheduler.
* Engage into activity as controlled by given start time.
* Attempts to acquire the GroomingToken if Activity is due
* immediately, otherwise just enqueue it for prioritisation.
* @param chain the Render Activities to put into action
* @param when the indicated time of start for these
* @param executionCtx abstracted execution environment for
* Render Activities (typically backed by the
* Scheduler as a whole, including notifications
* Place the given event into the schedule, with prioritisation
* according to its start time.
* @param event the chain of Render Activities to be scheduled,
* including start time and deadline
* @return Status value to indicate how to proceed processing
* - activity::PASS continue processing in regular operation
* - activity::WAIT nothing to do now, check back later
* - activity::HALT serious problem, cease processing
* - activity::KICK to contend (spin) on GroomingToken
* @note Attempts to acquire the GroomingToken for immediate
* processing, but not for just enqueuing planned tasks.
* Never drops the GroomingToken explicitly (unless when
* switching from grooming-mode to work-mode in the course
* of processing the given Activity chain regularly).
* @note Never attempts to acquire the GroomingToken itself,
* but if current thread holds the token, the task can
* be placed directly into the scheduler queue.
*/
template<class EXE>
activity::Proc
postDispatch (ActivationEvent event
,EXE& executionCtx
,SchedulerInvocation& layer1
)
{
if (!event) return activity::KICK;
Time now = executionCtx.getSchedTime();
sanityCheck (event, now);
if (decideDispatchNow (event.startTime(), now))
return ActivityLang::dispatchChain (event, executionCtx);
else
instructFollowUp (event,layer1);
return activity::PASS;
}
activity::Proc
instructFollowUp (ActivationEvent event
,SchedulerInvocation& layer1 )
postChain (ActivationEvent event, SchedulerInvocation& layer1)
{
if (holdsGroomingToken (thisThread()))
layer1.feedPrioritisation (move (event));
@ -285,11 +228,183 @@ namespace gear {
layer1.instruct (move (event));
return activity::PASS;
}
/**
* Implementation of the worker-Functor:
* - redirect work capacity in accordance to current scheduler and load
* - dequeue and dispatch the Activity chains from the queue to perform the render jobs.
*/
template<class DISPATCH, class CLOCK>
activity::Proc
dispatchCapacity (SchedulerInvocation&, LoadController&, DISPATCH, CLOCK);
private:
activity::Proc
scatteredDelay (Time now, Time head
,LoadController& loadController
,LoadController::Capacity capacity);
void
ensureDroppedGroomingToken()
{
if (holdsGroomingToken (thisThread()))
dropGroomingToken();
}
/**
* monad-like step sequence: perform sequence of steps,
* as long as the result remains activity::PASS
*/
struct WorkerInstruction
{
activity::Proc lastResult = activity::PASS;
/** exposes the latest verdict as overall result
* @note returning activity::SKIP from the dispatch
* signals early exit, which is acquitted here. */
operator activity::Proc()
{
return activity::SKIP == lastResult? activity::PASS
: lastResult;
}
template<class FUN>
WorkerInstruction
performStep (FUN step)
{
if (activity::PASS == lastResult)
lastResult = step();
return move(*this);
}
};
};
/**
* @remarks this function is invoked from within the worker thread(s) and will
* - decide if and how the capacity of this worker shall be used right now
* - possibly go into a short targeted wait state to redirect capacity at a better time point
* - and most notably commence with dispatch of render Activities, to calculate media data.
* @return an instruction for the work::Worker how to proceed next:
* - activity::PASS causes the worker to poll again immediately
* - activity::KICK to contend (spin) on GroomingToken
* - activity::WAIT induces a sleep state
* - activity::HALT terminates the worker
* @note Under some circumstances, this function depends on acquiring the »grooming-token«,
* which is an atomic lock to ensure only one thread at a time can alter scheduler internals.
* In the regular processing sequence, this token is dropped after dequeuing and processing
* some Activities, yet prior to invoking the actual »Render Job«. Explicitly dropping the
* token at the end of this function is a safeguard against deadlocking the system.
* If some other thread happens to hold the token, SchedulerCommutator::findWork
* will bail out, leading to active spinning wait for the current thread.
*/
template<class DISPATCH, class CLOCK>
inline activity::Proc
SchedulerCommutator::dispatchCapacity (SchedulerInvocation& layer1
,LoadController& loadController
,DISPATCH executeActivity
,CLOCK getSchedTime
)
{
try {
auto res = WorkerInstruction{}
.performStep([&]{
maybeFeed(layer1);
Time now = getSchedTime();
Time head = layer1.headTime();
return scatteredDelay(now, head, loadController,
loadController.markIncomingCapacity (head,now));
})
.performStep([&]{
Time now = getSchedTime();
auto toDispatch = findWork (layer1,now);
if (not toDispatch) return activity::KICK; // contention
return executeActivity (toDispatch);
})
.performStep([&]{
maybeFeed(layer1);
Time now = getSchedTime();
Time head = layer1.headTime();
return scatteredDelay(now, head, loadController,
loadController.markOutgoingCapacity (head,now));
});
// ensure lock clean-up
if (res != activity::PASS)
ensureDroppedGroomingToken();
return res;
}
catch(...)
{
ensureDroppedGroomingToken();
throw;
}
}
/**
* A worker [asking for work](\ref #doWork) constitutes free capacity,
* which can be redirected into a focused zone of the scheduler time axis
* where it is most likely to be useful, unless there is active work to
* be carried out right away.
* @param capacity classification of the capacity to employ this thread
* @return how to proceed further with this worker
* - activity::PASS indicates to proceed or call back immediately
* - activity::SKIP causes to exit this round, yet call back again
* - activity::KICK signals contention (not emitted here)
* - activity::WAIT exits and places the worker into sleep mode
* @note as part of the regular work processing, this function may
* place the current thread into a short-term targeted sleep.
*/
inline activity::Proc
SchedulerCommutator::scatteredDelay (Time now, Time head
,LoadController& loadController
,LoadController::Capacity capacity)
{
auto doTargetedSleep = [&]
{ // ensure not to block the Scheduler after management work
ensureDroppedGroomingToken();
// relocate this thread(capacity) to a time where its more useful
Offset targetedDelay = loadController.scatteredDelayTime (now, capacity);
std::this_thread::sleep_for (std::chrono::microseconds (_raw(targetedDelay)));
};
auto doTendNextHead = [&]
{
if (not loadController.tendedNext(head)
and (holdsGroomingToken(thisThread())
or acquireGoomingToken()))
loadController.tendNext(head);
};
switch (capacity) {
case LoadController::DISPATCH:
return activity::PASS;
case LoadController::SPINTIME:
std::this_thread::yield();
return activity::SKIP; // prompts to abort chain but call again immediately
case LoadController::IDLEWAIT:
return activity::WAIT; // prompts to switch this thread into sleep mode
case LoadController::TENDNEXT:
doTendNextHead();
doTargetedSleep(); // let this thread wait until next head time is due
return activity::SKIP;
default:
doTargetedSleep();
return activity::SKIP; // prompts to abort this processing-chain for good
}
}
class SchedulerCommutator::ScopedGroomingGuard
: util::MoveOnly
{

View file

@ -50,7 +50,7 @@
** is triggered in the SchedulerService, should such an entry
** [miss it's deadline](\ref SchedulerInvocation::isOutOfTime())
** @see SchedulerCommutator::findWork()
** @see SchedulerCommutator::postDispatch()
** @see SchedulerCommutator::postChain()
** @see SchedulerInvocation_test
** @see SchedulerUsage_test integrated usage
**

View file

@ -57,7 +57,7 @@
** in single-threaded mode. This is achieved by an atomic lock, maintained in
** [Layer-2 of the Scheduler implementation](\ref SchedulerCommutator::groomingToken_).
** Any thread looking for more work will pull a pre-configured functor, which
** is implemented by the [work-function](\ref Scheduler::getWork()). The thread
** is implemented by the [work-function](\ref Scheduler::doWork()). The thread
** will attempt to acquire the lock, designated as »grooming-token« -- but only
** if this is necessary to perform internal changes. Since workers are calling
** in randomly, in many cases there might be no task to perform at the moment,
@ -83,7 +83,7 @@
** Notably _internal processing_ (e.g. planning of new jobs) will _not_ drop
** the token, since it must be able to change the schedule. Such internal
** tasks can be processed in row and will be confined to a single thread
** (there is a special treatment at the end of #getWork() to achieve that).
** (there is a special treatment at the end of #doWork() to achieve that).
** As a safety net, the grooming-token will automatically be dropped after
** catching an exception, or when a thread is sent to sleep.
**
@ -130,7 +130,6 @@ namespace gear {
using lib::time::Time;
using lib::time::FSecs;
using lib::time::Offset;
using lib::time::Duration;
namespace test { // declared friend for test access
class SchedulerService_test;
@ -142,6 +141,7 @@ namespace gear {
const size_t DISMISS_CYCLES = 100; ///< number of wait cycles before an idle worker terminates completely
Offset DUTY_CYCLE_PERIOD{FSecs(1,20)}; ///< period of the regular scheduler »tick« for state maintenance.
Offset DUTY_CYCLE_TOLERANCE{FSecs(1,10)}; ///< maximum slip tolerated on duty-cycle start before triggering Scheduler-emergency
Offset FUTURE_PLANNING_LIMIT{FSecs{20}}; ///< limit timespan of deadline into the future (~360 MiB max)
}
@ -224,7 +224,7 @@ namespace gear {
struct Setup : work::Config
{
Scheduler& scheduler;
activity::Proc doWork() { return scheduler.getWork(); }
activity::Proc doWork() { return scheduler.doWork(); }
void finalHook (bool _) { scheduler.handleWorkerTermination(_);}
};
@ -291,7 +291,9 @@ namespace gear {
/**
*
* @return a synthetic indicator fused from several observations
* - 1.0 defines full work capacity yet no significant congestion
* - values > 2.0 indicate overload
*/
double
getLoadIndicator()
@ -341,7 +343,6 @@ namespace gear {
{
bool isCompulsory = true;
Time deadline = nextStart + DUTY_CYCLE_TOLERANCE;
auto guard = layer2_.requireGroomingTokenHere(); // protect allocation
// place the meta-Job into the timeline...
postChain ({activityLang_.buildMetaJob(planningJob, nextStart, deadline)
.post()
@ -371,11 +372,12 @@ namespace gear {
* The worker-Functor: called by the active Workers from the
* \ref WorkForce to pull / perform the actual render Activities.
*/
activity::Proc getWork();
activity::Proc doWork();
private:
void postChain (ActivationEvent);
void sanityCheck (ActivationEvent const&);
void handleDutyCycle (Time now, bool =false);
void handleWorkerTermination (bool isFailure);
void maybeScaleWorkForce (Time startHorizon);
@ -383,38 +385,6 @@ namespace gear {
void triggerEmergency();
/** send this thread into a targeted short-time wait. */
activity::Proc scatteredDelay (Time now, LoadController::Capacity);
/**
* monad-like step sequence: perform sequence of steps,
* as long as the result remains activity::PASS
*/
struct WorkerInstruction
{
activity::Proc lastResult = activity::PASS;
/** exposes the latest verdict as overall result
* @note returning activity::SKIP from the dispatch
* signals early exit, which is acquitted here. */
operator activity::Proc()
{
return activity::SKIP == lastResult? activity::PASS
: lastResult;
}
template<class FUN>
WorkerInstruction
performStep (FUN step)
{
if (activity::PASS == lastResult)
lastResult = step();
return move(*this);
}
};
/** @internal connect state signals for use by the LoadController */
LoadController::Wiring
connectMonitoring()
@ -426,13 +396,6 @@ namespace gear {
return setup;
}
void
ensureDroppedGroomingToken()
{
if (layer2_.holdsGroomingToken (thisThread()))
layer2_.dropGroomingToken();
}
/** access high-resolution-clock, rounded to µ-Ticks */
Time
getSchedTime()
@ -454,6 +417,8 @@ namespace gear {
/** work-timing event for performance observation */
class WorkTiming
: public EngineEvent
@ -500,9 +465,6 @@ namespace gear {
* of activations and notifications. The concrete implementation
* needs some further contextual information, which is passed
* down here as a nested sub-context.
* @note different than Scheduler::postChain(), this operation here
* always enqueues the \a chain, never dispatches directly.
* This special twist helps to improve parallelisation.
*/
activity::Proc
post (Time when, Time dead, Activity* chain, ExecutionCtx& ctx)
@ -510,7 +472,8 @@ namespace gear {
REQUIRE (chain);
ActivationEvent chainEvent = ctx.rootEvent;
chainEvent.refineTo (chain, when, dead);
return scheduler_.layer2_.instructFollowUp (chainEvent, scheduler_.layer1_);
scheduler_.sanityCheck (chainEvent);
return scheduler_.layer2_.postChain (chainEvent, scheduler_.layer1_);
}
/**
@ -550,6 +513,22 @@ namespace gear {
};
inline activity::Proc
Scheduler::doWork()
{
return layer2_.dispatchCapacity (layer1_
,loadControl_
,[this](ActivationEvent toDispatch)
{
ExecutionCtx ctx{*this, toDispatch};
return ActivityLang::dispatchChain (toDispatch, ctx);
}
,[this]{ return getSchedTime(); }
);
}
/** @note after invoking this terminal operation,
@ -564,7 +543,7 @@ namespace gear {
inline ScheduleSpec
ScheduleSpec::post()
{ // protect allocation
auto guard = theScheduler_->layer2_.requireGroomingTokenHere();
// auto guard = theScheduler_->layer2_.requireGroomingTokenHere();//////////////////////////////////////TODO can we avoid that?
term_ = move(
theScheduler_->activityLang_
.buildCalculationJob (job_, start_,death_));
@ -578,22 +557,39 @@ namespace gear {
inline ScheduleSpec
ScheduleSpec::linkToSuccessor (ScheduleSpec& succSpec)
{ // protect allocation
auto guard = theScheduler_->layer2_.requireGroomingTokenHere();
{
term_->appendNotificationTo (*succSpec.term_);
return move(*this);
}
inline ScheduleSpec
ScheduleSpec::linkToPredecessor (ScheduleSpec& predSpec)
{ // protect allocation
auto guard = theScheduler_->layer2_.requireGroomingTokenHere();
{
predSpec.term_->appendNotificationTo (*term_);
return move(*this);
}
inline void
Scheduler::sanityCheck (ActivationEvent const& event)
{
if (not event)
throw error::Logic ("Empty event passed into Scheduler entrance");
if (event.startTime() == Time::ANYTIME)
throw error::Fatal ("Attempt to schedule an Activity without valid start time");
if (event.deathTime() == Time::NEVER)
throw error::Fatal ("Attempt to schedule an Activity without valid deadline");
Time now{getSchedTime()};
Offset toDeadline{now, event.deathTime()};
if (toDeadline > FUTURE_PLANNING_LIMIT)
throw error::Fatal (util::_Fmt{"Attempt to schedule Activity %s "
"with a deadline by %s into the future"}
% *event.activity
% toDeadline);
}
/**
* Enqueue for time-bound execution, possibly dispatch immediately.
* This is the »main entrance« to get some Activity scheduled.
@ -603,118 +599,9 @@ namespace gear {
inline void
Scheduler::postChain (ActivationEvent actEvent)
{
sanityCheck (actEvent);
maybeScaleWorkForce (actEvent.startTime());
ExecutionCtx ctx{*this, actEvent};
layer2_.postDispatch (actEvent, ctx, layer1_);
}
/**
* @remarks this function is invoked from within the worker thread(s) and will
* - decide if and how the capacity of this worker shall be used right now
* - possibly go into a short targeted wait state to redirect capacity at a better time point
* - and most notably commence with dispatch of render Activities, to calculate media data.
* @return an instruction for the work::Worker how to proceed next:
* - activity::PROC causes the worker to poll again immediately
* - activity::SLEEP induces a sleep state
* - activity::HALT terminates the worker
* @note Under some circumstances, this function depends on acquiring the »grooming-token«,
* which is an atomic lock to ensure only one thread at a time can alter scheduler internals.
* In the regular processing sequence, this token is dropped after dequeuing and processing
* some Activities, yet prior to invoking the actual »Render Job«. Explicitly dropping the
* token at the end of this function is a safeguard against deadlocking the system.
* If some other thread happens to hold the token, SchedulerCommutator::findWork
* will bail out, leading to active spinning wait for the current thread.
*/
inline activity::Proc
Scheduler::getWork()
{
try {
auto res = WorkerInstruction{}
.performStep([&]{
layer2_.maybeFeed(layer1_);
Time now = getSchedTime();
Time head = layer1_.headTime();
return scatteredDelay(now,
loadControl_.markIncomingCapacity (head,now));
})
.performStep([&]{
Time now = getSchedTime();
auto toDispatch = layer2_.findWork (layer1_,now);
ExecutionCtx ctx{*this, toDispatch};
return layer2_.postDispatch (toDispatch, ctx, layer1_);
})
.performStep([&]{
layer2_.maybeFeed(layer1_);
Time now = getSchedTime();
Time head = layer1_.headTime();
return scatteredDelay(now,
loadControl_.markOutgoingCapacity (head,now));
});
// ensure lock clean-up
if (res != activity::PASS)
ensureDroppedGroomingToken();
return res;
}
catch(...)
{
ensureDroppedGroomingToken();
throw;
}
}
/**
* A worker [asking for work](\ref #getWork) constitutes free capacity,
* which can be redirected into a focused zone of the scheduler time axis
* where it is most likely to be useful, unless there is active work to
* be carried out right away.
* @param capacity classification of the capacity to employ this thread
* @return how to proceed further with this worker
* - activity::PASS indicates to proceed or call back immediately
* - activity::SKIP causes to exit this round, yet call back again
* - activity::KICK signals contention (not emitted here)
* - activity::WAIT exits and places the worker into sleep mode
* @note as part of the regular work processing, this function may
* place the current thread into a short-term targeted sleep.
*/
inline activity::Proc
Scheduler::scatteredDelay (Time now, LoadController::Capacity capacity)
{
auto doTargetedSleep = [&]
{ // ensure not to block the Scheduler after management work
ensureDroppedGroomingToken();
// relocate this thread(capacity) to a time where its more useful
Offset targetedDelay = loadControl_.scatteredDelayTime (now, capacity);
std::this_thread::sleep_for (std::chrono::microseconds (_raw(targetedDelay)));
};
auto doTendNextHead = [&]
{
Time head = layer1_.headTime();
if (not loadControl_.tendedNext(head)
and (layer2_.holdsGroomingToken(thisThread())
or layer2_.acquireGoomingToken()))
loadControl_.tendNext(head);
};
switch (capacity) {
case LoadController::DISPATCH:
return activity::PASS;
case LoadController::SPINTIME:
std::this_thread::yield();
return activity::SKIP; // prompts to abort chain but call again immediately
case LoadController::IDLEWAIT:
return activity::WAIT; // prompts to switch this thread into sleep mode
case LoadController::TENDNEXT:
doTendNextHead();
doTargetedSleep(); // let this thread wait until next head time is due
return activity::SKIP;
default:
doTargetedSleep();
return activity::SKIP; // prompts to abort this processing-chain for good
}
layer2_.postChain (actEvent, layer1_);
}
@ -764,8 +651,7 @@ namespace gear {
Time deadline = nextTick + DUTY_CYCLE_TOLERANCE;
Activity& tickActivity = activityLang_.createTick (deadline);
ActivationEvent tickEvent{tickActivity, nextTick, deadline, ManifestationID(), true};
ExecutionCtx ctx{*this, tickEvent};
layer2_.postDispatch (tickEvent, ctx, layer1_);
layer2_.postChain (tickEvent, layer1_);
} // *deliberately* use low-level entrance
} // to avoid ignite() cycles and derailed load-regulation

View file

@ -97,10 +97,9 @@ namespace test {
verify_GroomingToken();
verify_GroomingGuard();
torture_GroomingToken();
verify_DispatchDecision();
verify_findWork();
verify_Significance();
verify_postDispatch();
verify_postChain();
integratedWorkCycle();
}
@ -124,7 +123,7 @@ namespace test {
// prepare scenario: some activity is enqueued
queue.instruct ({activity, when, dead});
sched.postDispatch (sched.findWork(queue,now), detector.executionCtx,queue);
//// sched.postChain (sched.findWork(queue,now), detector.executionCtx,queue);///////////////TODO
CHECK (detector.verifyInvocation("CTX-tick").arg(now));
CHECK (queue.empty());
@ -279,51 +278,51 @@ namespace test {
/** @test verify the logic to decide where and when to perform
* the dispatch of a Scheduler Activity chain.
*/
void
verify_DispatchDecision()
{
SchedulerCommutator sched;
___ensureGroomingTokenReleased(sched);
Time t1{10,0};
Time t2{20,0};
Time t3{30,0};
Time now{t2};
auto myself = std::this_thread::get_id();
CHECK (sched.decideDispatchNow (t1, now)); // time is before now => good to execute
CHECK (sched.holdsGroomingToken (myself)); // Side-Effect: acquired the Grooming-Token
CHECK (sched.decideDispatchNow (t1, now)); // also works if Grooming-Token is already acquired
CHECK (sched.holdsGroomingToken (myself));
CHECK (sched.decideDispatchNow (t2, now)); // Boundary case time == now => good to execute
CHECK (sched.holdsGroomingToken (myself));
CHECK (not sched.decideDispatchNow (t3, now)); // Task in the future shall not be dispatched now
CHECK (sched.holdsGroomingToken (myself)); // ...and this case has no impact on the Grooming-Token
sched.dropGroomingToken();
CHECK (not sched.decideDispatchNow (t3, now));
CHECK (not sched.holdsGroomingToken (myself));
blockGroomingToken(sched);
CHECK (not sched.acquireGoomingToken());
CHECK (not sched.decideDispatchNow (t1, now)); // unable to acquire => can not decide positively
CHECK (not sched.holdsGroomingToken (myself));
CHECK (not sched.decideDispatchNow (t2, now));
CHECK (not sched.holdsGroomingToken (myself));
unblockGroomingToken();
CHECK (sched.decideDispatchNow (t2, now));
CHECK (sched.holdsGroomingToken (myself));
}
// /** @test verify the logic to decide where and when to perform
// * the dispatch of a Scheduler Activity chain.
// */
// void
// verify_DispatchDecision()
// {
// SchedulerCommutator sched;
// ___ensureGroomingTokenReleased(sched);
//
// Time t1{10,0};
// Time t2{20,0};
// Time t3{30,0};
// Time now{t2};
//
// auto myself = std::this_thread::get_id();
// CHECK (sched.decideDispatchNow (t1, now)); // time is before now => good to execute
// CHECK (sched.holdsGroomingToken (myself)); // Side-Effect: acquired the Grooming-Token
//
// CHECK (sched.decideDispatchNow (t1, now)); // also works if Grooming-Token is already acquired
// CHECK (sched.holdsGroomingToken (myself));
//
// CHECK (sched.decideDispatchNow (t2, now)); // Boundary case time == now => good to execute
// CHECK (sched.holdsGroomingToken (myself));
//
// CHECK (not sched.decideDispatchNow (t3, now)); // Task in the future shall not be dispatched now
// CHECK (sched.holdsGroomingToken (myself)); // ...and this case has no impact on the Grooming-Token
// sched.dropGroomingToken();
//
// CHECK (not sched.decideDispatchNow (t3, now));
// CHECK (not sched.holdsGroomingToken (myself));
//
// blockGroomingToken(sched);
// CHECK (not sched.acquireGoomingToken());
//
// CHECK (not sched.decideDispatchNow (t1, now)); // unable to acquire => can not decide positively
// CHECK (not sched.holdsGroomingToken (myself));
//
// CHECK (not sched.decideDispatchNow (t2, now));
// CHECK (not sched.holdsGroomingToken (myself));
//
// unblockGroomingToken();
//
// CHECK (sched.decideDispatchNow (t2, now));
// CHECK (sched.holdsGroomingToken (myself));
// }
@ -468,7 +467,7 @@ namespace test {
/** @test verify entrance point for performing an Activity chain.
*/
void
verify_postDispatch()
verify_postChain()
{
// rigged execution environment to detect activations--------------
ActivityDetector detector;
@ -488,18 +487,18 @@ namespace test {
CHECK (not sched.holdsGroomingToken (myself));
// no effect when empty / no Activity given (usually this can happen due to lock contention)
CHECK (activity::KICK == sched.postDispatch (ActivationEvent(), detector.executionCtx, queue));
CHECK (activity::KICK == sched.postChain (ActivationEvent(), queue));
CHECK (not sched.holdsGroomingToken (myself));
// Activity immediately dispatched when on time and GroomingToken can be acquired
CHECK (activity::PASS == sched.postDispatch (makeEvent(past), detector.executionCtx, queue));
// Activity immediately dispatched when on time and GroomingToken can be acquired ///////////////////////////TODO
CHECK (activity::PASS == sched.postChain (makeEvent(past), queue));
CHECK (detector.verifyInvocation("testActivity").timeArg(now)); // was invoked immediately
CHECK ( sched.holdsGroomingToken (myself));
CHECK ( queue.empty());
detector.incrementSeq(); // Seq-point-1 in the detector log
// future Activity is enqueued by short-circuit directly into the PriorityQueue if possible
CHECK (activity::PASS == sched.postDispatch (makeEvent(future), detector.executionCtx, queue));
CHECK (activity::PASS == sched.postChain (makeEvent(future), queue));
CHECK ( sched.holdsGroomingToken (myself));
CHECK (not queue.empty());
CHECK (isSameObject (activity, *queue.peekHead())); // appears at Head, implying it's in Priority-Queue
@ -510,14 +509,14 @@ namespace test {
CHECK (queue.empty());
// ...but GroomingToken is not acquired explicitly; Activity is just placed into the Instruct-Queue
CHECK (activity::PASS == sched.postDispatch (makeEvent(future), detector.executionCtx, queue));
CHECK (activity::PASS == sched.postChain (makeEvent(future), queue));
CHECK (not sched.holdsGroomingToken (myself));
CHECK (not queue.peekHead()); // not appearing at Head this time,
CHECK (not queue.empty()); // rather waiting in the Instruct-Queue
blockGroomingToken(sched);
CHECK (activity::PASS == sched.postDispatch (makeEvent(now), detector.executionCtx, queue));
CHECK (activity::PASS == sched.postChain (makeEvent(now), queue));
CHECK (not sched.holdsGroomingToken (myself));
CHECK (not queue.peekHead()); // was enqueued, not executed
@ -586,7 +585,7 @@ namespace test {
TimeVar now{Time::ZERO};
// rig the ExecutionCtx to allow manipulating "current scheduler time"
detector.executionCtx.getSchedTime = [&]{ return Time{now}; };
detector.executionCtx.getSchedTime = [&]{ return Time{now}; };///////////////////////////TODO RLY?
// rig the λ-work to verify GroomingToken and to drop it then
detector.executionCtx.work.implementedAs(
[&](Time, size_t)
@ -598,7 +597,7 @@ namespace test {
// ·=================================================================== actual test sequence
// Add the Activity-Term to be scheduled for planned start-Time
sched.postDispatch (ActivationEvent{anchor, start}, detector.executionCtx, queue);
sched.postChain (ActivationEvent{anchor, start}, queue);
CHECK (detector.ensureNoInvocation("testJob"));
CHECK (not sched.holdsGroomingToken (myself));
CHECK (not queue.empty());
@ -612,7 +611,7 @@ namespace test {
CHECK (sched.holdsGroomingToken (myself)); // acquired the GroomingToken
CHECK (isSameObject(*act, anchor)); // "found" the rigged Activity as next piece of work
sched.postDispatch (act, detector.executionCtx, queue);
sched.postChain (act, queue); ////////////////////////TODO must dispatch here
CHECK (queue.empty());
CHECK (not sched.holdsGroomingToken (myself)); // the λ-work was invoked and dropped the GroomingToken

View file

@ -129,8 +129,7 @@ namespace test {
postNewTask (Scheduler& scheduler, Activity& chain, Time start)
{
ActivationEvent actEvent{chain, start, start + Time{50,0}}; // add dummy deadline +50ms
Scheduler::ExecutionCtx ctx{scheduler, actEvent};
scheduler.layer2_.postDispatch (actEvent, ctx, scheduler.layer1_);
scheduler.layer2_.postChain (actEvent, scheduler.layer1_);
}
@ -299,7 +298,7 @@ namespace test {
/** @test verify visible behaviour of the [work-pulling function](\ref Scheduler::getWork)
/** @test verify visible behaviour of the [work-pulling function](\ref Scheduler::doWork)
* - use a rigged Activity probe to capture the schedule time on invocation
* - additionally perform a timing measurement for invoking the work-function
* - invoking the Activity probe itself costs 50...150µs, Scheduler internals < 50µs
@ -308,8 +307,8 @@ namespace test {
* + an Activity already due will be dispatched immediately by post()
* + an Activity due at the point when invoking the work-function is dispatched
* + while queue is empty, the work-function returns immediately, indicating sleep
* + invoking the work-function when there is still some time span up to the next
* planned Activity will enter a targeted sleep, returning shortly after the
* + invoking the work-function, when there is still some time span up to the next
* planned Activity, will cause a targeted sleep, returning shortly after the
* next schedule. Entering then again will cause dispatch of that activity.
* + if the work-function dispatches an Activity while the next entry is planned
* for some time ahead, the work-function will likewise go into a targeted
@ -349,7 +348,7 @@ namespace test {
};
auto pullWork = [&] {
delay_us = lib::test::benchmarkTime([&]{ res = scheduler.getWork(); });
delay_us = lib::test::benchmarkTime([&]{ res = scheduler.doWork(); });
slip_us = _raw(detector.invokeTime(probe)) - _raw(start);
cout << "res:"<<res<<" delay="<<delay_us<<"µs slip="<<slip_us<<"µs"<<endl;
};
@ -528,7 +527,7 @@ namespace test {
detector.incrementSeq(); // mark this point in the detector-log...
// Explicitly invoke the work-Function (normally done by the workers)
CHECK (activity::PASS == scheduler.getWork());
CHECK (activity::PASS == scheduler.doWork());
CHECK (detector.verifySeqIncrement(1)
.beforeInvocation("testJob").arg("7.007", 1337));

File diff suppressed because it is too large Load diff