/* ProcDispatcher - Proc-Layer command dispatch and execution Copyright (C) Lumiera.org 2008, Hermann Vosseler This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. * *****************************************************/ /** @file proc-dispatcher.cpp ** Implementation details of running commands and the builder. ** The ProcDispatcher is at the heart of the session subsystem and implements a ** (single) session thread to perform commands and trigger builder runs. New commands ** can be enqueued with a dedicated CommandQueue, while the details of operation control ** logic are encapsulated in a [logic component](\ref Looper). ** ** # Operational Semantics ** We need to distinguish between the ProcDispatcher itself, which is a static (singleton) service, ** and the »Session Subsystem« plus the _Session proper._ The subsystem has an application-global lifecycle, ** while the Session itself is a data structure and can be closed, opened or re-loaded. There is a singular ** transactional access point to the Session datastructure, which can be switched to new session contents. ** But external manipulation of the session contents is performed by commands, which are _dispatched_ -- ** to manage this process is the concern of the »Session Subsystem«. ** ** Closing a session blocks further command processing, while the lifecycle of the _Session Subsystem_ is ** actually linked to _running the \ref DispatcherLoop_ -- implementation logic defined in this translation ** unit here. This loop implementation is performed in a dedicated thread, _the Session Loop Thread._ And ** this also entails opening the public SessionCommandService interface. ** ** ## Loop operation control ** The loop starts with a blocking wait state, bound to the condition Looper::requireAction. Here, Looper ** is a helper to encapsulate the control logic, separated from the actual control flow. In the loop body, ** depending on the Looper's decision, either the next command is fetched from the CommandQueue and dispatched, ** or a builder run is triggered, rebuilding the »Low-Level-Model« to reflect the executed command's effects. ** After these working actions, a _"check point"_ is reached in Looper::markStateProcessed, which updates ** the logic and manages a _dirty state_ to control builder runs. After that, the looping control flow ** again enters the possibly blocking condition wait. ** - after a command has been dispatched, the builder is _dirty_ and needs to run ** - yet we continue to dispatch further commands, until the queue is emptied ** - and only after a further small latency wait, the builder run is triggered ** - but we _enforce a builder run_ after some extended timeout period, even ** when the command queue is not yet emptied ** - from the outside, it is possible to deactivate processing and place the ** loop into dormant state. This is used while closing or loading the Session ** - and of course we can request the Session Loop Thread to stop, for shutting ** down the »Session Subsystem« as a whole ** - in both cases the currently performed action (command or builder) is ** finished, without interrupt ** ** ## Locking ** The ProcDispatcher uses an "inner and outer capsule" design, and both layers are locked independently. ** On the outer layer, locking ensures sanity of the control data structures, while locking on the inner ** layer guards the communication with the Session Loop Thread, and coordinates sleep wait and notification. ** As usual with Lumiera's Thread wrapper, the management of the thread's lifecycle itself, hand-over of ** parameters, and starting / joining of the thread operation is protected by separate locking embedded ** into the thread and threadpool handling code. ** @note most of the time, the Session Loop Thread does not hold any lock, most notably while performing ** a command or running the builder. Likewise, evaluation of the control logic in the Looper helper ** is a private detail of the performing thread. The lock is acquired solely for checking or leaving ** the wait state and when fetching next command from queue. ** ** @todo as of 12/2016, implementation has been drafted and is very much WIP ** @todo //////////////////////////////////////////////////////TODO ensure really every state change triggers a wakeup!!!!!!! ** ** @see ProcDispatcher ** @see DispatcherLooper_test ** @see CommandQueue_test ** */ #include "lib/error.hpp" #include "include/logging.h" #include "proc/control/proc-dispatcher.hpp" #include "proc/control/command-dispatch.hpp" #include "proc/control/command-queue.hpp" #include "proc/control/looper.hpp" #include "proc/control/session-command-service.hpp" #include "proc/mobject/session.hpp" #include "backend/thread-wrapper.hpp" #include "lib/format-string.hpp" #include using backend::ThreadJoinable; using util::_Fmt; using lib::Sync; using lib::RecursiveLock_Waitable; using std::unique_ptr; namespace proc { namespace control { namespace error = lumiera::error; /** * PImpl within ProcDispatcher * to implement the _Session Loop Thread._ * During the lifetime of this object * - the SessionCommandService is offered to enqueue commands * - the Session Loop thread dispatches commands and triggers the Builder * @see DispatcherLooper_test */ class DispatcherLoop : ThreadJoinable , public CommandDispatch , public Sync { bool canDispatch_{false}; /** manage the primary public Session interface */ unique_ptr commandService_; CommandQueue queue_; Looper looper_; public: DispatcherLoop (Subsys::SigTerm notification) : ThreadJoinable("Lumiera Session" , bind (&DispatcherLoop::runSessionThread, this, notification)) , commandService_{new SessionCommandService(*this)} , queue_() , looper_([&]() -> bool { return not queue_.empty(); }) { Thread::sync(); // done with init; loop may run now.... INFO (session, "Proc-Dispatcher running..."); } ~DispatcherLoop() { try { Lock sync(this); commandService_.reset(); // redundant call, to ensure session interface is closed reliably this->join(); // block until the loop thread terminates and is reaped INFO (session, "Proc-Dispatcher stopped."); } ERROR_LOG_AND_IGNORE(session, "Stopping the Proc-Dispatcher"); } void activateCommandProecssing() { Lock sync(this); canDispatch_ = true; INFO (command, "Session command processing activated."); TODO ("implement command processing queue"); } void deactivateCommandProecssing() { Lock sync(this); canDispatch_ = false; INFO (command, "Session command interface closed."); TODO ("implement command processing queue"); } /* === CommandDispatch interface === */ //////////////////////////////////////////TODO notify!!!! on!! every!! state!! changing!! operation!! void clear() override { Lock sync(this); UNIMPLEMENTED ("clear the queue"); //////////////////////////////////////////TODO notify!!!! } void enqueue (Command cmd) override { if (not cmd.canExec()) throw error::Logic(_Fmt("Reject '%s'. Not suitably prepared for invocation: %s") % cmd.getID() % cmd , LUMIERA_ERROR_UNBOUND_ARGUMENTS); UNIMPLEMENTED ("enqueue command"); //////////////////////////////////////////TODO notify!!!! } size_t size() const { TODO ("implement command processing queue"); return 0; } void requestStop() noexcept { Lock sync(this); commandService_.reset(); // closes Session interface looper_.triggerShutdown(); UNIMPLEMENTED("*must* notify loop thread"); /////////////////TODO really? YES!!! //////////////////////////////////////////TODO notify!!!! } void awaitStateProcessed() { Lock blockWaiting(this, &DispatcherLoop::stateIsSynched); //////////////////////////////////////////TODO find out who will notify us!!!! } private: /** * any operation running in the Session thread * is started from here. When this loop terminates, * the "Session subsystem" shuts down. */ void runSessionThread (Subsys::SigTerm sigTerm) { string errorMsg; syncPoint(); try { while (looper_.shallLoop()) { awaitAction(); if (looper_.isDying()) break; if (looper_.runBuild()) startBuilder(); else if (looper_.isWorking()) processCommands(); updateState(); } } catch (lumiera::Error& problem) { errorMsg = problem.what(); lumiera_error(); // clear error flag } catch (...) { errorMsg = string{lumiera_error()}; } // now leave the Session thread... // send notification of subsystem shutdown sigTerm (&errorMsg); } void awaitAction() ///< at begin of loop body... { Lock(this).wait(looper_, &Looper::requireAction, looper_.getTimeout()); } void updateState() ///< at end of loop body... { looper_.markStateProcessed(); if (looper_.isDisabled()) Lock(this).notifyAll(); } bool stateIsSynched() { if (looper_.hasPendingChanges() and calledFromWithinSessionThread()) throw error::Fatal("Possible Deadlock. " "Attempt to synchronise to a command processing check point " "from within the (single) session thread." , error::LUMIERA_ERROR_LIFECYCLE); return not looper_.hasPendingChanges(); } void processCommands() { UNIMPLEMENTED ("pull commands from the queue and dispatch them"); } void startBuilder() { UNIMPLEMENTED ("start the Proc-Builder to recalculate render nodes network"); } bool calledFromWithinSessionThread() { UNIMPLEMENTED ("how to find out when the session thread attempts to catch its own tail...???"); ////////////////////////////////////////////////////////////////TODO any idea how to achieve that? The lock does not help us, since it is recursive and //////////////////////////////////////////////////////////////// ... since command/builder execution itself is not performed in a locked section. //////////////////////////////////////////////////////////////// ... Possibly we'll just have to plant a ThreadLocal to mark this dangerous situation. ///////////////////////////////////////////////////////////////////////////////TICKET #1054 : can be done by relying on some internals of our thread handling framework } }; /** storage for Singleton access */ lib::Depend ProcDispatcher::instance; /** */ bool ProcDispatcher::start (Subsys::SigTerm termNotification) { Lock sync(this); if (runningLoop_) return false; runningLoop_.reset ( new DispatcherLoop ( [=] (string* problemMessage) { runningLoop_.reset(); termNotification(problemMessage); })); if (active_) runningLoop_->activateCommandProecssing(); return true; } /** */ bool ProcDispatcher::isRunning() { Lock sync(this); return bool(runningLoop_); } /** signal to the loop thread that it needs to terminate. * @warning dangerous operation; must not block nor throw * * @todo need to re-check the logic, once the loop is fully implemented; ensure there is nothing on this call path that can block or throw!!! */ void ProcDispatcher::requestStop() noexcept { Lock sync(this); if (runningLoop_) runningLoop_->requestStop(); } /** activate processing of enqueued session commands. * @remarks command processing serves as public external interface * to the session. This call is used by the session lifecycle (SessManagerImpl) * when the session is brought up; any other invocation runs danger to mess up * the session lifecycle state and process commands on a deconfigured session. * In case the dispatcher loop is not actually running, the activation state * is stored and applied accordingly later, when the loop is fired up. */ void ProcDispatcher::activate() { Lock sync(this); active_ = true; if (runningLoop_) runningLoop_->activateCommandProecssing(); } void ProcDispatcher::deactivate() { Lock sync(this); active_ = false; if (runningLoop_) runningLoop_->deactivateCommandProecssing(); } /** block until the dispatcher has actually reached disabled state. * @warning beware of invoking this function from within the session thread, * since the waiting relies on the very lock also used to coordinate * command processing and builder runs within that thread. * @throw error::Fatal when a deadlock due to such a recursive call can be detected */ void ProcDispatcher::awaitDeactivation() { Lock sync(this); if (runningLoop_) runningLoop_->awaitStateProcessed(); } void ProcDispatcher::clear() { Lock sync(this); if (not empty()) { WARN (command, "DISCARDING pending Session commands."); REQUIRE (runningLoop_); runningLoop_->clear(); } } bool ProcDispatcher::empty() const { Lock sync(this); return not runningLoop_ or 0 == runningLoop_->size(); } }} // namespace proc::control