Article summary
Imagine a scenario where you need to create an Observable
sequence that will acquire a resource, do some processing, and release the resource when unsubscribed from. An example of something like this might be an Observable
that acquires a lock and releases it when unsubscribed from. That “lock” signal could be merged with a second signal that does some work that should only be done after the lock has been acquired.
Now what if you needed to chain that first operation with a second operation that could only execute properly after the resource had been released? In other words, you need the first value emitted from the first Observable to be an input to the second Observable (probably using flatMap
)—but the first observable needs to be unsubscribed from (to release the resource) before the emitted value is sent on down the stream.
In this post I’ll put together an example showing an scenario like this, along with a custom Transformer
that can be composed into the stream such that it will unsubscribe from the first Observable
before sending it’s emitted value down the stream.
Acquire the Resource
Here’s an Observable
that acquires a resource and releases it when unsubscribed from. Continuing with the scenario above it will acquire some lock and release the lock.
Observable acquireResource = Observable.defer(() -> {
MyResource resource = MyResource.acquire();
System.out.println("Acquired the resource");
return Observable
.never()
.doOnUnsubscribe(() -> {
MyResource.release(resource)
System.out.println("Released the resource");
});
});
This Observable
defers acquiring the lock until it has a subscription, and then it releases the lock when unsubscribed from. By doing the release
in the doOnUnsubscribe
it ensures that the lock will be released regardless of completion, error, or the subscription being cancelled (unsubscribe()
being called directly),
Do Some Work
Here’s an Observable
that does some work that should only be done when the lock has been acquired.
Observable doWork = Observable.defer(() -> {
System.out.println("Doing work");
return Observable.just(100);
});
To make sure that the work is done only when the resource has been acquired, merge the Observable
with the resource Observable
coming first in the list. You need to use the case
operator since you can’t merge different types of Observables.
Observable doWorkWhileAcquired =
Observable.merge(acquireResource.cast(Integer.class), doWork);
Chaining Operations
Taking the example further, let’s chain another Observable
that does some more work, and takes the output of the previous Observable
. This second operation needs to acquire the resource as well so it too needs to be merged with the “acquire” Observable
, and then chained with the first.
public static final Observable doMoreWork(Integer input) {
return Observable.defer(() -> {
System.out.println("Doing more work with input: " + input);
return Observable.just(input + 50);
});
}
Observable combinedOperations = doWorkWhileAcquired
.take(1)
.flatMap(value -> acquireResource
.cast(Integer.class)
.mergeWith(doMoreWork(value)))
.take(1);
If you were to subscribe to the combinedOperations observable and run it, the output would look like this:
Acquired the resource Doing work Acquired the resource Doing more work, input: 100 Released the resource Released the resource
Notice that it acquires the resource a second time before releasing it, which is not the desired behavior. We need a way to unsubscribe from the doWorkWhileAcquired
Observable before the value is sent into the flatMap
.
Release the Resource
This can be done using a custom Transformer
that gets composed into the stream (see the excellent article Don’t break the chain: use RxJava’s compose() operator for more on compose()
). The Transformer
uses a combination of the takeUntil
operator and a BehaviorSubject
. The first value emitted from the input Observable
is sent on the BehaviorSubject
. The takeUntil
operator will unsubscribe from its Observable
when the other Observable sends a value. By merging that with the subject that will emit the value, the value will be sent on down the stream, but only after the input Observable
has been unsubscribed from.
Here’s the code for the Transformer:
public static Observable.Transformer takeNextAndUnsubscribe() {
return observable -> {
BehaviorSubject subject = BehaviorSubject.create();
Observable source = observable.doOnNext(value -> subject.onNext(value));
return Observable
.merge(source.takeUntil(subject), subject)
.take(1);
};
}
And the updated combinedOperations
code, replacing the take(1)
with a compose()
using our custom Transformer:
Observable combinedOperations = doWorkWhileAcquired
.compose(takeNextAndUnsubscribe())
.flatMap(value -> acquireResource
.cast(Integer.class)
.mergeWith(doMoreWork(value)))
.take(1);
Running this new code will give this output:
Acquired the resource Doing work Released the resource Acquired the resource Doing more work, input: 100 Released the resource
You can see it now releases the resource before advancing and acquiring it again.
And finally, here's a gist with all of the code together in one place.
Isn’t it a cleaner way to use concat instead of merge as merge can interleave the items? Or did I just misunderstand the idea?
Merge may interleave the items emitted by the merged Observables (a similar operator, Concat, does not interleave items, but emits all of each source Observable’s items in turn before beginning to emit items from the next source Observable).
bifri – The problem with using
concat
is that it doesn’t subscribe to the next Observable until the first one completes. In this case the first Observable is the thing that acquires the resource/lock, and it never completes on its own. The second Observable, the one that is actually doing something that requires the resource/lock, needs to be subscribed to after the resource/lock has been acquired – which is what happens with themerge
. If I usedconcat
that second Observable would never get subscribed to.