Library: generalise pipeline summation into fold-left

Using the same building blocks, this operation can be generalised even more,
leading to a much cleaner implementation (also with better type deduction).

The feature actually used here, namely summing up all values,
can then be provided as a convenience shortcut, filling in std::plus
as a default reduction operator.
This commit is contained in:
Fischlurch 2023-09-24 02:45:43 +02:00
parent b416a67bb9
commit 35ff53a716
7 changed files with 698 additions and 1164 deletions

View file

@ -483,6 +483,19 @@ namespace lib {
}
/**
* @internal derive suitable result value types when reducing elements into an accumulator.
*/
template<class SRC, class FUN>
struct _ReduceTraits
{
using Result = typename iter_explorer::_FunTraits<FUN,SRC>::Res;
using ResVal = typename lib::meta::RefTraits<Result>::Value;
};
/**
* @internal Base of pipe processing decorator chain.
@ -1602,26 +1615,36 @@ namespace lib {
consumeFun (pipeline);
}
/**
* _terminal builder_ to invoke sum up resulting number values from the pipeline.
* @return accumulation of all results from the pipeline, combined by `std::plus`
* _terminal builder_ to sum up or reduce values from the pipeline.
* In the general case a _fold-left_ operation is performed; default values for the
* joining operation and the initial value however allow to fall back on summation of values.
* @param accessor a functor working on the pipeline result values or the iterator
* @param junctor (optional) binary operation, joining the sum with the next result of the junctor
* @param seedVal (optional) initial value to start accumulation from
* @return accumulation of all results from the pipeline, combined with the junctor
*/
template<class FUN>
auto
resultSum (FUN&& accessor)
template<class FUN
,typename COMB =decltype(std::plus<>())
,typename VAL =typename iter_explorer::_ReduceTraits<SRC,FUN>::ResVal>
VAL
reduce (FUN&& accessor
,COMB junctor =COMB()
,VAL seedVal =VAL())
{
auto accessVal = iter_explorer::_FunTraits<FUN,SRC>::adaptFunctor (forward<FUN> (accessor));
value_type sum{};
SRC& pipeline = *this;
for ( ; pipeline; ++pipeline)
sum += accessVal (pipeline);
VAL sum{move(seedVal)};
IterExplorer::foreach ([&](SRC& srcIter){ sum = junctor (sum, accessVal(srcIter)); });
return sum;
}
/** simplified _terminal builder_ to [reduce](\ref #reduce) by numeric sum. */
auto
resultSum()
{
return IterExplorer::resultSum ([](const reference val){ return val; });
return IterExplorer::reduce ([](const reference val){ return val; });
}

View file

@ -12,7 +12,12 @@ return: 0
END
TEST "Thread-local diagnostic context" DiagnosticContext_test <<END
TEST "N-fold synchronisation barrier" SyncBarrier_test <<END
return: 0
END
PLANNED "Yield-waiting sync performance" SyncBarrierPerformance_test <<END
return: 0
END

View file

@ -53,6 +53,11 @@ return: 0
END
TEST "Thread-local diagnostic context" DiagnosticContext_test <<END
return: 0
END
TEST "Diff: apply list diff to target sequence" DiffListApplication_test <<END
return: 0
END

View file

