StatArb: Rx CombineLatest and JoinBlock (Visual Studio Async CTP) – Part 3
I think I have finally figured out why has been bugging me about JoinBlock. Essentially I want the functionality of CombineLatest (use Rx Sandbox for the marble diagram) from JoinBlock. Hence’s I’ve adapted the previously blogged code as follows (ignore the IObservable leak):
public void AddConstituents(ConstituentsAgent constituentsAgent)
{
// constituentsAgent.OutgoingPriceStream.LinkTo(_calc);
var o = constituentsAgent.OutgoingPriceStream.ToObservable().
CombineLatest(_fxAgent.OutgoingFxRates.ToObservable(),
(stockPrice, ccyPair) =>
{
Console.WriteLine("CombineLatest {0} {1} {2} {3}", stockPrice.Stock, stockPrice.Price, ccyPair.Stock, ccyPair.Price);
return new MarketData(stockPrice.Price * ccyPair.Price, stockPrice.Stock);
});
o.Subscribe(md => _calc.Post(md));
}
I’ve also now introduced a currency pair agent to allow normalisation of the stock values to USD. Again ignore the hardcoding
class CurrencyPairAgent
{
private readonly string _ccyPair;
private readonly BroadcastBlock<MarketData> _fx;
public ISourceBlock<MarketData> OutgoingFxRates { get; private set; }
public CurrencyPairAgent(string ccyPair)
{
_ccyPair = ccyPair;
_fx = new BroadcastBlock<MarketData>(md => md, new DataflowBlockOptions(TaskScheduler.Default, DataflowBlockOptions.UnboundedDegreeOfParallelism));
OutgoingFxRates = _fx;
Run();
}
private async void Run()
{
var rand = new Random();
while (true)
{
await TaskEx.Delay(rand.Next(0, 1000));
_fx.Post(new MarketData(Math.Round(1.6 + rand.NextDouble(), EftPrecision.DecimalPlaces), _ccyPair));
}
}
}
Obviously the overall design has a slight flaw – not all the constituents of the ETF will be using the same GBPUSD rate at any one time, but for now we can live with that. I also need to sort out some tests for the ETF soon
I also need to consider TaskScheduler’s soon as well.
