Learn Observables by Building a Stoplight

Recently, I’ve been working on a new codebase that uses observables heavily. Observables are new to me. As you could probably guess, wrapping my head around these things is a humbling experience. So, to better my understanding, I took a step back from the codebase to work through and write about a small isolated example. Here, I’ll walk you through what I learned from building a stoplight using observables.

Beginning

To begin, let’s create a TypeScript project with RxJS and jest as the dependencies. You can view the package.json I use here.

Next, we need to define the operating details of the stoplight. For the sake of an example, this will be a simple stoplight.

It should:

  • start with a default color
  • have a single face to direct traffic in one direction
  • have one active color at a time
  • cycle the active colors between green -> yellow -> red, in that order
  • change the active color every three seconds

Now that we know how this stoplight should work, we need to decide what variant of observable to use from the RxJS library to keep track of the active color.

Hot vs. Cold Observables

You should know two main archetypes of observables: hot and cold observables.

The gist of hot observables is that they produce a constant stream of data shared amongst their subscribers. On the other hand, the cold observable will only produce a stream of data once it’s subscribed to, and each subscriber gets their own copy of the data.

Considering that a stoplight is constantly producing a color regardless of whether a motorist is paying attention, it makes sense to use a hot observable to control the internal state of this stoplight.

There are a few different types of hot observables in the RxJS library that I won’t elaborate on in this post. The one we’ll use here is the BehaviorSubject. This is a descendant of the base observable type with the feature of emitting the latest value to all new subscribers. This is important to use so that each motorist who watches the stop light (i.e. subscribes to it) will see the same color.

Now you may be wondering, “What would a cold observable be used for?” As with any good example, we’ll use one of these in the implementation below!

Implementation

Time to start writing code.

First, create a spotlight.ts file and write a class for our implementation of this stoplight.


import { BehaviorSubject, interval } from "rxjs";
import { map, startWith, takeWhile } from "rxjs/operators";

const colors = ["green", "yellow", "red"] as const;
type StoplightColor = typeof colors[number];

export class Stoplight {
  private color$: BehaviorSubject<StoplightColor>;

  constructor(initialColor: StoplightColor, iterations: number = 0) {
     this.color$ = new BehaviorSubject<StoplightColor>(initialColor);
   }

   public getColor$(): Observable<StoplightColor> {
    return this.color$.asObservable();
   }

}

Above, we create an:

  • array constant of the colors sequence
  • union type of the possible color strings for the stoplight to show
  • private instance variable for the color$ stream
  • public instance method to return a read-only stream of the active color
  • definition for a constructor that initializes the active color and a number for iterations to avoid infinite loops while testing

Using a Cold Observable

The above code still doesn’t operate the stoplight the way we need it. We need to have it change its active color every three seconds. For this, we will use a cold observable.

We will use the interval cold observable to emit a value every three seconds. This serves as the clock for changing the active color.

Inside the constructor, we create the following interval and immediately subscribe to it. Remember: subscribing is what makes the observable code run.


constructor(initialColor: StoplightColor, iterations: number = 0) {
  this.color$ = new BehaviorSubject<StoplightColor>(initialColor);
  let colorCounter = colors.indexOf(initialColor);
  let iterationsCounter = 1;
  interval(3000)
    .pipe(
      startWith(counter),
      map(() => {
        this.color$.next(colors[colorCounter++ % colors.length]);
      }),
      takeWhile(() => iterationsCounter++ !== iterations)
    )
    .subscribe();
}

Above we are creating an observable that emits every three seconds, which pipes its value through numerous operators.

Here’s what each operator is doing:

  • startsWith ensures that a value is emitted at the start of the stream as the active color.
  • map is producing the next color in the sequence at each interval. By performing modulo division on a counter, we get the index of the next color in the array.
  • takeWhile caps the number of color transitions to prevent an infinite loop from the interval observable. This is mainly used for testing.

Additionally worth noting: both counters are being incremented by one inside the pipe expression using the ++ unary operator.

Testing It

Now that we have a functional stoplight, let’s test it. We can run a simple program and log the state of the stoplight to stdout.


import { Stoplight } from "./stoplight";

const light = new Stoplight("green",4);
light.getColor$().subscribe(console.log);
// green
// yellow
// red
// green

Now, I’ll show you how to write marble tests for the stoplight to ensure it functions per the operating details defined from the start.

Marble Test

Marble testing is essentially a strategy for testing asynchronous observable code synchronously.

We’ll want a test that ensures our stoplight is changing its color every three seconds, checks that the sequence of colors is valid, and ensures multiple subscribers see the same color.

Here, we’ll create a stoplight.spec.ts file and put the following contents in it:


import { TestScheduler } from "rxjs/testing";
import { Stoplight } from "./stoplight";

describe("Stoplight", () => {
  let scheduler: TestScheduler;

  beforeEach(() => {
    scheduler = new TestScheduler((actual, expected) => {
      expect(actual).toEqual(expected);
    });
  });

  it("should cycle green, yellow, red", () => {
    scheduler.run((helpers) => {
      const { expectObservable } = helpers;
      const stoplight = new Stoplight("green", 4);

      // specify what the expected color given each subscription in virtual time
      // note: emitting a value advances time by 1 frame so subtract 1ms for each time progression
      const [expected1, sub1, expected2, sub2] = [
        "g 2999ms y 2999ms r 2999ms g",
        "^---------------------------",
        "3500ms   y 2499ms r 2999ms g",
        "3500ms   ^------------------",
      ];

      expectObservable(stoplight.getColor$(), sub1).toBe(expected1, {
        g: "green",
        y: "yellow",
        r: "red",
      });

      expectObservable(stoplight.getColor$(), sub2).toBe(expected2, {
        g: "green",
        y: "yellow",
        r: "red",
      });
    });
  });
});

This is a marble test run through Jest. The funky-looking lines below the comments, starting on line 18, are what’s really powerful. It’s a notation that represents value emitted in virtual time. Each dash is a time frame. The alphanumerics marbles are the emitted values from the given observable. In this case, the values are each time a color changes. Spaces mean nothing and are used for formatting. You can read more about this notation in the awesome docs provided by the RxJS team.

In this example, `sub1` and `sub2` emulates two motorists observing the stoplight at different points in time, verifying they see the same color. You’ll notice that, regardless of when observers subscribe to the stream, they will always see the same current and future values and each given point in virtual time.

Closing the Loop

Here is the GitHub repo for the source in this post.

I hope this project serves as a good working example for your journey to learning observables. I leave you with a challenge to change some functionality of the stoplight and update the marble test to make it pass. Green light, go!

Conversation
  • B. K. Oxley (binkley) says:

    The short explanation of hot _vs_ cold subscribers needs improvement. “hot observables … shared amongst their subscribers” and “cold observable … once it’s subscribed to” is confusing: don’t both require subscription?

  • Join the conversation

    Your email address will not be published. Required fields are marked *