You can support us by downloading this article as PDF from the Link below. Download the guide as PDF

The article aims to take the reader through a step-by-step process of streaming logs from a CloudWatch log group to an AWS Elasticsearch cluster. It will take the reader through creating a lambda policy and a role. Then it will highlight how to create the elastic search subscription filter. Finally, it will show the user how to change the lambda function to allow the streaming of multiple log groups to a cluster.

Prerequisites

Prior to execution of the steps in this article, The reader should have;

  • An AWS account.
  • Created a user with permissions to create resources on the AWS account.
  • Created an Elasticsearch cluster on the AWS account and have access to the cluster either via a VPC or internet endpoint.

Create the Lambda Execution Role

We will use a lambda function to stream logs to Elasticsearch. On the AWS IAM console, click on policies. Then select create policy.

Create Policy

A window opens. Then select the JSON tab.

Create JSON Policy

On the JSON tab, paste the below commands. Equally important, ensure that you replace the resource arn, with your Elasticsearch cluster arn.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "es:*"
            ],
            "Effect": "Allow",
            "Resource": "arn:aws:es:eu-west-1:****************:domain/test-es-cluster/*"
        }
    ]
}

Click on the review policy tab and fill in the Name and Description of your policy. Then click create policy. Once done, still, on the IAM console, go back to roles and click on create roles.

Create Role

Select service role. Also, choose lambda as a use case.

Select Service and Use Case

Then click on permissions and on policies select the policy you created earlier.

Attach Policy to Role

Click on Tags and add the tags you would like for your role. Then click on review and enter the name and description for the role. Then, click on create role. You now have your lambda role ready.

Edit Trust Relations for Your Lambda Role

On the AWS IAM console, select the lambda role we created above. Then select the Trust Relationship tab and click on Edit Trust Relationship.

Edit Trust Relationship for Lambda Role

On the window that opens, remove everything on the policy document and paste the below code. Then click on update trust policy.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

Your lambda role is now ready to stream logs to Elasticsearch Kibana.

Create an Elasticsearch Subscription for your Log Group

On the CloudWatch console, select log groups. Select the log group you want to create the Elasticsearch subscription. On the log group window, select actions and choose create Elasticsearch subscription filter from the drop-down menu.

Create ElasticSearch Subscription Filter

On the window that opens up, select the account where your ES cluster is created. For our case, it is on the same account as the CloudWatch log group. We chose “This account”, and the Amazon ES cluster we want to stream our logs to. Then choose the Lambda execution role that you created earlier from the drop-down list under Lambda Function.

Choose Elasticsearch Cluster and Lambda Execution Role

Scrolling down, you will be asked to configure your log format and filters. See below.

Configure Log Format and Filters

Select your preferred Log. Also, you can test and see how it will look like. If satisfied, click on start streaming. You should now see your logs as indices on the Elasticsearch Kibana.

Modify Lambda Function to Stream Logs from Multiple Log Groups

To stream logs from multiple, CloudWatch log groups to the Elasticsearch cluster, we have to modify the code of the original Lambda function created above. Replace your Lambda Function code with the below code. The only thing you need to change on the code is the var endpoint (Line 5 of code snippet). Ensure that you replace that with your Elasticsearch cluster endpoint. When done click save.

// v1.1.2
var https = require('https');
var zlib = require('zlib');
var crypto = require('crypto');

var endpoint = 'search-test-es-cluster-**************************.eu-west-1.es.amazonaws.com';

// Set this to true if you want to debug why data isn't making it to
// your Elasticsearch cluster. This will enable logging of failed items
// to CloudWatch Logs.
var logFailedResponses = false;

