Lambda Log Filter


Introduction

When designing reliable cloud services you may have a usecase where your service needs to react to a failure. AWS Cloudwatch has a feature called subscription filter which lets you filter for a specific log message for example “failure” then kinesis stream would pick up that message and you can have a lambda function react to that event. You could also use this for data collection where you want to look at a window of logs for particular event and then produce metrics for that.

Demonstration

In this demonstration you will learn how to build a lambda function with docker container to publish log messages. These log messages will be filtered out and sent to Kinesis stream which will aggregate the logs to s3 bucket. You can then later do some post processing after that logs arrive in s3 or while the logs are streaming.

System Diagram

Setup

CDK comes with scaffolding to generate a project in your desired language. For me I’m using python:

cdk init app --language=python

Docker + Lambda

Create a directory called lambda in the app subdirectory. Then create a docker container like this:


FROM amazon/aws-lambda-python:latest
COPY app.py .
COPY requirements.txt .
RUN yum update -y && yum install -y python3 python3-dev python3-pip gcc && rm -Rf /var/cache/yum && \
pip install -r requirements.txt

Create a requirements.txt file:

boto3

Create a python lambda handler function. Call it app.py


def handler(event, context):
    print("my app was triggered ...")
    print("failure")
    return  

Add this to the main stack

Create roles for kinesis stream And Lambda Function


    role = iam.Role( self, 
                         "lambda-role",
                         assumed_by = iam.ServicePrincipal("lambda.amazonaws.com")
                )
    role.add_managed_policy(
        iam.ManagedPolicy.from_aws_managed_policy_name(
                "service-role/AWSLambdaBasicExecutionRole"
            )
        )

    kinesis_stream_role = iam.Role( self, 
                         "kinesis-stream-role",
                         assumed_by = iam.ServicePrincipal("firehose.amazonaws.com")
                )
        
    kinesis_stream_role.add_to_policy(
            iam.PolicyStatement(
                actions=[
                    "kinesis:*",
                    "firehose:*",
                    "logs:PutLogEvents",
                    "kms:*",
                    "s3:*"
                ],
                resources=["*"],
            )
        )
   
    kinesis_stream_role.add_managed_policy(
            iam.ManagedPolicy.from_aws_managed_policy_name(
                "AmazonKinesisReadOnlyAccess"
            )
        )


Create the bucket and stream. And create lambda function


    stream = kinesis.Stream(self, "pipeline", shard_count=1)
    bucket = s3.Bucket(self,"logs",bucket_name="test12344logs")

    busybox_fn = lambda_.DockerImageFunction(
            self,
            "example_function",
            timeout=Duration.seconds(10),
            memory_size=256,
            role=role,
            tracing=lambda_.Tracing.ACTIVE,
            code=lambda_.DockerImageCode.from_image_asset(
                directory="lambdas",cmd=["app.handler"]
            )
    )

Create the subscription filter with the condition you want to trigger this for

    busybox_fn.log_group.add_subscription_filter("error-filter", destination=destinations.KinesisDestination(stream),
        filter_pattern=logs.FilterPattern.all_terms("failure"))

Create the firehose delivery stream

        firehose.CfnDeliveryStream(self, "data-stream",
                delivery_stream_type="KinesisStreamAsSource",
                kinesis_stream_source_configuration=firehose.CfnDeliveryStream.KinesisStreamSourceConfigurationProperty(
                    kinesis_stream_arn=stream.stream_arn,
                    role_arn = kinesis_stream_role.role_arn
                ),
                s3_destination_configuration=firehose.CfnDeliveryStream.S3DestinationConfigurationProperty(
                    bucket_arn= bucket.bucket_arn,
                    role_arn=kinesis_stream_role.role_arn,
                    buffering_hints=firehose.CfnDeliveryStream.BufferingHintsProperty(
                        interval_in_seconds=60,
                        size_in_m_bs=5
                    ),
                    compression_format="GZIP"
                )
            )

UI Image

Trigger your lambda function via console and then you should see logs show up in s3.

Once the logs arrive in s3 you can download extract it and see the content.

In more practical case you would probably write some python code to pulldown the directory and process this in batch.

$ aws s3 cp s3://test12344logs/2022/09/04/01/CodeStack-datastream-53vB7U6tbd4U-3-2022-09-04-01-45-38-e6461f25-9f44-48d2-ad8f-a2abeb7af4a3.gz .

$  gzip -d CodeStack-datastream-53vB7U6tbd4U-3-2022-09-04-01-45-38-e6461f25-9f44-48d2-ad8f-a2abeb7af4a3.gz

$ zcat CodeStack-datastream-53vB7U6tbd4U-3-2022-09-04-01-45-38-e6461f25-9f44-48d2-ad8f-a2abeb7af4a3 | jq .

Log Output

Zak's AI.Assist

Session only - not saved