refactor the C++ thread-wrapper, remove JoinHandle
This commit is contained in:
parent
8607a4006a
commit
93af4ed017
4 changed files with 255 additions and 275 deletions
|
|
@ -2,7 +2,8 @@
|
|||
THREADWRAPPER.hpp - thin convenience wrapper for starting lumiera threads
|
||||
|
||||
Copyright (C) Lumiera.org
|
||||
2008, Hermann Vosseler <Ichthyostega@web.de>
|
||||
2008 - 2010 Hermann Vosseler <Ichthyostega@web.de>
|
||||
Christian Thaeter <ct@pipapo.org>
|
||||
|
||||
This program is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU General Public License as
|
||||
|
|
@ -25,9 +26,9 @@
|
|||
#define LIB_THREADWRAPPER_H
|
||||
|
||||
|
||||
#include "include/logging.h"
|
||||
#include "lib/sync.hpp"
|
||||
#include "lib/error.hpp"
|
||||
#include "include/logging.h"
|
||||
#include "lib/bool-checkable.hpp"
|
||||
|
||||
extern "C" {
|
||||
#include "backend/threads.h"
|
||||
|
|
@ -45,172 +46,166 @@ namespace backend {
|
|||
|
||||
typedef struct nobug_flag* NoBugFlag;
|
||||
|
||||
class Thread;
|
||||
|
||||
/**
|
||||
* Brainstorming-in-code: how I would like to shape the API for joining threads.
|
||||
* Intended use: This non-copyable handle has to be created within the thread which
|
||||
* wants to wait-blocking on the termination of another thread. You then pass it
|
||||
* into the ctor of the Thread starting wrapper class (see below), which causes
|
||||
* the embedded lock/condition var to be used to sync on the end of the newly
|
||||
* created thread. Note, after ending the execution, the newly created thread
|
||||
* will be on hold until either the #join() function is called or this handle
|
||||
* goes out of scope altogether. Explanation: this is implemented by locking
|
||||
* the embedded monitor immediately in the ctor. Thus, unless entering the
|
||||
* wait state, the contained mutex remains locked and prevents the thread
|
||||
* manager from invoking the broadcast() on the condition var.
|
||||
*
|
||||
* @see thread-wrapper-join-test.cpp
|
||||
*/
|
||||
class JoinHandle
|
||||
: boost::noncopyable
|
||||
{
|
||||
friend class Thread;
|
||||
LumieraThread handle;
|
||||
|
||||
void
|
||||
attach (LumieraThread handle_)
|
||||
{
|
||||
REQUIRE (!handle, "JoinHandle used for several threads.");
|
||||
handle = handle_;
|
||||
}
|
||||
|
||||
public:
|
||||
/** Create a promise, that the current thread will or may
|
||||
* wait-blocking on another not-yet existing thread to terminate.
|
||||
* When passed in on creation of the other thread, as long as this
|
||||
* handle lives, the other thread will be on hold after termination.
|
||||
*/
|
||||
JoinHandle(LumieraThread handle_ = 0) : handle(handle_) {}
|
||||
|
||||
~JoinHandle() { if(handle) throw "not joined";} /* TODO decide if we want auto-joining here? (any way, throwing in dtor isnt very nice) */
|
||||
|
||||
/** put the current thread into a blocking wait until another thread
|
||||
* has terminated. This other thread needs to be created by the Thread
|
||||
* wrapper, passing this JoinHandle as ctor parameter.
|
||||
* @throws error::Logic if no thread has been registered to block on this
|
||||
*/
|
||||
void
|
||||
join()
|
||||
{
|
||||
if (!handle)
|
||||
throw lumiera::error::Logic ("no thread created blocking on this JoinHandle");
|
||||
|
||||
lumiera_err err = lumiera_thread_join (handle);
|
||||
lumiera_error_set(err, 0);
|
||||
|
||||
handle = 0;
|
||||
if (err) /* dont throw here when there is already an error pending (ymmv)*/
|
||||
lumiera::throwOnError();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
|
||||
/****************************************************************************
|
||||
* A thin convenience wrapper for dealing with threads,
|
||||
* as implemented by the backend (on top of pthread).
|
||||
* as implemented by the threadpool in the backend (based on pthread).
|
||||
* Using this wrapper...
|
||||
* - helps with passing data to the function executed in the new thread
|
||||
* - allows to bind to various kinds of functions including member functions
|
||||
* - supports integrating with an existing object monitor based lock (planned)
|
||||
* The new thread starts immediately within the ctor; after returning, the new
|
||||
* thread has already copied the arguments and indeed actively started to run.
|
||||
*
|
||||
* @note this class is \em not a thread handle. Within Lumiera, we do all of
|
||||
* our thread management such as to avoid using global thread handles.
|
||||
* If some cooperation between threads is needed, this should be done
|
||||
* in a implementation private way, e.g. by sharing a condition var.
|
||||
*
|
||||
* @todo Ichthyo started this wrapper 12/08 while our own thread handling
|
||||
* was just being shaped. It may well be possible that such a wrapper
|
||||
* is superfluous in the final application. Re-evaluate this!
|
||||
*/
|
||||
class Thread
|
||||
: boost::noncopyable
|
||||
: lib::BoolCheckable< Thread
|
||||
, boost::noncopyable> //////TODO: do we want Thread instances to be copyable?
|
||||
{
|
||||
LumieraThread handle;
|
||||
|
||||
|
||||
protected:
|
||||
typedef function<void(void)> Operation;
|
||||
Operation const& operation_;
|
||||
|
||||
static void
|
||||
run (void* arg)
|
||||
{
|
||||
REQUIRE (arg);
|
||||
Thread* startingWrapper = reinterpret_cast<Thread*>(arg);
|
||||
Operation _doIt_(startingWrapper->operation_);
|
||||
|
||||
lumiera_thread_sync (); // thread syncronizization point, see lumiera_thread_sync_other() below
|
||||
|
||||
_doIt_(); // execute the actual operation in the new thread
|
||||
}
|
||||
|
||||
void
|
||||
start_thread (lumiera_thread_class kind, Literal& purpose, NoBugFlag logging_flag, JoinHandle* jh = 0)
|
||||
{
|
||||
handle =
|
||||
lumiera_thread_run ( kind | (jh?LUMIERA_THREAD_JOINABLE:0) // joinable thread when a join handle is passed
|
||||
, &run // invoking the run helper and..
|
||||
, this // passing this start context as parameter
|
||||
, purpose.c()
|
||||
, logging_flag
|
||||
);
|
||||
|
||||
if (!handle) /* we dont want to throw here when there was already an error pending */
|
||||
lumiera::throwOnError();
|
||||
|
||||
if (jh)
|
||||
jh->attach (handle);
|
||||
|
||||
// make sure the new thread had the opportunity to take the Operation
|
||||
// prior to leaving and thereby possibly destroying this local context
|
||||
lumiera_thread_sync_other (handle);
|
||||
}
|
||||
|
||||
|
||||
|
||||
struct ThreadStartContext
|
||||
: boost::noncopyable
|
||||
{
|
||||
|
||||
Operation const& operation_;
|
||||
|
||||
static void
|
||||
run (void* arg)
|
||||
{
|
||||
REQUIRE (arg);
|
||||
ThreadStartContext* ctx = reinterpret_cast<ThreadStartContext*>(arg);
|
||||
Operation _doIt_(ctx->operation_);
|
||||
|
||||
lumiera_thread_sync (); // sync point: arguments handed over
|
||||
|
||||
_doIt_(); // execute the actual operation in the new thread
|
||||
}
|
||||
|
||||
public:
|
||||
ThreadStartContext (LumieraThread& handle
|
||||
,Operation const& operation_to_execute
|
||||
,Literal& purpose
|
||||
,NoBugFlag logging_flag
|
||||
,uint additionalFlags =0
|
||||
)
|
||||
: operation_(operation_to_execute)
|
||||
{
|
||||
REQUIRE (!lumiera_error(), "Error pending at thread start") ;
|
||||
handle =
|
||||
lumiera_thread_run ( LUMIERA_THREADCLASS_INTERACTIVE | additionalFlags
|
||||
, &run // invoking the run helper and..
|
||||
, this // passing this start context as parameter
|
||||
, purpose.c()
|
||||
, logging_flag
|
||||
);
|
||||
if (!handle)
|
||||
lumiera::throwOnError();
|
||||
|
||||
// make sure the new thread had the opportunity to take the Operation
|
||||
// prior to leaving and thereby possibly destroying this local context
|
||||
lumiera_thread_sync_other (handle);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
|
||||
LumieraThread thread_;
|
||||
|
||||
Thread() : thread_(0) { }
|
||||
|
||||
|
||||
public:
|
||||
/** Create a new thread to execute the given operation.
|
||||
* The new thread starts up synchronously, i.e. when the ctor returns, the new thread
|
||||
* has started running and taken over (copied) the operation functor passed in. The
|
||||
* thread will be created by lumiera_thread_run (declared in threads.h), it can't
|
||||
* be cancelled and it can't be joined.
|
||||
* @param purpose fixed char string used to denote the thread for diagnostics
|
||||
* @param logging_flag NoBug flag to receive diagnostics regarding the new thread
|
||||
* @param operation defining what to execute within the new thread. Any functor
|
||||
* which can be bound to function<void(void)>. Note this functor will be
|
||||
* copied onto the stack of the new thread, thus it can be transient.
|
||||
*
|
||||
*/
|
||||
Thread (Literal purpose, Operation const& operation, NoBugFlag logging_flag = &NOBUG_FLAG(thread))
|
||||
: operation_(operation)
|
||||
{
|
||||
start_thread (LUMIERA_THREADCLASS_INTERACTIVE, purpose, logging_flag);
|
||||
}
|
||||
|
||||
/** Variant of the standard case, used to register a JoinHandle in addition to starting a thread.
|
||||
* @param join ref to a JoinHandle, which needs to be created in the thread which plans
|
||||
* to wait-blocking on the termination of this newly created thread
|
||||
*
|
||||
* The new thread starts up synchronously, it can't
|
||||
* be cancelled and it can't be joined.
|
||||
* @param purpose fixed char string used to denote the thread for diagnostics
|
||||
* @param logging_flag NoBug flag to receive diagnostics regarding the new thread
|
||||
* @param operation defining what to execute within the new thread. Any functor
|
||||
* which can be bound to function<void(void)>. Note this functor will be
|
||||
* copied onto the stack of the new thread, thus it can be transient.
|
||||
*/
|
||||
Thread (Literal purpose, Operation const& operation,
|
||||
JoinHandle& join, NoBugFlag logging_flag = &NOBUG_FLAG(thread))
|
||||
: operation_(operation)
|
||||
{
|
||||
start_thread (LUMIERA_THREADCLASS_INTERACTIVE, purpose, logging_flag,
|
||||
&join);
|
||||
}
|
||||
|
||||
/**
|
||||
* syncronization barrier, the user supplied operation must have a matching lumiera_thread_sync() call
|
||||
Thread (Literal purpose, Operation const& operation, NoBugFlag logging_flag = &NOBUG_FLAG(thread))
|
||||
: thread_(0)
|
||||
{
|
||||
ThreadStartContext (thread_, operation, purpose, logging_flag);
|
||||
}
|
||||
|
||||
/** automatic conversion: Thread instance can stand-in for a handle
|
||||
* @throws lumiera::error::State when thread isn't running */
|
||||
operator LumieraThread() const
|
||||
{
|
||||
if (!isValid())
|
||||
throw lumiera::error::State("thread not executing (anymore)");
|
||||
|
||||
return thread_;
|
||||
}
|
||||
|
||||
bool
|
||||
isValid() const
|
||||
{
|
||||
return thread_
|
||||
&& true ////////////TODO: how to determine that the thread is still running?
|
||||
;
|
||||
}
|
||||
|
||||
|
||||
/** Synchronisation barrier. In the function executing in this thread
|
||||
* needs to be a corresponding lumiera_thread_sync() call. Blocking
|
||||
* until both the caller and the thread have reached the barrier.
|
||||
*/
|
||||
void
|
||||
sync ()
|
||||
{
|
||||
if (!lumiera_thread_sync_other (handle))
|
||||
lumiera::throwOnError();
|
||||
}
|
||||
|
||||
{
|
||||
REQUIRE (isValid(), "Thread terminated");
|
||||
if (!lumiera_thread_sync_other (thread_))
|
||||
lumiera::throwOnError();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Variant of the standard case, allowing additionally
|
||||
* to join on the termination of this thread.
|
||||
*/
|
||||
class ThreadJoinable
|
||||
: public Thread
|
||||
{
|
||||
public:
|
||||
ThreadJoinable (Literal purpose, Operation const& operation,
|
||||
NoBugFlag logging_flag = &NOBUG_FLAG(thread))
|
||||
: Thread()
|
||||
{
|
||||
ThreadStartContext (thread_, operation, purpose, logging_flag,
|
||||
LUMIERA_THREAD_JOINABLE);
|
||||
}
|
||||
|
||||
|
||||
/** put the caller into a blocking wait until this thread has terminated.
|
||||
* @throws error::Logic if this thread has already terminated
|
||||
*/
|
||||
void join()
|
||||
{
|
||||
if (!isValid())
|
||||
throw lumiera::error::Logic ("joining on an already terminated thread");
|
||||
|
||||
lumiera_err errorInOtherThread =
|
||||
lumiera_thread_join (thread_);
|
||||
thread_ = 0;
|
||||
|
||||
if (errorInOtherThread)
|
||||
throw lumiera::error::State ("Thread terminated with error:");
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
|
||||
} // namespace backend
|
||||
#endif
|
||||
|
|
|
|||
|
|
@ -57,8 +57,7 @@ namespace proc {
|
|||
* with adjustable frequency. Quick'n dirty implementation!
|
||||
*/
|
||||
class TickService
|
||||
: backend::JoinHandle,
|
||||
backend::Thread
|
||||
: backend::ThreadJoinable
|
||||
{
|
||||
typedef function<void(void)> Tick;
|
||||
volatile uint timespan_;
|
||||
|
|
@ -68,10 +67,9 @@ namespace proc {
|
|||
|
||||
public:
|
||||
TickService (Tick callback)
|
||||
: Thread("Tick generator (dummy)",
|
||||
bind (&TickService::timerLoop, this, callback),
|
||||
(backend::JoinHandle&)*this
|
||||
)
|
||||
: ThreadJoinable("Tick generator (dummy)"
|
||||
, bind (&TickService::timerLoop, this, callback)
|
||||
)
|
||||
{
|
||||
INFO (proc, "TickService started.");
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@
|
|||
|
||||
|
||||
#include "lib/test/run.hpp"
|
||||
#include "lib/test/test-helper.hpp"
|
||||
|
||||
#include "lib/symbol.hpp"
|
||||
#include "backend/thread-wrapper.hpp"
|
||||
|
|
@ -36,117 +37,106 @@ using test::Test;
|
|||
|
||||
|
||||
namespace backend {
|
||||
namespace test {
|
||||
namespace test {
|
||||
|
||||
/**************************************************************************
|
||||
* @test use the Lumiera backend to create some new threads, additionally
|
||||
* passing an condition variable for waiting on thread termination.
|
||||
* Actually this is implemented as creating and passing a JoinHandle.
|
||||
*
|
||||
* @see backend::Thread
|
||||
* @see threads.h
|
||||
*/
|
||||
class ThreadWrapperJoin_test : public Test
|
||||
{
|
||||
|
||||
virtual void
|
||||
run (Arg)
|
||||
{
|
||||
simpleUse ();
|
||||
wrongUse ();
|
||||
}
|
||||
|
||||
|
||||
volatile int aValue_; ///< state to be modified by the other thread
|
||||
|
||||
void
|
||||
theAction (int secretValue) ///< to be run in a new thread...
|
||||
{
|
||||
usleep (100000); // pause 100ms prior to modifying
|
||||
using lumiera::error::LUMIERA_ERROR_LOGIC;
|
||||
namespace {
|
||||
|
||||
const uint DESTRUCTION_CODE = 23;
|
||||
|
||||
LUMIERA_ERROR_DEFINE(SPECIAL, "grandiose exception");
|
||||
}
|
||||
|
||||
|
||||
/***************************************************************************
|
||||
* @test use the Lumiera backend to create some new threads, additionally
|
||||
* synchronising with these child threads and waiting for termination.
|
||||
*
|
||||
* @see backend::Thread
|
||||
* @see threads.h
|
||||
*/
|
||||
class ThreadWrapperJoin_test : public Test
|
||||
{
|
||||
|
||||
virtual void
|
||||
run (Arg)
|
||||
{
|
||||
simpleUse ();
|
||||
wrongUse ();
|
||||
getError ();
|
||||
}
|
||||
|
||||
|
||||
volatile int aValue_; ///< state to be modified by the other thread
|
||||
|
||||
void
|
||||
theAction (int secretValue) ///< to be run in a new thread...
|
||||
{
|
||||
usleep (100000); // pause 100ms prior to modifying
|
||||
|
||||
if (DESTRUCTION_CODE == secretValue)
|
||||
lumiera_error_set(LUMIERA_ERROR_SPECIAL, 0);
|
||||
else
|
||||
aValue_ = secretValue+42;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
simpleUse ()
|
||||
{
|
||||
aValue_=0;
|
||||
int mySecret = (rand() % 1000) - 500;
|
||||
|
||||
JoinHandle waitingHandle;
|
||||
|
||||
Thread("test Thread joining",
|
||||
bind (&ThreadWrapperJoin_test::theAction, this, mySecret),
|
||||
waitingHandle);
|
||||
// note binding and thread wrapper already destroyed
|
||||
waitingHandle.join(); // blocks until theAction() is done
|
||||
|
||||
CHECK (aValue_ == mySecret+42);
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
wrongUse ()
|
||||
{
|
||||
JoinHandle waitingHandle;
|
||||
|
||||
Thread("test Thread joining-1",
|
||||
bind (&ThreadWrapperJoin_test::theAction, this, 111));
|
||||
// note we "forget" to pass the JoinHandle
|
||||
try
|
||||
{
|
||||
waitingHandle.join(); // protocol error: handle wasn't passed for starting a Thread;
|
||||
NOTREACHED();
|
||||
}
|
||||
catch (lumiera::error::Logic& logo)
|
||||
{ lumiera_error(); }
|
||||
|
||||
|
||||
Thread("test Thread joining-2",
|
||||
bind (&ThreadWrapperJoin_test::theAction, this, 222),
|
||||
waitingHandle); // this time we pass it....
|
||||
|
||||
#ifdef DEBUG
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////TODO: better way of detecting debug builds
|
||||
#if false /////////////////////////////////////////////////////////////////////////////////////////////TODO: re-enable assertions to throw, and make this configurable
|
||||
try
|
||||
{
|
||||
Thread("test Thread joining-3",
|
||||
bind (&ThreadWrapperJoin_test::theAction, this, 333),
|
||||
waitingHandle); // but then pass it again for another thread....
|
||||
NOTREACHED();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
CHECK (lumiera_error() == lumiera::error::LUMIERA_ERROR_ASSERTION);
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
|
||||
// note: the waitingHandle goes out of scope here,
|
||||
// which unblocks the second thread. The first thread wasn't blocked,
|
||||
// while the third thread wasn't created at all.
|
||||
|
||||
waitingHandle.join(); // just making the above thing pass, JoinHandle thows when not joined and going out of scope
|
||||
// the semantics herer need to be defined (auto joining? see thread-wrapper.hpp)
|
||||
}
|
||||
|
||||
public:
|
||||
ThreadWrapperJoin_test()
|
||||
{ lumiera_threadpool_init(); }
|
||||
|
||||
~ThreadWrapperJoin_test()
|
||||
{ lumiera_threadpool_destroy(); }
|
||||
|
||||
};
|
||||
|
||||
|
||||
|
||||
/** Register this test class... */
|
||||
LAUNCHER (ThreadWrapperJoin_test, "function common");
|
||||
|
||||
|
||||
|
||||
} // namespace test
|
||||
|
||||
} // namespace backend
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
simpleUse ()
|
||||
{
|
||||
aValue_=0;
|
||||
int mySecret = (rand() % 1000) - 500;
|
||||
|
||||
ThreadJoinable newThread("test Thread joining-1"
|
||||
, bind (&ThreadWrapperJoin_test::theAction, this, mySecret)
|
||||
);
|
||||
newThread.join(); // blocks until theAction() is done
|
||||
|
||||
CHECK (aValue_ == mySecret+42);
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
wrongUse ()
|
||||
{
|
||||
ThreadJoinable newThread("test Thread joining-2"
|
||||
, bind (&ThreadWrapperJoin_test::theAction, this, 1234)
|
||||
);
|
||||
newThread.join(); // blocks until theAction() is done
|
||||
|
||||
VERIFY_ERROR(LOGIC, newThread.join() );
|
||||
VERIFY_ERROR(LOGIC, newThread.join() );
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
getError()
|
||||
{
|
||||
ThreadJoinable newThread("test Thread joining-2"
|
||||
, bind (&ThreadWrapperJoin_test::theAction, this, DESTRUCTION_CODE)
|
||||
);
|
||||
|
||||
VERIFY_ERROR(SPECIAL, newThread.join() );
|
||||
}
|
||||
|
||||
|
||||
|
||||
public:
|
||||
ThreadWrapperJoin_test()
|
||||
{ lumiera_threadpool_init(); }
|
||||
|
||||
~ThreadWrapperJoin_test()
|
||||
{ lumiera_threadpool_destroy(); }
|
||||
|
||||
};
|
||||
|
||||
|
||||
|
||||
/** Register this test class... */
|
||||
LAUNCHER (ThreadWrapperJoin_test, "function common");
|
||||
|
||||
|
||||
|
||||
}} // namespace backend::test
|
||||
|
|
|
|||
|
|
@ -58,8 +58,7 @@ namespace lib {
|
|||
namespace test{
|
||||
|
||||
|
||||
using backend::Thread;
|
||||
using backend::JoinHandle;
|
||||
using backend::ThreadJoinable;
|
||||
using util::for_each;
|
||||
using util::isnil;
|
||||
using std::tr1::placeholders::_1;
|
||||
|
|
@ -234,15 +233,13 @@ namespace test{
|
|||
* and decrements on random targets.
|
||||
*/
|
||||
class SingleCheck
|
||||
: JoinHandle,
|
||||
Thread
|
||||
: ThreadJoinable
|
||||
{
|
||||
public:
|
||||
SingleCheck (TypedCounter& counter_to_use)
|
||||
: Thread("TypedCounter_test worker Thread"
|
||||
, bind (&SingleCheck::runCheckSequence, this, ref(counter_to_use), (rand() % MAX_ITERATIONS))
|
||||
, (backend::JoinHandle&)*this
|
||||
)
|
||||
: ThreadJoinable("TypedCounter_test worker Thread"
|
||||
, bind (&SingleCheck::runCheckSequence, this, ref(counter_to_use), (rand() % MAX_ITERATIONS))
|
||||
)
|
||||
{ }
|
||||
|
||||
~SingleCheck () { this->join(); }
|
||||
|
|
|
|||
Loading…
Reference in a new issue