2 Comments

Unsubscribing from an RxJava Observable Mid-Stream

Imagine a scenario where you need to create an Observable sequence that will acquire a resource, do some processing, and release the resource when unsubscribed from. An example of something like this might be an Observable that acquires a lock and releases it when unsubscribed from. That “lock” signal could be merged with a second signal that does some work that should only be done after the lock has been acquired.

Now what if you needed to chain that first operation with a second operation that could only execute properly after the resource had been released? In other words, you need the first value emitted from the first Observable to be an input to the second Observable (probably using flatMap)—but the first observable needs to be unsubscribed from (to release the resource) before the emitted value is sent on down the stream.

In this post I’ll put together an example showing an scenario like this, along with a custom Transformer that can be composed into the stream such that it will unsubscribe from the first Observable before sending it’s emitted value down the stream.

Acquire the Resource

Here’s an Observable that acquires a resource and releases it when unsubscribed from. Continuing with the scenario above it will acquire some lock and release the lock.


Observable acquireResource = Observable.defer(() -> {
    MyResource resource = MyResource.acquire();
    System.out.println("Acquired the resource");
    return Observable
        .never()
        .doOnUnsubscribe(() -> {
            MyResource.release(resource)
            System.out.println("Released the resource");
        });
    });

This Observable defers acquiring the lock until it has a subscription, and then it releases the lock when unsubscribed from. By doing the release in the doOnUnsubscribe it ensures that the lock will be released regardless of completion, error, or the subscription being cancelled (unsubscribe() being called directly),

Do Some Work

Here’s an Observable that does some work that should only be done when the lock has been acquired.


Observable doWork = Observable.defer(() -> {
    System.out.println("Doing work");
    return Observable.just(100);
});

To make sure that the work is done only when the resource has been acquired, merge the Observable with the resource Observable coming first in the list. You need to use the case operator since you can’t merge different types of Observables.


Observable doWorkWhileAcquired =
    Observable.merge(acquireResource.cast(Integer.class), doWork);

Chaining Operations

Taking the example further, let’s chain another Observable that does some more work, and takes the output of the previous Observable. This second operation needs to acquire the resource as well so it too needs to be merged with the “acquire” Observable, and then chained with the first.


public static final Observable doMoreWork(Integer input) {
    return  Observable.defer(() -> {
        System.out.println("Doing more work with input: " + input);
        return Observable.just(input + 50);
    });
}
      
Observable combinedOperations = doWorkWhileAcquired
    .take(1)
    .flatMap(value -> acquireResource
        .cast(Integer.class)
        .mergeWith(doMoreWork(value)))
    .take(1);

If you were to subscribe to the combinedOperations observable and run it, the output would look like this:

Acquired the resource
Doing work
Acquired the resource
Doing more work, input: 100
Released the resource
Released the resource

Notice that it acquires the resource a second time before releasing it, which is not the desired behavior. We need a way to unsubscribe from the doWorkWhileAcquired Observable before the value is sent into the flatMap.

Release the Resource

This can be done using a custom Transformer that gets composed into the stream (see the excellent article Don’t break the chain: use RxJava’s compose() operator for more on compose()). The Transformer uses a combination of the takeUntil operator and a BehaviorSubject. The first value emitted from the input Observable is sent on the BehaviorSubject. The takeUntil operator will unsubscribe from its Observable when the other Observable sends a value. By merging that with the subject that will emit the value, the value will be sent on down the stream, but only after the input Observable has been unsubscribed from.

Here’s the code for the Transformer:


public static  Observable.Transformer takeNextAndUnsubscribe() {
    return observable -> {
        BehaviorSubject subject = BehaviorSubject.create();
        Observable source = observable.doOnNext(value -> subject.onNext(value));
        return Observable
            .merge(source.takeUntil(subject), subject)
            .take(1);
    };
}

And the updated combinedOperations code, replacing the take(1) with a compose() using our custom Transformer:


Observable combinedOperations = doWorkWhileAcquired
    .compose(takeNextAndUnsubscribe())
    .flatMap(value -> acquireResource
        .cast(Integer.class)
        .mergeWith(doMoreWork(value)))
    .take(1);

Running this new code will give this output:

Acquired the resource
Doing work
Released the resource
Acquired the resource
Doing more work, input: 100
Released the resource

You can see it now releases the resource before advancing and acquiring it again.

And finally, here's a gist with all of the code together in one place.