A big Rx Release

13 Feb

Looks like the Rx team have been super busy.  After our Christmas release, we have another massive build to try out. 

Some key features:

  1. Publish, Prune and Replay have been reverted to their previous signatures.
  2. Expand operator that recursively expands an initial observable sequence using a selector function.
  3. A set of FastSubjects!

Full Release notes:

Various fixes:

  • Added generic variance annotations on ISubject<in T1, out T2> and IObserver<in TValue, out TResult>.
  • Enabling generic variance annotations for Silverlight 4 binaries.
  • No longer installing Silverlight assemblies to the GAC; fixes build issues with Expression Blend.
  • Made Unit, TimeStamped<T>, TimeInterval<T>, Notification<T> serializable.
  • Fixes to System.Linq.Async’s Single and SingleOrDefault operators.

· Scheduler improvements:

  • ControlScheduler and NewThreadScheduler now correctly use relative time.
  • CurrentThreadScheduler, EventLoopScheduler, NewThreadScheduler and VirtualScheduler now normalize time properly.
  • ImmediateScheduler, CurrentThreadScheduler and NewThreadScheduler no longer cause a thread sleep if the specified TimeSpan is zero.
  • DispatcherScheduler, ThreadPoolScheduler and SynchronizationContextScheduler avoid using a timer if a scheduled action’s specified TimeSpan is zero.
  • CurrentThreadScheduler on Windows Phone 7 no longer depends on ThreadStaticAttribute which is not implemented on Windows Phone 7.

· Changes to observable operators:

  • Multicast, Publish, Prune and Replay binding operators:
    • Changed return type of Multicast to IConnectableObservable<T>.
    • Changed Multicast overload with an observable sequence selector function to take in a subject selector function, allowing deferred construction of the subject being used.
    • Bringing back Publish, Prune and Replay operators, each of them calling into Multicast to fix the subject parameterization ([Behavior]Subject, AsyncSubject and ReplaySubject respectively).
    • Removed Publish overload that simply returns an IObservable<T>.
  • Buffering and windowing operators:
    • BufferWithTime, BufferWithCount and BufferWithTimeOrCount now return IObservable<IList<T>> as they used to do before.
    • Added WindowWithTime, WindowWithCount and WindowWithTimeOrCount which return IObservable<IObservable<T>>, as special cases of the general purpose Window operator.
    • Fixed issue with duplicate or missing values due to timer errors.
  • Added ToList, ToArray, ToDictionary and ToLookup aggregation standard query operators, returning an observable sequence with the (single) aggregated value.
  • Added SequenceEqual operator which checks that the values in a stream are equal (but not the timing).
  • Added Expand operator that recursively expands an initial observable sequence using a selector function.
  • Added Concat overload that takes an observable sequence of observable sequences and produces a single observable sequence which exhausts each inner observable sequence before subscribing to the next observable sequence.
  • Added Merge overload that allows specifying the maximum number of active sequences being merged at any point in time, queuing the rest in a way similar to Concat.
  • Added ManySelect operator which is the comonadic bind operator and is similar to ContinueWith on tasks but can be used for streams.
  • Added IgnoreValues operator which behaves as Where(_ => false).
  • Added GroupByUntil operator which is similar to GroupBy except that there’s a duration selector function to control the lifetime of each generated group.
  • Removed Drain. The same behavior can be restored using the (updated) more general purpose Concat and Merge(n) operators.
  • Fixed memory leaks in GroupJoin and Join where these operators held onto window disposables until the end of the (potentially infinite) outer source.

· Subjects changes:

  • Added FastSubject, FastBehaviorSubject, FastAsyncSubject and FastReplaySubject which are much faster than regular subjects but:
    • don’t decouple producer and consumer by an IScheduler (effectively limiting them to ImmediateScheduler);
    • don’t protect against stack overflow;
    • don’t synchronize input messages.
  • Fast subjects are used by Publish and Prune operators if no scheduler is specified.

· Qbservable changes:

o Operators that take multiple observable sequences now check whether the supplied sequences implement IQbservable<T>. If so, their expression tree gets stitched in; otherwise, a ConstantExpression is used to wrap the sequence.

Download the different release flavors here:

