Building a Knowledge Base with AWS CDK and OpenSearch Serverless

Vipul Munot
6 min readJul 20, 2024

--

In this post, we’ll explore how to create a knowledge base using AWS Cloud Development Kit (CDK) and the OpenSearch Serverless service. The code provided sets up the necessary infrastructure, including an OpenSearch collection, encryption and network policies, and a Lambda function to create the index.

Setting the Stage

The code begins by creating an IAM role (CreateIndexExecutionRole) with the AWSLambdaBasicExecutionRole managed policy attached. This role will be assumed by the Lambda function responsible for creating the knowledge base index.

Provisioning the OpenSearch Collection

Next, an OpenSearch Serverless collection is created using the CfnCollection resource. This collection is of type VECTORSEARCH, which is specifically designed for vector similarity search use cases, making it ideal for knowledge base applications.

Securing the Collection

To secure the OpenSearch collection, the code provisions several security policies:

Encryption Policy: A KMS key is created (OpenSearchKMSKey), and an encryption policy (OpenSearchCollectionSecurityPolicy) is defined. This policy specifies how the collection data will be encrypted using the created KMS key.

Network Policy: A network policy (OpenSearchNetworkPolicy) is created to control access to the collection. This policy can be configured to allow public access or restrict access to specific VPC endpoints.

Access Policy: An access policy (OpenSearchCollectionAccessPolicy) is defined to grant permissions to the specified principal ARNs (including the current account and the Lambda execution role) to perform various operations on the collection and its indexes.

These security policies ensure that the knowledge base data is encrypted at rest, network access is controlled, and only authorized entities can interact with the collection and its indexes.

Preparing for Index Creation

While the provided code snippet doesn’t include the implementation details of the CreateIndexFunction, it sets the stage for this Lambda function to create the knowledge base index within the provisioned OpenSearch collection.

The CreateIndexFunction would use the opensearch-py library to interact with the OpenSearch API and define the index mapping, settings, and other configurations specific to the knowledge base use case.

Deploying the Knowledge Base Infrastructure

To deploy the knowledge base infrastructure, including the OpenSearch collection and the Lambda function for index creation, you would use the AWS CDK to synthesize the CloudFormation template and deploy the stack. The CDK allows you to define and provision cloud resources using familiar programming languages like Python.

Conclusion

This blog post explored the process of creating a knowledge base index using AWS Lambda and OpenSearch Serverless. The provided code sets up the necessary infrastructure, including an OpenSearch collection, encryption and network policies, and an IAM role for the Lambda function responsible for index creation.

By leveraging OpenSearch Serverless and its vector similarity search capabilities, you can build powerful knowledge base applications that provide fast and accurate search results. The combination of AWS Lambda and OpenSearch Serverless allows for a serverless and scalable architecture, reducing operational overhead and enabling you to focus on building innovative knowledge base solutions.

CDK Code

import os, re, json
from aws_cdk import (
aws_lambda as lambda_,
CustomResource,
aws_iam as iam,
aws_kms as kms,
aws_sns as sns,
aws_logs as logs,
aws_stepfunctions as sfn,
aws_events as ev,
aws_events_targets as evt,
Size,
Duration,
custom_resources as cr,
Stack,
RemovalPolicy,
BundlingOptions,
aws_scheduler as sc,
aws_bedrock as bedrock,
aws_opensearchserverless as opensearchserverless,
)
from constructs import Construct
from typing import Any, Dict, List, Optional


def getBundlingCommand(package_name):

return [
"bash",
"-c",
" && ".join(
[
"mkdir /asset-output/python",
("pip install --no-cache -t /asset-output/python -I {}").format(
package_name
),
"cp -au . /asset-output/python",
]
),
]



def get_access_policy(collection_name: str, principal_arns: List[str]) -> str:
policy = [
{
"Rules": [
{
"ResourceType": "index",
"Resource": [
f"index/{collection_name}/*",
],
"Permission": [
"aoss:UpdateIndex",
"aoss:DescribeIndex",
"aoss:ReadDocument",
"aoss:WriteDocument",
"aoss:CreateIndex",
],
},
{
"ResourceType": "collection",
"Resource": [
f"collection/{collection_name}",
],
"Permission": [
"aoss:DescribeCollectionItems",
"aoss:CreateCollectionItems",
"aoss:UpdateCollectionItems",
],
},
],
"Principal": principal_arns,
}
]
return json.dumps(policy)


def get_encryption_policy(
collection_name: str, kms_key_arn: Optional[str] = None
) -> str:
policy: Dict[str, Any] = {
"Rules": [
{
"ResourceType": "collection",
"Resource": [
f"collection/{collection_name}",
],
}
],
}
if kms_key_arn:
policy["KmsARN"] = kms_key_arn
else:
policy["AWSOwnedKey"] = True
return json.dumps(policy)


