From 481e35a59701e9e3e9df5d020bb21509c982a1da Mon Sep 17 00:00:00 2001 From: Ichthyostega Date: Tue, 5 Dec 2023 23:53:42 +0100 Subject: [PATCH] Chain-Load: implement translation into Scheduler invocations ... so this (finally) is the missing cornerstone ... traverse the calculation graph and generate render jobs ... provide a chunk-wise pre-planning of the next batch ... use a future to block the (test) thread until completed --- src/lib/uninitialised-storage.hpp | 109 ++++++++++++++ src/vault/gear/scheduler.hpp | 14 ++ tests/vault/gear/test-chain-load.hpp | 212 ++++++++++++++++++++++++++- wiki/thinkPad.ichthyo.mm | 123 +++++++++++++++- 4 files changed, 451 insertions(+), 7 deletions(-) diff --git a/src/lib/uninitialised-storage.hpp b/src/lib/uninitialised-storage.hpp index c4a3fbb51..3e6970d2e 100644 --- a/src/lib/uninitialised-storage.hpp +++ b/src/lib/uninitialised-storage.hpp @@ -126,5 +126,114 @@ namespace lib { }; + + + /** + * Managed uninitialised Heap-allocated storage with array like access. + * @tparam T the nominal type assumed to sit in each »slot« + */ + template + class UninitialisedDynBlock + { + using _Arr = T[]; + + T* buff_{nullptr}; + size_t size_{0}; + + public: + T* + allocate(size_t cnt) + { + if (buff_) discard(); + size_ = cnt; + buff_ = cnt? std::aligned_alloc (std::alignment_of(), cnt * sizeof(T)) + : nullptr; + return buff_; + } + + void + discard() + { + std::free (buff_); + buff_ = nullptr; + size_ = 0; + } + + + UninitialisedDynBlock() =default; + ~UninitialisedDynBlock() + { + if (buff_) + discard(); + } + explicit + UninitialisedDynBlock (size_t cnt) + { + if (cnt) + allocate(cnt); + } + + UninitialisedDynBlock (UninitialisedDynBlock && rr) + { + if (this != &rr) + swap (*this, rr); + } + + UninitialisedDynBlock (UninitialisedDynBlock const&) =delete; + UninitialisedDynBlock& operator= (UninitialisedDynBlock &&) =delete; + UninitialisedDynBlock& operator= (UninitialisedDynBlock const&) =delete; + + friend void + swap (UninitialisedDynBlock& u1, UninitialisedDynBlock& u2) + { + std::swap (u1.size_, u2.size_); + std::swap (u1.buff_, u2.buff_); + } + + explicit + operator bool() const + { + return bool(buff_); + } + + size_t + size() const + { + return size_; + } + + + _Arr& + array() + { + return * std::launder (reinterpret_cast<_Arr* > (buff_)); + } + + _Arr const& + array() const + { + return * std::launder (reinterpret_cast<_Arr const*> (buff_)); + } + + + T & operator[] (size_t idx) { return array()[idx]; } + T const& operator[] (size_t idx) const { return array()[idx]; } + + + template + T& + createAt (size_t idx, Args&& ...args) + { + return *new(&operator[](idx)) T{std::forward(args)...}; + } + + void + destroyAt (size_t idx) + { + operator[](idx).~T(); + } + }; + + } // namespace lib #endif /*LIB_UNINITIALISED_STORAGE_H*/ diff --git a/src/vault/gear/scheduler.hpp b/src/vault/gear/scheduler.hpp index a0c03a343..da61a4b2b 100644 --- a/src/vault/gear/scheduler.hpp +++ b/src/vault/gear/scheduler.hpp @@ -190,6 +190,20 @@ namespace gear { /** build Activity chain and hand-over to the Scheduler. */ ScheduleSpec post(); + + ScheduleSpec + linkToSuccessor (ScheduleSpec& succSpec) + { + term_->appendNotificationTo (*succSpec.term_); + return move(*this); + } + + ScheduleSpec + linkToPredecessor (ScheduleSpec& predSpec) + { + predSpec.term_->appendNotificationTo (*term_); + return move(*this); + } }; diff --git a/tests/vault/gear/test-chain-load.hpp b/tests/vault/gear/test-chain-load.hpp index ed4ac2e67..1d7051b80 100644 --- a/tests/vault/gear/test-chain-load.hpp +++ b/tests/vault/gear/test-chain-load.hpp @@ -85,8 +85,9 @@ //#include "lib/hash-value.h" #include "vault/gear/job.h" -//#include "vault/gear/activity.hpp" +#include "vault/gear/scheduler.hpp" //#include "vault/gear/nop-job-functor.hpp" +#include "lib/uninitialised-storage.hpp" #include "lib/time/timevalue.hpp" //#include "lib/meta/variadic-helper.hpp" //#include "lib/meta/function.hpp" @@ -102,10 +103,10 @@ #include #include #include -#include -#include +#include #include #include +#include #include #include @@ -119,6 +120,7 @@ namespace test { using lib::time::Time; using lib::time::TimeValue; using lib::time::FrameRate; + using lib::time::Duration; // using lib::time::FSecs; // using lib::time::Offset; // using lib::meta::RebindVariadic; @@ -138,12 +140,18 @@ namespace test { using std::swap; using std::move; // using boost::hash_combine; + using std::chrono_literals::operator ""s; + namespace err = lumiera::error; namespace dot = lib::dot_gen; namespace { // Default definitions for topology generation const size_t DEFAULT_FAN = 16; const size_t DEFAULT_SIZ = 256; + + const auto SAFETY_TIMEOUT = 5s; + const auto STANDARD_DEADLINE = 10ms; + const size_t DEFAULT_CHUNKSIZE = 64; } struct Statistic; @@ -606,7 +614,10 @@ namespace test { Statistic computeGraphStatistics(); TestChainLoad&& printTopologyStatistics(); - private: + class ScheduleCtx; + friend class ScheduleCtx; // accesses raw storage array + + ScheduleCtx setupSchedule (Scheduler& scheduler); }; @@ -1186,5 +1197,198 @@ namespace test { + + /** + * Setup and wiring for a test run to schedule a computation structure + * as defined by this TestChainLoad instance. This context is linked to + * a concrete TestChainLoad and Scheduler instance and holds a memory block + * with actual schedules, which are dispatched in batches into the Scheduler. + * It is *crucial* to keep this object *alive during the complete test run*, + * which is achieved by a blocking wait on the callback triggered after + * dispatching the last batch of calculation jobs. This process itself + * is meant for test usage and not thread-safe (while obviously the + * actual scheduling and processing happens in the worker threads). + * Yet the instance can be re-used to dispatch further test runs. + */ + template + class TestChainLoad::ScheduleCtx + : util::MoveOnly + { + TestChainLoad& chainLoad_; + Scheduler& scheduler_; + + lib::UninitialisedDynBlock schedule_; + + FrameRate levelSpeed_{1, Duration{_uTicks(1ms)}}; + uint loadFactor_{2}; + size_t chunkSize_{DEFAULT_CHUNKSIZE}; + TimeVar startTime_{Time::ANYTIME}; + Duration preRoll_{_uTicks(200us)}; + microseconds deadline_{STANDARD_DEADLINE}; + ManifestationID manID_{}; + + std::promise signalDone_{}; + + RandomChainCalcFunctor calcFunctor_; + RandomChainPlanFunctor planFunctor_; + + + /* ==== Callbacks from job planning ==== */ + + /** Callback: place a single job into the scheduler */ + void + disposeStep (size_t idx, size_t level) + { + schedule_[idx] = scheduler_.defineSchedule(calcJob (idx,level)) + .manifestation(manID_) + .startTime (calcStartTime(level)) + .lifeWindow (deadline_) + .post(); + } + + /** Callback: define a dependency between scheduled jobs */ + void + setDependency (Node* pred, Node* succ) + { + size_t predIdx = chainLoad_.nodeID (pred); + size_t succIdx = chainLoad_.nodeID (succ); + schedule_[predIdx].linkToSuccessor (schedule_[succIdx]); + } + + /** continue planning: schedule follow-up planning job */ + void + continuation (size_t levelDone, bool work_left) + { + if (work_left) + { + size_t nextChunkLevel = calcNextLevel (levelDone); + scheduler_.continueMetaJob (calcPlanScheduleTime (nextChunkLevel) + ,planningJob (nextChunkLevel) + ,manID_); + } + else + signalDone_.set_value(); + } + + std::future + performRun() + { + size_t numNodes = chainLoad_.size(); + schedule_.allocate (numNodes); + startTime_ = anchorStartTime(); + scheduler_.seedCalcStream (planningJob(0) + ,manID_ + ,calcLoadHint()); + return attachNewCompletionSignal(); + } + + public: + ScheduleCtx (TestChainLoad& mother, Scheduler& scheduler) + : chainLoad_{mother} + , scheduler_{scheduler} + , calcFunctor_{chainLoad_.nodes_[0]} + , planFunctor_{chainLoad_.nodes_[0], chainLoad_.numNodes_ + ,[this](size_t i, size_t l){ disposeStep(i,l); } + ,[this](auto* p, auto* s) { setDependency(p,s);} + ,[this](size_t l, bool w) { continuation(l,w); } + } + { } + + ScheduleCtx + launch_and_wait() + { + awaitBlocking( + performRun()); + return move(*this); + } + + + private: + /** push away any existing wait state and attach new clean state */ + std::future + attachNewCompletionSignal() + { + signalDone_.set_exception (std::make_exception_ptr(std::future_error (std::future_errc::broken_promise))); + std::promise notYetTriggered; + signalDone_.swap (notYetTriggered); + return signalDone_.get_future(); + } + + void + awaitBlocking(std::future signal) + { + if (std::future_status::timeout == signal.wait_for (SAFETY_TIMEOUT)) + throw err::Fatal("Timeout on Scheduler test exceeded."); + } + + Job + calcJob (size_t idx, size_t level) + { + return Job{calcFunctor_ + ,calcFunctor_.encodeNodeID(idx) + ,calcFunctor_.encodeLevel(level) + }; + } + + Job + planningJob (size_t level) + { + return Job{planFunctor_ + ,InvocationInstanceID() + ,planFunctor_.encodeLevel(level) + }; + } + + Time + anchorStartTime() + { + return RealClock::now() + preRoll_; + } + + FrameRate + calcLoadHint() + { + return FrameRate{levelSpeed_ * loadFactor_}; + } + + size_t + calcNextLevel (size_t levelDone) + { + return levelDone + chunkSize_; + } + + Time + calcStartTime(size_t level) + { + return startTime_ + level / levelSpeed_; + } + + Time + calcPlanScheduleTime (size_t nextChunkLevel) + {/* must be at least 1 level ahead, + because dependencies are defined backwards; + the chain-load graph only defines dependencies over one level + thus the first level in the next chunk must still be able to attach + dependencies to the last row of the preceding chunk, implying that + those still need to be ahead of schedule. + */ + nextChunkLevel = nextChunkLevel>2? nextChunkLevel-2 : 0; + return calcStartTime(nextChunkLevel) - preRoll_; + } + }; + + + /** + * establish and configure the context used for scheduling computations. + */ + template + typename TestChainLoad::ScheduleCtx + TestChainLoad::setupSchedule(Scheduler& scheduler) + { + return ScheduleCtx{*this, scheduler}; + } + + + }}} // namespace vault::gear::test #endif /*VAULT_GEAR_TEST_TEST_CHAIN_LOAD_H*/ diff --git a/wiki/thinkPad.ichthyo.mm b/wiki/thinkPad.ichthyo.mm index 9a23f9eba..b42eb6929 100644 --- a/wiki/thinkPad.ichthyo.mm +++ b/wiki/thinkPad.ichthyo.mm @@ -100597,13 +100597,17 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
- + + + + + @@ -100644,8 +100648,40 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
- + + + + + + + + +

+ Gut daß wir darüber geredet haben. +

+

+ Da offensichtlich die Länge des Test-Graphen keine Eigenschaft  der Teststruktur ist, landen wir bei einer rein dynamischen Allokation. Trotzdem war es eine gute Idee, das mit der uninitialised storage  endlich mal zu codifizierren... +

+ + +
+ +
+
+ + + + + + + + + + + + + @@ -100974,8 +101010,89 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
- + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +