Article summary
I’ve been using ReactiveExtensions a lot lately in my C# code, and I am really impressed with the library. Every now and then, however, I want some additional features. Thankfully, due to Extension methods, they are really easy to add. Here’s one feature I added, which I call CachedSelect
.
Streaming Data Transformation
Imagine that we have a source of data points, IObservable<Data[]>
, and each Data
point has a few values, call them Foo
and Bar
. This data comes in quite frequently from an external data source.
Now imagine several different UI widgets use that data stream to create their displays. For example, one panel might use the average value of Foo from the data points to color code a widget corresponding the status of the object represented by the data source. Another panel might take the average and display it as a string.
Data[]→Foo[]→Average | ↗ Color ↘ string |
A naive implementation of this pattern might look like so:
var colorObs = commonSource.Select(data => data.Select(d => d.Foo))
.Select(foo => Average(foo))
.Select(val => ValueToColor(val));
elsewhere:
var averageStringObs = commonSource.Select(data => data.Select(d => d.Foo))
.Select(foo => Average(foo))
.Select(val => val.ToString());
There are a couple of problems with this approach. As is obvious from above, we would be extracting the Foo
values and calculating the average twice for each new Data[]
that comes in.
Workarounds
Unfortunately, even if we attempt a more “enlightened” approach, we have the same problem:
var averageObs = source.Select(data => data.Select(d => d.Foo))
.Select(data => Average(data));
var colorObs = averageObs.Select(val => ValueToColor(val));
var averageStringObs = averageObs.Select(val => val.ToString());
Because of how IObservable.Select
works, we still end up selecting Foo and calculating the average twice for each data point. This can be fixed by using a “hot” observable:
var averageObs = source.Select(data => data.Select(d => d.Foo))
.Select(foo => Average(foo)).Publish().RefCount();
var colorObs = averageObs.Select(val => ValueToColor(val));
var averageStringObs = averageObs.Select(val => val.ToString());
This essentially makes a shareable IObservable
that will only have the Average
function called once, regardless of how many Select
s occur off of the IObservable
. The RefCount()
call takes the IConnectedObservable
and turns it back into an IObservable
, with the twist that the IConnectedObservable
will be connected (that is, the prior IObservable
will be subscribed to) based on whether the RefCount Observable
has any subscriptions.
CachedSelect
Sounds good, right? Except we might still have a problem. If you want to make sure to reuse the RefCount IObservable
that gives you the average, you have to have make it accessible to all the widgets that need it, and you better make sure you don’t recreate it from the IObservable<Data[]>
each time. This is particularly problematic when UI widgets are created essentially statically or in different code paths, and both need access to the IObservable
. There are a few ways around this, but most of them involve some sort of massive tree of IObservables
. To me that sounds like a pain to manage.
So here’s what I did: I made a special implementation of IObservable
that caches the Select
functions you pass in, using a new method CachedSelect
. If you pass in the same Func
twice, you will get the same IObservable
back. That way, as long as you start with the same common source, you can “re-create” the chain of select commands and end up sharing as much processing as possible.
One last detail: rather than use Publish().RefCount()
, I use Replay(1).RefCount()
. This is just like publishing, except new subscribers will get the last published value when they subscribe. This is crucial for UI widgets that might attach to a source which received values in the past but might not receive new values. This can happen, for example, if you use a DistinctUntilChanged
somewhere in your processing, and you are sharing an observable created from that. You still want to see the last value that came in, and you don’t want to have to remember it yourself.
Bonus: Composability
An interesting side effect of this is that it becomes quite natural to define a class that contains a list of related transform Func
s to use, to guarantee that you reuse the same function (though C# is really good letting a lambda defined with no closed over variables permute into the same Func
when invoked as one). This encourages testing your complicated transforms in isolation, and then using a simpler replacement when testing the creation of IObservables
that chain together transforms.