The following demonstrates using GPC API to parallelize the reduction algorithm by computing a sum in parallel (just for the illustration purpose). The image below depicts the computation flow with an assumption that the number of leaves is unknown in advance:
#pragma once
#include "tef/Tentity.h"
class HelloReduction : public tef::RequestHandler
{
public:
HelloReduction();
virtual ~HelloReduction();
virtual const char* Path() const override;
virtual void Process(tef::RequestContext& context) const override;
};
#include "stdafx.h"
#include "HelloReduction.h"
static HelloReduction Instance;
HelloReduction::HelloReduction()
{
tef::Register(this);
}
HelloReduction::~HelloReduction()
{
}
const char* HelloReduction::Path() const
{
return "/sum";
}
typedef gpc::qumex<int, int> level_t;
typedef gpc::qumex<int, level_t> tree_t;
struct dosum
{
int sum;
int index;
int level;
tree_t mtx;
dosum(int sum, int node_index, int tree_level, tree_t& mtx)
: sum(sum), index(node_index), level(tree_level), mtx(mtx)
{
}
// the merge/reduction
void operator()(gpc::mutex<int,int>& state)
{
sum += state.getData();
if( state.counter() == 2 ) {
// after two nodes are merged, continue to the next level
level++;
join();
} else {
state.setData(sum);
}
}
// the fork/map
void operator()()
{
// here do some parallel work, then
join();
}
void join()
{
index /= 2;
dosum $this(*this);
mtx.join(level, [$this](gpc::mutex<level_t, int>& state) {
state.getData().join($this.index, dosum($this));
});
}
};
// the HTTP entry
void HelloReduction::Process(tef::RequestContext& context) const
{
tree_t mtx;
int number = context.Input[L"num"];
int count = 0;
while( count < number ) {
// the map phase in parallel
gpc::fork(dosum(count, count, 1, mtx));
count++;
}
if( count == 0 ) {
tef::JsonOutput<128> json(context.Output);
json.writeNumberValue(0);
json.flush();
return;
}
int level = 0;
while( count > 1 ) {
level++;
if( count & 1 ) {
// every level of the reduction must have an even number of nodes
gpc::fork(dosum(0, count, level, mtx));
count++;
}
count /= 2;
}
// when this reduction is complete, respond with the result
level++;
mtx.join(level, [context](gpc::mutex<level_t, int>& state) {
// join node 0 on this level
state.getData().join(0, [context](gpc::mutex<int, int>& state) {
if( state.counter() == 1 ) {
// rejoin means this callback function must be executed again after the next task on this mutex
state.rejoin();
} else {
tef::RequestContext req(context);
tef::JsonOutput<128> json(req.Output);
json.writeNumberValue(state.getData());
json.flush();
req.Output.End();
}
});
});
}