Unsubscribing from an RxJava Observable Mid-Stream

Imagine a scenario where you need to create an Observable sequence that will acquire a resource, do some processing, and release the resource when unsubscribed from. An example of something like this might be an Observable that acquires a lock and releases it when unsubscribed from. That “lock” signal could be merged with a second signal that does some work that should only be done after the lock has been acquired.

Now what if you needed to chain that first operation with a second operation that could only execute properly after the resource had been released? In other words, you need the first value emitted from the first Observable to be an input to the second Observable (probably using flatMap)—but the first observable needs to be unsubscribed from (to release the resource) before the emitted value is sent on down the stream.

In this post I’ll put together an example showing an scenario like this, along with a custom Transformer that can be composed into the stream such that it will unsubscribe from the first Observable before sending it’s emitted value down the stream.

Acquire the Resource

Here’s an Observable that acquires a resource and releases it when unsubscribed from. Continuing with the scenario above it will acquire some lock and release the lock.


Observable acquireResource = Observable.defer(() -> {
    MyResource resource = MyResource.acquire();
    System.out.println("Acquired the resource");
    return Observable
        .never()
        .doOnUnsubscribe(() -> {
            MyResource.release(resource)
            System.out.println("Released the resource");
        });
    });

This Observable defers acquiring the lock until it has a subscription, and then it releases the lock when unsubscribed from. By doing the release in the doOnUnsubscribe it ensures that the lock will be released regardless of completion, error, or the subscription being cancelled (unsubscribe() being called directly),

Do Some Work

Here’s an Observable that does some work that should only be done when the lock has been acquired.


Observable doWork = Observable.defer(() -> {
    System.out.println("Doing work");
    return Observable.just(100);
});

To make sure that the work is done only when the resource has been acquired, merge the Observable with the resource Observable coming first in the list. You need to use the case operator since you can’t merge different types of Observables.


Observable doWorkWhileAcquired =
    Observable.merge(acquireResource.cast(Integer.class), doWork);

Chaining Operations

Taking the example further, let’s chain another Observable that does some more work, and takes the output of the previous Observable. This second operation needs to acquire the resource as well so it too needs to be merged with the “acquire” Observable, and then chained with the first.


public static final Observable doMoreWork(Integer input) {
    return  Observable.defer(() -> {
        System.out.println("Doing more work with input: " + input);
        return Observable.just(input + 50);
    });
}
      
Observable combinedOperations = doWorkWhileAcquired
    .take(1)
    .flatMap(value -> acquireResource
        .cast(Integer.class)
        .mergeWith(doMoreWork(value)))
    .take(1);

If you were to subscribe to the combinedOperations observable and run it, the output would look like this:

Acquired the resource
Doing work
Acquired the resource
Doing more work, input: 100
Released the resource
Released the resource

Notice that it acquires the resource a second time before releasing it, which is not the desired behavior. We need a way to unsubscribe from the doWorkWhileAcquired Observable before the value is sent into the flatMap.

Release the Resource

This can be done using a custom Transformer that gets composed into the stream (see the excellent article Don’t break the chain: use RxJava’s compose() operator for more on compose()). The Transformer uses a combination of the takeUntil operator and a BehaviorSubject. The first value emitted from the input Observable is sent on the BehaviorSubject. The takeUntil operator will unsubscribe from its Observable when the other Observable sends a value. By merging that with the subject that will emit the value, the value will be sent on down the stream, but only after the input Observable has been unsubscribed from.

Here’s the code for the Transformer:


public static  Observable.Transformer takeNextAndUnsubscribe() {
    return observable -> {
        BehaviorSubject subject = BehaviorSubject.create();
        Observable source = observable.doOnNext(value -> subject.onNext(value));
        return Observable
            .merge(source.takeUntil(subject), subject)
            .take(1);
    };
}

And the updated combinedOperations code, replacing the take(1) with a compose() using our custom Transformer:


Observable combinedOperations = doWorkWhileAcquired
    .compose(takeNextAndUnsubscribe())
    .flatMap(value -> acquireResource
        .cast(Integer.class)
        .mergeWith(doMoreWork(value)))
    .take(1);

Running this new code will give this output:

Acquired the resource
Doing work
Released the resource
Acquired the resource
Doing more work, input: 100
Released the resource

You can see it now releases the resource before advancing and acquiring it again.

And finally, here's a gist with all of the code together in one place.

Conversation
  • bifri says:

    Isn’t it a cleaner way to use concat instead of merge as merge can interleave the items? Or did I just misunderstand the idea?

    Merge may interleave the items emitted by the merged Observables (a similar operator, Concat, does not interleave items, but emits all of each source Observable’s items in turn before beginning to emit items from the next source Observable).

    • Patrick Bacon Patrick Bacon says:

      bifri – The problem with using concat is that it doesn’t subscribe to the next Observable until the first one completes. In this case the first Observable is the thing that acquires the resource/lock, and it never completes on its own. The second Observable, the one that is actually doing something that requires the resource/lock, needs to be subscribed to after the resource/lock has been acquired – which is what happens with the merge. If I used concat that second Observable would never get subscribed to.

  • Comments are closed.