Creating producer and consumer using AWS Lambda for AWS MSK Cluster

Alien
2 min readFeb 20, 2022

Use case

The official AWS documentation does not provide steps to create producer using Lambda. So this setup provides you detailed information how to do setup producer and consumer for AWS MSK cluster.

Steps

1. Create a VPC in Account A- 3 private subnet and 1 public subnet

2. Create Security Group for the VPC with inbound rule of All traffic to 0.0.0.0/0

3. Create MSK Cluster 2 brokers using 2 private subnet and security group with default MSK configuration and SSL/SCRAM authentication

4. Create Secret Manager as mentioned in article[1]

[1] Username and password authentication with AWS Secrets Manager — Setting up SASL/SCRAM authentication for an Amazon MSK Cluster — https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html#msk-password-tutorial

5. Create Custom KMS Key using following policy:

{“Id”: “key-consolepolicy-3”,“Version”: “2012–10–17”,“Statement”: [{“Sid”: “Enable IAM User Permissions”,“Effect”: “Allow”,“Principal”: {“AWS”: “<ACCOUNT_ID>”},“Action”: “kms:*”,“Resource”: “*”},{“Sid”: “Allow access for Key Administrators”,“Effect”: “Allow”,“Principal”: {“AWS”: “arn:aws:iam::ACCOUNT_ID:role/Administrator”},“Action”: [“kms:Create*”,“kms:Describe*”,“kms:Enable*”,“kms:List*”,“kms:Put*”,“kms:Update*”,“kms:Revoke*”,“kms:Disable*”,“kms:Get*”,“kms:Delete*”,“kms:TagResource”,“kms:UntagResource”,“kms:ScheduleKeyDeletion”,“kms:CancelKeyDeletion”],“Resource”: “*”},{“Sid”: “Allow use of the key”,“Effect”: “Allow”,“Principal”: {“AWS”: [“<Lambda-Consumer ROle>”]},“Action”: [“kms:Encrypt”,“kms:Decrypt”,“kms:ReEncrypt*”,“kms:GenerateDataKey*”,“kms:DescribeKey”],“Resource”: “*”},{“Sid”: “Allow attachment of persistent resources”,“Effect”: “Allow”,“Principal”: {“AWS”: [“<Lambda-Consumer ROle>”]},“Action”: [“kms:CreateGrant”,“kms:ListGrants”,“kms:RevokeGrant”],“Resource”: “*”,“Condition”: {“Bool”: {“kms:GrantIsForAWSResource”: “true”}}}]}

6. Created Lambda function(kafka-Consumer) with following code:

exports.handler = async (event) => {// Iterate through keysfor (let key in event.records) {console.log(‘Key: ‘, key)// Iterate through recordsevent.records[key].map((record) => {console.log(‘Record: ‘, record)// Decode base64const msg = Buffer.from(record.value, ‘base64’).toString()console.log(‘Message:’, msg)})}}

6. Created Lambda function(kafka-Producer) with following code:

import json
from time import time
import boto3
import base64
from botocore.exceptions import ClientError
from kafka import KafkaProducer, KafkaConsumer
secret_name = "<SECRET-MANAGER-ARN"# Create a Secrets Manager client
session = boto3.session.Session()
client = session.client(service_name='secretsmanager',region_name=region_name)
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
secerts = json.loads(get_secret_value_response['SecretString'])
print(secerts)
KAFKA_BROKER = '<KAFKA-BROKER>:9096,<KAFKA-BROKER>:9096'
KAFKA_TOPIC = 'MSKTutorialTopic'
def lambda_handler(event, context):
producer = KafkaProducer(bootstrap_servers=KAFKA_BROKER,security_protocol='SASL_SSL',sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username=secerts['username'],sasl_plain_password=secerts['password'],
value_serializer=lambda x: json.dumps(x).encode('utf-8'))
# producer.send(KAFKA_TOPIC, b"test for hw08-solution02")
for e in range(5):
data = {'number' : e}
producer.send(KAFKA_TOPIC, data)
producer.flush()

7. Added the VPC Config same as MSK Cluster

8. Add MSK Trigger with SASL_SCRAM_512_AUTH pointing to Secret Manager to lambda function kafka-Consumer.

9. The Lambda execution role had default policy, arn:aws:iam::aws:policy/SecretsManagerReadWrite, arn:aws:iam::aws:policy/AmazonMSKFullAccess, arn:aws:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole

Once the setup is complete, test it.

--

--