LUMIERA.clone/src/proc/control/proc-dispatcher.cpp

450 lines
16 KiB
C++
Raw Normal View History

2009-06-08 04:46:07 +02:00
/*
ProcDispatcher - Proc-Layer command dispatch and execution
2010-12-17 23:28:49 +01:00
2009-06-08 04:46:07 +02:00
Copyright (C) Lumiera.org
2008, Hermann Vosseler <Ichthyostega@web.de>
2010-12-17 23:28:49 +01:00
2009-06-08 04:46:07 +02:00
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License as
2010-12-17 23:28:49 +01:00
published by the Free Software Foundation; either version 2 of
the License, or (at your option) any later version.
2009-06-08 04:46:07 +02:00
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
2010-12-17 23:28:49 +01:00
2009-06-08 04:46:07 +02:00
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
2010-12-17 23:28:49 +01:00
2009-06-08 04:46:07 +02:00
* *****************************************************/
/** @file proc-dispatcher.cpp
** Implementation details of running commands and the builder.
2016-12-22 19:35:42 +01:00
** The ProcDispatcher is at the heart of the session subsystem and implements a
** (single) session thread to perform commands and trigger builder runs. New commands
** can be enqueued with a dedicated CommandQueue, while the details of operation control
** logic are encapsulated in a [logic component](\ref Looper).
**
** # Operational Semantics
** We need to distinguish between the ProcDispatcher itself, which is a static (singleton) service,
** and the »Session Subsystem« plus the _Session proper._ The subsystem has an application-global lifecycle,
** while the Session itself is a data structure and can be closed, opened or re-loaded. There is a singular
** transactional access point to the Session datastructure, which can be switched to new session contents.
** But external manipulation of the session contents is performed by commands, which are _dispatched_ --
** to manage this process is the concern of the »Session Subsystem«.
**
** Closing a session blocks further command processing, while the lifecycle of the _Session Subsystem_ is
** actually linked to _running the \ref DispatcherLoop_ -- implementation logic defined in this translation
** unit here. This loop implementation is performed in a dedicated thread, _the Session Loop Thread._ And
** this also entails opening the public SessionCommandService interface.
**
** ## Loop operation control
** The loop starts with a blocking wait state, bound to the condition Looper::requireAction. Here, Looper
** is a helper to encapsulate the control logic, separated from the actual control flow. In the loop body,
** depending on the Looper's decision, either the next command is fetched from the CommandQueue and dispatched,
** or a builder run is triggered, rebuilding the »Low-Level-Model« to reflect the executed command's effects.
** After these working actions, a _"check point"_ is reached in Looper::markStateProcessed, which updates
** the logic and manages a _dirty state_ to control builder runs. After that, the looping control flow
** again enters the possibly blocking condition wait.
** - after a command has been dispatched, the builder is _dirty_ and needs to run
** - yet we continue to dispatch further commands, until the queue is emptied
** - and only after a further small latency wait, the builder run is triggered
** - but we _enforce a builder run_ after some extended timeout period, even
** when the command queue is not yet emptied
** - from the outside, it is possible to deactivate processing and place the
** loop into dormant state. This is used while closing or loading the Session
** - and of course we can request the Session Loop Thread to stop, for shutting
** down the »Session Subsystem« as a whole
** - in both cases the currently performed action (command or builder) is
** finished, without interrupt
2016-12-22 19:35:42 +01:00
**
** ## Locking
** The ProcDispatcher uses an "inner and outer capsule" design, and both layers are locked independently.
** On the outer layer, locking ensures sanity of the control data structures, while locking on the inner
** layer guards the communication with the Session Loop Thread, and coordinates sleep wait and notification.
** As usual with Lumiera's Thread wrapper, the management of the thread's lifecycle itself, hand-over of
** parameters, and starting / joining of the thread operation is protected by separate locking embedded
** into the thread and threadpool handling code.
** @note most of the time, the Session Loop Thread does not hold any lock, most notably while performing
** a command or running the builder. Likewise, evaluation of the control logic in the Looper helper
** is a private detail of the performing thread. The lock is acquired solely for checking or leaving
** the wait state and when fetching next command from queue.
**
** @todo as of 12/2016, implementation has been drafted and is very much WIP
** @todo //////////////////////////////////////////////////////TODO ensure really every state change triggers a wakeup!!!!!!!
**
** @see ProcDispatcher
** @see DispatcherLooper_test
** @see CommandQueue_test
**
*/
2009-06-08 04:46:07 +02:00
#include "lib/error.hpp"
#include "include/logging.h"
2009-06-08 04:46:07 +02:00
#include "proc/control/proc-dispatcher.hpp"
#include "proc/control/command-dispatch.hpp"
#include "proc/control/command-queue.hpp"
#include "proc/control/looper.hpp"
#include "proc/control/session-command-service.hpp"
#include "proc/mobject/session.hpp"
#include "backend/thread-wrapper.hpp"
2009-06-08 04:46:07 +02:00
#include <memory>
using backend::ThreadJoinable;
using lib::Sync;
using lib::RecursiveLock_Waitable;
using std::unique_ptr;
2009-06-08 04:46:07 +02:00
namespace proc {
2009-06-08 04:46:07 +02:00
namespace control {
namespace error = lumiera::error;
/**
* PImpl within ProcDispatcher
* to implement the _Session Loop Thread._
* During the lifetime of this object
* - the SessionCommandService is offered to enqueue commands
* - the Session Loop thread dispatches commands and triggers the Builder
* @see DispatcherLooper_test
*/
class DispatcherLoop
: ThreadJoinable
, public CommandDispatch
, public Sync<RecursiveLock_Waitable>
{
bool canDispatch_{false};
2016-12-22 19:35:42 +01:00
/** manage the primary public Session interface */
unique_ptr<SessionCommandService> commandService_;
CommandQueue queue_;
Looper looper_;
public:
/** start the session loop thread
* @param notification callback to invoke on thread termination
* @remark _in theory_ this ctor could block, since it waits for the thread
* actually to get operational and it waits for the SessionCommand interface
* to be opened. The latter _should not_ run into any obstacles, because
* in case it does, the main application thread is deadlocked on startup.
* Such might happen indirectly, when something depends on "the Session"
*/
DispatcherLoop (Subsys::SigTerm notification)
: ThreadJoinable("Lumiera Session"
, bind (&DispatcherLoop::runSessionThread, this, notification))
2016-12-22 19:35:42 +01:00
, commandService_{new SessionCommandService(*this)}
, queue_()
, looper_([&]() -> bool
{
return not queue_.empty();
})
{
Thread::sync(); // done with init; loop may run now....
INFO (session, "Proc-Dispatcher running...");
}
~DispatcherLoop()
{
try {
Lock sync(this);
2016-12-22 19:35:42 +01:00
commandService_.reset(); // redundant call, to ensure session interface is closed reliably
this->join(); // block until the loop thread terminates and is reaped
INFO (session, "Proc-Dispatcher stopped.");
}
ERROR_LOG_AND_IGNORE(session, "Stopping the Proc-Dispatcher");
}
void
activateCommandProecssing()
{
Lock sync(this);
canDispatch_ = true;
INFO (command, "Session command processing activated.");
TODO ("implement command processing queue");
}
void
deactivateCommandProecssing()
{
Lock sync(this);
canDispatch_ = false;
INFO (command, "Session command interface closed.");
TODO ("implement command processing queue");
}
/* === CommandDispatch interface === */
//////////////////////////////////////////TODO notify!!!! on!! every!! state!! changing!! operation!!
void
clear() override
{
Lock sync(this);
queue_.clear();
//////////////////////////////////////////TODO notify!!!!
}
void
enqueue (Command cmd) override
{
Lock sync(this);
queue_.feed (cmd);
//////////////////////////////////////////TODO notify!!!!
}
size_t
size() const
{
Lock sync(this);
return queue_.size();
}
void
requestStop() noexcept
{
Lock sync(this);
commandService_.reset(); // closes Session interface
looper_.triggerShutdown();
UNIMPLEMENTED("*must* notify loop thread"); /////////////////TODO really? YES!!!
//////////////////////////////////////////TODO notify!!!!
}
void
awaitStateProcessed()
{
Lock blockWaiting(this, &DispatcherLoop::stateIsSynched);
//////////////////////////////////////////TODO eternal sleep.... find out who will wake us!!!!
}
private:
/**
* any operation running in the Session thread
* is started from here. When this loop terminates,
* the "Session subsystem" shuts down.
*/
void
runSessionThread (Subsys::SigTerm sigTerm)
{
string errorMsg;
syncPoint();
try
{
while (looper_.shallLoop())
{
awaitAction();
if (looper_.isDying())
break;
if (looper_.runBuild())
startBuilder();
else
if (looper_.isWorking())
processCommands();
updateState();
}
}
catch (lumiera::Error& problem)
{
errorMsg = problem.what();
lumiera_error(); // clear error flag
}
catch (...)
{
errorMsg = string{lumiera_error()};
}
// now leave the Session thread...
// send notification of subsystem shutdown
sigTerm (&errorMsg);
}
void
awaitAction() ///< at begin of loop body...
{
Lock(this).wait(looper_, &Looper::requireAction,
looper_.getTimeout());
}
void
updateState() ///< at end of loop body...
{
looper_.markStateProcessed();
if (looper_.isDisabled())
Lock(this).notifyAll();
}
bool
stateIsSynched()
{
if (looper_.hasPendingChanges() and calledFromWithinSessionThread())
throw error::Fatal("Possible Deadlock. "
"Attempt to synchronise to a command processing check point "
"from within the (single) session thread."
, error::LUMIERA_ERROR_LIFECYCLE);
return not looper_.hasPendingChanges();
}
void
processCommands()
{
UNIMPLEMENTED ("pull commands from the queue and dispatch them");
}
void
startBuilder()
{
UNIMPLEMENTED ("start the Proc-Builder to recalculate render nodes network");
}
bool
calledFromWithinSessionThread()
{
UNIMPLEMENTED ("how to find out when the session thread attempts to catch its own tail...???");
////////////////////////////////////////////////////////////////TODO any idea how to achieve that? The lock does not help us, since it is recursive and
//////////////////////////////////////////////////////////////// ... since command/builder execution itself is not performed in a locked section.
//////////////////////////////////////////////////////////////// ... Possibly we'll just have to plant a ThreadLocal to mark this dangerous situation.
///////////////////////////////////////////////////////////////////////////////TICKET #1054 : can be done by relying on some internals of our thread handling framework
}
};
/** storage for Singleton access */
lib::Depend<ProcDispatcher> ProcDispatcher::instance;
/** starting the ProcDispatcher means to start the session subsystem.
* @return `false` when _starting_ failed since it is already running...
* @remark this function implements the start operation for the »session subsystem«.
* More specifically, this operation starts a new thread to perform the
* _session loop,_ which means to perform commands and trigger the builder.
* It might block temporarily for synchronisation with this new thread and
* while opening the SessionCommand facade.
*/
bool
ProcDispatcher::start (Subsys::SigTerm termNotification)
{
Lock sync(this);
if (runningLoop_) return false;
runningLoop_.reset (
new DispatcherLoop (
[=] (string* problemMessage)
{
runningLoop_.reset(); //////////////////////TODO: (1) Race in ProcDispatcher (2) Deadlock because the Thread attempts to reap itself
termNotification(problemMessage);
}));
if (active_)
runningLoop_->activateCommandProecssing();
return true;
}
/** whether the »session subsystem« is operational.
* @return `true` if the session loop thread has been fully
* started and is not (yet) completely terminated.
*/
bool
ProcDispatcher::isRunning()
{
Lock sync(this);
return bool(runningLoop_);
}
/** signal to the loop thread that it needs to terminate.
* @warning dangerous operation; must not block nor throw
*
* @todo need to re-check the logic, once the loop is fully implemented; ensure there is nothing on this call path that can block or throw!!!
*/
void
ProcDispatcher::requestStop() noexcept
{
Lock sync(this);
if (runningLoop_)
runningLoop_->requestStop();
}
/** activate processing of enqueued session commands.
* @remarks command processing serves as public external interface
* to the session. This call is used by the session lifecycle (SessManagerImpl)
* when the session is brought up; any other invocation runs danger to mess up
* the session lifecycle state and process commands on a deconfigured session.
* In case the dispatcher loop is not actually running, the activation state
* is stored and applied accordingly later, when the loop is fired up.
*/
void
ProcDispatcher::activate()
{
Lock sync(this);
active_ = true;
if (runningLoop_)
runningLoop_->activateCommandProecssing();
}
void
ProcDispatcher::deactivate()
{
Lock sync(this);
active_ = false;
if (runningLoop_)
runningLoop_->deactivateCommandProecssing();
}
/** block until the dispatcher has actually reached disabled state.
* @warning beware of invoking this function from within the session thread,
* since the waiting relies on the very lock also used to coordinate
* command processing and builder runs within that thread.
* @throw error::Fatal when a deadlock due to such a recursive call can be detected
*/
void
ProcDispatcher::awaitDeactivation()
{
Lock sync(this);
if (runningLoop_)
runningLoop_->awaitStateProcessed();
}
void
ProcDispatcher::clear()
{
Lock sync(this);
if (not empty())
{
WARN (command, "DISCARDING pending Session commands.");
REQUIRE (runningLoop_);
runningLoop_->clear();
}
}
bool
ProcDispatcher::empty() const
{
Lock sync(this);
return not runningLoop_
or 0 == runningLoop_->size();
}
2009-06-08 04:46:07 +02:00
}} // namespace proc::control