diff --git a/src/backend/thread-wrapper.hpp b/src/backend/thread-wrapper.hpp index 2355a6f8a..8a582c73d 100644 --- a/src/backend/thread-wrapper.hpp +++ b/src/backend/thread-wrapper.hpp @@ -27,6 +27,7 @@ #include "include/logging.h" #include "lib/sync.hpp" +#include "lib/error.hpp" extern "C" { #include "backend/threads.h" @@ -41,9 +42,6 @@ namespace backend { using std::tr1::bind; using std::tr1::function; using lib::Literal; - using lib::Sync; - using lib::RecursiveLock_Waitable; - using lib::NonrecursiveLock_Waitable; typedef struct nobug_flag* NoBugFlag; @@ -60,68 +58,52 @@ namespace backend { * goes out of scope altogether. Explanation: this is implemented by locking * the embedded monitor immediately in the ctor. Thus, unless entering the * wait state, the contained mutex remains locked and prevents the thread - * manager from invoking the broadcast() on the condition var. - * - * @see thread-wrapper-join-test.cpp + * manager from invoking the broadcast() on the condition var. + * + * @see thread-wrapper-join-test.cpp */ class JoinHandle - : public Sync - , Sync::Lock // noncopyable, immediately acquire the lock + : boost::noncopyable + { + friend class Thread; + LumieraThread handle; + + void + attach (LumieraThread handle_) { - typedef Sync SyncBase; - - bool isWaiting_; - volatile bool armed_; - - friend class Thread; - - LumieraReccondition - accessLockedCondition() - { - ASSERT (!armed_, "Lifecycle error, JoinHandle used for several threads."); - armed_ = true; - return accessMonitor().accessCond(); - } - - bool - wakeupCheck() - { - if (!armed_) - throw lumiera::error::Logic ("no thread created blocking on this JoinHandle"); - - if (!isWaiting_) - { - isWaiting_ = true; - return false; // causes entering the blocking wait - } - return true; // causes end of blocking wait - } - - - public: + REQUIRE (!handle, "JoinHandle used for several threads."); + handle = handle_; + } + + public: /** Create a promise, that the current thread will or may * wait-blocking on another not-yet existing thread to terminate. * When passed in on creation of the other thread, as long as this * handle lives, the other thread will be on hold after termination. */ - JoinHandle() - : SyncBase::Lock(this) - , isWaiting_(false) - , armed_(false) - { } - - /** put the current thread into a blocking wait until another thread + JoinHandle(LumieraThread handle_ = 0) : handle(handle_) {} + + ~JoinHandle() { if(handle) throw "not joined";} /* TODO decide if we want auto-joining here? (any way, throwing in dtor isnt very nice) */ + + /** put the current thread into a blocking wait until another thread * has terminated. This other thread needs to be created by the Thread * wrapper, passing this JoinHandle as ctor parameter. * @throws error::Logic if no thread has been registered to block on this - */ - void - join() - { - accessMonitor().wait (&handle, *this, &JoinHandle::wakeupCheck); - } - }; - + */ + void + join() + { + if (!handle) + throw lumiera::error::Logic ("no thread created blocking on this JoinHandle"); + + lumiera_err err = lumiera_thread_join (handle); + lumiera_error_set(err, 0); + + handle = 0; + if (err) /* dont throw here when there is already an error pending (ymmv)*/ + lumiera::throwOnError(); + } + }; @@ -134,99 +116,101 @@ namespace backend { * - supports integrating with an existing object monitor based lock (planned) * The new thread starts immediately within the ctor; after returning, the new * thread has already copied the arguments and indeed actively started to run. - * + * * @note this class is \em not a thread handle. Within Lumiera, we do all of * our thread management such as to avoid using global thread handles. * If some cooperation between threads is needed, this should be done - * in a implementation private way, e.g. by sharing a condition var. - * + * in a implementation private way, e.g. by sharing a condition var. + * * @todo Ichthyo started this wrapper 12/08 while our own thread handling * was just being shaped. It may well be possible that such a wrapper * is superfluous in the final application. Re-evaluate this! */ class Thread - : public Sync - , boost::noncopyable + : boost::noncopyable + { + LumieraThread handle; + + typedef function Operation; + Operation const& operation_; + + static void + run (void* arg) { - volatile bool started_; - - typedef function Operation; - Operation const& operation_; - - static void - run (void* arg) - { - REQUIRE (arg); - Thread* startingWrapper = reinterpret_cast(arg); - Operation _doIt_(startingWrapper->operation_); - { - Lock sync(startingWrapper); - startingWrapper->started_ = true; - sync.notify(); // handshake signalling we've gotten the parameter - } - - _doIt_(); // execute the actual operation in the new thread - } - - - void - start_thread (lumiera_thread_class kind, Literal& purpose, NoBugFlag logging_flag, LumieraReccondition joinCond=0) - { - Lock sync(this); - LumieraThread res = - lumiera_thread_run ( kind - , &run // invoking the run helper and.. - , this // passing this start context as parameter + REQUIRE (arg); + Thread* startingWrapper = reinterpret_cast(arg); + Operation _doIt_(startingWrapper->operation_); + + lumiera_thread_sync (); // thread syncronizization point, see lumiera_thread_sync_other() below + + _doIt_(); // execute the actual operation in the new thread + } + + void + start_thread (lumiera_thread_class kind, Literal& purpose, NoBugFlag logging_flag, JoinHandle* jh = 0) + { + handle = + lumiera_thread_run ( kind | (jh?LUMIERA_THREAD_JOINABLE:0) // joinable thread when a join handle is passed + , &run // invoking the run helper and.. + , this // passing this start context as parameter , purpose.c() , logging_flag ); - (void)joinCond; // TODO: this is a temporary fix to match the C API - // we might have to re-write more of this file or even remove it later - - if (!res) - throw lumiera::error::State("failed to create new thread."); - - // make sure the new thread had the opportunity to take the Operation - // prior to leaving and thereby possibly destroying this local context - sync.wait (started_); - } - - public: - /** Create a new thread to execute the given operation. + + if (!handle) /* we dont want to throw here when there was already an error pending */ + lumiera::throwOnError(); + + if (jh) + jh->attach (handle); + + // make sure the new thread had the opportunity to take the Operation + // prior to leaving and thereby possibly destroying this local context + lumiera_thread_sync_other (handle); + } + + public: + /** Create a new thread to execute the given operation. * The new thread starts up synchronously, i.e. when the ctor returns, the new thread * has started running and taken over (copied) the operation functor passed in. The - * thread will be created by lumiera_thread_run (declared in threads.h), it can't + * thread will be created by lumiera_thread_run (declared in threads.h), it can't * be cancelled and it can't be joined. * @param purpose fixed char string used to denote the thread for diagnostics * @param logging_flag NoBug flag to receive diagnostics regarding the new thread * @param operation defining what to execute within the new thread. Any functor * which can be bound to function. Note this functor will be * copied onto the stack of the new thread, thus it can be transient. - * + * */ - Thread (Literal purpose, Operation const& operation, NoBugFlag logging_flag = &NOBUG_FLAG(thread)) - : started_(false), - operation_(operation) - { - start_thread (LUMIERA_THREADCLASS_INTERACTIVE, purpose, logging_flag); - } - - /** Variant of the standard case, used to register a JoinHandle in addition to starting a thread. - * @param join ref to a JoinHandle, which needs to be created in the thread which plans - * to wait-blocking on the termination of this newly created thread - * - */ - Thread (Literal purpose, Operation const& operation, - JoinHandle& join, NoBugFlag logging_flag = &NOBUG_FLAG(thread)) - : started_(false), - operation_(operation) - { - start_thread (LUMIERA_THREADCLASS_INTERACTIVE, purpose, logging_flag, - join.accessLockedCondition()); - } - }; - - - - } // namespace backend + Thread (Literal purpose, Operation const& operation, NoBugFlag logging_flag = &NOBUG_FLAG(thread)) + : operation_(operation) + { + start_thread (LUMIERA_THREADCLASS_INTERACTIVE, purpose, logging_flag); + } + + /** Variant of the standard case, used to register a JoinHandle in addition to starting a thread. + * @param join ref to a JoinHandle, which needs to be created in the thread which plans + * to wait-blocking on the termination of this newly created thread + * + */ + Thread (Literal purpose, Operation const& operation, + JoinHandle& join, NoBugFlag logging_flag = &NOBUG_FLAG(thread)) + : operation_(operation) + { + start_thread (LUMIERA_THREADCLASS_INTERACTIVE, purpose, logging_flag, + &join); + } + + /** + * syncronization barrier, the user supplied operation must have a matching lumiera_thread_sync() call + */ + void + sync () + { + if (!lumiera_thread_sync_other (handle)) + lumiera::throwOnError(); + } + + }; + +} // namespace backend #endif diff --git a/tests/lib/thread-wrapper-join-test.cpp b/tests/lib/thread-wrapper-join-test.cpp index aff2ffab7..f3c32126f 100644 --- a/tests/lib/thread-wrapper-join-test.cpp +++ b/tests/lib/thread-wrapper-join-test.cpp @@ -40,9 +40,9 @@ namespace backend { /************************************************************************** * @test use the Lumiera backend to create some new threads, additionally - * passing an condition variable for waiting on thread termination. + * passing an condition variable for waiting on thread termination. * Actually this is implemented as creating and passing a JoinHandle. - * + * * @see backend::Thread * @see threads.h */ @@ -57,7 +57,7 @@ namespace backend { } - volatile int aValue_; ///< state to be modified by the other thread + volatile int aValue_; ///< state to be modified by the other thread void theAction (int secretValue) ///< to be run in a new thread... @@ -77,12 +77,11 @@ namespace backend { Thread("test Thread joining", bind (&ThreadWrapperJoin_test::theAction, this, mySecret), - waitingHandle); + waitingHandle); // note binding and thread wrapper already destroyed - waitingHandle.join(); // blocks until theAction() is done - ASSERT (aValue_ == mySecret+42); + CHECK (aValue_ == mySecret+42); } @@ -92,11 +91,11 @@ namespace backend { JoinHandle waitingHandle; Thread("test Thread joining-1", - bind (&ThreadWrapperJoin_test::theAction, this, 111)); + bind (&ThreadWrapperJoin_test::theAction, this, 111)); // note we "forget" to pass the JoinHandle - try - { - waitingHandle.join(); // protocol error: handle wasn't passed for starting a Thread; + try + { + waitingHandle.join(); // protocol error: handle wasn't passed for starting a Thread; NOTREACHED(); } catch (lumiera::error::Logic& logo) @@ -119,7 +118,7 @@ namespace backend { } catch (...) { - ASSERT (lumiera_error() == lumiera::error::LUMIERA_ERROR_ASSERTION); + CHECK (lumiera_error() == lumiera::error::LUMIERA_ERROR_ASSERTION); } #endif #endif @@ -127,6 +126,9 @@ namespace backend { // note: the waitingHandle goes out of scope here, // which unblocks the second thread. The first thread wasn't blocked, // while the third thread wasn't created at all. + + waitingHandle.join(); // just making the above thing pass, JoinHandle thows when not joined and going out of scope + // the semantics herer need to be defined (auto joining? see thread-wrapper.hpp) } public: diff --git a/tests/lib/thread-wrapper-test.cpp b/tests/lib/thread-wrapper-test.cpp index af8a6f149..76cbdfc54 100644 --- a/tests/lib/thread-wrapper-test.cpp +++ b/tests/lib/thread-wrapper-test.cpp @@ -31,8 +31,8 @@ using std::tr1::bind; using test::Test; - - +using lib::Sync; +using lib::NonrecursiveLock_NoWait; namespace backend { namespace test { @@ -54,18 +54,21 @@ namespace backend { } - struct TestThread : Thread + struct TestThread + : Thread + , Sync // note: Thread isnt derived from sync anymore, if we want to lock values this needs to be done at per client base + // an alternative would be to use the threads .sync() and lumiera_thread_sync() where approbiate { TestThread() : Thread("test Thread creation", bind (&TestThread::theOperation, this, createVal(), createVal())) { } // note the binding (functor object) is passed as anonymous temporary - - void + + void theOperation (uint a, uint b) ///< the actual operation running in a separate thread { - Lock sync(this); // *not* a recursive lock, because parent unlocks prior to invoking the operation + Lock(this); sum += (a+b); } }; @@ -98,7 +101,7 @@ namespace backend { sum = checksum = 0; TestThread instances[NUM_THREADS] SIDEEFFECT; - usleep (200000); // pause 200ms for the threads to terminate..... + usleep (200000); // pause 200ms for the threads to terminate..... ASSERT (0 < sum); ASSERT (sum==checksum);