Caching Data Transforms on IObservables

International Observe the Moon Night I’ve been using ReactiveExtensions a lot lately in my C# code, and I am really impressed with the library. Every now and then, however, I want some additional features. Thankfully, due to Extension methods, they are really easy to add. Here’s one feature I added, which I call CachedSelect.

Streaming Data Transformation

Imagine that we have a source of data points, IObservable<Data[]>, and each Data point has a few values, call them Foo and Bar. This data comes in quite frequently from an external data source.

Now imagine several different UI widgets use that data stream to create their displays. For example, one panel might use the average value of Foo from the data points to color code a widget corresponding the status of the object represented by the data source. Another panel might take the average and display it as a string.

Data[]→Foo[]→Average ↗ Color
↘ string

A naive implementation of this pattern might look like so:

var colorObs = commonSource.Select(data => data.Select(d => d.Foo))
                           .Select(foo => Average(foo))
                           .Select(val => ValueToColor(val));


var averageStringObs = commonSource.Select(data => data.Select(d => d.Foo))
                                   .Select(foo => Average(foo))
                                   .Select(val => val.ToString());

There are a couple of problems with this approach. As is obvious from above, we would be extracting the Foo values and calculating the average twice for each new Data[] that comes in.


Unfortunately, even if we attempt a more “enlightened” approach, we have the same problem:

var averageObs = source.Select(data => data.Select(d => d.Foo))
                       .Select(data => Average(data));
var colorObs = averageObs.Select(val => ValueToColor(val));
var averageStringObs = averageObs.Select(val => val.ToString());

Because of how IObservable.Select works, we still end up selecting Foo and calculating the average twice for each data point. This can be fixed by using a “hot” observable:

var averageObs = source.Select(data => data.Select(d => d.Foo))
                       .Select(foo => Average(foo)).Publish().RefCount();
var colorObs = averageObs.Select(val => ValueToColor(val));
var averageStringObs = averageObs.Select(val => val.ToString());

This essentially makes a shareable IObservable that will only have the Average function called once, regardless of how many Selects occur off of the IObservable. The RefCount() call takes the IConnectedObservable and turns it back into an IObservable, with the twist that the IConnectedObservable will be connected (that is, the prior IObservable will be subscribed to) based on whether the RefCount Observable has any subscriptions.


Sounds good, right? Except we might still have a problem. If you want to make sure to reuse the RefCount IObservable that gives you the average, you have to have make it accessible to all the widgets that need it, and you better make sure you don’t recreate it from the IObservable<Data[]> each time. This is particularly problematic when UI widgets are created essentially statically or in different code paths, and both need access to the IObservable. There are a few ways around this, but most of them involve some sort of massive tree of IObservables. To me that sounds like a pain to manage.

So here’s what I did: I made a special implementation of IObservable that caches the Select functions you pass in, using a new method CachedSelect. If you pass in the same Func twice, you will get the same IObservable back. That way, as long as you start with the same common source, you can “re-create” the chain of select commands and end up sharing as much processing as possible.

One last detail: rather than use Publish().RefCount(), I use Replay(1).RefCount(). This is just like publishing, except new subscribers will get the last published value when they subscribe. This is crucial for UI widgets that might attach to a source which received values in the past but might not receive new values. This can happen, for example, if you use a DistinctUntilChanged somewhere in your processing, and you are sharing an observable created from that. You still want to see the last value that came in, and you don’t want to have to remember it yourself.

Bonus: Composability

An interesting side effect of this is that it becomes quite natural to define a class that contains a list of related transform Funcs to use, to guarantee that you reuse the same function (though C# is really good letting a lambda defined with no closed over variables permute into the same Func when invoked as one). This encourages testing your complicated transforms in isolation, and then using a simpler replacement when testing the creation of IObservables that chain together transforms.