AWS Compute Blog
Using Amazon MSK as an event source for AWS Lambda
Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed, highly available service that uses Apache Kafka to process real-time streaming data. Many producers can send messages to Kafka, which can then be routed to and processed by multiple consumers. Lambda now supports Amazon MSK as an event source, so it can consume messages and integrate with downstream serverless workflows.
Apache Kafka is a distributed streaming platform that it is similar to Amazon Kinesis. Amazon MSK simplifies the setup, scaling, and management of clusters running Kafka. It makes it easier to configure the application for multiple Availability Zones and securing with IAM. It’s fully compatible with Kafka and supports familiar community-build tools such as MirrorMaker, Apache Flink, and Prometheus.
In this blog post, I explain how to set up a test Amazon MSK cluster and configure key elements in the networking configuration. I also show how to create a Lambda function that is invoked by messages in Amazon MSK topics.
Overview
Using Amazon MSK as an event source operates in a similar way to using Amazon SQS or Amazon Kinesis. In all cases, the Lambda service internally polls for new records or messages from the event source, and then synchronously invokes the target Lambda function. Lambda reads the messages in batches and provides these to your function as an event payload.
Lambda is a consumer application for your Kafka topic. It processes records from one or more partitions and sends the payload to the target function. Lambda continues to process batches until there are no more messages in the topic.
The Lambda function’s event payload contains an array of records. Each array item contains details of the topic and Kafka partition identifier, together with a timestamp and base64 encoded message:
Network configuration overview
Amazon MSK is a highly available service, so it must be configured to run in a minimum of two Availability Zones in your preferred Region. To comply with security best practice, the brokers are usually configured in private subnets in each Region.
For Amazon MSK to invoke Lambda, you must ensure that there is a NAT Gateway running in the public subnet of each Region. It’s possible to route the traffic to a single NAT Gateway in one AZ for test and development workloads. For redundancy in production workloads, it’s recommended that there is one NAT Gateway available in each Availability Zone.
The Lambda function target in the event source mapping does not need to be running in a VPC to receive messages from Amazon MSK. To learn more about configuring the private subnet table to use a NAT Gateway, see this Premium Support response. For an example of how to configure the infrastructure with AWS CloudFormation, see this template.
Required Lambda function permissions
The Lambda function must have permission to describe VPCs and security groups, and manage elastic network interfaces. These execution roles permissions are:
- ec2:CreateNetworkInterface
- ec2:DescribeNetworkInterfaces
- ec2:DescribeVpcs
- ec2:DeleteNetworkInterface
- ec2:DescribeSubnets
- ec2:DescribeSecurityGroups
To access the Amazon MSK data stream, the Lambda function also needs two Kafka permissions: kafka:DescribeCluster and kafka:GetBootstrapBrokers. The policy template AWSLambdaMSKExecutionRole includes these permissions.
In AWS SAM templates, you can configure a Lambda function with an Amazon MSK event source mapping and the necessary permissions. For example:
Resources:
ProcessMSKfunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: code/
Timeout: 3
Handler: app.handler
Runtime: nodejs12.x
Policies:
- AWSLambdaMSKExecutionRole
Events:
MSKEvent:
Type: MSK
Properties:
StartingPosition: LATEST
Stream: arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/12abcd12-1234-1234-1234-1234abcd1234-1
Topics:
- MyTopic
Setting up an Amazon MSK cluster
Before starting, configure a new VPC with public and private subnets in two Availability Zones using this AWS CloudFormation template. In this configuration, the private subnets are set up to use a NAT Gateway.
To create the Amazon MSK cluster:
- From the Amazon MSK console, choose Create Cluster.
- Select Create cluster with custom settings, apply the name “my-MSK-cluster”, and keep the recommended Apache Kafka version.
- In the Configuration panel, keep the “Use the MSK default configuration” option selected.
- In Networking, select the new VPC and choose “2” for Number of Availability Zones. From the dropdowns, select the two Availability Zones in the VPC, and choose the private subnets for each.
- In the Brokers panel, choose kafka.t3.small for Broker instance type, and enter “1” for Number of brokers per Availability Zone.
- For Storage, enter “1” for EBS storage volume per broker.
- Keep the existing defaults and choose Create cluster. It takes up to 15 minutes to create the cluster and the status is displayed in the Cluster Summary panel.
Configuring the Lambda event source mapping
You can create the Lambda event source mapping using the AWS CLI or AWS SDK, which provide the CreateEventSourceMapping API. In this walkthrough, you use the AWS Management Console to create the event source mapping.
First, you must create an Amazon MSK topic:
- Launch an EC2 instance, selecting t2.micro as the instance size. Connect to the instance once it is available.
- Follow these instructions in the Amazon MSK documentation to create a topic. Use the name “mytopic” and a
replication-factor
of 2, since there are only two Availability Zones in this test.
Now create a Lambda function that uses the Amazon MSK cluster and topic as an event source:
- From the Lambda console, select Create function.
- Enter a function name, and select Node.js 12.x as the runtime.
- Select the Permissions tab, and select the role name in the Execution role panel to open the IAM console.
- Choose Attach policies.
- In the filter box, enter “MSK”, then check the AWSLambdaMSKExecutionRole policy name.
- Choose Attach policy.
- Back in the Lambda function, select the Configuration tab. In the Designer panel, choose Add trigger.
- In the dropdown, select MSK. In the MSK cluster box, select the cluster you configured earlier. Enter “mytopic” for Topic name and choose “Latest” for Starting Position. Choose Add.
Note: it takes several minutes for the trigger status to update from Creating to Enabled. - In the Function code panel, replace the contents of index.js with:
exports.handler = async (event) => { // Iterate through keys for (let key in event.records) { console.log('Key: ', key) // Iterate through records event.records[key].map((record) => { console.log('Record: ', record) // Decode base64 const msg = Buffer.from(record.value, 'base64').toString() console.log('Message:', msg) }) } }
- Choose Save.
Testing the event source mapping
At this point, you have created a VPC with two private and public subnets and a NAT Gateway. You have placed a new Amazon MSK cluster in the two private subnets. You set up a target Lambda function in the same VPC and private subnets, with the necessary IAM permissions. Next, you publish messages to the Amazon MSK topic and see the resulting invocation in the Lambda function’s logs.
- From the EC2 instance you created earlier, follow these instructions to produce messages. In the Kafka console producer script running in the terminal, enter “Message #1”:
- Back in the Lambda function, select the Monitoring tab and choose View logs in CloudWatch. Select the first log stream.
- In the Log events panel, expand the entries to see the message sent from the Amazon MSK topic. Note that the value attribute containing the message is base64 encoded.
Conclusion
Amazon MSK provide a fully managed, highly available service that uses Kafka to process real-time streaming data. Now Lambda supports Amazon MSK as an event source, you can invoke Lambda functions from messages in Kafka topics to integrate into your downstream serverless workflows.
In this post, I give an overview of how the integration compared with other event source mappings. I show how to create a test Amazon MSK cluster, configure the networking, and create the event source mapping with Lambda. I also show how to set up the Lambda function in the AWS Management Console, and refer to the equivalent AWS SAM syntax to simplify deployment.
To learn more about how to use this feature, read the documentation.