@ -286,7 +286,7 @@ namespace test{
verify_FilterChanges();
verify_asIterSource();
verify_IterSource();
verify_resultSum();
verify_reduceVal();
verify_effuse();
verify_depthFirstExploration();
@ -1070,20 +1070,39 @@ namespace test{
/** @test verify _terminal operation_ to sum up all values from the pipeline.
/** @test verify _terminal operation_ to sum or reduce all values from the pipeline.
*/
void
verify_resultSum()
verify_reduceVal()
{
auto accumulated = explore(CountDown{6})
.transform([](int i){ return i-1; })
auto accumulated = explore(CountDown{30})
.transform([](int i){ return i-1; }) // note: implicitly converts uint -> int
.resultSum();
using Res = decltype(accumulated);
CHECK (lib::test::showType<Res>() == "int"_expect);
auto expectedSum = [](auto N){ return N*(N+1) / 2; };
CHECK (accumulated == expectedSum(5));
CHECK (accumulated == expectedSum(29));
// In the general case an accessor and a junctor can be given...
CHECK (explore(CountDown{10})
.reduce([](int i){ return i - 0.5; } // accessor: produce a double
,[](string accu, float val)
{
return accu+">"+util::toString(val); // junctor: convert to String and combine with separator char
}
, string{">-"} // seedVal: starting point for the reduction; also defines result type
)
== ">->9.5>8.5>7.5>6.5>5.5>4.5>3.5>2.5>1.5>0.5"_expect);
// If only the accessor is given, values are combined by std::plus...
CHECK (explore(CountDown{9})
.reduce([](auto it) -> string
{
return _Fmt{"○%s●"} % *it; // accessor: format into a string
})
== "○9●○8●○7●○6●○5●○4●○3●○2●○1●"_expect);
}

View file

@ -0,0 +1,146 @@
/*
SyncBarrierPerformance(Test) - investigate performance of yield-waiting synchronisation
Copyright (C) Lumiera.org
2023, Hermann Vosseler <Ichthyostega@web.de>
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License as
published by the Free Software Foundation; either version 2 of
the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
* *****************************************************/
/** @file sync-barrier-performance-test.cpp
** unit test \ref SyncBarrierPerformance_test
*/
#include "lib/test/run.hpp"
#include "lib/sync-barrier.hpp"
#include "lib/iter-explorer.hpp"
#include "lib/util-foreach.hpp"
#include <chrono>
#include <thread>
#include <atomic>
#include <array>
using test::Test;
using util::and_all;
using lib::explore;
using std::array;
using std::atomic_uint;
using std::this_thread::sleep_for;
using namespace std::chrono_literals;
namespace lib {
namespace test {
namespace {// Test setup for a concurrent calculation with checksum....
const uint NUM_THREADS = 1024;
atomic_uint stage1{0};
atomic_uint stage2{0};
atomic_uint finish{0};
SyncBarrier interThread{NUM_THREADS };
SyncBarrier afterThread{NUM_THREADS+1};
/**
* A test thread to perform a summation protocol including synchronisation points
* - build a compound sum of random numbers in the first stage
* - wait for the compound sum to build up completely
* - book in the compound sum plus a further random number
*/
class TestThread
: std::thread ////////////////////////////////////////////////////////////////////OOO TOD-oh
{
public:
TestThread()
: thread{[&]()
{ //-STAGE-1------------------------------
localSum = rand() % 1000; // generate local value
stage1.fetch_add (localSum); // book in local value
interThread.sync(); // wait for all other threads to have booked in
//-STAGE-2------------------------------
uint sync = stage1; // pick up compounded sum from STAGE-1
localSum += rand() % 1000; // add further local value for STAGE-2
stage2.fetch_add (localSum+sync); // book in both local values and synced sum
afterThread.sync(); // wait for other threads and supervisor
finish.fetch_add(1); // mark completion of this thread
thread::detach(); //////////////////////////////////////////////OOO Wech-oh
}}
{ }
uint localSum; // *deliberately* not initialised to avoid race
bool isRunning() const { return thread::joinable(); } ///////////////////////OOO Wack-oh
};
/** sum up all `localSum` fields from all TestThread instances in a container */
template<class CON>
uint
sumLocals (CON const& threads)
{
return explore (threads)
.reduce ([&](TestThread const& t){ return t.localSum; });
}
}//(End)Test setup
/*******************************************************************//**
* @test investigate performance of N-fold thread synchronisation.
* - start a _huge number_ of TestThread
* - all those pick up the partial sum from stage1
* @remark without coordinated synchronisation, some threads would see
* an incomplete sum and thus the stage2 checksum would be lower
* @see lib::SyncBarrier
* @see steam::control::DispatcherLoop
*/
class SyncBarrierPerformance_test : public Test
{
virtual void
run (Arg)
{
array<TestThread,NUM_THREADS> threads;
CHECK (0 == finish);
CHECK (and_all (threads, [](auto& t){ return t.isRunning(); }));
afterThread.sync();
sleep_for (5ms); // give the threads a chance to terminate
CHECK (NUM_THREADS == finish); // all threads have passed out....
CHECK (0 < stage1);
CHECK (stage1 < stage2);
CHECK (stage2 > sumLocals(threads));
CHECK (stage2 == sumLocals(threads) + NUM_THREADS*stage1); // this holds only if all threads waited to get the complete stage1 sum
}
};
/** Register this test class... */
LAUNCHER (SyncBarrierPerformance_test, "function common");
}} // namespace lib::test

View file

@ -97,10 +97,8 @@ namespace test {
uint
sumLocals (CON const& threads)
{
uint sum{0};
explore (threads)
.foreach ([&](TestThread const& t){ sum += t.localSum; });
return sum;
return explore (threads)
.reduce ([&](TestThread const& t){ return t.localSum; });
}
}//(End)Test setup

File diff suppressed because it is too large Load diff