Lambda Function Setup Guide for IAM Event Notifications in Slack

Overview

This document provides a step-by-step guide to creating a Lambda function that sends notifications to Slack when:

  • A new IAM user is created.
  • A permission (policy) is attached to an IAM user.

Prerequisites

  1. AWS Account with necessary permissions to create and configure Lambda, IAM, CloudTrail, and CloudWatch Logs.
  2. Slack workspace with permissions to create a new app and generate an incoming webhook URL.

Architecture

 IAM event -> CloudTrail -> CloudWatch Logs -> Lambda Function -> Slack

 

Step 1: Setup CloudTrail

  1. Go to CloudTrail Console:
  • Navigate to the AWS Management Console.
  • Go to the CloudTrail service.

2. Create or Configure a Trail:

  • Create a new trail or use an existing one.
  • Ensure that the trail is configured to log management events.
  • Enable the trail to send logs to CloudWatch Logs.

Step 2: Setup CloudWatch Logs

  1. Create Log Group:
  • Navigate to CloudWatch in the AWS Management Console.
  • Create a new log group or use an existing one to receive CloudTrail logs

Step 3: Create Lambda Function

  1. Create a Lambda Function:
  • Go to the AWS Lambda Console.
  • Create a new Lambda function with a suitable name, runtime (Python 3.8+), and role with the necessary permissions.

2. Add the Following Code:

import json

import os

import base64

import gzip

import urllib3

webhook_url = os.environ.get('WEBHOOK_URL')

def send_slack_message(message):

    http = urllib3.PoolManager()

    payload = {

        "blocks": message

    }

    encoded_payload = json.dumps(payload).encode('utf-8')

    response = http.request(

        'POST',

        webhook_url,

        body=encoded_payload,

        headers={'Content-Type': 'application/json'},

    )

    return response.status

def create_slack_message(event_detail):

    event_name = event_detail['eventName']

    actor_name = event_detail['userIdentity']['userName']

    if event_name == 'CreateUser':

        user_name = event_detail['requestParameters']['userName']

        title = "New IAM User Created"

        message = f"A new IAM user `{user_name}` has been created by `{actor_name}`."

        color = "#36a64f"

    elif event_name in ['AttachUserPolicy', 'PutUserPolicy']:

        user_name = event_detail['requestParameters']['userName']

        policy_name = event_detail['requestParameters']['policyArn']

        title = "Policy Attached to IAM User"

        message = f"Policy `{policy_name}` has been attached to user `{user_name}` by `{actor_name}`."

        color = "#FFA500"

    slack_message = [

        {

            "type": "section",

            "text": {

                "type": "mrkdwn",

                "text": f":rotating_light: *{title}* :rotating_light:"

            }

        },

        {

            "type": "section",

            "fields": [

                {

                    "type": "mrkdwn",

                    "text": f"*Event Name:*\n{event_name}"

                },

                {

                    "type": "mrkdwn",

                    "text": f"*User Name:*\n{user_name}"

                },

                {

                    "type": "mrkdwn",

                    "text": f"*Performed By:*\n{actor_name}"

                }

            ]

        },

        {

            "type": "section",

            "text": {

                "type": "mrkdwn",

                "text": message

            }

        },

        {

            "type": "divider"

        }

    ]

    return slack_message

def parse_event(event):

    if 'awslogs' in event:

        log_data = event['awslogs']['data']

        compressed_payload = base64.b64decode(log_data)

        uncompressed_payload = gzip.decompress(compressed_payload)

        log_events = json.loads(uncompressed_payload)['logEvents']

        for log_event in log_events:

            message = log_event['message']

            log_entry = json.loads(message)

            if log_entry['eventName'] in ['CreateUser', 'AttachUserPolicy', 'PutUserPolicy']:

                slack_message = create_slack_message(log_entry)

                send_slack_message(slack_message)

def lambda_handler(event, context):

    try:

        parse_event(event)

    except Exception as e:

        print(f"Error parsing event: {e}")

    return {

        'statusCode': 200,

        'body': json.dumps('Notification sent!')

    }

# Example test event

if __name__ == "__main__":

    test_event = {

        'awslogs': {

            'data': base64.b64encode(gzip.compress(json.dumps({

                'logEvents': [

                    {

                        'message': json.dumps({

                            'eventName': 'CreateUser',

                            'requestParameters': {

                                'userName': 'NewTestUser'

                            },

                            'userIdentity': {

                                'userName': 'AdminUser',

                            }

                        })

                    }

                ]

            }).encode())).decode()

        }

    }

    lambda_handler(test_event, None)

Set Environment variable

  • Set the WEBHOOK_URL environment variable to the Slack webhook URL you created.

Step 4: Configure CloudWatch Logs Filter Pattern

  1. Create Metric Filter:
  • Go to the CloudWatch Console.
  • Select the log group that receives CloudTrail logs.
  • Create a new metric filter with the following pattern:
  • { ($.eventName = “CreateUser”) || ($.eventName = “AttachUserPolicy”) || ($.eventName = “PutUserPolicy”) }
  • Provide a name for the filter and configure it.

Step 5: Add CloudWatch Logs Trigger to Lambda

  1. Add Trigger:
  • Go to the Lambda Console.
  • Select your Lambda function.
  • In the “Function overview” section, click “Add trigger”.
  • Select “CloudWatch Logs” as the trigger type.
  • Choose the log group that contains the CloudTrail logs.
  • Specify the filter pattern created earlier.
  • Click “Add”.

Step 6: Test the Setup

  1. Generate Test Events:
  • Create a new IAM user or attach a policy to an existing user to generate test events.

2. Verify Slack Notifications:

  • Check your Slack channel to ensure notifications are being received as expected.

Conclusion

By following these steps, you have successfully set up a Lambda function to send notifications to Slack for IAM user creation and policy attachment events. This setup enhances security monitoring and provides real-time alerts for critical IAM changes.

Blog Pundits: Sandeep Rawat
 
Opstree is an End to End DevOps solution provider
 
 

Connect With Us

Author: Ayush Yadav

I’m Avinaw Sharma, a DevOps Engineer at Opstree Solutions, specializing in cloud-native technologies, infrastructure automation, and continuous integration/continuous deployment (CI/CD) pipelines. With hands-on experience in tools like Kubernetes, Docker, Jenkins, and Terraform, I focus on building scalable and secure systems that support rapid development cycles. I am passionate about bridging the gap between development and operations teams, ensuring seamless integration and continuous delivery.

Leave a Reply