update the thread-wrapper to use the new threading API

This removes the explicit Sync things from the thread wraper since
the functionality is almost exactly provided by the backend.

Thread encapsulates a lumiera thread handle now, but this is strictly
optional and might be dropped on the floor (using a temporary as thread).

Thread has a sync() function which allows user controlled syncronous
startup:

 Thread("foo", myoperation).sync();

will startup myoperation and only return from the ctor after myoperation
called a matching lumiera_thread_sync().

The related tests need to initialize/destroy the threadpool accordingly
This commit is contained in:
Christian Thaeter 2010-01-20 18:45:48 +01:00
parent 9fc68c9d32
commit 8607a4006a
3 changed files with 135 additions and 146 deletions

View file

@ -27,6 +27,7 @@
#include "include/logging.h"
#include "lib/sync.hpp"
#include "lib/error.hpp"
extern "C" {
#include "backend/threads.h"
@ -41,9 +42,6 @@ namespace backend {
using std::tr1::bind;
using std::tr1::function;
using lib::Literal;
using lib::Sync;
using lib::RecursiveLock_Waitable;
using lib::NonrecursiveLock_Waitable;
typedef struct nobug_flag* NoBugFlag;
@ -60,68 +58,52 @@ namespace backend {
* goes out of scope altogether. Explanation: this is implemented by locking
* the embedded monitor immediately in the ctor. Thus, unless entering the
* wait state, the contained mutex remains locked and prevents the thread
* manager from invoking the broadcast() on the condition var.
*
* @see thread-wrapper-join-test.cpp
* manager from invoking the broadcast() on the condition var.
*
* @see thread-wrapper-join-test.cpp
*/
class JoinHandle
: public Sync<RecursiveLock_Waitable>
, Sync<RecursiveLock_Waitable>::Lock // noncopyable, immediately acquire the lock
: boost::noncopyable
{
friend class Thread;
LumieraThread handle;
void
attach (LumieraThread handle_)
{
typedef Sync<RecursiveLock_Waitable> SyncBase;
bool isWaiting_;
volatile bool armed_;
friend class Thread;
LumieraReccondition
accessLockedCondition()
{
ASSERT (!armed_, "Lifecycle error, JoinHandle used for several threads.");
armed_ = true;
return accessMonitor().accessCond();
}
bool
wakeupCheck()
{
if (!armed_)
throw lumiera::error::Logic ("no thread created blocking on this JoinHandle");
if (!isWaiting_)
{
isWaiting_ = true;
return false; // causes entering the blocking wait
}
return true; // causes end of blocking wait
}
public:
REQUIRE (!handle, "JoinHandle used for several threads.");
handle = handle_;
}
public:
/** Create a promise, that the current thread will or may
* wait-blocking on another not-yet existing thread to terminate.
* When passed in on creation of the other thread, as long as this
* handle lives, the other thread will be on hold after termination.
*/
JoinHandle()
: SyncBase::Lock(this)
, isWaiting_(false)
, armed_(false)
{ }
/** put the current thread into a blocking wait until another thread
JoinHandle(LumieraThread handle_ = 0) : handle(handle_) {}
~JoinHandle() { if(handle) throw "not joined";} /* TODO decide if we want auto-joining here? (any way, throwing in dtor isnt very nice) */
/** put the current thread into a blocking wait until another thread
* has terminated. This other thread needs to be created by the Thread
* wrapper, passing this JoinHandle as ctor parameter.
* @throws error::Logic if no thread has been registered to block on this
*/
void
join()
{
accessMonitor().wait (&handle, *this, &JoinHandle::wakeupCheck);
}
};
*/
void
join()
{
if (!handle)
throw lumiera::error::Logic ("no thread created blocking on this JoinHandle");
lumiera_err err = lumiera_thread_join (handle);
lumiera_error_set(err, 0);
handle = 0;
if (err) /* dont throw here when there is already an error pending (ymmv)*/
lumiera::throwOnError();
}
};
@ -134,99 +116,101 @@ namespace backend {
* - supports integrating with an existing object monitor based lock (planned)
* The new thread starts immediately within the ctor; after returning, the new
* thread has already copied the arguments and indeed actively started to run.
*
*
* @note this class is \em not a thread handle. Within Lumiera, we do all of
* our thread management such as to avoid using global thread handles.
* If some cooperation between threads is needed, this should be done
* in a implementation private way, e.g. by sharing a condition var.
*
* in a implementation private way, e.g. by sharing a condition var.
*
* @todo Ichthyo started this wrapper 12/08 while our own thread handling
* was just being shaped. It may well be possible that such a wrapper
* is superfluous in the final application. Re-evaluate this!
*/
class Thread
: public Sync<NonrecursiveLock_Waitable>
, boost::noncopyable
: boost::noncopyable
{
LumieraThread handle;
typedef function<void(void)> Operation;
Operation const& operation_;
static void
run (void* arg)
{
volatile bool started_;
typedef function<void(void)> Operation;
Operation const& operation_;
static void
run (void* arg)
{
REQUIRE (arg);
Thread* startingWrapper = reinterpret_cast<Thread*>(arg);
Operation _doIt_(startingWrapper->operation_);
{
Lock sync(startingWrapper);
startingWrapper->started_ = true;
sync.notify(); // handshake signalling we've gotten the parameter
}
_doIt_(); // execute the actual operation in the new thread
}
void
start_thread (lumiera_thread_class kind, Literal& purpose, NoBugFlag logging_flag, LumieraReccondition joinCond=0)
{
Lock sync(this);
LumieraThread res =
lumiera_thread_run ( kind
, &run // invoking the run helper and..
, this // passing this start context as parameter
REQUIRE (arg);
Thread* startingWrapper = reinterpret_cast<Thread*>(arg);
Operation _doIt_(startingWrapper->operation_);
lumiera_thread_sync (); // thread syncronizization point, see lumiera_thread_sync_other() below
_doIt_(); // execute the actual operation in the new thread
}
void
start_thread (lumiera_thread_class kind, Literal& purpose, NoBugFlag logging_flag, JoinHandle* jh = 0)
{
handle =
lumiera_thread_run ( kind | (jh?LUMIERA_THREAD_JOINABLE:0) // joinable thread when a join handle is passed
, &run // invoking the run helper and..
, this // passing this start context as parameter
, purpose.c()
, logging_flag
);
(void)joinCond; // TODO: this is a temporary fix to match the C API
// we might have to re-write more of this file or even remove it later
if (!res)
throw lumiera::error::State("failed to create new thread.");
// make sure the new thread had the opportunity to take the Operation
// prior to leaving and thereby possibly destroying this local context
sync.wait (started_);
}
public:
/** Create a new thread to execute the given operation.
if (!handle) /* we dont want to throw here when there was already an error pending */
lumiera::throwOnError();
if (jh)
jh->attach (handle);
// make sure the new thread had the opportunity to take the Operation
// prior to leaving and thereby possibly destroying this local context
lumiera_thread_sync_other (handle);
}
public:
/** Create a new thread to execute the given operation.
* The new thread starts up synchronously, i.e. when the ctor returns, the new thread
* has started running and taken over (copied) the operation functor passed in. The
* thread will be created by lumiera_thread_run (declared in threads.h), it can't
* thread will be created by lumiera_thread_run (declared in threads.h), it can't
* be cancelled and it can't be joined.
* @param purpose fixed char string used to denote the thread for diagnostics
* @param logging_flag NoBug flag to receive diagnostics regarding the new thread
* @param operation defining what to execute within the new thread. Any functor
* which can be bound to function<void(void)>. Note this functor will be
* copied onto the stack of the new thread, thus it can be transient.
*
*
*/
Thread (Literal purpose, Operation const& operation, NoBugFlag logging_flag = &NOBUG_FLAG(thread))
: started_(false),
operation_(operation)
{
start_thread (LUMIERA_THREADCLASS_INTERACTIVE, purpose, logging_flag);
}
/** Variant of the standard case, used to register a JoinHandle in addition to starting a thread.
* @param join ref to a JoinHandle, which needs to be created in the thread which plans
* to wait-blocking on the termination of this newly created thread
*
*/
Thread (Literal purpose, Operation const& operation,
JoinHandle& join, NoBugFlag logging_flag = &NOBUG_FLAG(thread))
: started_(false),
operation_(operation)
{
start_thread (LUMIERA_THREADCLASS_INTERACTIVE, purpose, logging_flag,
join.accessLockedCondition());
}
};
} // namespace backend
Thread (Literal purpose, Operation const& operation, NoBugFlag logging_flag = &NOBUG_FLAG(thread))
: operation_(operation)
{
start_thread (LUMIERA_THREADCLASS_INTERACTIVE, purpose, logging_flag);
}
/** Variant of the standard case, used to register a JoinHandle in addition to starting a thread.
* @param join ref to a JoinHandle, which needs to be created in the thread which plans
* to wait-blocking on the termination of this newly created thread
*
*/
Thread (Literal purpose, Operation const& operation,
JoinHandle& join, NoBugFlag logging_flag = &NOBUG_FLAG(thread))
: operation_(operation)
{
start_thread (LUMIERA_THREADCLASS_INTERACTIVE, purpose, logging_flag,
&join);
}
/**
* syncronization barrier, the user supplied operation must have a matching lumiera_thread_sync() call
*/
void
sync ()
{
if (!lumiera_thread_sync_other (handle))
lumiera::throwOnError();
}
};
} // namespace backend
#endif

View file

@ -40,9 +40,9 @@ namespace backend {
/**************************************************************************
* @test use the Lumiera backend to create some new threads, additionally
* passing an condition variable for waiting on thread termination.
* passing an condition variable for waiting on thread termination.
* Actually this is implemented as creating and passing a JoinHandle.
*
*
* @see backend::Thread
* @see threads.h
*/
@ -57,7 +57,7 @@ namespace backend {
}
volatile int aValue_; ///< state to be modified by the other thread
volatile int aValue_; ///< state to be modified by the other thread
void
theAction (int secretValue) ///< to be run in a new thread...
@ -77,12 +77,11 @@ namespace backend {
Thread("test Thread joining",
bind (&ThreadWrapperJoin_test::theAction, this, mySecret),
waitingHandle);
waitingHandle);
// note binding and thread wrapper already destroyed
waitingHandle.join(); // blocks until theAction() is done
ASSERT (aValue_ == mySecret+42);
CHECK (aValue_ == mySecret+42);
}
@ -92,11 +91,11 @@ namespace backend {
JoinHandle waitingHandle;
Thread("test Thread joining-1",
bind (&ThreadWrapperJoin_test::theAction, this, 111));
bind (&ThreadWrapperJoin_test::theAction, this, 111));
// note we "forget" to pass the JoinHandle
try
{
waitingHandle.join(); // protocol error: handle wasn't passed for starting a Thread;
try
{
waitingHandle.join(); // protocol error: handle wasn't passed for starting a Thread;
NOTREACHED();
}
catch (lumiera::error::Logic& logo)
@ -119,7 +118,7 @@ namespace backend {
}
catch (...)
{
ASSERT (lumiera_error() == lumiera::error::LUMIERA_ERROR_ASSERTION);
CHECK (lumiera_error() == lumiera::error::LUMIERA_ERROR_ASSERTION);
}
#endif
#endif
@ -127,6 +126,9 @@ namespace backend {
// note: the waitingHandle goes out of scope here,
// which unblocks the second thread. The first thread wasn't blocked,
// while the third thread wasn't created at all.
waitingHandle.join(); // just making the above thing pass, JoinHandle thows when not joined and going out of scope
// the semantics herer need to be defined (auto joining? see thread-wrapper.hpp)
}
public:

View file

@ -31,8 +31,8 @@
using std::tr1::bind;
using test::Test;
using lib::Sync;
using lib::NonrecursiveLock_NoWait;
namespace backend {
namespace test {
@ -54,18 +54,21 @@ namespace backend {
}
struct TestThread : Thread
struct TestThread
: Thread
, Sync<NonrecursiveLock_NoWait> // note: Thread isnt derived from sync anymore, if we want to lock values this needs to be done at per client base
// an alternative would be to use the threads .sync() and lumiera_thread_sync() where approbiate
{
TestThread()
: Thread("test Thread creation",
bind (&TestThread::theOperation, this, createVal(), createVal()))
{ } // note the binding (functor object) is passed as anonymous temporary
void
void
theOperation (uint a, uint b) ///< the actual operation running in a separate thread
{
Lock sync(this); // *not* a recursive lock, because parent unlocks prior to invoking the operation
Lock(this);
sum += (a+b);
}
};
@ -98,7 +101,7 @@ namespace backend {
sum = checksum = 0;
TestThread instances[NUM_THREADS] SIDEEFFECT;
usleep (200000); // pause 200ms for the threads to terminate.....
usleep (200000); // pause 200ms for the threads to terminate.....
ASSERT (0 < sum);
ASSERT (sum==checksum);