Introduction to Rx – Reactive Extensions

Thirty years ago life was much simpler for desktop developers. Processors only had one core, GUIs, mice and event driven programming didn’t exist outside universities and UIs were simple affairs using cursor positioning on a text screen. Most software was developed procedurally at that time — object oriented programming hadn’t made the jump to mainstream.

Now it’s all become much more complex. Windows, mice, text and graphics are all manipulated by event driven object oriented software. Technologies like LINQ help to simplify the code, but some problems are still not easy to deal with.

One such problem, for instance, is pushing real-time updates to client software each time a database receives updates from a data feed.

Traditionally these updates are processed in one of two ways:

1. Polling for new data, checking every so often to see if there’s been an update.
2. Handle it via some low level Interrupt mechanism.

The second method is preferable since it only fires when data is available and is quicker, but it’s usually only possible with hardware. A serial port receiving data, for instance, could fire an interrupt that reads the byte value and stores the data in a buffer.

A newer approach to dealing with this is with asynchronous programming, with the await and async keywords, introduced to the .NET framework a year or two back. Using these keywords, it’s possible to write complicated code that attempts to address the problem. A better way though, is to use Reactive Extensions, another Microsoft technology for .NET, which allows high level software to subscribe to a data feed and receive notifications when new data becomes available.

Implementing Reactive Extensions

If you have Visual Studio you can install Rx Extensions into an existing and open solution with the NuGet Package Manager Console or use the SDK installer for Visual Studio Express. This adds four dlls to your solution’s references.

reactive-dlls

Then you just need to add the following line to the start of your program to get it to compile and run:

using System.Reactive.Linq;

Here’s an example from the Rx Guidelines PDF that shows how to asynchronously read a 4GB file in 64Kb chunks and write out to an encrypted file in just three lines of code.

var inFile = new FileStream(@"d:\temp\4GBfile.txt",
        FileMode.Open, FileAccess.Read, FileShare.Read,2 << 15, true) ;
//open a file for asynchronous writing in blocks of 64K
var outFile = new FileStream(@"d:\temp\Encrypted.txt",
        FileMode.OpenOrCreate, FileAccess.Write, FileShare.None, 2 << 15, true) ;

inFile.AsyncRead(2 << 15)
        .Select(Encrypt)
        .WriteToStream(outFile)
        .Subscribe(
        _=> Console.WriteLine("Successfully encrypted the file."),
        error=> Console.WriteLine(
                "An error occurred while 
encrypting the file: {0}", error.Message)) ;

Reactive Extensions is a library that’s added to an existing solution. The asynchronous data streams are represented by an Observable class (comparable to IEnumberable in LINQ) that provides a number of methods. You create a subscribing object that implements an IObserver interface to receive notifications when the Observable object has new data.

Lambda syntax and query comprehension syntax are commonplace with Rx, so some familiarity with LINQ and extension methods is useful. Keep in mind, with LINQ the data already exists, but with an Observable object it may not be there yet.

Here’s an example from the 101 Rx Samples Wiki that demonstrates a simple number feed. Observable.Interval(TimeSpan.FromSeconds(1)) generates an increasing number each second. It’s a little like a timer event adding a new number on the end of the list. The lowNums uses the where clause to filter out numbers 0..5. A subscription to this list calls a Console. WriteLine for each value received.

class Where_Simple
{
    static void Main()
    {
        var oneNumberPerSecond = Observable.Interval(TimeSpan.FromSeconds
(1)) ;

        var lowNums = from n in oneNumberPerSecond

   where n < 5

   select n;

        Console.WriteLine("Numbers < 5:") ;

        lowNums.Subscribe(lowNum =>
        {
            Console.WriteLine(lowNum) ;
        }) ;

        Console.ReadKey() ;
    }
}

This outputs the following numbers, each after a second-long gap. But absolutely no timers are used!

Numbers < 5:
0
1
2
3
4

In a future article, I’ll dig deeper into Rx to show how it can be used with mouse movement, to subscribe to the mouse cursor positions.

Post a Comment

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>