ACE, Theron and Concurrency Runtime
I’ve spent a little time looking at ACE and ACE and Theron recently. Primarily due to the Disruptor noise
. Theron offers some idea of performance here, and hence maybe an interesting comparison with regards to the disruptor performance tests – for example the diamond test (produce an event replicated to two consumer and fold back to a single third consumer) from a viewpoint of leveraging Theron’s agents as the consumer’s. I haven’t figured out how beneficial ACE would be in a disruptor like mode – curious if anyone has some pointers on this?
Curious to see in the Microsoft September build conference what will be offered in the parallel computing arena around improved throughput of data – especially to aid any Disruptor C++ implementation. Just found Don McCrady – Parallelism in C++ Using the Concurrency Runtime presentation on Channel 9 which might be worth watching.
So to some Asynchronous Agents Library code
. Essentially the below is very bare bones, but attempts (possibly badly as I haven’t read or applied Best Practices in the Asynchronous Agents Library) to leverage agents to satisfy event producer, replicated to two consumer and fold back to a single third consumer.
#include <agents.h>
#include <functional>
using namespace Concurrency;
using namespace std;
class action_block : public agent
{
public:
action_block(overwrite_buffer<long>* inbuffer, unbounded_buffer<bool>* outbuffer, function<bool(long)>& f, long iterations)
: m_inbuffer(inbuffer), m_outbuffer(outbuffer), m_f(f), m_iterations(iterations) {}
protected:
void run()
{
for (int i=0; i < m_iterations; i++) {
long val = receive(m_inbuffer);
bool newVal = m_f(val);
send(m_outbuffer, newVal);
}
done();
}
private:
overwrite_buffer<long>* m_inbuffer;
unbounded_buffer<bool>* m_outbuffer;
function<bool(long)>& m_f;
long m_iterations;
};
#include <agents.h>
using namespace Concurrency;
using namespace std;
class join_block : public agent
{
public:
join_block(transformer<long, bool> * buffer, transformer<long, bool> * buffer2, long iterations)
: m_buffer(buffer), m_buffer1(buffer2), m_iterations(iterations) {}
long messageCount;
protected:
void run()
{
Concurrency::join<bool,Concurrency::greedy> j(2);
m_buffer->link_target(&j);
m_buffer1->link_target(&j);
messageCount=0;
for (int i=0; i < m_iterations; i++) {
auto result = receive(j);
if (result[0] && result[1])
++messageCount;
}
done();
}
private:
transformer<long, bool> * m_buffer;
transformer<long, bool> * m_buffer1;
long m_iterations;
};
#include "stdafx.h"
#include <agents.h>
#include <string>
#include <iostream>
#include <sstream>
#include <windows.h>
#include <assert.h>
#include "agents_extras.h"
#include "action_block.h"
#include "join_block.h"
using namespace std;
using namespace Concurrency;
using namespace samples;
const long Iterations = 10;
int _tmain(int argc, _TCHAR* argv[])
{
overwrite_buffer<long> ob1;
function<bool(long)> f = [](long x) {return (x % 3L) == 0;};
function<bool(long)> f2 = [](long x) {return (x % 5L) == 0;};
transformer<long, bool> t1(f);
transformer<long, bool> t2(f2);
ob1.link_target(&t1);
ob1.link_target(&t2);
join_block sum(&t1, &t2, Iterations);
sum.start();
for (long i=0; i < Iterations; i++) {
asend(ob1, i);
}
agent::wait(&sum);
}
As I hinted at above, the code is ‘basic’ and hence its not optimized as appropriate
. Possibly in the future I will try to optimize the code. Its also not worth running this code and doing a comparison with the disruptor – basic means, basic
