StatArb: Rx CombineLatest and JoinBlock (Visual Studio Async CTP) – Part 3


I think I have finally figured out why has been bugging me about JoinBlock. Essentially I want the functionality of CombineLatest (use Rx Sandbox for the marble diagram) from JoinBlock. Hence’s I’ve adapted the previously blogged code as follows (ignore the IObservable leak):

        public void AddConstituents(ConstituentsAgent constituentsAgent)
        {
//            constituentsAgent.OutgoingPriceStream.LinkTo(_calc);
            var o = constituentsAgent.OutgoingPriceStream.ToObservable().
                CombineLatest(_fxAgent.OutgoingFxRates.ToObservable(),
                    (stockPrice, ccyPair) =>
                    {
                        Console.WriteLine("CombineLatest {0} {1} {2} {3}", stockPrice.Stock, stockPrice.Price, ccyPair.Stock, ccyPair.Price);
                        return new MarketData(stockPrice.Price * ccyPair.Price, stockPrice.Stock);
                    });

            o.Subscribe(md => _calc.Post(md));
        }

I’ve also now introduced a currency pair agent to allow normalisation of the stock values to USD. Again ignore the hardcoding :)

    class CurrencyPairAgent
    {
        private readonly string _ccyPair;
        private readonly BroadcastBlock<MarketData> _fx;

        public ISourceBlock<MarketData> OutgoingFxRates { get; private set; }

        public CurrencyPairAgent(string ccyPair)
        {
            _ccyPair = ccyPair;
            _fx = new BroadcastBlock<MarketData>(md => md, new DataflowBlockOptions(TaskScheduler.Default, DataflowBlockOptions.UnboundedDegreeOfParallelism));
            OutgoingFxRates = _fx;

            Run();
        }

        private async void Run()
        {
            var rand = new Random();
            while (true)
            {
                await TaskEx.Delay(rand.Next(0, 1000));
                _fx.Post(new MarketData(Math.Round(1.6 + rand.NextDouble(), EftPrecision.DecimalPlaces), _ccyPair));
            }
        }
    }

Obviously the overall design has a slight flaw – not all the constituents of the ETF will be using the same GBPUSD rate at any one time, but for now we can live with that. I also need to sort out some tests for the ETF soon :) I also need to consider TaskScheduler’s soon as well.

About these ads

~ by mdavey on November 18, 2010.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

 
Follow

Get every new post delivered to your Inbox.

Join 640 other followers

%d bloggers like this: