Extend Keyed Serial Queues to Support Web Concurrency

In a previous post, I discussed how to implement “keyed serial queues.” Here, I’ll provide a simple API for ordering related tasks serially while allowing unrelated tasks to proceed concurrently. This kind of abstraction is useful for coordinating access to shared, slow, I/O bound resources like the disc across many concurrent requests.

Background

In my current project, we used the keyed serial queue to coordinate syncing our server’s set of application files with a currently published manifest. The long-running downloads use Node.js’s streaming APIs to decrease memory pressure. But, that opens up the possibility of two concurrent downloads mangling the file. The keyed serial queue dealt with the issue succinctly. It cleanly separated the logic of managing concurrency from the business logic of fetching and deleting the appropriate files.

Unfortunately, we ran into exactly the kind of mangled file downloads the queue was designed to prevent once we had deployed the app into AWS. So what gives? Well, the queue was designed to manage concurrency on a single node process. That’s the way our development config was set up, and quite often the way node servers are set up anyway. In fact, one of Node’s claims to fame is that it can handle many concurrent connections on a single process.

This superpower is derived from the fact that most Node.js applications simply correlate incoming HTTP requests with requests to databases and other APIs, and then apply a bit of downstream processing. In other words, most node applications are heavily I/O bound, spending most of their time waiting for a response to come back from a database or HTTP request. Our application isn’t powered by a traditional RDBMS, though. We’re using SQlite instead, which runs in process. This makes our application far more CPU-bound than most Node applications since the work of querying the database is actually performed inside of our process.

Problem

In sum, the assumption that we were dealing with a single-node process was false. We use Throng to specify the number of workers to spin up. This was trouble for our keyed serial queue because, under the hood, our queue was backed by Node’s internal promise queue. We used the Promise API to hook into this queue, but at the end of the day, Node was providing all of the heavy lifting. Even though we had many concurrent requests, we had a single Node Promise queue that we could use to coordinate the requests.

Once we deployed and Throng spun up a bunch of web workers, though, we no longer had that guarantee. We had lots of promise queues sitting around, pretty well trashing the queue semantics so useful in protecting our files. So, what could we do to reimplement our queue interface and keep our business logic clean and tidy, now that we had multiple node processes to coordinate?

Solution

We needed to get back to a world where we have a single, shared resource on which we can hang the logic of coordinating our tasks. To do that, we needed to reorganize the internals of our class quite a bit. What if we could split out an interface with the job of requesting permission for a task with a particular key to execute? Then, our keyed serial queue could simply await permission for a task with that key to proceed, run the task, and then notify the world that the task was complete.

Example

Here’s how the code could look:

interface KeyedSerialTaskQueue {
  run(key: string, task: () => Promise<void>): Promise<void>;
}

interface QueuePermissionProvider {
  permissionFor(key: string, taskId: number): Promise<void>;
  done(key: string, taskId: number): Promise<void>;
}

export class PermissionKeyedSerialTaskQueue implements KeyedSerialTaskQueue {
  private nextTaskId = 0;
  private permission: QueuePermission;

  constructor(permission: QueuePermission) {
    this.permission = permission;
  }

  async run(key: string, task: () => Promise<void>): Promise<void> {
    const taskId = this.nextTaskId++;
    return this.permission
      .permissionFor(key, taskId)
      .then(task)
      .finally(() => this.permission.done(key, taskId));
  }
}

Now, any of our given node processes can instantiate a PermissionKeyedSerialTaskQueue, and as long as the QueuePermissionProvider coordinates with the other processes, our tasks will be executed correctly. So how do we implement our permission mechanism? The interface is simple enough that we actually have a lot of options. We could use Redis, or we could use the file system and node’s file watch APIs. We can use anything that eventually ties back to a single, shared source of truth. For our app, we went with Node’s built-in cluster IPC API and used the main node as the single shared resource.

import cluster, { Worker } from "cluster";

type Message = ({ type: "request" } | { type: "done" }) & {
  key: string;
  requestId: string;
};

class QueuePermission implements QueuePermissionProvider {
  async permissionFor(key: string, taskId: number) {
    const promise = new Promise<void>((resolve) => {
      const handler = (msg: { key: string; taskId: number }) => {
        if (key === msg.key && taskId === msg.taskId) {
          process.off("message", handler);
          resolve();
        }
      };
      process.on("message", handler);
    });
    process.send?.({ type: "request", key, taskId });
    return promise;
  }

  async done(key: string, taskId: number) {
    process.send?.({ type: "done", key, taskId });
  }

  static async clusterPrimary() {
    const keyMap = {} as Record< string, { requestId: string; worker: Worker }[] >;
    cluster.on("fork", (worker) => {
      worker.on("message", ({ type, requestId, key }: Message) => {
        if (type === "request" && !(key in keyMap)) {
          keyMap[key] = [];
        }
        if (type === "request") {
          keyMap[key].push({ requestId, worker });
        }
        if (type === "request" && keyMap[key].length === 1) {
          worker.send({ key, requestId });
        }
        if (type === "done") {
          keyMap[key].shift();
        }
        if (type === "done" && keyMap[key].length > 0) {
          const { requestId, worker } = keyMap[key][0];
          worker.send({ key, requestId });
        }
        if (type === "done" && keyMap[key].length === 0) {
          delete keyMap[key];
        }
      });
    });
  }
}

The clusterPrimary static method is passed into Throng to set up the main node. This sets the main node up with something akin to a little Redux store that tracks queues of task IDs associated with logical keys. It then sends messages to the child process that requested permission for keys when they can proceed. The permission class itself is responsible for sending the permission requests.

Keyed Serial Queues

If you need to reimplement a queue interface and keep business logic clean and tidy, keep in mind you might have multiple node processes to coordinate. Keyed serial queues create a way to hang the logic of coordinating our tasks on a single, shared resource.

Conversation

Join the conversation

Your email address will not be published. Required fields are marked *