Library/Application: switch Steam-Dispatcher to new thread-framework

TODO: SessionCommandFunction_test deadlocks!!
This commit is contained in:
Fischlurch 2023-09-30 03:12:55 +02:00
parent d79e33f797
commit 48d6f0fae3
4 changed files with 202 additions and 46 deletions

View file

@ -61,6 +61,7 @@
#include <vector>
#include <string>
#include <atomic>
namespace util {
@ -84,6 +85,8 @@ namespace lib {
* Rather, they are tied to a specific type context, e.g. a class
* implementing a custom allocator. These typed contexts are
* considered to be orthogonal and independent of each other.
* @remark 2023: allocation of global ID counters are protected by
* double-checked locking with mutex, which is deemed adequate.
*/
template<class CX>
class TypedContext
@ -202,14 +205,13 @@ namespace lib {
const size_t id_;
/** member counter shared per template instance */
static size_t memberCounter;
static std::atomic_size_t memberCounter;
/** threadsafe allocation of member ID */
static size_t
allocateNextMember()
{
ClassLock<FamilyMember> synchronised;
return memberCounter++;
return memberCounter.fetch_add(+1, std::memory_order_relaxed);
}
public:
@ -242,7 +244,7 @@ namespace lib {
/** allocate storage for the counter per type family */
template<typename TY>
size_t FamilyMember<TY>::memberCounter{0};
std::atomic_size_t FamilyMember<TY>::memberCounter{0};

View file

@ -88,15 +88,17 @@
#include "steam/control/looper.hpp"
#include "steam/control/session-command-service.hpp"
#include "steam/mobject/session.hpp"
#include "vault/thread-wrapper.hpp"
#include "lib/depend-inject.hpp"
#include "lib/sync-barrier.hpp"
#include "lib/thread.hpp"
#include "lib/util.hpp" ///////////////TODO for test command invocation
#include <memory>
using lib::Sync;
using lib::RecursiveLock_Waitable;
using vault::Thread;
using lib::SyncBarrier;
using lib::Thread;
using std::unique_ptr;
namespace steam {
@ -113,8 +115,7 @@ namespace control {
* @see DispatcherLooper_test
*/
class DispatcherLoop
: Thread
, public CommandDispatch
: public CommandDispatch
, public Sync<RecursiveLock_Waitable>
{
using ServiceHandle = lib::DependInject<SessionCommandService>::ServiceInstance<>;
@ -122,9 +123,10 @@ namespace control {
/** manage the primary public Session interface */
ServiceHandle commandService_;
SyncBarrier init_;
CommandQueue queue_;
Looper looper_;
Thread thread_;
public:
/** start the session loop thread
@ -136,15 +138,17 @@ namespace control {
* Such might happen indirectly, when something depends on "the Session"
*/
DispatcherLoop (Subsys::SigTerm notification)
: Thread{"Lumiera Session", bind (&DispatcherLoop::runSessionThread, this, notification)}
, commandService_{ServiceHandle::NOT_YET_STARTED}
: commandService_{ServiceHandle::NOT_YET_STARTED}
, queue_{}
, looper_([&]() -> bool
{
return not queue_.empty();
})
, thread_{"Lumiera Session"
,&DispatcherLoop::runSessionThread
, this, notification}
{
Thread::sync(); // done with setup; loop may run now....
init_.sync(); // done with setup; loop may run now....
INFO (session, "Steam-Dispatcher running...");
{
Lock(this); // open public session interface:
@ -155,7 +159,7 @@ namespace control {
~DispatcherLoop()
{
try {
commandService_.shutdown(); // redundant call, to ensure session interface is closed reliably
commandService_.shutdown(); // redundant call, to ensure session interface is closed reliably
INFO (session, "Steam-Dispatcher stopped.");
}
ERROR_LOG_AND_IGNORE(session, "Stopping the Steam-Dispatcher");
@ -231,7 +235,7 @@ namespace control {
runSessionThread (Subsys::SigTerm notifyEnd)
{
string errorMsg;
syncPoint();
init_.sync();
try
{
while (looper_.shallLoop())
@ -279,7 +283,7 @@ namespace control {
bool
isStateSynched()
{
if (this->invokedWithinThread())
if (thread_.invokedWithinThread())
throw error::Fatal("Possible Deadlock. "
"Attempt to synchronise to a command processing check point "
"from within the (single) session thread."

View file

@ -82,15 +82,19 @@ extern "C" {
#include "steam/control/steam-dispatcher.hpp"
#include "steam/control/command-def.hpp"
#include "include/session-command-facade.h"
#include "vault/thread-wrapper.hpp"
#include "lib/typed-counter.hpp"
#include "lib/format-string.hpp"
#include "lib/sync-barrier.hpp"
#include "lib/thread.hpp"
#include "lib/symbol.hpp"
#include "lib/util.hpp"
#include "lib/test/diagnostic-output.hpp"////////////////////TODO
#include <boost/lexical_cast.hpp>
#include <vector>
#include <chrono>
#include <string>
#include <vector>
#include <deque>
namespace steam {
@ -99,8 +103,11 @@ namespace test {
using boost::lexical_cast;
using lib::test::randTime;
using std::this_thread::sleep_for;
using std::chrono::microseconds;
using namespace std::chrono_literals;
using steam::control::SessionCommand;
using lib::test::randTime;
using lib::diff::GenNode;
using lib::diff::Rec;
using lib::time::Time;
@ -109,11 +116,13 @@ namespace test {
using lib::time::Offset;
using lib::time::FSecs;
using lib::FamilyMember;
using lib::SyncBarrier;
using lib::Symbol;
using util::_Fmt;
using util::isnil;
using std::string;
using std::vector;
using std::deque;
using std::rand;
@ -163,7 +172,7 @@ namespace test {
}//(End) test fixture
#define __DELAY__ usleep(20000);
#define __DELAY__ sleep_for (20ms);
@ -215,8 +224,8 @@ namespace test {
lumiera::throwOnError();
startDispatcher();
perform_simpleInvocation();
perform_messageInvocation();
// perform_simpleInvocation();
// perform_messageInvocation();
perform_massivelyParallel(args_for_stresstest);
stopDispatcher();
@ -323,11 +332,14 @@ namespace test {
// we'll run several instances of the following thread....
class InvocationProducer
: vault::ThreadJoinable
: util::NonCopyable
{
SyncBarrier& barrier_;
FamilyMember<InvocationProducer> id_;
vector<string> cmdIDs_;
lib::ThreadJoinable<void> thread_;
Symbol
cmdID(uint j)
{
@ -337,15 +349,14 @@ namespace test {
public:
InvocationProducer()
: ThreadJoinable("test command producer", [&](){ fabricateCommands(); })
{
this->sync();
}
InvocationProducer (SyncBarrier& trigger)
: barrier_{trigger}
, thread_{"command producer", [&]{ fabricateCommands(); }}
{ }
~InvocationProducer()
{
this->join(); // .maybeThrow(); /////////////////////////////////////////OOO should detect exceptions in thread explicitly
thread_.join().maybeThrow();
for (auto& id : cmdIDs_)
Command::remove (cStr(id));
}
@ -354,7 +365,7 @@ namespace test {
void
fabricateCommands()
{
syncPoint(); // barrier to ensure initialisation of the object
barrier_.sync(); // barrier to unleash all threads together
for (uint j=0; j<NUM_INVOC_PER_THRED; ++j)
{
@ -375,24 +386,31 @@ namespace test {
__randomDelay()
{
if (not MAX_RAND_DELAY_us) return;
usleep (1 + rand() % MAX_RAND_DELAY_us); // random delay varying in steps of 1µs
sleep_for (microseconds (1 + rand() % MAX_RAND_DELAY_us)); // random delay varying in steps of 1µs
}
};
/* == controlling code in main thread == */
try{
Time prevState = testCommandState;
// fire up several threads to issue commands in parallel...
vector<InvocationProducer> producerThreads{NUM_THREADS_DEFAULT};
FSecs expectedOffset{0};
for (uint i=0; i<NUM_THREADS_DEFAULT; ++i)
for (uint j=0; j<NUM_INVOC_PER_THRED; ++j)
expectedOffset += FSecs(i*7,2) - FSecs(j,2);
// fire up several threads to issue commands in parallel...
SyncBarrier trigger{NUM_THREADS_DEFAULT + 1};
deque<InvocationProducer> producerThreads;
for (uint i=0; i<NUM_THREADS_DEFAULT; ++i)
producerThreads.emplace_back (trigger);
// start concurrent execution
trigger.sync();
// give the producer threads some head start...
usleep(MAX_RAND_DELAY_us * NUM_INVOC_PER_THRED / 2);
sleep_for (microseconds (MAX_RAND_DELAY_us * NUM_INVOC_PER_THRED / 2));
__DELAY__
// stop the dispatching to cause the queue to build up...
SteamDispatcher::instance().deactivate();
@ -406,7 +424,19 @@ namespace test {
__DELAY__
CHECK (testCommandState - prevState == Time(expectedOffset));
}
catch(lumiera::Error& ex)
{
cout << "##### Lumix-Ex: "<<ex.what()<<endl;
}
catch(std::exception& jaleck)
{
cout << "##### Standard-Ex: "<<jaleck.what()<<endl;
}
catch(...)
{
cout << "WOOT??"<<endl;
}
}// Note: leaving this scope blocks for joining all producer threads
};

View file

@ -79014,8 +79014,8 @@ Date:&#160;&#160;&#160;Thu Apr 20 18:53:17 2023 +0200<br/>
</html></richcontent>
<icon BUILTIN="idea"/>
</node>
<node BACKGROUND_COLOR="#eee5c3" COLOR="#990000" CREATED="1695313755664" HGAP="-12" ID="ID_1075854169" MODIFIED="1695313767920" TEXT="Umstellung der Codebasis" VSHIFT="21">
<icon BUILTIN="flag-yellow"/>
<node BACKGROUND_COLOR="#eef0c5" COLOR="#990000" CREATED="1695313755664" HGAP="-12" ID="ID_1075854169" MODIFIED="1696029339723" TEXT="Umstellung der Codebasis" VSHIFT="21">
<icon BUILTIN="pencil"/>
<node COLOR="#338800" CREATED="1695313774171" FOLDED="true" ID="ID_1529952681" MODIFIED="1696025089421" TEXT="spezielle Abh&#xe4;ngigkeiten">
<icon BUILTIN="help"/>
<node CREATED="1695313807671" ID="ID_1031643072" MODIFIED="1695313818061" TEXT="vault::Thread vs vault::ThreadJoinable">
@ -79990,8 +79990,7 @@ Date:&#160;&#160;&#160;Thu Apr 20 18:53:17 2023 +0200<br/>
...indem man testet, da&#223; eine bestimmte Berechnung stattgefunden hat
</p>
</body>
</html>
</richcontent>
</html></richcontent>
<icon BUILTIN="button_ok"/>
</node>
<node COLOR="#338800" CREATED="1696007745882" ID="ID_523093343" MODIFIED="1696007928650" TEXT="Zeitmessung &#x27f9; Beweis der Parallelisierung">
@ -80015,8 +80014,7 @@ Date:&#160;&#160;&#160;Thu Apr 20 18:53:17 2023 +0200<br/>
</li>
</ul>
</body>
</html>
</richcontent>
</html></richcontent>
<icon BUILTIN="button_ok"/>
</node>
</node>
@ -80082,15 +80080,14 @@ Date:&#160;&#160;&#160;Thu Apr 20 18:53:17 2023 +0200<br/>
...das ist <i>interessant,</i>&#160;aber nicht kritisch; und zwar weil ich den Test jetzt umgeschrieben habe auf ein Lambda, und gar keine eigenst&#228;ndige Klasse mehr verwende &#8212; viel spannender ist, da&#223; der C++ - Compiler &#252;berhaupt schafft, solchen Code zu &#8222;knacken&#8220;
</p>
</body>
</html>
</richcontent>
</html></richcontent>
</node>
<node COLOR="#338800" CREATED="1696025030399" ID="ID_838223706" MODIFIED="1696025042055" TEXT="auch gepr&#xfc;ft da&#xdf; es funktioniert (mal die Logik umgedreht)">
<icon BUILTIN="button_ok"/>
</node>
</node>
</node>
<node COLOR="#338800" CREATED="1695394753483" FOLDED="true" ID="ID_1338410455" MODIFIED="1695584550744" TEXT="SyncBarrier_test">
<node COLOR="#338800" CREATED="1695394753483" FOLDED="true" ID="ID_1338410455" MODIFIED="1696030309964" TEXT="SyncBarrier_test">
<icon BUILTIN="button_ok"/>
<node COLOR="#435e98" CREATED="1695394763010" ID="ID_1220273122" MODIFIED="1695484703977" TEXT="neuer Test f&#xfc;r neue (interims-) Implementierung">
<linktarget COLOR="#2b3fa9" DESTINATION="ID_1220273122" ENDARROW="Default" ENDINCLINATION="169;-7;" ID="Arrow_ID_1739726561" SOURCE="ID_281891239" STARTARROW="None" STARTINCLINATION="15;204;"/>
@ -80489,8 +80486,131 @@ Date:&#160;&#160;&#160;Thu Apr 20 18:53:17 2023 +0200<br/>
</node>
</node>
</node>
<node BACKGROUND_COLOR="#eee5c3" COLOR="#990000" CREATED="1695394237210" ID="ID_11553358" MODIFIED="1695394251486" TEXT="Applikation umstellen">
<node BACKGROUND_COLOR="#eef0c5" COLOR="#990000" CREATED="1695394237210" ID="ID_11553358" MODIFIED="1696029630733" TEXT="Applikation umstellen">
<icon BUILTIN="flag-yellow"/>
<icon BUILTIN="pencil"/>
<node CREATED="1696029414385" ID="ID_140265798" MODIFIED="1696029417676" TEXT="weitere Tests...">
<node BACKGROUND_COLOR="#eef0c5" COLOR="#990000" CREATED="1696029465122" ID="ID_1060376805" MODIFIED="1696036148318" TEXT="SessionCommandFunction_test">
<icon BUILTIN="pencil"/>
<node CREATED="1696036153400" ID="ID_1572079642" MODIFIED="1696036156883" TEXT="Umstellung">
<node CREATED="1696036158076" ID="ID_429764463" MODIFIED="1696036170175" TEXT="puh... der Test ist komplex">
<icon BUILTIN="smiley-oh"/>
</node>
<node CREATED="1696036172041" ID="ID_1669094555" MODIFIED="1696036317600" TEXT="nicht klar ob ich die Barriere wirklich brauche">
<richcontent TYPE="NOTE"><html>
<head>
</head>
<body>
<p>
Theoretisch sollte sie sehr wohl n&#246;tig sein, da wir hier einen TypedCounter initialisieren, und das bedingt globales Locking; d.h. die weitere Initialisierung im ctor eines Threads kann durch einen anderen Thread aufgehalten werden.
</p>
</body>
</html>
</richcontent>
</node>
<node CREATED="1696036369976" ID="ID_1754677356" MODIFIED="1696036379688" TEXT="lieber sauber...">
<node CREATED="1696036380543" ID="ID_1634940367" MODIFIED="1696036392151" TEXT="der Thread wird ein member-Feld"/>
<node CREATED="1696036392852" ID="ID_558233481" MODIFIED="1696036404375" TEXT="verwende eine gemeinsame Barriere"/>
<node CREATED="1696036405723" ID="ID_464812584" MODIFIED="1696036427052" TEXT="Threads kommen in eine deque (&#x27fc; NonCopyable)"/>
<node CREATED="1696036429232" ID="ID_1367189942" MODIFIED="1696036446410" TEXT="Threads werden explizit erzeugt und mit Barriere versorgt"/>
<node CREATED="1696036451549" ID="ID_351344035" MODIFIED="1696036465487" TEXT="der Test-Thread partizipiert an der Barriere"/>
</node>
</node>
<node BACKGROUND_COLOR="#fdfdcf" COLOR="#ff0000" CREATED="1696036468258" ID="ID_604722459" MODIFIED="1696036477574" TEXT="umgestellter Code h&#xe4;ngt">
<icon BUILTIN="broken-line"/>
<node CREATED="1696036872517" ID="ID_747545275" MODIFIED="1696036880792" TEXT="Beobachtung">
<node CREATED="1696036882068" ID="ID_670148447" MODIFIED="1696036973668" TEXT="die Threads laufen"/>
<node CREATED="1696036974896" ID="ID_737751853" MODIFIED="1696036992529" TEXT="der Dispatcher-Thread schl&#xe4;ft in awaitAction()"/>
<node CREATED="1696037005899" ID="ID_1623850589" MODIFIED="1696037015918" TEXT="alle Producer stecken daher im Lock fest"/>
<node CREATED="1696037017218" ID="ID_166804637" MODIFIED="1696037028972" TEXT="main-Thread blockt im Destruktor"/>
<node CREATED="1696037163527" ID="ID_1110298475" MODIFIED="1696037175948" TEXT="ich sehe zuletzt: deactivateCommandProecssing: Session command interface closed."/>
</node>
<node CREATED="1696037187379" ID="ID_1472154127" MODIFIED="1696037197430" TEXT="Feststellungen">
<node CREATED="1696037199267" ID="ID_791125343" MODIFIED="1696037224734" TEXT="Mischzustand: der Dispatcher-Thread l&#xe4;uft auf dem alten Framework">
<icon BUILTIN="messagebox_warning"/>
<node COLOR="#338800" CREATED="1696039674199" ID="ID_1718267866" MODIFIED="1696039763561" TEXT="sicherheitshalber auch gleich umstellen">
<arrowlink COLOR="#49b261" DESTINATION="ID_556208204" ENDARROW="Default" ENDINCLINATION="1224;154;" ID="Arrow_ID_1257115855" STARTARROW="None" STARTINCLINATION="602;40;"/>
<icon BUILTIN="button_ok"/>
</node>
<node COLOR="#497db2" CREATED="1696039767003" ID="ID_1053399266" MODIFIED="1696039787013" TEXT="Tja.... das war&apos;s nicht">
<icon BUILTIN="smily_bad"/>
</node>
</node>
<node COLOR="#435e98" CREATED="1696038355031" FOLDED="true" ID="ID_1193356091" MODIFIED="1696038505579" TEXT="(hatte nebenbei typed-counter FamilyMember auf Atomics umgestellt)">
<font NAME="SansSerif" SIZE="11"/>
<node CREATED="1696038385875" ID="ID_1012862043" MODIFIED="1696038434994" TEXT="w&#xe4;re aber unwahrscheinlich">
<richcontent TYPE="NOTE"><html>
<head>
</head>
<body>
<p>
...denn das sollte sich nur w&#228;hrend der Konstruktoren auswirken, und die sind ja alle &quot;durch&quot;, gem&#228;&#223; Barriere
</p>
</body>
</html>
</richcontent>
</node>
<node CREATED="1696038437552" ID="ID_1876729224" MODIFIED="1696038477225" TEXT="au&#xdf;erdem geht&apos;s da nur um Inkrementieren">
<richcontent TYPE="NOTE"><html>
<head>
</head>
<body>
<p>
daf&#252;r braucht man heutzutage nun wirklich kein Lock mehr
</p>
</body>
</html>
</richcontent>
</node>
<node COLOR="#338800" CREATED="1696038478167" ID="ID_268185715" MODIFIED="1696038501250" TEXT="gepr&#xfc;ft: die alte Variante mit Lock h&#xe4;ngt genauso">
<icon BUILTIN="button_ok"/>
</node>
</node>
<node BACKGROUND_COLOR="#e0ceaa" COLOR="#690f14" CREATED="1696037316298" ID="ID_804937793" MODIFIED="1696037339615" TEXT="unklar: wer sollte den Dispatcher-Thread aufwecken?">
<icon BUILTIN="forward"/>
<node CREATED="1696039648897" ID="ID_1751198515" MODIFIED="1696039665811" TEXT="normalerweise sollten das doch die ankommenden Commands bewirken"/>
</node>
<node BACKGROUND_COLOR="#f0d5c5" COLOR="#990033" CREATED="1696037228668" ID="ID_1851104736" MODIFIED="1696037273970" TEXT="warum sehe ich das wieder-&#xd6;ffnen des Command-Interfaces nicht?">
<font NAME="SansSerif" SIZE="12"/>
<icon BUILTIN="help"/>
<node CREATED="1696037275093" ID="ID_1726964186" MODIFIED="1696037285735" TEXT="fliegt hier eine Exception??">
<icon BUILTIN="idea"/>
</node>
<node COLOR="#5b280f" CREATED="1696039593569" ID="ID_843115246" MODIFIED="1696039620815" TEXT="nein (try-catch eingebaut)">
<icon BUILTIN="button_cancel"/>
</node>
</node>
</node>
</node>
</node>
<node CREATED="1696029465122" ID="ID_1515483274" MODIFIED="1696029516437" TEXT="subsystem-runner-test.cpp (/zLumi/tests/core/application)"/>
<node CREATED="1696029465124" ID="ID_1556068052" MODIFIED="1696029514685" TEXT="typed-counter-test.cpp (/zLumi/tests/basics)"/>
<node CREATED="1696029465120" ID="ID_1705540772" MODIFIED="1696029510893" TEXT="call-queue-test.cpp (/zLumi/tests/basics)"/>
<node CREATED="1696029465120" ID="ID_1782746519" MODIFIED="1696029465120" TEXT="bus-term-test.cpp (/zLumi/tests/stage)"/>
<node BACKGROUND_COLOR="#fdfdcf" COLOR="#ff0000" CREATED="1696030280182" ID="ID_244374373" MODIFIED="1696030328243" TEXT="SyncBarrier_test">
<icon BUILTIN="flag-pink"/>
</node>
</node>
<node CREATED="1696029465119" ID="ID_820648476" MODIFIED="1696029572456" TEXT="eigentliche Verwendungen">
<node CREATED="1696029465121" ID="ID_1756135675" MODIFIED="1696029465121" TEXT="diagnostic-context-test.cpp (/zLumi/tests/basics)"/>
<node CREATED="1696029465121" ID="ID_484763089" MODIFIED="1696029589510" TEXT="dummy-tick.hpp">
<node CREATED="1696029465123" ID="ID_944980561" MODIFIED="1696029607971" TEXT="tick-service.hpp">
<node CREATED="1696029465123" ID="ID_221141346" MODIFIED="1696029617413" TEXT="dummy-player-service.cpp"/>
</node>
</node>
<node CREATED="1696029465121" ID="ID_786350998" MODIFIED="1696029583638" TEXT="gtk-lumiera.cpp"/>
<node CREATED="1696029465122" ID="ID_871343805" MODIFIED="1696029593893" TEXT="output-director.cpp"/>
<node CREATED="1696029465122" ID="ID_556208204" MODIFIED="1696039758193" TEXT="steam-dispatcher.cpp">
<linktarget COLOR="#49b261" DESTINATION="ID_556208204" ENDARROW="Default" ENDINCLINATION="1224;154;" ID="Arrow_ID_1257115855" SOURCE="ID_1718267866" STARTARROW="None" STARTINCLINATION="602;40;"/>
<node CREATED="1696039703074" ID="ID_796238065" MODIFIED="1696039722180" TEXT="Refactoring wie &#xfc;blich">
<node CREATED="1696039723008" ID="ID_932291373" MODIFIED="1696039728651" TEXT="Thread wird ein Member-Feld"/>
<node CREATED="1696039729255" ID="ID_249325323" MODIFIED="1696039735587" TEXT="Sync-Barrier kommt hinzu"/>
</node>
</node>
</node>
</node>
</node>
</node>