Tags: ,

Renewed!

11 Jan

Looks like I’ve been renewed as a Client App Dev MVP for 2011.  I was pretty happy about this and I look forward to the coming year with the UK tech community.

Looks like DDD Scotland will be my first outing and hope to see you there!

Tags:

New Version of Rx – Join Support

31 Dec

Looks like the Rx team released a lovely new version of the framework.  Highlights are Join and GroupJoin support and better naming (in my opinion) for the Publish, Prune and Replay operators.

Full release notes
Downloads:

Tags: , ,

Silverlight and the UI Thread – An Update

5 Dec

So it looks like Silverlight 5 will solve the UI thread issue.  They mention a low-latency networking mode, however, if there weren’t caveats, why do they make it sound like something you have to turn on?  Wouldn’t you always want a low-latency mode?

What is sounds like to me is that this new low-latency mode has some other hidden costs involved.  It will be interesting to see what these are, and what the threading model is for the new stack.  I’m truly hoping it doesn’t mean a single thread for networking and will allow us developers to choose.

Tags: , ,

New Addition

4 Dec

Welcome to Emma! Our beautiful new baby girl was born this morning after only 3 hours of labour. What a champion I have for a wife!

Looking forward to meeting her properly over the next few days.

Tags:

Silverlight and Networking – The UI Thread

23 Nov

The devil is in the detail as they say.  As Silverlight is essentially a client-based runtime, a large amount of resource will be invested in downloading data, posting data and generally interacting with resources external to the client.  This means frequent use of the WebClient or the associated WebRequest classes.

All of Silverlight’s APIs for interacting with resources on the web are async by nature.  This means that it looks by the design of the API that we can easily  do all our interaction on a separate thread and let the UI thread do it’s important rendering work.  However, as is always with Silverlight, all is not as it seems.  This is because even though Silverlight lets you do all the interaction on a thread other than the UI thread, the networking is ALWAYS marshalled back to the UI thread at some point in the networking stack.

What does this mean for me as a developer?
It means, that even though you have invested time and effort to keep the UI thread as empty as possible, the Silverlight runtime will execute networking operations back onto the thread that is in most applications, starved of resources.

Can we prove this?
Definitely.  Below I’ve included a solution to download which proves the point.  It allows the user to schedule (using Rx of course :) ), every second, a web request to a local long running http handler.  The picture below shows this:

Clicking either the webclient or webrequest buttons will in an infinite loop, on the thread pool, download the result from a local HttpHandler.  Clicking the 3rd button will simply do a Thread.Sleep() on the UI thread.  If you hook up Fiddler, as required by the solution, you will notice that the web requests stop, and wait until the UI thread is released.  Once the UI thread has been released, all the queued web requests will all start executing and catch up.

What does this mean for me?
This should, in my opinion, be considered a bug in the runtime.  In my current role, we stream pricing data very quickly to our controls with the data being incredibly time sensitive.  Our UI is also very busy rendering some quite complex controls, animations and UI interactions.  If our UI is busy rendering frames, it means it cannot process network calls.  If it is busy processing network calls, it cannot process rendering requests. For busy, complex LOB Silverlight application this poses a very large problem for the developer.  How can we justify networking shuddering to a halt, because a large control is being rendered? The answer is that unfortunately, there is no answer.  We either have to slow down our networking calls, potentially caching if possible, or render less to the UI.

This bug exists in both Silverlight 3.0 and 4.0, and I definitely think the runtime requires a fix.  Is this a problem where the browser is imposing this restriction?  Is it a simple restriction that the runtime places on us as developers for some esoteric reason?

Thanks must go to the guys in the office, who came up with this simple test to prove the UI interaction in the networking stack.

Source code download

Tags: , ,

ObservableCollection to IObservable

9 Nov

Ever wanted your ObservableCollection<T>’s CollectionChanged event wrapped up in an IObservable stream? Try this extension method:

public static IObservable<NotifyCollectionChangedEventArgs> ToStream(this ObservableCollection<T> source)
{
	var stream =  Observable.FromEvent(
			(EventHandler ev)
			   => new NotifyCollectionChangedEventHandler(ev),
			ev => source.CollectionChanged += ev,
			ev => source.CollectionChanged -= ev)
		.Select(e => e.EventArgs);
	return stream;
}

