StatArb: Rx on DataFlows (Visual Studio Async CTP) – Part 4


In Part 3 of this series I leverage Rx ToObservable() using the following extension methods (thanks to a colleage at Microsoft):

    public static class Extensions
    {
        public static IObservable<T> ToObservable<T>(this ISourceBlock<T> source)
        {
            return Observable.CreateWithDisposable<T>(observer =>
            {
                var target = new ObserverTargetBlock<T> { m_observer = observer };
                var ctsd = new CancellationDisposable();
                source.CompletionTask.ContinueWith(t =>
                {
                    if (t.IsFaulted) observer.OnError(t.Exception);
                    else observer.OnCompleted();
                }, ctsd.Token);
                return new CompositeDisposable(ctsd, source.LinkTo(target));
            });
        }

        internal sealed class ObserverTargetBlock<T> : ITargetBlock<T>
        {
            internal IObserver<T> m_observer;

            public DataflowMessageStatus OfferMessage(DataflowMessage<T> message, ISourceBlock<T> source, bool consumeToAccept)
            {
                if (consumeToAccept) message = source.ConsumeMessage(message, this);
                if (message == null) return DataflowMessageStatus.NotAvailable;
                m_observer.OnNext(message.Value);
                return DataflowMessageStatus.Accepted;
            }

            public void DeclinePermanently() { throw new NotSupportedException(); }
            public bool Post(T item) { throw new NotSupportedException(); }
            public Task CompletionTask { get { throw new NotSupportedException(); } }
        }
    }

But I haven’t used the inverse yet:

public static ISourceBlock<T> ToSourceBlock<T>(this IObservable<T> source)
{
    var buffer = new BufferBlock<T>();
    source.Subscribe(i => buffer.Post(i), () => buffer.DeclinePermanently());
    return buffer;
}
About these ads

~ by mdavey on November 18, 2010.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

 
Follow

Get every new post delivered to your Inbox.

Join 647 other followers

%d bloggers like this: