Stream DynamoDB table to an ElasticSearch index

Stream DynamoDB table to an ElasticSearch index - AWS CDK

Scalability and rapid read/write speeds of DynamoDB, combined with full text search by AWS ElasticSearch.

DynamoDB is one of the most efficient database services, provided by AWS, but it lacks one important feature – quick search and filtering of data.

That’s where ElasticSearch comes into play.

One common approach is to “write” to DynamoDB and “read” from ElasticSearch. The thing that is needed in this cases is the “synchronization” of data from DynamoDB to ElasticSearch.

Let’s try to use AWS CDK constructs to build this streaming mechanism.

We’ll start by defining a simple DynamoDB table that holds the data:

import * as dynamodb from '@aws-cdk/aws-dynamodb';
const table = new dynamodb.Table(this, 'Table', {
partitionKey: { name: 'id', type: dynamodb.AttributeType.STRING },
// Activate DynamoDB Stream for this table
stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
});

Also, let’s create the ElasticSearch Domain where data from the Table will be streamed:

import * as es from '@aws-cdk/aws-elasticsearch';
const domain = new es.Domain(this, 'elasticsearch', {
version: es.ElasticsearchVersion.V7_7,
});

So far so good. Now, how do we get DynamoDB to automatically synchronize to ElasticSearch? Using DynamoDB Streams.

You can’t just create a DynamoDB Stream though. You need to also attach something as a “listener”. In our example, this will be a Lambda function:

import {NodejsFunction} from "@aws-cdk/aws-lambda-nodejs";
const functionForStreaming = new NodejsFunction(this, 'fn-for-stream', {
entry: path.resolve(__dirname, './lambda.ts'),
nodeModules: [
'aws-lambda', '@elastic/elasticsearch'
],
});

We‘ll define the body of the Lambda inside lambda.ts later. For now, let’s just “attach” the Lambda to the DynamoDB stream:

functionForStreaming.addEventSource(new DynamoEventSource(this, {
startingPosition: StartingPosition.TRIM_HORIZON
}));

What the above does it is this – it instructs the DynamoDB service to automatically invoke the Lambda defined in functionForStreaming, every time there is a create, update or delete operation of any object within the DynamoDB table. Note that for batch operations (e.g. BatchWriteItem), the Lambda will be invoked in batch mode – one invokation with up to 100 items as payload.

Let’s also allow the Lambda to read-write to the ElasticSearch domain. We will need this later.

domain.grantReadWrite(functionForStreaming);

Additionally, I’d like to pass some information to the Lambda that will handle the DynamoDB stream’s events: namely the URL of the ElasticSearch domain and the ElasticSearch index where the table should be replicated. We will use this later in the Lambda handler.

functionForStreaming.addEnvironment('ES_DOMAIN', domain.domainEndpoint);
functionForStreaming.addEnvironment('ES_INDEX', table.tableName);
// Pass info about which field in the table is the "primary key" (always present and unique)
// This information is used as ID of the document in ElasticSearch
functionForStreaming.addEnvironment('PK', table.partitionKey.name);

And to make it possible for the Lambda to read/write from ElasticSearch securely:

functionForStreaming.addToRolePolicy(new PolicyStatement({
actions: ["es:*"]
resources: ["*"],
}))

Now, let’s focus on the Lambda that will receive the created, updated, deleted DynamoDB rows and synchronize them to ElasticSearch. This is where most of the logic lives: lambda.ts.

import {DynamoDBStreamEvent} from "aws-lambda";
import {DynamoDB} from "aws-sdk";
const {Client} = require('@elastic/elasticsearch')
export const handler = async (event: DynamoDBStreamEvent) => {
const node = process.env.ES_DOMAIN as string;
const index = process.env.ES_INDEX as string;
console.log("DynamoDB to ES synchronize event triggered");
console.log("Received event object:", event);
console.log("ES domain to use:", node);
console.log("ES index to use:", index);
if (!event["Records"]) {
console.log("No records to process. Exiting");
return;
}
const client = new Client({
node: `https://${node}`,
});
for (const record of event
.Records
.filter((record: any) => record.dynamodb)) {
try {
let result;
const keys = record.dynamodb!.Keys;
console.log(JSON.stringify(record));
const id = keys?.[process.env.PK!].S;
if (!id) {
console.log(`Can not detect the ID of the document to index. Make sure the DynamoDB document has a field called 'undefined'`);
continue;
}
if (record.eventName === "REMOVE") {
console.log("Deleting document: " + id);
result = await client.delete({
index,
id,
});
} else {
if (!record.dynamodb!.NewImage) {
console.log("Trying to index new document but the DynamoDB stream event did not provide the NewImage. Skipping...");
continue;
}
console.log("Indexing document: " + id);
const convertedDocument = DynamoDB.Converter.output({"M": record.dynamodb!.NewImage});
console.log("The full object to store is: ", convertedDocument);
result = await client.index({
index,
id,
body: convertedDocument,
})
}
console.log(result);
} catch (e) {
console.error("Failed to process DynamoDB row");
console.error(record);
console.error(e);
}
}
};

We should be good to go in terms of AWS CDK constructs.

Try deploying these constructs and adding an item to the DynamoDB table. If you browse the ElasticSearch Kibana web interface, you should see the item synchronized.

Happy coding!

Need help? Get in touch