ACE, Theron and Concurrency Runtime

I’ve spent a little time looking at ACE and ACE and Theron recently. Primarily due to the Disruptor noise :) . Theron offers some idea of performance here, and hence maybe an interesting comparison with regards to the disruptor performance tests – for example the diamond test (produce an event replicated to two consumer and fold back to a single third consumer) from a viewpoint of leveraging Theron’s agents as the consumer’s. I haven’t figured out how beneficial ACE would be in a disruptor like mode – curious if anyone has some pointers on this?

Curious to see in the Microsoft September build conference what will be offered in the parallel computing arena around improved throughput of data – especially to aid any Disruptor C++ implementation. Just found Don McCrady – Parallelism in C++ Using the Concurrency Runtime presentation on Channel 9 which might be worth watching.

So to some Asynchronous Agents Library code :) . Essentially the below is very bare bones, but attempts (possibly badly as I haven’t read or applied Best Practices in the Asynchronous Agents Library) to leverage agents to satisfy event producer, replicated to two consumer and fold back to a single third consumer.

#include <agents.h>
#include <functional>

using namespace Concurrency;
using namespace std;

class action_block : public agent
{
public:
	action_block(overwrite_buffer<long>* inbuffer, unbounded_buffer<bool>* outbuffer, function<bool(long)>& f, long iterations)
		: m_inbuffer(inbuffer), m_outbuffer(outbuffer), m_f(f), m_iterations(iterations) {}

protected:
    void run()
    {
		for (int i=0; i < m_iterations; i++) {
			long val = receive(m_inbuffer);
			bool newVal = m_f(val);
			send(m_outbuffer, newVal);
		}

        done();
    }
private:
	overwrite_buffer<long>* m_inbuffer;
	unbounded_buffer<bool>* m_outbuffer;
	function<bool(long)>& m_f;
	long m_iterations;
};

#include <agents.h>

using namespace Concurrency;
using namespace std;

class join_block : public agent
{
public:
	join_block(transformer<long, bool> * buffer, transformer<long, bool> * buffer2, long iterations)
		: m_buffer(buffer), m_buffer1(buffer2), m_iterations(iterations) {}

	long messageCount;
protected:
    void run()
    {
		Concurrency::join<bool,Concurrency::greedy> j(2);
		m_buffer->link_target(&j); 
		m_buffer1->link_target(&j);

		messageCount=0;
		for (int i=0; i < m_iterations; i++) {
			auto result = receive(j);
			if (result[0] && result[1])
				++messageCount;
		}

		done();
    }
private:
	transformer<long, bool> * m_buffer;
	transformer<long, bool> * m_buffer1;
	long m_iterations;
};

#include "stdafx.h"
#include <agents.h>
#include <string>
#include <iostream>
#include <sstream>
#include <windows.h>
#include <assert.h>
#include "agents_extras.h"
#include "action_block.h"
#include "join_block.h"

using namespace std;
using namespace Concurrency;
using namespace samples;

const long Iterations = 10;

int _tmain(int argc, _TCHAR* argv[])
{
	overwrite_buffer<long> ob1;

	function<bool(long)> f = [](long x) {return (x % 3L) == 0;};
	function<bool(long)> f2 = [](long x) {return (x % 5L) == 0;};

	transformer<long, bool> t1(f);
	transformer<long, bool> t2(f2);
	ob1.link_target(&t1);
	ob1.link_target(&t2);

	join_block sum(&t1, &t2, Iterations);
	sum.start();

	for (long i=0; i < Iterations; i++) {
		asend(ob1, i);
	}

	agent::wait(&sum);
}

As I hinted at above, the code is ‘basic’ and hence its not optimized as appropriate :( . Possibly in the future I will try to optimize the code. Its also not worth running this code and doing a comparison with the disruptor – basic means, basic :)

Advertisement

~ by mdavey on July 15, 2011.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Connecting to %s

 
Follow

Get every new post delivered to your Inbox.

Join 273 other followers