StatArb: Rx on DataFlows (Visual Studio Async CTP) – Part 4
In Part 3 of this series I leverage Rx ToObservable() using the following extension methods (thanks to a colleage at Microsoft):
public static class Extensions
{
public static IObservable<T> ToObservable<T>(this ISourceBlock<T> source)
{
return Observable.CreateWithDisposable<T>(observer =>
{
var target = new ObserverTargetBlock<T> { m_observer = observer };
var ctsd = new CancellationDisposable();
source.CompletionTask.ContinueWith(t =>
{
if (t.IsFaulted) observer.OnError(t.Exception);
else observer.OnCompleted();
}, ctsd.Token);
return new CompositeDisposable(ctsd, source.LinkTo(target));
});
}
internal sealed class ObserverTargetBlock<T> : ITargetBlock<T>
{
internal IObserver<T> m_observer;
public DataflowMessageStatus OfferMessage(DataflowMessage<T> message, ISourceBlock<T> source, bool consumeToAccept)
{
if (consumeToAccept) message = source.ConsumeMessage(message, this);
if (message == null) return DataflowMessageStatus.NotAvailable;
m_observer.OnNext(message.Value);
return DataflowMessageStatus.Accepted;
}
public void DeclinePermanently() { throw new NotSupportedException(); }
public bool Post(T item) { throw new NotSupportedException(); }
public Task CompletionTask { get { throw new NotSupportedException(); } }
}
}
But I haven’t used the inverse yet:
public static ISourceBlock<T> ToSourceBlock<T>(this IObservable<T> source)
{
var buffer = new BufferBlock<T>();
source.Subscribe(i => buffer.Post(i), () => buffer.DeclinePermanently());
return buffer;
}
