The following demonstrates using GPC API to parallelize the reduction algorithm by computing a sum in parallel (just for the illustration purpose). The image below depicts the computation flow with an assumption that the number of leaves is unknown in advance:
#pragma once #include "tef/Tentity.h" class HelloReduction : public tef::RequestHandler { public: HelloReduction(); virtual ~HelloReduction(); virtual const char* Path() const override; virtual void Process(tef::RequestContext& context) const override; };
#include "stdafx.h" #include "HelloReduction.h" static HelloReduction Instance; HelloReduction::HelloReduction() { tef::Register(this); } HelloReduction::~HelloReduction() { } const char* HelloReduction::Path() const { return "/sum"; } typedef gpc::qumex<int, int> level_t; typedef gpc::qumex<int, level_t> tree_t; struct dosum { int sum; int index; int level; tree_t mtx; dosum(int sum, int node_index, int tree_level, tree_t& mtx) : sum(sum), index(node_index), level(tree_level), mtx(mtx) { } // the merge/reduction void operator()(gpc::mutex<int,int>& state) { sum += state.getData(); if( state.counter() == 2 ) { // after two nodes are merged, continue to the next level level++; join(); } else { state.setData(sum); } } // the fork/map void operator()() { // here do some parallel work, then join(); } void join() { index /= 2; dosum $this(*this); mtx.join(level, [$this](gpc::mutex<level_t, int>& state) { state.getData().join($this.index, dosum($this)); }); } }; // the HTTP entry void HelloReduction::Process(tef::RequestContext& context) const { tree_t mtx; int number = context.Input[L"num"]; int count = 0; while( count < number ) { // the map phase in parallel gpc::fork(dosum(count, count, 1, mtx)); count++; } if( count == 0 ) { tef::JsonOutput<128> json(context.Output); json.writeNumberValue(0); json.flush(); return; } int level = 0; while( count > 1 ) { level++; if( count & 1 ) { // every level of the reduction must have an even number of nodes gpc::fork(dosum(0, count, level, mtx)); count++; } count /= 2; } // when this reduction is complete, respond with the result level++; mtx.join(level, [context](gpc::mutex<level_t, int>& state) { // join node 0 on this level state.getData().join(0, [context](gpc::mutex<int, int>& state) { if( state.counter() == 1 ) { // rejoin means this callback function must be executed again after the next task on this mutex state.rejoin(); } else { tef::RequestContext req(context); tef::JsonOutput<128> json(req.Output); json.writeNumberValue(state.getData()); json.flush(); req.Output.End(); } }); }); }