Parallel Reduction and MapReduce

The following demonstrates using GPC API to parallelize the reduction algorithm by computing a sum in parallel (just for the illustration purpose). The image below depicts the computation flow with an assumption that the number of leaves is unknown in advance:

  1. Add the .h file and the following code to it:
    #pragma once
    #include "tef/Tentity.h"
    
    class HelloReduction : public tef::RequestHandler
    {
    public:
    	HelloReduction();
    	virtual ~HelloReduction();
    	virtual const char* Path() const override;
    	virtual void Process(tef::RequestContext& context) const override;
    };
                    
  2. Add the .cpp file and the following code to it:
    #include "stdafx.h"
    #include "HelloReduction.h"
    
    static HelloReduction Instance;
    
    HelloReduction::HelloReduction()
    {
    	tef::Register(this);
    }
    
    HelloReduction::~HelloReduction()
    {
    }
    
    const char* HelloReduction::Path() const
    {
    	return "/sum";
    }
    
    typedef gpc::qumex<int, int> level_t;
    typedef gpc::qumex<int, level_t> tree_t;
    
    struct dosum
    {
    	int sum;
    	int index;
    	int level;
    	tree_t mtx;
    
    	dosum(int sum, int node_index, int tree_level, tree_t& mtx)
    		: sum(sum), index(node_index), level(tree_level), mtx(mtx)
    	{
    	}
    
        // the merge/reduction
    	void operator()(gpc::mutex<int,int>& state)
    	{
    		sum += state.getData();
    		if( state.counter() == 2 ) {
    			// after two nodes are merged, continue to the next level
    			level++;
    			join();
    		} else {
    			state.setData(sum);
    		}
    	}
    	
    	// the fork/map
    	void operator()()
    	{
    		// here do some parallel work, then
    		join();
    	}
    
    	void join()
    	{
    		index /= 2;
    		dosum $this(*this);
    		mtx.join(level, [$this](gpc::mutex<level_t, int>& state) {
    			state.getData().join($this.index, dosum($this));
    		});
    	}
    };
    
     // the HTTP entry
    void HelloReduction::Process(tef::RequestContext& context) const
    {
        tree_t mtx;
    
    	int number = context.Input[L"num"];
    	int count = 0;
    	while( count < number ) {
    		// the map phase in parallel
    		gpc::fork(dosum(count, count, 1, mtx));
    		count++;
    	}
    	if( count == 0 ) {
    		tef::JsonOutput<128> json(context.Output);
    		json.writeNumberValue(0);
    		json.flush();
    		return;
    	}
    	
    	int level = 0;
    	while( count > 1 ) {
    		level++;
    		if( count & 1 ) {
    			// every level of the reduction must have an even number of nodes
    			gpc::fork(dosum(0, count, level, mtx));
    			count++;
    		}
    		count /= 2;
    	}
    	// when this reduction is complete, respond with the result
    	level++;
    	mtx.join(level, [context](gpc::mutex<level_t, int>& state) {
    		// join node 0 on this level
    		state.getData().join(0, [context](gpc::mutex<int, int>& state) {
    			if( state.counter() == 1 ) {
    				// rejoin means this callback function must be executed again after the next task on this mutex
    				state.rejoin();
    			} else {
    				tef::RequestContext req(context);
    				tef::JsonOutput<128> json(req.Output);
    				json.writeNumberValue(state.getData());
    				json.flush();
    				req.Output.End();
    			}
    		});
    	});
    }
                    
  3. Compile (F7) and run the project (F5).
  4. In a browser, enter http://yourWebSqlServer/sum.jsx?num=4.
  5. The browser will render "6".

See Also:

Getting Started with C++

C++ GPC Reference

Table of Contents