def get_network_policy(
collection_name: str, vpc_endpoints: Optional[List[str]] = None
) -> str:
policy: List[Dict[str, Any]] = [
{
"Rules": [
{
"ResourceType": "dashboard",
"Resource": [
f"collection/{collection_name}",
],
},
{
"ResourceType": "collection",
"Resource": [
f"collection/{collection_name}",
],
},
],
}
]
if vpc_endpoints:
policy[0]["SourceVPCEs"] = vpc_endpoints
else:
policy[0]["AllowFromPublic"] = True
return json.dumps(policy)


class KnowledgeBaseConstruct(Construct):

def __init__(
self,
scope: Construct,
id: str,
stage: str,
application_name: str,
**kwargs,
) -> None:
super().__init__(scope, id, **kwargs)

LayerVersion_awsauth_latest = lambda_.LayerVersion(
scope=self,
id="LayerVersion_awsauth_latest",
layer_version_name=f"{application_name}-awsauth-python-layer",
description="awsauth-python-layer",
compatible_architectures=[lambda_.Architecture.X86_64],
compatible_runtimes=[lambda_.Runtime.PYTHON_3_12],
code=lambda_.Code.from_asset(
path=os.path.join(os.path.dirname(__file__), "EmptyBundle"),
bundling=BundlingOptions(
image=lambda_.Runtime.PYTHON_3_12.bundling_image,
command=getBundlingCommand(

"requests-aws4auth",
),
),
),
)
LayerVersion_opensearch_py_latest = lambda_.LayerVersion(
scope=self,
id="LayerVersion_opensearch-py_latest",
layer_version_name=f"{application_name}-opensearch-py-python-layer",
description="opensearch-py-python-layer",
compatible_architectures=[lambda_.Architecture.X86_64],
compatible_runtimes=[lambda_.Runtime.PYTHON_3_12],
code=lambda_.Code.from_asset(
path=os.path.join(os.path.dirname(__file__), "EmptyBundle"),
bundling=BundlingOptions(
image=lambda_.Runtime.PYTHON_3_12.bundling_image,
command=getBundlingCommand(

"opensearch-py",
),
),
),
)

create_index_lambda_execution_role = iam.Role(
self,
"CreateIndexExecutionRole",
assumed_by=iam.ServicePrincipal("lambda.amazonaws.com"),
description="Managed by CDK - {}".format(application_name),
managed_policies=[
iam.ManagedPolicy.from_aws_managed_policy_name(
"service-role/AWSLambdaBasicExecutionRole"
)
],
)

open_search_collection = opensearchserverless.CfnCollection(
self,
"OpenSearchCollection",
name=f"{application_name}-{stage}-collection".lower(),
description="Managed by CDK - {}".format(application_name),
type="VECTORSEARCH",
)
key = kms.Key(
self,
"OpenSearchKMSKey",
alias=f"{application_name}-{stage}-opensearch-serverless",
enable_key_rotation=True,
)

cfn_encryption_policy = opensearchserverless.CfnSecurityPolicy(
self,
"OpenSearchCollectionSecurityPolicy",
name=f"{application_name}-{stage}-security-policy".lower(),
type="encryption",
description="Managed by CDK - {}".format(application_name),
policy=get_encryption_policy(
collection_name=open_search_collection.name,
kms_key_arn=key.key_arn,
),
)
cfn_network_policy = opensearchserverless.CfnSecurityPolicy(
self,
"OpenSearchNetworkPolicy",
name=f"{application_name}-{stage}-network-policy".lower(),
description="Managed by CDK - {}".format(application_name),
type="network",
policy=get_network_policy(
collection_name=open_search_collection.name,
),
)
## Add Bedrock KB Role for OpenSearch access
principal_arns = [
iam.AccountPrincipal(Stack.of(self).account).arn,
create_index_lambda_execution_role.role_arn,
]
cfn_access_policy = opensearchserverless.CfnAccessPolicy(
self,
f"OpenSearchCollectionAccessPolicy",
name=f"{application_name}-{stage}-access-policy".lower(),
description="Managed by CDK - {}".format(application_name),
type="data",
policy=get_access_policy(
collection_name=open_search_collection.name,
principal_arns=principal_arns,
),
)
open_search_collection.add_dependency(cfn_encryption_policy)
open_search_collection.add_dependency(cfn_network_policy)
open_search_collection.add_dependency(cfn_access_policy)

vector_index_name = "bedrock-knowledgebase-index"
vector_field_name = "bedrock-knowledge-base-default-vector"

create_index_function = lambda_.Function(
self,
"CreateIndexFunction",
runtime=lambda_.Runtime.PYTHON_3_12,
handler="lambda_function.lambda_handler",
description="Managed by CDK - {}".format(application_name),
code=lambda_.Code.from_asset(
os.path.join(
os.path.dirname(__file__),
"lambdas/create_index",
)
),
role=create_index_lambda_execution_role,
layers=[LayerVersion_awsauth_latest, LayerVersion_opensearch_py_latest],
environment={
"REGION_NAME": Stack.of(self).region,
"COLLECTION_HOST": open_search_collection.attr_collection_endpoint,
"VECTOR_INDEX_NAME": vector_index_name,
"VECTOR_FIELD_NAME": vector_field_name,
},
timeout=Duration.seconds(900),
tracing=lambda_.Tracing.ACTIVE,
)
create_index_lambda_execution_role.add_to_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["aoss:APIAccessAll"],
resources=[
f"arn:aws:aoss:{Stack.of(self).region}:{Stack.of(self).account}:collection/{open_search_collection.attr_id}"
],
)
)
lambda_provider = cr.Provider(
self,
"LambdaCreateIndexCustomProvider",
on_event_handler=create_index_function,
)

lambda_cr = CustomResource(
self,
"LambdaCreateIndexCustomResource",
service_token=lambda_provider.service_token,
)

kb_name = "BedrockKnowledgeBase"
text_field = "AMAZON_BEDROCK_TEXT_CHUNK"
metadata_field = "AMAZON_BEDROCK_METADATA"

embedding_model_arn = f"arn:aws:bedrock:{Stack.of(self).region}::foundation-model/amazon.titan-embed-text-v1"
# Create an execution role for the knowledge base
model_policy = iam.ManagedPolicy(
self,
f"AmazonBedrockFoundationModelPolicyForKnowledgeBase_{kb_name}",
statements=[
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["bedrock:InvokeModel"],
resources=[embedding_model_arn],
)
],
)

aoss_policy = iam.ManagedPolicy(
self,
f"AmazonBedrockOSSPolicyForKnowledgeBase_{kb_name}",
statements=[
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["aoss:APIAccessAll"],
resources=[
f"arn:aws:aoss:{Stack.of(self).region}:{Stack.of(self).account}:collection/{open_search_collection.attr_id}"
],
)
],
)

kb_role = iam.Role(
self,
"KnowledgeBaseRole",
assumed_by=iam.ServicePrincipal("bedrock.amazonaws.com"),
managed_policies=[model_policy, aoss_policy],
)
knowledge_base = bedrock.CfnKnowledgeBase(
self,
"KnowledgeBase",
role_arn=kb_role.role_arn,
name=kb_name,
description="Managed by CDK - {}".format(application_name),
knowledge_base_configuration=bedrock.CfnKnowledgeBase.KnowledgeBaseConfigurationProperty(
type="VECTOR",
vector_knowledge_base_configuration=bedrock.CfnKnowledgeBase.VectorKnowledgeBaseConfigurationProperty(
embedding_model_arn=embedding_model_arn
),
),
storage_configuration=bedrock.CfnKnowledgeBase.StorageConfigurationProperty(
type="OPENSEARCH_SERVERLESS",
opensearch_serverless_configuration=bedrock.CfnKnowledgeBase.OpenSearchServerlessConfigurationProperty(
collection_arn=open_search_collection.attr_arn,
field_mapping=bedrock.CfnKnowledgeBase.OpenSearchServerlessFieldMappingProperty(
metadata_field=metadata_field,
text_field=text_field,
vector_field=vector_field_name,
),
vector_index_name=vector_index_name,
),
),
)
knowledge_base.add_dependency(open_search_collection)
knowledge_base.node.add_dependency(lambda_cr)
s3_data_source = bedrock.CfnDataSource(
self,
"MyCfnDataSource",
knowledge_base_id=knowledge_base.attr_knowledge_base_id,
name=f"{application_name}_s3_source",
description="Managed by CDK - {}".format(application_name),
server_side_encryption_configuration=bedrock.CfnDataSource.ServerSideEncryptionConfigurationProperty(
kms_key_arn=key.key_arn
),
data_deletion_policy="RETAIN",
data_source_configuration=bedrock.CfnDataSource.DataSourceConfigurationProperty(
s3_configuration=bedrock.CfnDataSource.S3DataSourceConfigurationProperty(
bucket_arn=f"arn:aws:s3:::{genai_bucket}",
# the properties below are optional
inclusion_prefixes=["genai_folder/"],
),
type="S3",
),
)
s3_data_source.add_dependency(knowledge_base)

CreateIndexFunction