exports.handler = function(input, context) {
    // decode input from base64
    var zippedInput = new Buffer.from(input.awslogs.data, 'base64');

    // decompress the input
    zlib.gunzip(zippedInput, function(error, buffer) {
        if (error) { context.fail(error); return; }

        // parse the input from JSON
        var awslogsData = JSON.parse(buffer.toString('utf8'));

        // transform the input to Elasticsearch documents
        var elasticsearchBulkData = transform(awslogsData);

        // skip control messages
        if (!elasticsearchBulkData) {
            console.log('Received a control message');
            context.succeed('Control message handled successfully');
            return;
        }

        // post documents to the Amazon Elasticsearch Service
        post(elasticsearchBulkData, function(error, success, statusCode, failedItems) {
            console.log('Response: ' + JSON.stringify({
                "statusCode": statusCode
            }));

            if (error) {
                logFailure(error, failedItems);
                context.fail(JSON.stringify(error));
            } else {
                console.log('Success: ' + JSON.stringify(success));
                context.succeed('Success');
            }
        });
    });
};

function transform(payload) {
    if (payload.messageType === 'CONTROL_MESSAGE') {
        return null;
    }

    var bulkRequestBody = '';

    payload.logEvents.forEach(function(logEvent) {
        var timestamp = new Date(1 * logEvent.timestamp);

        // index name format: cwl-YYYY.MM.DD
        //var indexName = [
            //'cwl-' + timestamp.getUTCFullYear(),              // year
            //('0' + (timestamp.getUTCMonth() + 1)).slice(-2),  // month
            //('0' + timestamp.getUTCDate()).slice(-2)          // day
        //].join('.');
        // index name format: cwl-YYYY.MM.DD
        //var appName =payload.logGroup.toLowerCase();
        //var indexName = '';
        var indexName = [
            'cwl-' + payload.logGroup.toLowerCase().split('/').join('-') + '-' + timestamp.getUTCFullYear(), // log group + year
            ('0' + (timestamp.getUTCMonth() + 1)).slice(-2),  // month
            ('0' + timestamp.getUTCDate()).slice(-2)          // day
        ].join('.');
        var source = buildSource(logEvent.message, logEvent.extractedFields);
        source['@id'] = logEvent.id;
        source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString();
        source['@message'] = logEvent.message;
        source['@owner'] = payload.owner;
        source['@log_group'] = payload.logGroup;
        source['@log_stream'] = payload.logStream;

        var action = { "index": {} };
        action.index._index = indexName;
        action.index._type = payload.logGroup;
        action.index._id = logEvent.id;

        bulkRequestBody += [
            JSON.stringify(action),
            JSON.stringify(source),
        ].join('\n') + '\n';
    });
    return bulkRequestBody;
}

function buildSource(message, extractedFields) {
    if (extractedFields) {
        var source = {};

        for (var key in extractedFields) {
            if (extractedFields.hasOwnProperty(key) && extractedFields[key]) {
                var value = extractedFields[key];

                if (isNumeric(value)) {
                    source[key] = 1 * value;
                    continue;
                }

                jsonSubString = extractJson(value);
                if (jsonSubString !== null) {
                    source['$' + key] = JSON.parse(jsonSubString);
                }

                source[key] = value;
            }
        }
        return source;
    }

    jsonSubString = extractJson(message);
    if (jsonSubString !== null) {
        return JSON.parse(jsonSubString);
    }

    return {};
}

function extractJson(message) {
    var jsonStart = message.indexOf('{');
    if (jsonStart < 0) return null;
    var jsonSubString = message.substring(jsonStart);
    return isValidJson(jsonSubString) ? jsonSubString : null;
}

function isValidJson(message) {
    try {
        JSON.parse(message);
    } catch (e) { return false; }
    return true;
}

function isNumeric(n) {
    return !isNaN(parseFloat(n)) && isFinite(n);
}