Tags: , , ,

Rx and Scheduling – A Misconception

2 Nov

A common misconception I see during work with the reactive framework is how scheduling is achieved.  Let us look at the following example.

var stream = Observable
    .Interval(TimeSpan.FromSeconds(1))
    .ObserveOn(Scheduler.ThreadPool)
    .Subscribe(i => Console.WriteLine("New Value" + i));

What this simple example is doing, is every 1 second, a value of type long is yielded, starting at zero and incrementing forever. Our ObserveOn call marshals the observation to a thread in the threadpool and we output the result to the Console.

A concern was raised a while back in that as we’re observing on a thread pool thread, we’re holding on to this threadpool thread for the duration of the stream. As we know, thread pool threads should not be held on for any long duration of time so this could potentially be a bad thing.

However, the truth of Rx is much simpler. The ObserveOn call simply tells Rx to use the specified scheduler when a new value is OnNexted. Therefore, every second, Rx pulls a thread from the thread pool and marshals the call to that thread. On that basis, we can see we’re using the thread for only the period of time that the code in the OnNext function is being run.

Don’t believe me? Pop this code into Linqpad or a console app, and you can see the results for yourself:

var stream = Observable.Interval(TimeSpan.FromSeconds(1));
stream
	.ObserveOn(Scheduler.NewThread)
	.Subscribe(i => String.Format("New Value on Thread {0}", Thread.CurrentThread.ManagedThreadId).Dump());

Tags: , , ,

Rx and the Async CTP

1 Nov

With the news about C# 5.0 and the new asynchronous keywords (async and await), it comes as no surprise that the Reactive Framework team is in on this.

I was a bit dubious to the future of Rx after seeing Anders’ video, but on the day of the PDC, the Rx team released a new version of the library that allows us to use the new async compiler magic with Rx.

Since this new async CTP doesn’t rely on framework features, a new runtime or anything other than convention, all that the Rx team needed to do to have the framework work, allow the conversion of an IObservable<T> to a class that conforms to the Awaiter convention.  As the spec says:

The expression t of an await-expression await t is called the task of the await expression. The task t is required to be awaitable, which means that all of the following must hold:

  • (t).GetAwaiter() is a valid expression of type A.
  • Given an expression a of type A and an expression r of type System.Action, (a).BeginAwait(r) is a valid boolean-expression.
  • Given an expression a of type A, (a).EndAwait() is a valid expression.

A is called the awaiter type for the await expression. The GetAwaiter method is used to obtain an awaiter for the task.

The BeginAwait method is used to sign up a continuation on the awaited task. The continuation passed is the resumption delegate, which is further explained below.

The EndAwait method is used to obtain the outcome of the task once it is complete.

The method calls will be resolved syntactically, so all of GetAwaiter, BeginAwait and EndAwait can be either instance members or extension methods, or even bound dynamically, as long as the calls are valid in the context where the await expression appears. All of them are intended to be “non-blocking”; that is, not cause the calling thread to wait for a significant amount of time, e.g. for an operation to complete.


What this means to us Rx’ers, is that we can leverage the GetAwaiter() extension method provided in the Rx framework to convert between an IObservable<T> into something that works with the new async and await keywords.  This method can be found in System.Reactive.dll:

public static ObservableAwaiter<TSource> GetAwaiter<TSource>(this IObservable<TSource> source);

Tags: , , ,

A C# Wish – Static Extension Methods

1 Nov

Something I’d love to have in C# – static extension methods.  While extension methods are extremely powerful – look what LINQ and Rx have done with them, I’d love this extra little bit of help.

As an example, the Rx framework have quite a few static helper classes.  For example, Observable.Interval(Timespan) will return a stream of data that yields a value on every interval of the timespan you specify.  The Observable static class is full of these methods.

To make any methods you write that fit under the scope of this Observable class discoverable, you would ideally write an extension method on the Observable class.  However, in the current C# spec, static extension methods only can be called on an instance of a class and not on the the type.

Added to wish list. :)

Tags: , ,

Follow

Get every new post delivered to your Inbox.