Overview
This document provides a step-by-step guide to creating a Lambda function that sends notifications to Slack when a security group rule is modified in AWS.
Prerequisites
- AWS Account with necessary permissions to create and configure Lambda, IAM, CloudTrail, and CloudWatch Logs.
- Slack workspace with permissions to create a new app and generate an incoming webhook URL.
Architecture
Security Group Rule Event -> CloudTrail -> CloudWatch Logs -> Lambda Function -> Slack
Step 1: Setup CloudTrail
- Go to CloudTrail Console:
- Navigate to the AWS Management Console.
- Go to the CloudTrail service.
- Create or Configure a Trail:
- Create a new trail or use an existing one.
- Ensure that the trail is configured to log management events.
- Enable the trail to send logs to CloudWatch Logs.
Step 2: Setup CloudWatch Logs
- Create Log Group:
- Navigate to CloudWatch in the AWS Management Console.
- Create a new log group or use an existing one to receive CloudTrail logs
Step 3: Create Lambda Function
- Create a Lambda Function:
- Go to the AWS Lambda Console.
- Create a new Lambda function with a suitable name, runtime (Python 3.8+), and role with the necessary permissions.
- Add the Code mentioned below:
import boto3
import json
import os
import base64
import gzip
import subprocess
ec2_client = boto3.client('ec2', region_name='us-east-1')
webhook_url = os.environ.get('WEBHOOK_URL')
def slack_message(sg_id, username, port, cidr, action):
message = f"""
:siren: *Security Group Rules Alert* :siren:
<!channel> Hey team,
A security group rule has been modified. Here are the details:
*Action:* `{action}`
*Created By:* `{username}`
*Security Group Id:* `{sg_id}`
*Port Number:* `{port}`
*CIDR:* `{cidr}`
*Potential Impact:* Access rule change detected.
*Actions Required:* Review the security group rules to ensure they meet your security requirements.
Thank you for your attention to this matter! :hammer_and_spanner:
"""
return message
def send_alert(sg_id, username, port, cidr, action):
message = slack_message(sg_id, username, port, cidr, action)
payload = {
"channel": "",
"username": "Bot",
"text": message,
"icon_emoji": ":alert:"
}
json_payload = json.dumps(payload)
curl_command = ['curl', '-X', 'POST', '-H', 'Content-Type: application/json', '-d', json_payload, webhook_url]
response = subprocess.run(curl_command, check=True, capture_output=True, text=True)
def parse_event(event):
if 'detail' in event:
action = event['detail']['eventName']
username = event['detail']['userIdentity']['principalId'].split(':')[-1]
sg_rule_sets = event['detail']['responseElements']['securityGroupRuleSet']
for sg_rule in sg_rule_sets['items']:
sg_id = sg_rule['groupId']
from_port = int(sg_rule['fromPort'])
to_port = int(sg_rule['toPort'])
cidr = sg_rule.get('cidrIpv4', sg_rule.get('cidrIpv6', 'N/A'))
send_alert(sg_id, username, to_port, cidr, action)
else:
# Log CloudWatch log events
log_data = event['awslogs']['data']
compressed_payload = base64.b64decode(log_data)
uncompressed_payload = gzip.decompress(compressed_payload)
log_events = json.loads(uncompressed_payload)['logEvents']
for log_event in log_events:
message = log_event['message']
log_entry = json.loads(message)
action = log_entry['eventName']
username = log_entry['userIdentity']['principalId'].split(':')[-1]
sg_rule_sets = log_entry['responseElements']['securityGroupRuleSet']
for sg_rule in sg_rule_sets['items']:
sg_id = sg_rule['groupId']
from_port = int(sg_rule['fromPort'])
to_port = int(sg_rule['toPort'])
cidr = sg_rule.get('cidrIpv4', sg_rule.get('cidrIpv6', 'N/A'))
send_alert(sg_id, username, to_port, cidr, action)
def lambda_handler(event, context):
try:
parse_event(event)
except KeyError as e:
print(f"KeyError: {e}")
# Example event structure for testing
if __name__ == "__main__":
test_event = {
'awslogs': {
'data': base64.b64encode(gzip.compress(json.dumps({
'logEvents': [
{
'message': json.dumps({
'eventName': 'AuthorizeSecurityGroupIngress',
'userIdentity': {
'principalId': 'AWS:TestUser',
},
'responseElements': {
'securityGroupRuleSet': {
'items': [
{
'groupId': 'sg-12345678',
'fromPort': 22,
'toPort': 22,
'cidrIpv4': '0.0.0.0/0',
}
]
}
}
})
}
]
}).encode())).decode()
}
}
lambda_handler(test_event, None)
Set Environment Variable:
- Set the WEBHOOK_URL environment variable to the Slack webhook URL you created.
Step 4: Configure CloudWatch Logs Filter Pattern
- Create Metric Filter:
- Go to the CloudWatch Console.
- Select the log group that receives CloudTrail logs.
- Create a new metric filter with the following pattern:
- { ($.eventName = AuthorizeSecurityGroupIngress) || ($.eventName = AuthorizeSecurityGroupEgress) || ($.eventName = RevokeSecurityGroupIngress) || ($.eventName = RevokeSecurityGroupEgress) || ($.eventName = CreateSecurityGroup) || ($.eventName = DeleteSecurityGroup) }
- Provide a name for the filter and configure it.
Step 5: Add CloudWatch Logs Trigger to Lambda
- Add Trigger:
- Go to the Lambda Console.
- Select your Lambda function.
- In the “Function overview” section, click “Add trigger”.
- Select “CloudWatch Logs” as the trigger type.
- Choose the log group that contains the CloudTrail logs.
- Specify the filter pattern created earlier.
- Click “Add”.
Step 6: Test the Setup
- Generate Test Events:
Modify a security group rule (e.g., authorize or revoke an ingress or egress rule) to generate test events.
- Verify Slack Notifications:
Check your Slack channel to ensure notifications are being received as expected.
Conclusion
Following these steps, you have successfully set up a Lambda function to send notifications to Slack for security group rule changes. This setup enhances security monitoring and provides real-time alerts to your AWS security groups for critical changes.
Connect With Us