From 48d6f0fae3399ac8c5e2522a386df5c7100ab33e Mon Sep 17 00:00:00 2001 From: Ichthyostega Date: Sat, 30 Sep 2023 03:12:55 +0200 Subject: [PATCH] Library/Application: switch Steam-Dispatcher to new thread-framework TODO: SessionCommandFunction_test deadlocks!! --- src/lib/typed-counter.hpp | 10 +- src/steam/control/steam-dispatcher.cpp | 26 ++-- .../control/session-command-function-test.cpp | 72 ++++++--- wiki/thinkPad.ichthyo.mm | 140 ++++++++++++++++-- 4 files changed, 202 insertions(+), 46 deletions(-) diff --git a/src/lib/typed-counter.hpp b/src/lib/typed-counter.hpp index da79383e0..e2bc74497 100644 --- a/src/lib/typed-counter.hpp +++ b/src/lib/typed-counter.hpp @@ -61,6 +61,7 @@ #include #include +#include namespace util { @@ -84,6 +85,8 @@ namespace lib { * Rather, they are tied to a specific type context, e.g. a class * implementing a custom allocator. These typed contexts are * considered to be orthogonal and independent of each other. + * @remark 2023: allocation of global ID counters are protected by + * double-checked locking with mutex, which is deemed adequate. */ template class TypedContext @@ -202,14 +205,13 @@ namespace lib { const size_t id_; /** member counter shared per template instance */ - static size_t memberCounter; + static std::atomic_size_t memberCounter; /** threadsafe allocation of member ID */ static size_t allocateNextMember() { - ClassLock synchronised; - return memberCounter++; + return memberCounter.fetch_add(+1, std::memory_order_relaxed); } public: @@ -242,7 +244,7 @@ namespace lib { /** allocate storage for the counter per type family */ template - size_t FamilyMember::memberCounter{0}; + std::atomic_size_t FamilyMember::memberCounter{0}; diff --git a/src/steam/control/steam-dispatcher.cpp b/src/steam/control/steam-dispatcher.cpp index edc54f127..79c3f7f0e 100644 --- a/src/steam/control/steam-dispatcher.cpp +++ b/src/steam/control/steam-dispatcher.cpp @@ -88,15 +88,17 @@ #include "steam/control/looper.hpp" #include "steam/control/session-command-service.hpp" #include "steam/mobject/session.hpp" -#include "vault/thread-wrapper.hpp" #include "lib/depend-inject.hpp" +#include "lib/sync-barrier.hpp" +#include "lib/thread.hpp" #include "lib/util.hpp" ///////////////TODO for test command invocation #include using lib::Sync; using lib::RecursiveLock_Waitable; -using vault::Thread; +using lib::SyncBarrier; +using lib::Thread; using std::unique_ptr; namespace steam { @@ -113,8 +115,7 @@ namespace control { * @see DispatcherLooper_test */ class DispatcherLoop - : Thread - , public CommandDispatch + : public CommandDispatch , public Sync { using ServiceHandle = lib::DependInject::ServiceInstance<>; @@ -122,9 +123,10 @@ namespace control { /** manage the primary public Session interface */ ServiceHandle commandService_; + SyncBarrier init_; CommandQueue queue_; Looper looper_; - + Thread thread_; public: /** start the session loop thread @@ -136,15 +138,17 @@ namespace control { * Such might happen indirectly, when something depends on "the Session" */ DispatcherLoop (Subsys::SigTerm notification) - : Thread{"Lumiera Session", bind (&DispatcherLoop::runSessionThread, this, notification)} - , commandService_{ServiceHandle::NOT_YET_STARTED} + : commandService_{ServiceHandle::NOT_YET_STARTED} , queue_{} , looper_([&]() -> bool { return not queue_.empty(); }) + , thread_{"Lumiera Session" + ,&DispatcherLoop::runSessionThread + , this, notification} { - Thread::sync(); // done with setup; loop may run now.... + init_.sync(); // done with setup; loop may run now.... INFO (session, "Steam-Dispatcher running..."); { Lock(this); // open public session interface: @@ -155,7 +159,7 @@ namespace control { ~DispatcherLoop() { try { - commandService_.shutdown(); // redundant call, to ensure session interface is closed reliably + commandService_.shutdown(); // redundant call, to ensure session interface is closed reliably INFO (session, "Steam-Dispatcher stopped."); } ERROR_LOG_AND_IGNORE(session, "Stopping the Steam-Dispatcher"); @@ -231,7 +235,7 @@ namespace control { runSessionThread (Subsys::SigTerm notifyEnd) { string errorMsg; - syncPoint(); + init_.sync(); try { while (looper_.shallLoop()) @@ -279,7 +283,7 @@ namespace control { bool isStateSynched() { - if (this->invokedWithinThread()) + if (thread_.invokedWithinThread()) throw error::Fatal("Possible Deadlock. " "Attempt to synchronise to a command processing check point " "from within the (single) session thread." diff --git a/tests/core/steam/control/session-command-function-test.cpp b/tests/core/steam/control/session-command-function-test.cpp index f2633eda2..0ce533a33 100644 --- a/tests/core/steam/control/session-command-function-test.cpp +++ b/tests/core/steam/control/session-command-function-test.cpp @@ -82,15 +82,19 @@ extern "C" { #include "steam/control/steam-dispatcher.hpp" #include "steam/control/command-def.hpp" #include "include/session-command-facade.h" -#include "vault/thread-wrapper.hpp" #include "lib/typed-counter.hpp" #include "lib/format-string.hpp" +#include "lib/sync-barrier.hpp" +#include "lib/thread.hpp" #include "lib/symbol.hpp" #include "lib/util.hpp" +#include "lib/test/diagnostic-output.hpp"////////////////////TODO #include -#include +#include #include +#include +#include namespace steam { @@ -99,8 +103,11 @@ namespace test { using boost::lexical_cast; - using lib::test::randTime; + using std::this_thread::sleep_for; + using std::chrono::microseconds; + using namespace std::chrono_literals; using steam::control::SessionCommand; + using lib::test::randTime; using lib::diff::GenNode; using lib::diff::Rec; using lib::time::Time; @@ -109,11 +116,13 @@ namespace test { using lib::time::Offset; using lib::time::FSecs; using lib::FamilyMember; + using lib::SyncBarrier; using lib::Symbol; using util::_Fmt; using util::isnil; using std::string; using std::vector; + using std::deque; using std::rand; @@ -163,7 +172,7 @@ namespace test { }//(End) test fixture -#define __DELAY__ usleep(20000); +#define __DELAY__ sleep_for (20ms); @@ -215,8 +224,8 @@ namespace test { lumiera::throwOnError(); startDispatcher(); - perform_simpleInvocation(); - perform_messageInvocation(); +// perform_simpleInvocation(); +// perform_messageInvocation(); perform_massivelyParallel(args_for_stresstest); stopDispatcher(); @@ -323,11 +332,14 @@ namespace test { // we'll run several instances of the following thread.... class InvocationProducer - : vault::ThreadJoinable + : util::NonCopyable { + SyncBarrier& barrier_; FamilyMember id_; vector cmdIDs_; + lib::ThreadJoinable thread_; + Symbol cmdID(uint j) { @@ -337,15 +349,14 @@ namespace test { public: - InvocationProducer() - : ThreadJoinable("test command producer", [&](){ fabricateCommands(); }) - { - this->sync(); - } + InvocationProducer (SyncBarrier& trigger) + : barrier_{trigger} + , thread_{"command producer", [&]{ fabricateCommands(); }} + { } ~InvocationProducer() { - this->join(); // .maybeThrow(); /////////////////////////////////////////OOO should detect exceptions in thread explicitly + thread_.join().maybeThrow(); for (auto& id : cmdIDs_) Command::remove (cStr(id)); } @@ -354,7 +365,7 @@ namespace test { void fabricateCommands() { - syncPoint(); // barrier to ensure initialisation of the object + barrier_.sync(); // barrier to unleash all threads together for (uint j=0; j producerThreads{NUM_THREADS_DEFAULT}; - FSecs expectedOffset{0}; for (uint i=0; i producerThreads; + for (uint i=0; i - - + + @@ -79990,8 +79990,7 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
...indem man testet, daß eine bestimmte Berechnung stattgefunden hat

- - +
@@ -80015,8 +80014,7 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
- - +
@@ -80082,15 +80080,14 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
...das ist interessant, aber nicht kritisch; und zwar weil ich den Test jetzt umgeschrieben habe auf ein Lambda, und gar keine eigenständige Klasse mehr verwende — viel spannender ist, daß der C++ - Compiler überhaupt schafft, solchen Code zu „knacken“

- - +
- + @@ -80489,8 +80486,131 @@ Date:   Thu Apr 20 18:53:17 2023 +0200
- + + + + + + + + + + + + + + + +

+ Theoretisch sollte sie sehr wohl nötig sein, da wir hier einen TypedCounter initialisieren, und das bedingt globales Locking; d.h. die weitere Initialisierung im ctor eines Threads kann durch einen anderen Thread aufgehalten werden. +

+ + +
+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +

+ ...denn das sollte sich nur während der Konstruktoren auswirken, und die sind ja alle "durch", gemäß Barriere +

+ + +
+
+ + + + + + +

+ dafür braucht man heutzutage nun wirklich kein Lock mehr +

+ + +
+
+ + + +
+ + + + + + + + + + + + + + + +
+
+ + + + + + + + + + + + + + + + + + + + + + + + +