StatArb: Rx on DataFlows (Visual Studio Async CTP) – Part 4

In Part 3 of this series I leverage Rx ToObservable() using the following extension methods (thanks to a colleage at Microsoft):

    public static class Extensions
        public static IObservable<T> ToObservable<T>(this ISourceBlock<T> source)
            return Observable.CreateWithDisposable<T>(observer =>
                var target = new ObserverTargetBlock<T> { m_observer = observer };
                var ctsd = new CancellationDisposable();
                source.CompletionTask.ContinueWith(t =>
                    if (t.IsFaulted) observer.OnError(t.Exception);
                    else observer.OnCompleted();
                }, ctsd.Token);
                return new CompositeDisposable(ctsd, source.LinkTo(target));

        internal sealed class ObserverTargetBlock<T> : ITargetBlock<T>
            internal IObserver<T> m_observer;

            public DataflowMessageStatus OfferMessage(DataflowMessage<T> message, ISourceBlock<T> source, bool consumeToAccept)
                if (consumeToAccept) message = source.ConsumeMessage(message, this);
                if (message == null) return DataflowMessageStatus.NotAvailable;
                return DataflowMessageStatus.Accepted;

            public void DeclinePermanently() { throw new NotSupportedException(); }
            public bool Post(T item) { throw new NotSupportedException(); }
            public Task CompletionTask { get { throw new NotSupportedException(); } }

But I haven’t used the inverse yet:

public static ISourceBlock<T> ToSourceBlock<T>(this IObservable<T> source)
    var buffer = new BufferBlock<T>();
    source.Subscribe(i => buffer.Post(i), () => buffer.DeclinePermanently());
    return buffer;

~ by mdavey on November 18, 2010.

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )


Connecting to %s

%d bloggers like this: