How To

Stream AWS CloudWatch Logs to OpenSearch

Amazon OpenSearch Service (formerly Amazon Elasticsearch Service) gives you a managed search and analytics engine for centralizing log data. When you combine it with CloudWatch Logs, you get a pipeline that streams application and infrastructure logs directly into OpenSearch for real-time search, filtering, and visualization – no manual export required.

Original content from computingforgeeks.com - post 72559

This guide walks through setting up a complete CloudWatch Logs to OpenSearch streaming pipeline using AWS Lambda as the bridge. We cover domain creation, IAM roles, subscription filters, and cost optimization strategies that keep your log analytics bill under control.

Prerequisites

Before starting, confirm you have the following in place:

  • An active AWS account with administrative access (or sufficient IAM permissions for OpenSearch, CloudWatch, Lambda, and IAM)
  • AWS CLI installed and configured with valid credentials
  • An existing CloudWatch Log Group with active log streams (or we will create one in Step 2)
  • A VPC with at least two subnets in different Availability Zones if deploying OpenSearch in VPC mode
  • Basic familiarity with JSON IAM policies and AWS console navigation

Step 1: Create an OpenSearch Domain

The OpenSearch domain is the cluster that receives and indexes your log data. For a production setup, use at least two data nodes across Availability Zones. For testing, a single-node domain works fine.

Create a domain using the AWS CLI. This example creates a small domain suitable for log ingestion testing:

aws opensearch create-domain \
  --domain-name cloudwatch-logs \
  --engine-version OpenSearch_2.13 \
  --cluster-config InstanceType=t3.small.search,InstanceCount=1 \
  --ebs-options EBSEnabled=true,VolumeType=gp3,VolumeSize=20 \
  --access-policies '{
    "Version": "2012-10-17",
    "Statement": [{
      "Effect": "Allow",
      "Principal": {"AWS": "*"},
      "Action": "es:*",
      "Resource": "arn:aws:es:us-east-1:ACCOUNT_ID:domain/cloudwatch-logs/*",
      "Condition": {"IpAddress": {"aws:SourceIp": "YOUR_IP/32"}}
    }]
  }'

Replace ACCOUNT_ID with your 12-digit AWS account ID and YOUR_IP with your public IP address. The access policy restricts direct access to your IP – the Lambda function uses IAM role-based access instead.

Domain creation takes 15-20 minutes. Check the status with:

aws opensearch describe-domain --domain-name cloudwatch-logs --query 'DomainStatus.Processing'

When the domain is ready, the command returns false. Retrieve the domain endpoint – you will need it later:

aws opensearch describe-domain --domain-name cloudwatch-logs --query 'DomainStatus.Endpoint' --output text

The output returns the domain endpoint URL similar to:

search-cloudwatch-logs-abc123xyz.us-east-1.es.amazonaws.com

Save this endpoint – it is the destination for your log data.

Production Sizing Recommendations

For production workloads, adjust the cluster configuration based on your log volume:

Log VolumeRecommended Config
Under 10 GB/day2x t3.medium.search, 50 GB gp3 EBS each
10-50 GB/day3x m6g.large.search, 200 GB gp3 EBS each
50-200 GB/day3x r6g.xlarge.search, 500 GB gp3 EBS, 3 dedicated masters

Step 2: Create a CloudWatch Log Group

If you already have log groups with active streams (from EC2 instances, Lambda functions, or ECS containers), skip to Step 3. Otherwise, create a log group to test the pipeline.

Create a new log group:

aws logs create-log-group --log-group-name /aws/test/application-logs

Set a retention policy to control storage costs. This keeps logs for 30 days:

aws logs put-retention-policy --log-group-name /aws/test/application-logs --retention-in-days 30

Push a test log event to verify the log group is working:

aws logs create-log-stream --log-group-name /aws/test/application-logs --log-stream-name test-stream

aws logs put-log-events \
  --log-group-name /aws/test/application-logs \
  --log-stream-name test-stream \
  --log-events timestamp=$(date +%s000),message="Test log event from CloudWatch"

Confirm the event was ingested:

aws logs get-log-events --log-group-name /aws/test/application-logs --log-stream-name test-stream

The response should include your test message in the events array, confirming the log group is ready for streaming.

Step 3: Create an IAM Role for the Lambda Function

CloudWatch Logs uses a Lambda function to transform and forward log data to OpenSearch. This function needs an IAM role with permissions to read from CloudWatch Logs and write to your OpenSearch domain.

Create the trust policy file that allows Lambda to assume the role:

cat > /tmp/lambda-trust-policy.json << 'EOF'
{
  "Version": "2012-10-17",
  "Statement": [{
    "Effect": "Allow",
    "Principal": {"Service": "lambda.amazonaws.com"},
    "Action": "sts:AssumeRole"
  }]
}
EOF

Create the IAM role:

aws iam create-role \
  --role-name CWLtoOpenSearchRole \
  --assume-role-policy-document file:///tmp/lambda-trust-policy.json

The output confirms the role was created with the ARN you will reference later:

{
    "Role": {
        "RoleName": "CWLtoOpenSearchRole",
        "Arn": "arn:aws:iam::ACCOUNT_ID:role/CWLtoOpenSearchRole",
        ...
    }
}

Now attach the permissions policy. Create the policy document:

cat > /tmp/lambda-opensearch-policy.json << 'EOF'
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "es:ESHttpPost",
        "es:ESHttpPut",
        "es:ESHttpGet"
      ],
      "Resource": "arn:aws:es:us-east-1:ACCOUNT_ID:domain/cloudwatch-logs/*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": "arn:aws:logs:*:*:*"
    }
  ]
}
EOF

Replace ACCOUNT_ID with your AWS account ID, then create and attach the policy:

aws iam put-role-policy \
  --role-name CWLtoOpenSearchRole \
  --policy-name CWLtoOpenSearchPolicy \
  --policy-document file:///tmp/lambda-opensearch-policy.json

If your OpenSearch domain runs inside a VPC, the Lambda function also needs network permissions. Attach the managed VPC execution policy:

aws iam attach-role-policy \
  --role-name CWLtoOpenSearchRole \
  --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole

Verify the role has the correct policies attached:

aws iam list-role-policies --role-name CWLtoOpenSearchRole
aws iam list-attached-role-policies --role-name CWLtoOpenSearchRole

Both commands should list the policies we just attached, confirming the role is properly configured for Lambda to stream logs into OpenSearch.

Step 4: Create a CloudWatch Logs Subscription Filter

The subscription filter is the bridge that connects CloudWatch Logs to OpenSearch. When you create a subscription filter, AWS automatically provisions a Lambda function that decompresses, transforms, and forwards log events to your OpenSearch domain.

Using the AWS Console

The easiest way to set up the subscription filter with the auto-generated Lambda function:

  • Open the CloudWatch console and navigate to Logs > Log groups
  • Select your log group (e.g., /aws/test/application-logs)
  • Click the Subscription filters tab, then Create > Create Amazon OpenSearch Service subscription filter
  • Select your OpenSearch domain (cloudwatch-logs)
  • Select the Lambda IAM role (CWLtoOpenSearchRole)
  • Set the log format (JSON, space-delimited, or other) and define any filter pattern
  • Click Start streaming

AWS creates a Lambda function named LogsToElasticsearch_DOMAIN-NAME automatically.

Using the AWS CLI

If the Lambda function already exists (from a previous console setup or manual creation), create the subscription filter directly:

aws logs put-subscription-filter \
  --log-group-name /aws/test/application-logs \
  --filter-name "StreamToOpenSearch" \
  --filter-pattern "" \
  --destination-arn arn:aws:lambda:us-east-1:ACCOUNT_ID:function:LogsToElasticsearch_cloudwatch-logs

An empty --filter-pattern streams all log events. To stream only specific events, use a filter pattern like "ERROR" or "{ $.statusCode = 500 }" for JSON logs.

Grant CloudWatch Logs permission to invoke the Lambda function:

aws lambda add-permission \
  --function-name LogsToElasticsearch_cloudwatch-logs \
  --statement-id AllowCloudWatchLogs \
  --principal logs.amazonaws.com \
  --action lambda:InvokeFunction \
  --source-arn arn:aws:logs:us-east-1:ACCOUNT_ID:log-group:/aws/test/application-logs:*

Verify the subscription filter was created:

aws logs describe-subscription-filters --log-group-name /aws/test/application-logs

The response should show your filter with the destination ARN pointing to your Lambda function.

Step 5: Verify Log Data in OpenSearch Dashboards

After creating the subscription filter, new log events from the CloudWatch log group start flowing into OpenSearch within a few minutes. Generate some test traffic to confirm the pipeline works end-to-end.

Push test events into the log group:

for i in $(seq 1 5); do
  aws logs put-log-events \
    --log-group-name /aws/test/application-logs \
    --log-stream-name test-stream \
    --log-events timestamp=$(date +%s000),message="Test event $i - pipeline verification" \
    --sequence-token $(aws logs describe-log-streams \
      --log-group-name /aws/test/application-logs \
      --log-stream-name-prefix test-stream \
      --query 'logStreams[0].uploadSequenceToken' --output text)
done

Wait 2-3 minutes for Lambda to process the events, then query OpenSearch directly to confirm the data arrived:

ENDPOINT=$(aws opensearch describe-domain --domain-name cloudwatch-logs --query 'DomainStatus.Endpoint' --output text)

curl -s "https://$ENDPOINT/_cat/indices?v"

You should see an index named cwl-YYYY.MM.DD with a document count matching your test events:

health status index           uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   cwl-2026.03.22  xYz123AbC...           5   1          5            0     28.5kb         14.2kb

To view the actual log content, query the index:

curl -s "https://$ENDPOINT/cwl-*/_search?pretty&size=3" | python3 -m json.tool

The response shows your log events with metadata including the log group, log stream, timestamp, and message content.

Step 6: Create Index Patterns and Visualizations

OpenSearch Dashboards (the Kibana-based UI included with every OpenSearch domain) lets you search, filter, and build dashboards from your log data.

Access the Dashboards URL from your domain details:

aws opensearch describe-domain --domain-name cloudwatch-logs --query 'DomainStatus.DashboardEndpoint' --output text

Open the returned URL in your browser. Create an index pattern to start exploring your data:

  • Navigate to Stack Management > Index Patterns
  • Click Create index pattern and enter cwl-* as the pattern
  • Select @timestamp as the time field
  • Click Create index pattern

Now go to Discover in the left sidebar. You should see your log events with fields like @message, @log_group, @log_stream, and @timestamp.

Build a Log Volume Dashboard

Create a visualization to track log volume over time:

  • Go to Visualize > Create visualization
  • Select Vertical bar chart and choose the cwl-* index pattern
  • Set the Y-axis metric to Count
  • Add a bucket on the X-axis with Date histogram using @timestamp
  • Optionally split series by @log_group.keyword to see volume per log group
  • Save and add to a dashboard

For error tracking, create a saved search in Discover with a filter like @message: "ERROR" OR @message: "Exception". This gives you a quick view of problems across all your streamed log groups.

Step 7: Use CloudWatch Logs Insights as an Alternative

If you need quick ad-hoc log queries without the overhead of maintaining an OpenSearch domain, CloudWatch Logs Insights provides a built-in query engine directly in the AWS console.

Run a query against your log group using the CLI:

aws logs start-query \
  --log-group-name /aws/test/application-logs \
  --start-time $(date -d '1 hour ago' +%s) \
  --end-time $(date +%s) \
  --query-string 'fields @timestamp, @message | sort @timestamp desc | limit 20'

This returns a query ID. Retrieve the results with:

aws logs get-query-results --query-id "YOUR_QUERY_ID"

Common Insights queries for troubleshooting:

# Count errors per hour
fields @timestamp, @message
| filter @message like /ERROR/
| stats count(*) as error_count by bin(1h)

# Find top error messages
fields @message
| filter @message like /ERROR/
| stats count(*) as cnt by @message
| sort cnt desc
| limit 10

# Latency analysis for API Gateway logs
fields @timestamp, @message
| filter @message like /Integration latency/
| parse @message "Integration latency: *" as latency
| stats avg(latency), max(latency), p99(latency) by bin(5m)

Logs Insights charges per query based on data scanned - roughly $0.005 per GB. For occasional queries, this is cheaper than running an OpenSearch domain. For continuous monitoring with dashboards and alerts, OpenSearch is the better fit.

Step 8: Cost Optimization for CloudWatch to OpenSearch Streaming

Log streaming can get expensive quickly if you send everything without filtering. Here are practical strategies to keep costs in check.

Set Log Retention Policies

CloudWatch charges for log storage. Without a retention policy, logs accumulate forever. Set retention on every log group:

aws logs put-retention-policy --log-group-name /aws/test/application-logs --retention-in-days 14

Common retention periods: 7 days for debug logs, 30 days for application logs, 90 days for audit/compliance logs. Adjust based on your requirements.

Use Subscription Filter Patterns

Instead of streaming all log events, filter at the source. This reduces Lambda invocations and OpenSearch ingestion volume:

# Stream only errors and warnings
aws logs put-subscription-filter \
  --log-group-name /aws/test/application-logs \
  --filter-name "ErrorsOnly" \
  --filter-pattern "?ERROR ?WARN ?CRITICAL" \
  --destination-arn arn:aws:lambda:us-east-1:ACCOUNT_ID:function:LogsToElasticsearch_cloudwatch-logs

The ? prefix matches any of the specified terms. For JSON-structured logs, use metric filter syntax:

# Stream only 5xx errors from JSON logs
aws logs put-subscription-filter \
  --log-group-name /aws/test/application-logs \
  --filter-name "5xxErrors" \
  --filter-pattern '{ $.statusCode >= 500 }' \
  --destination-arn arn:aws:lambda:us-east-1:ACCOUNT_ID:function:LogsToElasticsearch_cloudwatch-logs

Manage OpenSearch Index Lifecycle

Old indices consume storage and cluster resources. Set up an index lifecycle policy using OpenSearch Index State Management (ISM):

ENDPOINT=$(aws opensearch describe-domain --domain-name cloudwatch-logs --query 'DomainStatus.Endpoint' --output text)

curl -s -X PUT "https://$ENDPOINT/_plugins/_ism/policies/cwl-cleanup" \
  -H 'Content-Type: application/json' \
  -d '{
  "policy": {
    "description": "Delete CWL indices after 30 days",
    "default_state": "hot",
    "states": [
      {
        "name": "hot",
        "actions": [],
        "transitions": [{"state_name": "delete", "conditions": {"min_index_age": "30d"}}]
      },
      {
        "name": "delete",
        "actions": [{"delete": {}}],
        "transitions": []
      }
    ],
    "ism_template": [{"index_patterns": ["cwl-*"], "priority": 100}]
  }
}'

This policy automatically deletes cwl-* indices older than 30 days. Adjust the min_index_age based on how long you need searchable log data.

Cost Comparison Overview

Use this table to estimate monthly costs based on your log volume:

ComponentCost Factor
CloudWatch Logs ingestion$0.50 per GB ingested
CloudWatch Logs storage$0.03 per GB/month
Lambda invocations$0.20 per 1M requests + compute time
OpenSearch t3.small.search~$26/month per instance
OpenSearch EBS storage (gp3)$0.08 per GB/month
CloudWatch Logs Insights$0.005 per GB scanned

For low-volume environments (under 5 GB/day), the Lambda and OpenSearch costs dominate. Consider using CloudWatch Logs Insights alone for ad-hoc queries and only deploy OpenSearch when you need persistent dashboards or cross-log-group correlation.

Conclusion

You now have a working pipeline that streams CloudWatch Logs into Amazon OpenSearch for search, analysis, and visualization. The subscription filter handles the heavy lifting - Lambda decompresses and transforms log events automatically, and OpenSearch indexes them for fast querying through Dashboards.

For production environments, enable fine-grained access control on the OpenSearch domain, set up multi-AZ deployment for high availability, configure automated snapshots, and use ISM policies to manage index lifecycle. Monitor your Lambda function's error rate and duration metrics in CloudWatch to catch pipeline issues early.

Related Articles

Cloud Use Vault-Agent sidecar to inject Secrets in Vault to Kubernetes Pod Databases Installing PostgreSQL 14 on Rocky Linux 9 / AlmaLinux 9 Databases Install PostgreSQL 14 on Ubuntu 22.04 (Jammy Jellyfish) Automation Automate Bitbucket Tasks via Terraform

Leave a Comment

Press ESC to close