I’ve been using ReactiveExtensions a lot lately in my C# code, and I am really impressed with the library. Every now and then, however, I want some additional features. Thankfully, due to Extension methods, they are really easy to add. Here’s one feature I added, which I call
Streaming Data Transformation
Imagine that we have a source of data points,
IObservable<Data>, and each
Data point has a few values, call them
Bar. This data comes in quite frequently from an external data source.
Now imagine several different UI widgets use that data stream to create their displays. For example, one panel might use the average value of Foo from the data points to color code a widget corresponding the status of the object represented by the data source. Another panel might take the average and display it as a string.
A naive implementation of this pattern might look like so:
var colorObs = commonSource.Select(data => data.Select(d => d.Foo)) .Select(foo => Average(foo)) .Select(val => ValueToColor(val));
var averageStringObs = commonSource.Select(data => data.Select(d => d.Foo)) .Select(foo => Average(foo)) .Select(val => val.ToString());
There are a couple of problems with this approach. As is obvious from above, we would be extracting the
Foo values and calculating the average twice for each new
Data that comes in.
Unfortunately, even if we attempt a more “enlightened” approach, we have the same problem:
var averageObs = source.Select(data => data.Select(d => d.Foo)) .Select(data => Average(data)); var colorObs = averageObs.Select(val => ValueToColor(val)); var averageStringObs = averageObs.Select(val => val.ToString());
Because of how
IObservable.Select works, we still end up selecting Foo and calculating the average twice for each data point. This can be fixed by using a “hot” observable:
var averageObs = source.Select(data => data.Select(d => d.Foo)) .Select(foo => Average(foo)).Publish().RefCount(); var colorObs = averageObs.Select(val => ValueToColor(val)); var averageStringObs = averageObs.Select(val => val.ToString());
This essentially makes a shareable
IObservable that will only have the
Average function called once, regardless of how many
Selects occur off of the
RefCount() call takes the
IConnectedObservable and turns it back into an
IObservable, with the twist that the
IConnectedObservable will be connected (that is, the prior
IObservable will be subscribed to) based on whether the
RefCount Observable has any subscriptions.
Sounds good, right? Except we might still have a problem. If you want to make sure to reuse the
RefCount IObservable that gives you the average, you have to have make it accessible to all the widgets that need it, and you better make sure you don’t recreate it from the
IObservable<Data> each time. This is particularly problematic when UI widgets are created essentially statically or in different code paths, and both need access to the
IObservable. There are a few ways around this, but most of them involve some sort of massive tree of
IObservables. To me that sounds like a pain to manage.
So here’s what I did: I made a special implementation of
IObservable that caches the
Select functions you pass in, using a new method
CachedSelect. If you pass in the same
Func twice, you will get the same
IObservable back. That way, as long as you start with the same common source, you can “re-create” the chain of select commands and end up sharing as much processing as possible.
One last detail: rather than use
Publish().RefCount(), I use
Replay(1).RefCount(). This is just like publishing, except new subscribers will get the last published value when they subscribe. This is crucial for UI widgets that might attach to a source which received values in the past but might not receive new values. This can happen, for example, if you use a
DistinctUntilChanged somewhere in your processing, and you are sharing an observable created from that. You still want to see the last value that came in, and you don’t want to have to remember it yourself.
An interesting side effect of this is that it becomes quite natural to define a class that contains a list of related transform
Funcs to use, to guarantee that you reuse the same function (though C# is really good letting a lambda defined with no closed over variables permute into the same
Func when invoked as one). This encourages testing your complicated transforms in isolation, and then using a simpler replacement when testing the creation of
IObservables that chain together transforms.