Article summary
A recent project of mine was potentially in need of a WebSocket API, and I wanted to investigate how to set up a WebSocket API in the AWS ecosystem.
Extending the infrastructure from my previous DynamoDB setup to stream updates to a client via WebSockets was a straightforward problem to solve. I updated the Github project from my last post to connect to an WebSocket API. Here’s what I did to set it up.
WebSocket API
To create a WebSocket API in AWS, I needed to utilize three services:
- A DynamoDB table to manage connections
- An API Gateway to create the API
- Lambda functions to handle connection and disconnection events for the API
First, I created a DynamoDB table to manage the connections for the WebSocket API. This table is separate from the table that I’ll be streaming changes from. It’s a simple table with just a ConnectionId column. Here’s the configuration:
Next, I needed to create the WebSocket API in API gateway. However, the AWS console was going to prompt me for Lambda functions to handle connect and disconnect actions from clients, so I created those first.
Here’s the connect Lambda code:
// connect.js
const DynamoDB = require('aws-sdk/clients/dynamodb');
exports.handler = async function(event, context, callback) {
const db = new DynamoDB.DocumentClient();
var putParams = {
TableName: process.env.TABLE_NAME, // In our case, "WebSocketManager"
Item: {
ConnectionId: event.requestContext.connectionId,
}
};
try {
// Insert incoming connection id in the WebSocket
await db.put(putParams).promise();
return {
statusCode: 200,
body: "Connected"
};
} catch (e) {
console.error('error!', e);
return {
statusCode: 501,
body: "Failed to connect: " + JSON.stringify(e),
};
}
};
And here’s the disconnect Lambda code:
// disconnect.js
const DynamoDB = require('aws-sdk/clients/dynamodb');
exports.handler = async function(event, context, callback) {
const db = new DynamoDB.DocumentClient();
var deleteParams = {
TableName: process.env.TABLE_NAME, // In our case, "WebSocketManager"
Key: {
ConnectionId: event.requestContext.connectionId,
}
};
try {
// If the client dis
await db.delete(deleteParams).promise();
return {
statusCode: 200,
body: "Disconnected"
}
} catch (e) {
console.error('error!', e);
return {
statusCode: 501,
body: "Failed to disconnect: " + JSON.stringify(e),
};
}
};
There’s not much to the connect/disconnect code. I’m just inserting and deleting connection IDs as they come in from events.
Next, I created a WebSocket API in API Gateway. When creating the API, I attached the connect and disconnect Lambda functions to their respective integrations:
WebSocket Client
Now that I have the WebSocket API built, I connect to it. On my project, I installed wscat to connect to the API. I can run this command to connect to the API:
yarn wscat -c WEBSOCKET_URL_GOES_HERE
The WebSocket URL can be found in the API stage panel in the AWS console:
After getting a successful connection to the API, I hooked my DynamoDB stream to a Lambda function that can send events to my computer via WebSockets.
DynamoDB Stream
I needed to create another Lambda function to act as a trigger for DynamoDB stream events. This trigger function needed to look up all connections to my WebSocket API and send a message to each connection via API Gateway. Here’s what the code looks like:
// trigger.js
const ApiGatewayManagementApi = require('aws-sdk/clients/apigatewaymanagementapi');
const DynamoDB = require('aws-sdk/clients/dynamodb');
exports.handler = async function(event, context, callback) {
const db = new DynamoDB.DocumentClient();
let connections;
try {
connections = await db.scan({ TableName: process.env.TABLE_NAME, ProjectionExpression: 'Id' }).promise();
} catch (e) {
return { statusCode: 500, body: e.stack };
}
const api = new ApiGatewayManagementApi({
endpoint: process.env.ENDPOINT,
});
const postCalls = connections.Items.map(async ({ Id }) => {
await api.postToConnection({ ConnectionId: Id, Data: JSON.stringify(event) }).promise();
});
try {
await Promise.all(postCalls);
} catch (e) {
return { statusCode: 500, body: e.stack };
}
return { statusCode: 200, body: 'Event sent.' };
};
The code is pretty straightforward. In the Lambda, I needed to make sure to set up my environment variables properly. TABLE_NAME
is the DynamoDB table storing my connections (not the table I’m streaming changes from). ENDPOINT
is similar to the URL I used to connect to the WebSocket API, replacing the wss://
with https://
.
With the WebSocket API in place, I enabled streaming changes from my DynamoDB. For my project, I wanted to stream changes for the table that contained user profile data. Navigating to the DynamoDB table in the AWS console, I enabled streaming in the “Overview” tab under the section “DynamoDB Stream Details” by clicking “Manage DynamoDB Stream.”
Next, I created a trigger for the same table by navigating to the “Trigger” tab and clicking “Create Trigger.” After that, I could see changes to the records in that table streaming directly to any client connected to my WebSocket.
We’re Done! 🥳
WebSockets are a cool technology with a lot of potential for the projects I work on. While I’m not too interested in streaming DynamoDB changes, this experiment introduced me to WebSockets in AWS. I hope to continue exploring more AWS services in future side projects.
Fantastic article, thanks for putting it together. Gives me a few ideas! This is a great use for DynamoDB streams.
Just a few notes on the excess async/await usage:
“`
const postCalls = connections.Items.map(async ({ Id }) => {
await api.postToConnection({ ConnectionId: Id, Data: JSON.stringify(event) }).promise();
});
“`
SInce you’re gathering it to the list of promises async/await is redundant here. Just do:
“`
const postCalls = connections.Items.map(item => api.postToConnection({ ConnectionId: item.Id, Data: JSON.stringify(event) }).promise());
“`
try/catch usage: if you have only one line in try it’s probably easier and more readable to just use `.catch` directly on the promise:
“`
try {
await doSomethign();
} catch(e) {
handleError(e);
}
“`
is the same as
“`
doSomething().catch(e => handleError(e))
“`