function post(body, callback) {
    var requestParams = buildRequest(endpoint, body);

    var request = https.request(requestParams, function(response) {
        var responseBody = '';
        response.on('data', function(chunk) {
            responseBody += chunk;
        });

        response.on('end', function() {
            var info = JSON.parse(responseBody);
            var failedItems;
            var success;
            var error;

            if (response.statusCode >= 200 && response.statusCode < 299) {
                failedItems = info.items.filter(function(x) {
                    return x.index.status >= 300;
                });

                success = {
                    "attemptedItems": info.items.length,
                    "successfulItems": info.items.length - failedItems.length,
                    "failedItems": failedItems.length
                };
            }

            if (response.statusCode !== 200 || info.errors === true) {
                // prevents logging of failed entries, but allows logging
                // of other errors such as access restrictions
                delete info.items;
                error = {
                    statusCode: response.statusCode,
                    responseBody: info
                };
            }

            callback(error, success, response.statusCode, failedItems);
        });
    }).on('error', function(e) {
        callback(e);
    });
    request.end(requestParams.body);
}

function buildRequest(endpoint, body) {
    var endpointParts = endpoint.match(/^([^\.]+)\.?([^\.]*)\.?([^\.]*)\.amazonaws\.com$/);
    var region = endpointParts[2];
    var service = endpointParts[3];
    var datetime = (new Date()).toISOString().replace(/[:\-]|\.\d{3}/g, '');
    var date = datetime.substr(0, 8);
    var kDate = hmac('AWS4' + process.env.AWS_SECRET_ACCESS_KEY, date);
    var kRegion = hmac(kDate, region);
    var kService = hmac(kRegion, service);
    var kSigning = hmac(kService, 'aws4_request');

    var request = {
        host: endpoint,
        method: 'POST',
        path: '/_bulk',
        body: body,
        headers: {
            'Content-Type': 'application/json',
            'Host': endpoint,
            'Content-Length': Buffer.byteLength(body),
            'X-Amz-Security-Token': process.env.AWS_SESSION_TOKEN,
            'X-Amz-Date': datetime
        }
    };

    var canonicalHeaders = Object.keys(request.headers)
        .sort(function(a, b) { return a.toLowerCase() < b.toLowerCase() ? -1 : 1; })
        .map(function(k) { return k.toLowerCase() + ':' + request.headers[k]; })
        .join('\n');

    var signedHeaders = Object.keys(request.headers)
        .map(function(k) { return k.toLowerCase(); })
        .sort()
        .join(';');

    var canonicalString = [
        request.method,
        request.path, '',
        canonicalHeaders, '',
        signedHeaders,
        hash(request.body, 'hex'),
    ].join('\n');

    var credentialString = [ date, region, service, 'aws4_request' ].join('/');

    var stringToSign = [
        'AWS4-HMAC-SHA256',
        datetime,
        credentialString,
        hash(canonicalString, 'hex')
    ] .join('\n');

    request.headers.Authorization = [
        'AWS4-HMAC-SHA256 Credential=' + process.env.AWS_ACCESS_KEY_ID + '/' + credentialString,
        'SignedHeaders=' + signedHeaders,
        'Signature=' + hmac(kSigning, stringToSign, 'hex')
    ].join(', ');

    return request;
}

function hmac(key, str, encoding) {
    return crypto.createHmac('sha256', key).update(str, 'utf8').digest(encoding);
}

function hash(str, encoding) {
    return crypto.createHash('sha256').update(str, 'utf8').digest(encoding);
}

function logFailure(error, failedItems) {
    if (logFailedResponses) {
        console.log('Error: ' + JSON.stringify(error, null, 2));

        if (failedItems && failedItems.length > 0) {
            console.log("Failed Items: " +
                JSON.stringify(failedItems, null, 2));
        }
    }
}

You can now stream multiple log streams to your Elasticsearch cluster Kibana.

Related articles:

AWS Learning courses:

$14.04
$152.14
in stock
Udemy.com
$14.04
$152.14
in stock
Udemy.com
$19.90
$175.55
in stock
Udemy.com
$14.04
$152.14
in stock
Udemy.com
You can support us by downloading this article as PDF from the Link below. Download the guide as PDF