Correctly Managing Concurrency in JavaScript

Article summary

More times than I can count, I’ve found myself needing better concurrency primitives in JavaScript (or TypeScript) applications. Almost anything you might need is possible using the Promise API, but promises are complex, and there are many subtleties that easily manifest bugs. (Don’t forget to return that new promise you made inside your promise handler, or else!)

Of course, JavaScript doesn’t have real concurrency. There’s only one thread. Despite this, it’s still possible to have race conditions and all the other kinds of bugs that come with concurrency because of evented I/O, timers, and so on.

Problem A

Let’s imagine that you have a synchronize() function that pulls some data (asynchronously, of course) out of a database, does some processing, and then (asynchronously) uploads it to somewhere in “the cloud” for safekeeping.

It’s easy to imagine why having multiple synchronize() processes in flight could be bad. Perhaps some data changes in between their invocations, and then there is a race to see which is uploaded first. In a more complicated scenario, the process might first check for and resolve conflicts with the remote store. Then there’s a lot more to go wrong.

What we’re after here is the ability to restrict this process so that only one can be in-flight at a time. There are multiple ways to deal with a request to start the process when it’s already running, but, I often find it sufficient and convenient to latch on to (get a promise for) what’s currently running.

The Solution

Using TypeScript + async/await, I’ve arrived at and tested the following
higher-order function. You provide an async function, and it returns a wrapped
version of that function, preventing simultaneous execution.


export const notConcurrent = <T>(proc: () => PromiseLike<T>) => {
  let inFlight: Promise<T> | false = false;

  return () => {
    if (!inFlight) {
      inFlight = (async () => {
        try {
          return await proc();
        } finally {
          inFlight = false;
        }
      })();
    }
    return inFlight;
  };
};

Here’s how you might use it:


export class Synchronizer {
  // ... other code elided ...
  synchronize = notConcurrent(async () => {
    let data = await this.gatherData();
    // more processing
    await this.sendDataToServer(data);
  });
}

// time A:
synchronizer.synchronize() // => a promise

// a few seconds later, but before the sync has finished:
synchronizer.synchronize() // => the same promise as before, no additional sync triggered

How it works

Inside notConcurrent, I create a variable to hold the promise of the result of an in-flight process and then return the wrapped copy of the function passed in. Because JavaScript doesn’t have real concurrency, simple checks and mutations of that variable are sufficient to safely control the concurrency.

ES6 async/await syntax makes it easy to safely wrap the passed-in function proc. Whether proc succeeds or fails, the inFlight variable is reset when it is done. Importantly, any failure coming out of proc won’t be caught– anyone awaiting the promise will correctly receive a rejection.

Problem B

Let’s consider a contrived example. Please be generous to your author, and resist the critique that this would be a weird way to solve this specific problem:


async function set(collection: string, key: string, val: string):
{[key: string]: string} {
  const data = await fetchCollection(collection);
  data[key] = val;
  await sendCollection(collection, data);
  return data;
}

Can you spot the bug?

What if you ran some code like this:


set("userInfo", "name", "Chris");
set("userInfo", "email", "[email protected]");

If you haven’t figured it out, it’s likely that both calls to fetchCollection() would return the same data, and then there would be a race to see which write finishes…last. Whichever call comes in second would win, and the first operation would be overwritten.

The solution

What we need here is basically a mutex: a way to say that the critical section of reading the collection, updating it, and writing it back cannot be happening simultaneously. Let’s imagine we had such a thing:


const collectionMutex = new Mutex();

async function set(collection: string, key: string, value: string): {[key:
string]: string} {
  const unlock = await collectionMutex.lock();
  
  const data = await fetchCollection(collection);
  data[key] = val;
  await sendCollection(collection, data);

  unlock();
  return data;
}

Or, perhaps, less error-prone:


const collectionMutex = new Mutex();

async function set(collection: string, key: string, value: string): {[key:
string]: string} {
  return await collectionMutex.dispatch(async () => {
    const data = await fetchCollection(collection);
    data[key] = val;
    await sendCollection(collection, data);
    return data;
  });
}

Implementing this mutex requires a bit of promise-trampoline-ing, but it’s still
relatively straightforward:


export class Mutex {
  private mutex = Promise.resolve();

  lock(): PromiseLike<() => void> {
    let begin: (unlock: () => void) => void = unlock => {};

    this.mutex = this.mutex.then(() => {
      return new Promise(begin);
    });

    return new Promise(res => {
      begin = res;
    });
  }

  async dispatch(fn: (() => T) | (() => PromiseLike<T>)): Promise<T> {
    const unlock = await this.lock();
    try {
      return await Promise.resolve(fn());
    } finally {
      unlock();
    }
  }
}

Overall, it’s pretty similar to the previous problem. There’s some additional complexity because we need to chain promises in order for subsequent operations to be triggered.

In the lock() method, we wait for the lock by using this.mutex.then(). We pass in a function that will create a new promise from the begin function. Right now, begin is just a function that does nothing and returns nothing, but, by the time it is actually invoked, it will be a different function. Finally, we replace our mutex instance variable with the resulting promise, so that any subsequent invocations wait for us.

After this, we return a new promise. In the body, we replace begin with the promise’s resolve() function. The promise returned by lock() will resolve (with the unlock function, which is actually the mutex’s then’s resolve function) once any previous mutexes have finished and called their respective unlock function that was yielded over their promise.

Phew. It works, I promise.

Conclusion

I think there are already some libraries out there that may implement these functions, but I’d honestly rather just copy and paste them into a file in my project where I can assume ownership. They’re small and unlikely to change, and the world doesn’t need yet another Node module. More importantly, my project doesn’t need yet another dependency to introduce conflicts with Babel or who knows what.

Trying to implement this kind of concurrency control ad-hoc, where needed in your codebase, is basically asking for trouble. Just imagine all the confusingly-named instance variables you might find, all containing promises…

It’s much better to separately abstract and test your concurrency control. Hopefully, the next time you find yourself needing it, you can just copy these functions.

Conversation
  • Matt Bishop says:

    Thanks for writing Mutex! It would be great it you published it to NPM. Several TS alternatives exist, but they use a queue or don’t have the dispatch() option.

  • Comments are closed.