This lambda function has two files:

lambda_function.py

from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
import os
import boto3
import json
import logging
import cfnresponse
import time

HOST = os.environ.get("COLLECTION_HOST")
VECTOR_INDEX_NAME = os.environ.get("VECTOR_INDEX_NAME")
VECTOR_FIELD_NAME = os.environ.get("VECTOR_FIELD_NAME")
REGION_NAME = os.environ.get("REGION_NAME")
logger = logging.getLogger()
logger.setLevel(logging.INFO)


def log(message):
logger.info(message)


def lambda_handler(event, context):
"""
Lambda handler to create OpenSearch Index
"""
log(f"Event: {json.dumps(event)}")

session = boto3.Session()

# Get STS client from session
sts_client = session.client("sts")

# Get caller identity
caller_identity = sts_client.get_caller_identity()

# Print the caller identity information
log(f"Caller Identity: {caller_identity}")

# Specifically, print the ARN of the caller
log(f"ARN: {caller_identity['Arn']}")

creds = session.get_credentials()

# Get STS client from session
sts_client = session.client("sts")

# Get caller identity
caller_identity = sts_client.get_caller_identity()

# Print the caller identity information
log(f"Caller Identity: {caller_identity}")

# Specifically, print the ARN of the caller
log(f"ARN: {caller_identity['Arn']}")

log(f"HOST: {HOST}")
host = HOST.split("//")[1]

region = REGION_NAME
service = "aoss"
status = cfnresponse.SUCCESS
response = {}

try:
auth = AWSV4SignerAuth(creds, region, service)

client = OpenSearch(
hosts=[{"host": host, "port": 443}],
http_auth=auth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
pool_maxsize=20,
)
index_name = VECTOR_INDEX_NAME

if event["RequestType"] == "Create":
log(f"Creating index: {index_name}")

index_body = {
"settings": {
"index.knn": True,
"index.knn.algo_param.ef_search": 512,
},
"mappings": {
"properties": {
VECTOR_FIELD_NAME: {
"type": "knn_vector",
"dimension": 1536,
"method": {
"space_type": "innerproduct",
"engine": "FAISS",
"name": "hnsw",
"parameters": {
"m": 16,
"ef_construction": 512,
},
},
},
"AMAZON_BEDROCK_METADATA": {"type": "text", "index": False},
"AMAZON_BEDROCK_TEXT_CHUNK": {"type": "text"},
"id": {"type": "text"},
}
},
}

response = client.indices.create(index_name, body=index_body)

log(f"Response: {response}")

log("Sleeping for 1 minutes to let index create.")
time.sleep(60)

elif event["RequestType"] == "Delete":
log(f"Deleting index: {index_name}")
response = client.indices.delete(index_name)
log(f"Response: {response}")
else:
log("Continuing without action.")

except Exception as e:
logging.error("Exception: %s" % e, exc_info=True)
status = cfnresponse.FAILED

finally:
cfnresponse.send(event, context, status, response)

return {
"statusCode": 200,
"body": json.dumps("Create index lambda ran successfully."),
}

cfnresponse.py

# © 2023 Amazon Web Services, Inc. or its affiliates. All Rights Reserved.
#
# This AWS Content is provided subject to the terms of the AWS Customer Agreement
# available at http://aws.amazon.com/agreement or other written agreement between
# Customer and either Amazon Web Services, Inc. or Amazon Web Services EMEA SARL or both.

# # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0

from __future__ import print_function
import urllib3
import json
import logging

# Set up logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

SUCCESS = "SUCCESS"
FAILED = "FAILED"

http = urllib3.PoolManager()


def send(
event,
context,
responseStatus,
responseData,
physicalResourceId=None,
noEcho=False,
reason=None,
):
responseUrl = event["ResponseURL"]

logger.info(f"ResponseURL: {responseUrl}")

responseBody = {
"Status": responseStatus,
"Reason": reason
or "See the details in CloudWatch Log Stream: {}".format(
context.log_stream_name
),
"PhysicalResourceId": physicalResourceId or context.log_stream_name,
"StackId": event["StackId"],
"RequestId": event["RequestId"],
"LogicalResourceId": event["LogicalResourceId"],
"NoEcho": noEcho,
"Data": responseData,
}

json_responseBody = json.dumps(responseBody)

logger.info("Response body:")
logger.info(json_responseBody)

headers = {"content-type": "", "content-length": str(len(json_responseBody))}

try:
response = http.request(
"PUT", responseUrl, headers=headers, body=json_responseBody
)
logger.info(f"Status code: {response.status}")
except Exception as e:

logger.info(f"send(..) failed executing http.request(..): {e}")

--

--

Responses (2)