How to Manage Concurrent Tasks with Keyed Queues

Node.js provides a powerful streaming API allowing developers to process large amounts of data in a memory-efficient way. When working with file downloads, streaming can be particularly useful for ensuring you can download large files safely without consuming too much memory. However, managing multiple file downloads can be tricky. That’s especially true when you must ensure multiple requests for the same file do not result in data corruption or memory leaks.

A Promise Queue

One approach to solving this problem is using a promise queue. As explained in this post, a promise queue can help manage a queue of tasks that needs to be executed in a serial order. By extending the described promise queue class, we can implement a tidy wrapper class. This wrapper class manages concurrent queues associated with keys. This allows unrelated tasks to proceed concurrently and independently while maintaining the serial ordering of related tasks.

Let’s look at the code and then unpack what’s going on.

class SerialTaskQueue<T = void> {
  private queue = Promise.resolve();
  private inFlight = 0;

  run(operation: () => Promise<T>) {
    return new Promise<T>((resolve, reject) => {
      this.inFlight++;
      this.queue = this.queue
        .then(operation)
        .then((res) => {
          this.inFlight--;
          resolve(res);
        })
        .catch(reject);
    });
  }

  isEmpty() {
    return this.inFlight === 0;
  }

  async empty() {
    await this.queue;
  }
}

class KeyedSerialTaskQueue {
  private tasks: Record<string, SerialTaskQueue> = {};

  private getOrCreateQueue(key: string) {
    if (!(key in this.tasks)) {
      this.tasks[key] = new SerialTaskQueue();
    }
    return this.tasks[key];
  }

  private deleteQueue(key: string) {
    delete this.tasks[key];
  }

  async run(key: string, task: () => Promise<void>) {
    const queue = this.getOrCreateQueue(key);
    await queue.run(task);
    if (queue.isEmpty()) this.deleteQueue(key);
  }
}

 

The KeyedSerialTaskQueue class in this post builds on the concept of a promise queue to manage a queue of tasks, each associated with a unique key. When adding a new task, the class ensures that tasks with the same key are executed serially.

The class is built on top of the SerialTaskQueue class, which is responsible for executing tasks in serial order. The SerialTaskQueue maintains a queue of promises and executes each in turn. When a new promise is added to the queue, it is wrapped in a new promise that resolves or rejects when the original promise completes. The SerialTaskQueue can maintain a single point of entry and exit, ensuring all promises are executed in the correct order.

KeyedSerialTaskQueue

The KeyedSerialTaskQueue extends this class by maintaining a record of SerialTaskQueue instances, one for each unique key. When a new task joins the queue, the class looks up the appropriate SerialTaskQueue based on the key and adds the task to the queue. If no queue exists for the given key, a new queue is created. Once the task is completed, the class checks if the queue is now empty. If so, it deletes the queue to prevent memory leaks.

So, how can we use this class to implement safe file downloads? Imagine you have a web application that allows users to download files from a server. When a user requests a file, you might use the KeyedSerialTaskQueue to manage the download process. Each download request would be associated with a unique key, based on the URL of the file or another identifier.

When a new download request is received, the KeyedSerialTaskQueue adds a task to the appropriate queue. This task would use Node’s streaming API to download the file, process the data in chunks, and write it to a file on disk. By using a SerialTaskQueue, you can ensure multiple requests for the same file execute in the correct order, preventing data corruption or memory leaks.

A Powerful Tool for Managing Queues

The KeyedSerialTaskQueue is a powerful tool for managing queues of tasks associated with unique keys. You can use this class in a wide range of applications. However, it’s particularly useful when working with large files or other resources you need to download or process serially. By ensuring tasks execute in the correct order, this class helps prevent data corruption, memory leaks, and other issues that can arise when multiple requests are made for the same resource

Conversation

Join the conversation

Your email address will not be published. Required fields are marked *