Using WebSockets in AWS to Stream DynamoDB Updates

A recent project of mine was potentially in need of a WebSocket API, and I wanted to investigate how to set up a WebSocket API in the AWS ecosystem.

Extending the infrastructure from my previous DynamoDB setup to stream updates to a client via WebSockets was a straightforward problem to solve. I updated the Github project from my last post to connect to an WebSocket API. Here’s what I did to set it up.

WebSocket API

To create a WebSocket API in AWS, I needed to utilize three services:

  • A DynamoDB table to manage connections
  • An API Gateway to create the API
  • Lambda functions to handle connection and disconnection events for the API

First, I created a DynamoDB table to manage the connections for the WebSocket API. This table is separate from the table that I’ll be streaming changes from. It’s a simple table with just a ConnectionId column. Here’s the configuration:

Next, I needed to create the WebSocket API in API gateway. However, the AWS console was going to prompt me for Lambda functions to handle connect and disconnect actions from clients, so I created those first.

Here’s the connect Lambda code:


// connect.js
const DynamoDB = require('aws-sdk/clients/dynamodb');

exports.handler = async function(event, context, callback) {
  const db = new DynamoDB.DocumentClient();
  var putParams = {
    TableName: process.env.TABLE_NAME, // In our case, "WebSocketManager"
    Item: {
      ConnectionId: event.requestContext.connectionId,
    }
  };

  try {
    // Insert incoming connection id in the WebSocket
    await db.put(putParams).promise();

    return {
      statusCode: 200,
      body: "Connected"
    };
  } catch (e) {
    console.error('error!', e);
    return {
      statusCode: 501,
      body: "Failed to connect: " + JSON.stringify(e),
    };
  }
};

And here’s the disconnect Lambda code:


// disconnect.js
const DynamoDB = require('aws-sdk/clients/dynamodb');

exports.handler = async function(event, context, callback) {
  const db = new DynamoDB.DocumentClient();
  var deleteParams = {
    TableName: process.env.TABLE_NAME, // In our case, "WebSocketManager"
    Key: {
      ConnectionId: event.requestContext.connectionId,
    }
  };

  try {
    // If the client dis
    await db.delete(deleteParams).promise();
    return {
      statusCode: 200,
      body: "Disconnected"
    }
  } catch (e) {
    console.error('error!', e);
    return {
      statusCode: 501,
      body: "Failed to disconnect: " + JSON.stringify(e),
    };
  }
};

There’s not much to the connect/disconnect code. I’m just inserting and deleting connection IDs as they come in from events.

Next, I created a WebSocket API in API Gateway. When creating the API, I attached the connect and disconnect Lambda functions to their respective integrations:

WebSocket Client

Now that I have the WebSocket API built, I connect to it. On my project, I installed wscat to connect to the API. I can run this command to connect to the API:


yarn wscat -c WEBSOCKET_URL_GOES_HERE

The WebSocket URL can be found in the API stage panel in the AWS console:

After getting a successful connection to the API, I hooked my DynamoDB stream to a Lambda function that can send events to my computer via WebSockets.

DynamoDB Stream

I needed to create another Lambda function to act as a trigger for DynamoDB stream events. This trigger function needed to look up all connections to my WebSocket API and send a message to each connection via API Gateway. Here’s what the code looks like:


// trigger.js
const ApiGatewayManagementApi = require('aws-sdk/clients/apigatewaymanagementapi');
const DynamoDB = require('aws-sdk/clients/dynamodb');

exports.handler = async function(event, context, callback) {
  const db = new DynamoDB.DocumentClient();
  let connections;

  try {
    connections = await db.scan({ TableName: process.env.TABLE_NAME, ProjectionExpression: 'Id' }).promise();
  } catch (e) {
    return { statusCode: 500, body: e.stack };
  }

  const api = new ApiGatewayManagementApi({
    endpoint: process.env.ENDPOINT,
  });

  const postCalls = connections.Items.map(async ({ Id }) => {
    await api.postToConnection({ ConnectionId: Id, Data: JSON.stringify(event) }).promise();
  });

  try {
    await Promise.all(postCalls);
  } catch (e) {
    return { statusCode: 500, body: e.stack };
  }

  return { statusCode: 200, body: 'Event sent.' };
};

The code is pretty straightforward. In the Lambda, I needed to make sure to set up my environment variables properly. TABLE_NAME is the DynamoDB table storing my connections (not the table I’m streaming changes from). ENDPOINT is similar to the URL I used to connect to the WebSocket API, replacing the wss:// with https://.

With the WebSocket API in place, I enabled streaming changes from my DynamoDB. For my project, I wanted to stream changes for the table that contained user profile data. Navigating to the DynamoDB table in the AWS console, I enabled streaming in the “Overview” tab under the section “DynamoDB Stream Details” by clicking “Manage DynamoDB Stream.”

Next, I created a trigger for the same table by navigating to the “Trigger” tab and clicking “Create Trigger.” After that, I could see changes to the records in that table streaming directly to any client connected to my WebSocket.

We’re Done! 🥳

WebSockets are a cool technology with a lot of potential for the projects I work on. While I’m not too interested in streaming DynamoDB changes, this experiment introduced me to WebSockets in AWS. I hope to continue exploring more AWS services in future side projects.

Conversation
  • Fantastic article, thanks for putting it together. Gives me a few ideas! This is a great use for DynamoDB streams.

  • Akaha says:

    Just a few notes on the excess async/await usage:

    “`
    const postCalls = connections.Items.map(async ({ Id }) => {
    await api.postToConnection({ ConnectionId: Id, Data: JSON.stringify(event) }).promise();
    });
    “`
    SInce you’re gathering it to the list of promises async/await is redundant here. Just do:
    “`
    const postCalls = connections.Items.map(item => api.postToConnection({ ConnectionId: item.Id, Data: JSON.stringify(event) }).promise());
    “`

    try/catch usage: if you have only one line in try it’s probably easier and more readable to just use `.catch` directly on the promise:

    “`
    try {
    await doSomethign();
    } catch(e) {
    handleError(e);
    }
    “`
    is the same as
    “`
    doSomething().catch(e => handleError(e))
    “`

  • Comments are closed.