AWS CDK

Stream DynamoDB table to an ElasticSearch index

Scalability and rapid read/write speeds of DynamoDB, combined with full text search by AWS ElasticSearch.

DynamoDB is one of the most efficient database services, provided by AWS, but it lacks one important feature – quick search and filtering of data.

That’s where ElasticSearch comes into play.

One common approach is to “write” to DynamoDB and “read” from ElasticSearch. The thing that is needed in this cases is the “synchronization” of data from DynamoDB to ElasticSearch.

Let’s try to use AWS CDK constructs to build this streaming mechanism.

We’ll start by defining a simple DynamoDB table that holds the data:

import * as dynamodb from '@aws-cdk/aws-dynamodb';  
  
const table = new dynamodb.Table(this, 'Table', {  
    partitionKey: { name: 'id', type: dynamodb.AttributeType.STRING },

	// Active DynamoDB Stream for this table
    stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,  
});  

Also, let’s create the ElasticSearch Domain where data from the Table will be streamed:

import * as es from '@aws-cdk/aws-elasticsearch';  
  
const domain = new es.Domain(this, 'elasticsearch', {  
    version: es.ElasticsearchVersion.V7_7,  
});  

So far so good. Now, how do we get DynamoDB to automatically synchronize to ElasticSearch? Using DynamoDB Streams.

You can’t just create a DynamoDB Stream though. You need to also attach something as a “listener”. In our example, this will be a Lambda function:

import {NodejsFunction} from "@aws-cdk/aws-lambda-nodejs";

const functionForStreaming = new NodejsFunction(this, 'fn-for-stream', {
    entry: path.resolve(__dirname, './lambda.ts'),
    nodeModules: [
        'aws-lambda', '@elastic/elasticsearch'
    ],
});

We’ll define the body of the Lambda inside lambda.ts later. For now, let’s just “attach” the Lambda to the DynamoDB stream:

functionForStreaming.addEventSource(new DynamoEventSource(this, {  
  startingPosition: StartingPosition.TRIM_HORIZON  
}));

What the above does it is this – it instructs the DynamoDB service to automatically invoke the Lambda defined in functionForStreaming, every time there is a create, update or delete operation of any object within the DynamoDB table. Note that for batch operations (e.g. BatchWriteItem), the Lambda will be invoked in batch mode – one invokation with up to 100 items as payload.

Let’s also allow the Lambda to read-write to the ElasticSearch domain. We will need this later.

domain.grantReadWrite(functionForStreaming);

Additionally, I’d like to pass some information to the Lambda that will handle the DynamoDB stream’s events: namely the URL of the ElasticSearch domain and the ElasticSearch index where the table should be replicated. We will use this later in the Lambda handler.

// Information about where data should be streamed
functionForStreaming.addEnvironment('ES_DOMAIN', domain.domainEndpoint);
functionForStreaming.addEnvironment('ES_INDEX', table.tableName);
// Pass info about which field in the table is the "primary key" (always present and unique)
// This information is used as ID of the document in ElasticSearch
functionForStreaming.addEnvironment('PK', table.partitionKey.name);

And to make it possible for the Lambda to read/write from EleasticSearch securely:

functionForStreaming.addToRolePolicy(new PolicyStatement({
	 actions: ["es:*"]
	 resources: ["*"],
}))

Now, let’s focus on the Lambda that will receive the created, updated, deleted DynamoDB rows and synchronize them to ElasticSearch. This is where most of the logic lives: lambda.ts.

import {DynamoDBStreamEvent} from "aws-lambda";
import {DynamoDB} from "aws-sdk";

const {Client} = require('@elastic/elasticsearch')

export const handler = async (event: DynamoDBStreamEvent) => {
    const node = process.env.ES_DOMAIN as string;
    const index = process.env.ES_INDEX as string;

    console.log("DynamoDB to ES synchronize event triggered");
    console.log("Received event object:", event);
    console.log("ES domain to use:", node);
    console.log("ES index to use:", index);

    if (!event["Records"]) {
        console.log("No records to process. Exiting");
        return;
    }

    const client = new Client({
        node: `https://${node}`,
    });

    for (const record of event
        .Records
        .filter((record: any) => record.dynamodb)) {
        try {
            let result;

            const keys = record.dynamodb!.Keys;

            console.log(JSON.stringify(record));

            const id = keys?.[process.env.PK!].S;

            if (!id) {
                console.log(`Can not detect the ID of the document to index. Make sure the DynamoDB document has a field called '${process.env.PK}'`);
                continue;
            }

            if (record.eventName === "REMOVE") {
                console.log("Deleting document: " + id);
                result = await client.delete({
                    index,
                    id,
                });
            } else {
                if (!record.dynamodb!.NewImage) {
                    console.log("Trying to index new document but the DynamoDB stream event did not provide the NewImage. Skipping...");
                    continue;
                }

                console.log("Indexing document: " + id);
                const convertedDocument = DynamoDB.Converter.output({"M": record.dynamodb!.NewImage});
                console.log("The full object to store is: ", convertedDocument);
                result = await client.index({
                    index,
                    id,
                    body: convertedDocument,
                })
            }

            console.log(result);
        } catch (e) {
            console.error("Failed to process DynamoDB row");
            console.error(record);
            console.error(e);
        }

    }
};

We should be good to go in terms of AWS CDK constructs.

Try deploying these constructs and adding an item to the DynamoDB table. If you browse the ElasticSearch Kibana web interface, you should see the item synchronized.

Happy coding!

Need consulting? Get in touch

Comments

  • I followed every step but I’m getting errors on this step, “functionForStreaming.addEnvironment(‘PK’, table.partitionKey.name);”
    Error: TS2339: Property ‘partitionKey’ does not exist on type ‘Table’.

    • Hi, Nesim.

      What is the TypeScript type of the `table` variable? In other words, where is it imported from at the top of your file?

      • I imported it the same way, which is -> (import * as dynamodb from ‘@aws-cdk/aws-dynamodb’;)

  • Any reason why I am getting timeouts? I know lambda has internet connectivity because I was able to query another external API. But cant figure out why connecting to the elastic search domain times out after 10. Perhaps auth related? I attached same policy as you do

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.