diff --git a/src/backend/thread-wrapper.hpp b/src/backend/thread-wrapper.hpp index 8a582c73d..81e8cc83a 100644 --- a/src/backend/thread-wrapper.hpp +++ b/src/backend/thread-wrapper.hpp @@ -2,7 +2,8 @@ THREADWRAPPER.hpp - thin convenience wrapper for starting lumiera threads Copyright (C) Lumiera.org - 2008, Hermann Vosseler + 2008 - 2010 Hermann Vosseler + Christian Thaeter This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as @@ -25,9 +26,9 @@ #define LIB_THREADWRAPPER_H -#include "include/logging.h" -#include "lib/sync.hpp" #include "lib/error.hpp" +#include "include/logging.h" +#include "lib/bool-checkable.hpp" extern "C" { #include "backend/threads.h" @@ -45,172 +46,166 @@ namespace backend { typedef struct nobug_flag* NoBugFlag; - class Thread; - - /** - * Brainstorming-in-code: how I would like to shape the API for joining threads. - * Intended use: This non-copyable handle has to be created within the thread which - * wants to wait-blocking on the termination of another thread. You then pass it - * into the ctor of the Thread starting wrapper class (see below), which causes - * the embedded lock/condition var to be used to sync on the end of the newly - * created thread. Note, after ending the execution, the newly created thread - * will be on hold until either the #join() function is called or this handle - * goes out of scope altogether. Explanation: this is implemented by locking - * the embedded monitor immediately in the ctor. Thus, unless entering the - * wait state, the contained mutex remains locked and prevents the thread - * manager from invoking the broadcast() on the condition var. - * - * @see thread-wrapper-join-test.cpp - */ - class JoinHandle - : boost::noncopyable - { - friend class Thread; - LumieraThread handle; - - void - attach (LumieraThread handle_) - { - REQUIRE (!handle, "JoinHandle used for several threads."); - handle = handle_; - } - - public: - /** Create a promise, that the current thread will or may - * wait-blocking on another not-yet existing thread to terminate. - * When passed in on creation of the other thread, as long as this - * handle lives, the other thread will be on hold after termination. - */ - JoinHandle(LumieraThread handle_ = 0) : handle(handle_) {} - - ~JoinHandle() { if(handle) throw "not joined";} /* TODO decide if we want auto-joining here? (any way, throwing in dtor isnt very nice) */ - - /** put the current thread into a blocking wait until another thread - * has terminated. This other thread needs to be created by the Thread - * wrapper, passing this JoinHandle as ctor parameter. - * @throws error::Logic if no thread has been registered to block on this - */ - void - join() - { - if (!handle) - throw lumiera::error::Logic ("no thread created blocking on this JoinHandle"); - - lumiera_err err = lumiera_thread_join (handle); - lumiera_error_set(err, 0); - - handle = 0; - if (err) /* dont throw here when there is already an error pending (ymmv)*/ - lumiera::throwOnError(); - } - }; - /**************************************************************************** * A thin convenience wrapper for dealing with threads, - * as implemented by the backend (on top of pthread). + * as implemented by the threadpool in the backend (based on pthread). * Using this wrapper... * - helps with passing data to the function executed in the new thread * - allows to bind to various kinds of functions including member functions - * - supports integrating with an existing object monitor based lock (planned) * The new thread starts immediately within the ctor; after returning, the new * thread has already copied the arguments and indeed actively started to run. * - * @note this class is \em not a thread handle. Within Lumiera, we do all of - * our thread management such as to avoid using global thread handles. - * If some cooperation between threads is needed, this should be done - * in a implementation private way, e.g. by sharing a condition var. - * - * @todo Ichthyo started this wrapper 12/08 while our own thread handling - * was just being shaped. It may well be possible that such a wrapper - * is superfluous in the final application. Re-evaluate this! */ class Thread - : boost::noncopyable + : lib::BoolCheckable< Thread + , boost::noncopyable> //////TODO: do we want Thread instances to be copyable? { - LumieraThread handle; - + + protected: typedef function Operation; - Operation const& operation_; - - static void - run (void* arg) - { - REQUIRE (arg); - Thread* startingWrapper = reinterpret_cast(arg); - Operation _doIt_(startingWrapper->operation_); - - lumiera_thread_sync (); // thread syncronizization point, see lumiera_thread_sync_other() below - - _doIt_(); // execute the actual operation in the new thread - } - - void - start_thread (lumiera_thread_class kind, Literal& purpose, NoBugFlag logging_flag, JoinHandle* jh = 0) - { - handle = - lumiera_thread_run ( kind | (jh?LUMIERA_THREAD_JOINABLE:0) // joinable thread when a join handle is passed - , &run // invoking the run helper and.. - , this // passing this start context as parameter - , purpose.c() - , logging_flag - ); - - if (!handle) /* we dont want to throw here when there was already an error pending */ - lumiera::throwOnError(); - - if (jh) - jh->attach (handle); - - // make sure the new thread had the opportunity to take the Operation - // prior to leaving and thereby possibly destroying this local context - lumiera_thread_sync_other (handle); - } - + + + struct ThreadStartContext + : boost::noncopyable + { + + Operation const& operation_; + + static void + run (void* arg) + { + REQUIRE (arg); + ThreadStartContext* ctx = reinterpret_cast(arg); + Operation _doIt_(ctx->operation_); + + lumiera_thread_sync (); // sync point: arguments handed over + + _doIt_(); // execute the actual operation in the new thread + } + + public: + ThreadStartContext (LumieraThread& handle + ,Operation const& operation_to_execute + ,Literal& purpose + ,NoBugFlag logging_flag + ,uint additionalFlags =0 + ) + : operation_(operation_to_execute) + { + REQUIRE (!lumiera_error(), "Error pending at thread start") ; + handle = + lumiera_thread_run ( LUMIERA_THREADCLASS_INTERACTIVE | additionalFlags + , &run // invoking the run helper and.. + , this // passing this start context as parameter + , purpose.c() + , logging_flag + ); + if (!handle) + lumiera::throwOnError(); + + // make sure the new thread had the opportunity to take the Operation + // prior to leaving and thereby possibly destroying this local context + lumiera_thread_sync_other (handle); + } + }; + + + + LumieraThread thread_; + + Thread() : thread_(0) { } + + public: /** Create a new thread to execute the given operation. - * The new thread starts up synchronously, i.e. when the ctor returns, the new thread - * has started running and taken over (copied) the operation functor passed in. The - * thread will be created by lumiera_thread_run (declared in threads.h), it can't - * be cancelled and it can't be joined. - * @param purpose fixed char string used to denote the thread for diagnostics - * @param logging_flag NoBug flag to receive diagnostics regarding the new thread - * @param operation defining what to execute within the new thread. Any functor - * which can be bound to function. Note this functor will be - * copied onto the stack of the new thread, thus it can be transient. - * - */ - Thread (Literal purpose, Operation const& operation, NoBugFlag logging_flag = &NOBUG_FLAG(thread)) - : operation_(operation) - { - start_thread (LUMIERA_THREADCLASS_INTERACTIVE, purpose, logging_flag); - } - - /** Variant of the standard case, used to register a JoinHandle in addition to starting a thread. - * @param join ref to a JoinHandle, which needs to be created in the thread which plans - * to wait-blocking on the termination of this newly created thread - * + * The new thread starts up synchronously, it can't + * be cancelled and it can't be joined. + * @param purpose fixed char string used to denote the thread for diagnostics + * @param logging_flag NoBug flag to receive diagnostics regarding the new thread + * @param operation defining what to execute within the new thread. Any functor + * which can be bound to function. Note this functor will be + * copied onto the stack of the new thread, thus it can be transient. */ - Thread (Literal purpose, Operation const& operation, - JoinHandle& join, NoBugFlag logging_flag = &NOBUG_FLAG(thread)) - : operation_(operation) - { - start_thread (LUMIERA_THREADCLASS_INTERACTIVE, purpose, logging_flag, - &join); - } - - /** - * syncronization barrier, the user supplied operation must have a matching lumiera_thread_sync() call + Thread (Literal purpose, Operation const& operation, NoBugFlag logging_flag = &NOBUG_FLAG(thread)) + : thread_(0) + { + ThreadStartContext (thread_, operation, purpose, logging_flag); + } + + /** automatic conversion: Thread instance can stand-in for a handle + * @throws lumiera::error::State when thread isn't running */ + operator LumieraThread() const + { + if (!isValid()) + throw lumiera::error::State("thread not executing (anymore)"); + + return thread_; + } + + bool + isValid() const + { + return thread_ + && true ////////////TODO: how to determine that the thread is still running? + ; + } + + + /** Synchronisation barrier. In the function executing in this thread + * needs to be a corresponding lumiera_thread_sync() call. Blocking + * until both the caller and the thread have reached the barrier. */ void sync () - { - if (!lumiera_thread_sync_other (handle)) - lumiera::throwOnError(); - } - + { + REQUIRE (isValid(), "Thread terminated"); + if (!lumiera_thread_sync_other (thread_)) + lumiera::throwOnError(); + } }; - + + + + + + + /** + * Variant of the standard case, allowing additionally + * to join on the termination of this thread. + */ + class ThreadJoinable + : public Thread + { + public: + ThreadJoinable (Literal purpose, Operation const& operation, + NoBugFlag logging_flag = &NOBUG_FLAG(thread)) + : Thread() + { + ThreadStartContext (thread_, operation, purpose, logging_flag, + LUMIERA_THREAD_JOINABLE); + } + + + /** put the caller into a blocking wait until this thread has terminated. + * @throws error::Logic if this thread has already terminated + */ + void join() + { + if (!isValid()) + throw lumiera::error::Logic ("joining on an already terminated thread"); + + lumiera_err errorInOtherThread = + lumiera_thread_join (thread_); + thread_ = 0; + + if (errorInOtherThread) + throw lumiera::error::State ("Thread terminated with error:"); + } + }; + + + } // namespace backend #endif diff --git a/src/proc/play/tick-service.hpp b/src/proc/play/tick-service.hpp index c1df386c6..a81335913 100644 --- a/src/proc/play/tick-service.hpp +++ b/src/proc/play/tick-service.hpp @@ -57,8 +57,7 @@ namespace proc { * with adjustable frequency. Quick'n dirty implementation! */ class TickService - : backend::JoinHandle, - backend::Thread + : backend::ThreadJoinable { typedef function Tick; volatile uint timespan_; @@ -68,10 +67,9 @@ namespace proc { public: TickService (Tick callback) - : Thread("Tick generator (dummy)", - bind (&TickService::timerLoop, this, callback), - (backend::JoinHandle&)*this - ) + : ThreadJoinable("Tick generator (dummy)" + , bind (&TickService::timerLoop, this, callback) + ) { INFO (proc, "TickService started."); } diff --git a/tests/lib/thread-wrapper-join-test.cpp b/tests/lib/thread-wrapper-join-test.cpp index f3c32126f..6e5b4a65e 100644 --- a/tests/lib/thread-wrapper-join-test.cpp +++ b/tests/lib/thread-wrapper-join-test.cpp @@ -22,6 +22,7 @@ #include "lib/test/run.hpp" +#include "lib/test/test-helper.hpp" #include "lib/symbol.hpp" #include "backend/thread-wrapper.hpp" @@ -36,117 +37,106 @@ using test::Test; namespace backend { - namespace test { +namespace test { - /************************************************************************** - * @test use the Lumiera backend to create some new threads, additionally - * passing an condition variable for waiting on thread termination. - * Actually this is implemented as creating and passing a JoinHandle. - * - * @see backend::Thread - * @see threads.h - */ - class ThreadWrapperJoin_test : public Test - { - - virtual void - run (Arg) - { - simpleUse (); - wrongUse (); - } - - - volatile int aValue_; ///< state to be modified by the other thread - - void - theAction (int secretValue) ///< to be run in a new thread... - { - usleep (100000); // pause 100ms prior to modifying + using lumiera::error::LUMIERA_ERROR_LOGIC; + namespace { + + const uint DESTRUCTION_CODE = 23; + + LUMIERA_ERROR_DEFINE(SPECIAL, "grandiose exception"); + } + + + /*************************************************************************** + * @test use the Lumiera backend to create some new threads, additionally + * synchronising with these child threads and waiting for termination. + * + * @see backend::Thread + * @see threads.h + */ + class ThreadWrapperJoin_test : public Test + { + + virtual void + run (Arg) + { + simpleUse (); + wrongUse (); + getError (); + } + + + volatile int aValue_; ///< state to be modified by the other thread + + void + theAction (int secretValue) ///< to be run in a new thread... + { + usleep (100000); // pause 100ms prior to modifying + + if (DESTRUCTION_CODE == secretValue) + lumiera_error_set(LUMIERA_ERROR_SPECIAL, 0); + else aValue_ = secretValue+42; - } - - - void - simpleUse () - { - aValue_=0; - int mySecret = (rand() % 1000) - 500; - - JoinHandle waitingHandle; - - Thread("test Thread joining", - bind (&ThreadWrapperJoin_test::theAction, this, mySecret), - waitingHandle); - // note binding and thread wrapper already destroyed - waitingHandle.join(); // blocks until theAction() is done - - CHECK (aValue_ == mySecret+42); - } - - - void - wrongUse () - { - JoinHandle waitingHandle; - - Thread("test Thread joining-1", - bind (&ThreadWrapperJoin_test::theAction, this, 111)); - // note we "forget" to pass the JoinHandle - try - { - waitingHandle.join(); // protocol error: handle wasn't passed for starting a Thread; - NOTREACHED(); - } - catch (lumiera::error::Logic& logo) - { lumiera_error(); } - - - Thread("test Thread joining-2", - bind (&ThreadWrapperJoin_test::theAction, this, 222), - waitingHandle); // this time we pass it.... - -#ifdef DEBUG - /////////////////////////////////////////////////////////////////////////////////////////////TODO: better way of detecting debug builds -#if false /////////////////////////////////////////////////////////////////////////////////////////////TODO: re-enable assertions to throw, and make this configurable - try - { - Thread("test Thread joining-3", - bind (&ThreadWrapperJoin_test::theAction, this, 333), - waitingHandle); // but then pass it again for another thread.... - NOTREACHED(); - } - catch (...) - { - CHECK (lumiera_error() == lumiera::error::LUMIERA_ERROR_ASSERTION); - } -#endif -#endif - - // note: the waitingHandle goes out of scope here, - // which unblocks the second thread. The first thread wasn't blocked, - // while the third thread wasn't created at all. - - waitingHandle.join(); // just making the above thing pass, JoinHandle thows when not joined and going out of scope - // the semantics herer need to be defined (auto joining? see thread-wrapper.hpp) - } - - public: - ThreadWrapperJoin_test() - { lumiera_threadpool_init(); } - - ~ThreadWrapperJoin_test() - { lumiera_threadpool_destroy(); } - - }; - - - - /** Register this test class... */ - LAUNCHER (ThreadWrapperJoin_test, "function common"); - - - - } // namespace test - -} // namespace backend + + + } + + + void + simpleUse () + { + aValue_=0; + int mySecret = (rand() % 1000) - 500; + + ThreadJoinable newThread("test Thread joining-1" + , bind (&ThreadWrapperJoin_test::theAction, this, mySecret) + ); + newThread.join(); // blocks until theAction() is done + + CHECK (aValue_ == mySecret+42); + } + + + void + wrongUse () + { + ThreadJoinable newThread("test Thread joining-2" + , bind (&ThreadWrapperJoin_test::theAction, this, 1234) + ); + newThread.join(); // blocks until theAction() is done + + VERIFY_ERROR(LOGIC, newThread.join() ); + VERIFY_ERROR(LOGIC, newThread.join() ); + } + + + void + getError() + { + ThreadJoinable newThread("test Thread joining-2" + , bind (&ThreadWrapperJoin_test::theAction, this, DESTRUCTION_CODE) + ); + + VERIFY_ERROR(SPECIAL, newThread.join() ); + } + + + + public: + ThreadWrapperJoin_test() + { lumiera_threadpool_init(); } + + ~ThreadWrapperJoin_test() + { lumiera_threadpool_destroy(); } + + }; + + + + /** Register this test class... */ + LAUNCHER (ThreadWrapperJoin_test, "function common"); + + + +}} // namespace backend::test diff --git a/tests/lib/typed-counter-test.cpp b/tests/lib/typed-counter-test.cpp index 68eefc988..2fbb1ba40 100644 --- a/tests/lib/typed-counter-test.cpp +++ b/tests/lib/typed-counter-test.cpp @@ -58,8 +58,7 @@ namespace lib { namespace test{ - using backend::Thread; - using backend::JoinHandle; + using backend::ThreadJoinable; using util::for_each; using util::isnil; using std::tr1::placeholders::_1; @@ -234,15 +233,13 @@ namespace test{ * and decrements on random targets. */ class SingleCheck - : JoinHandle, - Thread + : ThreadJoinable { public: SingleCheck (TypedCounter& counter_to_use) - : Thread("TypedCounter_test worker Thread" - , bind (&SingleCheck::runCheckSequence, this, ref(counter_to_use), (rand() % MAX_ITERATIONS)) - , (backend::JoinHandle&)*this - ) + : ThreadJoinable("TypedCounter_test worker Thread" + , bind (&SingleCheck::runCheckSequence, this, ref(counter_to_use), (rand() % MAX_ITERATIONS)) + ) { } ~SingleCheck () { this->join(); }