rajdeep-sonu/Security-Runbook-Automation

GitHub: rajdeep-sonu/Security-Runbook-Automation

基于 AWS Lambda 的无服务器安全事件自动分诊与响应系统,通过接收监控警报、执行自动化诊断并推送结构化通知来缩短安全事件响应时间。

Stars: 0 | Forks: 0

# 🛡️ 安全事件 Runbook 自动化系统 ![Python](https://img.shields.io/badge/Python-3.11-3776AB?logo=python&logoColor=white) ![AWS Lambda](https://img.shields.io/badge/AWS-Lambda-FF9900?logo=awslambda&logoColor=white) ![Terraform](https://img.shields.io/badge/Terraform-IaC-7B42BC?logo=terraform&logoColor=white) ![DynamoDB](https://img.shields.io/badge/AWS-DynamoDB-4053D6?logo=amazondynamodb&logoColor=white) ![License: MIT](https://img.shields.io/badge/License-MIT-green.svg) ## 📖 概述 **安全事件 Runbook 自动化系统**是一个无服务器、事件驱动的平台,可自动对 AWS 环境中的安全事件进行分诊、诊断和通知。当监控警报触发时(来自通过 Grafana 连接的 Prometheus、CloudWatch 或 GuardDuty),该系统会: 1. 通过 API Gateway webhook **接收**警报。 2. 根据警报名称模式匹配,将警报**分派**到相应的 runbook 处理器。 3. 针对您的 AWS 环境(CloudTrail、IAM、EC2、CloudWatch 指标)**运行自动化诊断**。 4. 使用可配置的时间窗口对事件**去重**,以防止警报疲劳。 5. **发送内容丰富的 Slack 通知**,采用 Block Kit 格式化,包含诊断结果和修复步骤。 6. 将每个事件**记录**到 DynamoDB 中,用于审计跟踪、事后复盘和合规性检查。 该系统消除了初始事件分诊的手动繁杂工作,缩短了平均响应时间 (MTTR),并确保您的安全团队在整个事件处理过程中保持一致性和可重复性。 ## 🏗️ 架构 ``` flowchart LR subgraph Monitoring["Monitoring Sources"] P["Prometheus"] CW["CloudWatch Alarms"] GD["GuardDuty Findings"] end subgraph Grafana["Grafana"] AR["Alert Rules"] CP["Contact Point
(Webhook)"] end subgraph AWS["AWS Serverless"] APIGW["API Gateway
HTTP API"] LF["Lambda Function
(Python 3.11)"] subgraph Diagnostics["Automated Diagnostics"] CT["CloudTrail
Lookup"] IAM["IAM
Audit"] EC2["EC2 Status
Check"] CWM["CloudWatch
Metrics"] end end subgraph Storage["Persistence"] DDB["DynamoDB
(security-incidents)"] end subgraph Notifications["Notifications"] SL["Slack
(Block Kit)"] end P --> AR CW --> AR GD --> AR AR --> CP CP -->|POST /webhook| APIGW APIGW --> LF LF --> CT LF --> IAM LF --> EC2 LF --> CWM LF -->|Log Incident| DDB LF -->|Rich Notification| SL ``` ## ✨ 功能 ### 🔒 三种专用 Runbook 类型 | Runbook | 触发模式 | 执行的诊断 | |---|---|---| | **凭证异常** | `credential*`, `unauthorized*`, `iam*` | CloudTrail 事件查找、IAM 用户/角色枚举、可疑活动检测 | | **API 速率激增** | `api_rate*`, `rate_limit*`, `throttl*` | CloudWatch API 指标、请求速率分析、错误率计算 | | **基础设施故障** | `infra*`, `instance*`, `ec2*`, `health*` | EC2 实例状态检查、AWS Health 事件、系统/实例状态验证 | ### 📊 丰富的 Slack 通知 - 采用 **Block Kit 格式化**,为警报详情、诊断和修复步骤提供结构化的部分 - **基于严重程度的 Emoji 指示器**(🔴 严重,🟠 高,🟡 中,🔵 低) - 指向相关 AWS 控制台页面的**一键链接** - **诊断结果**直接嵌入在通知中 ### 🗄️ DynamoDB 事件记录 - 每个事件都存储了完整的诊断结果 - **基于 alert_name + timestamp 的 GSI**,以便按警报类型进行高效查询 - **基于 TTL 的自动过期**,用于数据生命周期管理 - 启用了**时间点恢复** (Point-in-time recovery) 以满足合规要求 ### 🔁 智能去重 - 可配置的去重时间窗口(默认:1 小时) - 防止抖动警报引起的警报疲劳 - 在事件记录中跟踪重复次数 ## 📁 项目结构 ``` Cyber Security/ ├── README.md # This file ├── requirements.txt # Python dependencies ├── docs/ │ └── grafana_webhook_setup.md # Grafana configuration guide ├── src/ │ ├── __init__.py │ ├── lambda_handler.py # Lambda entry point & webhook parser │ ├── alert_dispatcher.py # Routes alerts to correct runbook │ ├── incident_logger.py # DynamoDB incident persistence │ ├── slack_notifier.py # Slack Block Kit message builder │ ├── diagnostics/ │ │ ├── __init__.py │ │ ├── credential_checks.py # IAM & CloudTrail diagnostics │ │ ├── api_rate_checks.py # API metrics diagnostics │ │ └── infra_checks.py # EC2 & Health diagnostics │ └── runbooks/ │ ├── credential_anomaly.md # Credential anomaly runbook │ ├── api_rate_spike.md # API rate spike runbook │ └── infrastructure_failure.md # Infrastructure failure runbook ├── tests/ │ ├── __init__.py │ ├── conftest.py # Shared fixtures │ ├── test_lambda_handler.py # Handler unit tests │ ├── test_alert_dispatcher.py # Dispatcher unit tests │ ├── test_incident_logger.py # Logger unit tests │ └── test_slack_notifier.py # Notifier unit tests └── terraform/ ├── main.tf # Core infrastructure ├── variables.tf # Input variables └── outputs.tf # Output values ``` ## 📋 前置条件 | 工具 | 版本 | 用途 | |---|---|---| | **Python** | 3.11+ | Lambda runtime 和本地开发 | | **AWS CLI** | v2 | AWS 身份验证和资源管理 | | **Terraform** | >= 1.5.0 | 基础设施即代码部署 | | **Grafana** | 9.0+ | 警报管理和 webhook 集成 | | **pip** | 最新版 | Python 包管理 | 您还需要: - 拥有适当权限的 **AWS 账户** - 配置了 [Incoming Webhook](https://api.slack.com/messaging/webhooks) 的 **Slack 工作区** - 启用了警报功能的 **Grafana** 实例(自托管或 Grafana Cloud) ## 🚀 快速开始 ### 1. 克隆仓库 ``` git clone https://github.com/your-org/security-runbook-automation.git cd security-runbook-automation ``` ### 2. 安装依赖项 ``` # 创建并激活虚拟环境 python -m venv .venv source .venv/bin/activate # Linux/macOS # .venv\Scripts\activate # Windows # 安装依赖 pip install -r requirements.txt pip install -r requirements-dev.txt # For testing ``` ### 3. 配置环境变量 ``` # 复制示例环境文件 cp .env.example .env # 使用你的值进行编辑 export SLACK_WEBHOOK_URL="https://hooks.slack.com/services/T.../B.../xxx" export DYNAMODB_TABLE="security-incidents" export AWS_REGION="us-east-1" export ENVIRONMENT="prod" export LOG_LEVEL="INFO" export DEDUP_WINDOW_MINUTES="60" ``` ### 4. 配置 AWS 凭证 ``` aws configure # 或者使用环境变量: export AWS_ACCESS_KEY_ID="your-access-key" export AWS_SECRET_ACCESS_KEY="your-secret-key" export AWS_DEFAULT_REGION="us-east-1" ``` ### 5. 使用 Terraform 部署 ``` cd terraform # 初始化 Terraform terraform init # 审查执行计划 terraform plan -var="slack_webhook_url=https://hooks.slack.com/services/T.../B.../xxx" # 应用基础设施 terraform apply -var="slack_webhook_url=https://hooks.slack.com/services/T.../B.../xxx" ``` 部署完成后,Terraform 将输出 API Gateway webhook URL。使用此 URL 配置 Grafana(参见 [Grafana Webhook 设置](docs/grafana_webhook_setup.md))。 ### 6. 配置 Grafana 请按照详细的指南操作:**[docs/grafana_webhook_setup.md](docs/grafana_webhook_setup.md)** ## ⚙️ 配置 ### 环境变量 | 变量 | 必需 | 默认值 | 描述 | |---|---|---|---| | `SLACK_WEBHOOK_URL` | ✅ | — | 用于通知的 Slack Incoming Webhook URL | | `DYNAMODB_TABLE` | ✅ | `security-incidents` | 用于记录事件的 DynamoDB 表名 | | `AWS_REGION` | ❌ | `us-east-1` | API 调用的 AWS 区域 | | `ENVIRONMENT` | ❌ | `prod` | 部署环境标识符 | | `LOG_LEVEL` | ❌ | `INFO` | 日志级别(DEBUG, INFO, WARNING, ERROR) | | `DEDUP_WINDOW_MINUTES` | ❌ | `60` | 以分钟为单位的去重时间窗口 | ### Terraform 变量 | 变量 | 默认值 | 描述 | |---|---|---| | `aws_region` | `us-east-1` | 所有资源的 AWS 区域 | | `project_name` | `security-runbook-automation` | 资源名称前缀 | | `lambda_timeout` | `60` | Lambda 超时时间(秒) | | `lambda_memory` | `256` | Lambda 内存(MB) | | `slack_webhook_url` | — | Slack webhook URL(敏感信息) | | `environment` | `prod` | 部署环境 | ## 🔧 添加新的 Runbook 请按照以下步骤向系统添加新的安全 Runbook: ### 步骤 1:创建诊断模块 在 `src/diagnostics/` 中创建一个包含您的诊断检查的新文件: ``` # src/diagnostics/my_new_checks.py """Diagnostics for my new security runbook.""" import logging import boto3 from typing import Any logger = logging.getLogger(__name__) def run_diagnostics(alert_data: dict[str, Any]) -> dict[str, Any]: """Run diagnostic checks for the new alert type. Args: alert_data: Parsed alert payload from Grafana. Returns: Dictionary containing diagnostic results. """ results = {} # Add your AWS API calls and diagnostic logic here # Example: # client = boto3.client("some-service") # response = client.some_api_call() # results["check_name"] = response return results ``` ### 步骤 2:在警报分派器中注册 将您的新 Runbook 模式添加到 `src/alert_dispatcher.py`: ``` # 在 RUNBOOK_PATTERNS 字典中,添加: "my_new*": "my_new_checks", "other_pattern*": "my_new_checks", ``` ### 步骤 3:创建 Runbook 文档 按照现有 Runbook 的格式,在 `src/runbooks/my_new_runbook.md` 中创建一个 Markdown 格式的 Runbook 文档。包含以下内容: - 警报触发条件 - 自动诊断说明 - 手动调查步骤 - 修复步骤 - 升级矩阵 - 事后检查清单 ### 步骤 4:添加测试 在 `tests/test_my_new_checks.py` 中创建测试覆盖: ``` """Tests for my new diagnostic checks.""" import pytest from unittest.mock import patch, MagicMock from src.diagnostics.my_new_checks import run_diagnostics class TestMyNewDiagnostics: """Test suite for the new diagnostic module.""" @patch("src.diagnostics.my_new_checks.boto3.client") def test_run_diagnostics_success(self, mock_client): alert_data = {"alertname": "my_new_alert", "severity": "high"} result = run_diagnostics(alert_data) assert isinstance(result, dict) ``` ### 步骤 5:部署 运行 `terraform apply` 以使用您的新代码重新部署 Lambda 函数。 ## 🧪 测试 ### 运行所有测试 ``` pytest tests/ -v ``` ### 运行测试并检查覆盖率 ``` pytest tests/ -v --cov=src --cov-report=term-missing --cov-report=html ``` ### 运行特定测试模块 ``` pytest tests/test_lambda_handler.py -v pytest tests/test_alert_dispatcher.py -v pytest tests/test_incident_logger.py -v pytest tests/test_slack_notifier.py -v ``` ### 按标记运行测试 ``` pytest tests/ -v -m "unit" # Unit tests only pytest tests/ -v -m "integration" # Integration tests only ``` ### 本地调用测试 您可以使用示例事件在本地测试 Lambda 函数: ``` # 使用 AWS SAM CLI sam local invoke -e tests/events/sample_grafana_alert.json # 或者直接使用 python python -c " from src.lambda_handler import lambda_handler import json with open('tests/events/sample_grafana_alert.json') as f: event = json.load(f) result = lambda_handler(event, None) print(json.dumps(result, indent=2)) " ``` ## ⚠️ 已知局限性 | 局限性 | 详情 | 解决方法 | |---|---|---| | **单区域** | 系统部署在单个 AWS 区域 | 在每个区域部署具有唯一表名的独立 stack | | **冷启动** | 首次调用时,Lambda 冷启动可能会增加 1-3 秒的延迟 | 对关键工作负载使用预置并发 | | **仅限 Grafana** | webhook 格式专为 Grafana 警报 payload 设计 | 为其他警报源调整 `lambda_handler.py` 解析器 | | **无警报确认** | 系统不会向 Grafana 发送确认回执 | 使用 Grafana 内置的静音/免打扰功能 | | **仅限 Slack** | 通知仅发送至 Slack | 扩展 `slack_notifier.py` 以支持 PagerDuty、Teams 等 | | **无密钥轮换** | Slack webhook URL 作为环境变量存储 | 在生产环境中迁移到 AWS Secrets Manager | | **DynamoDB 节流** | 在极端突发负载下,按需计费可能会遇到节流 | 如有需要,切换到带自动扩展的预置容量 | ## 📄 许可证 该项目基于 **MIT License** 授权。 ``` MIT License Copyright (c) 2026 Security Runbook Automation Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ``` ## 🤝 贡献 1. Fork 本仓库 2. 创建功能分支 (`git checkout -b feature/new-runbook`) 3. 为您的更改编写测试 4. 确保所有测试通过 (`pytest tests/ -v`) 5. 提交您的更改 (`git commit -m "feat: add new runbook for X"`) 6. 推送到该分支 (`git push origin feature/new-runbook`) 7. 发起 Pull Request

用 ❤️ 为安全运维打造。

标签:AWS, DPI, Python, 安全运营, 扫描框架, 无后门, 监控告警, 自动化运维, 自定义请求头, 逆向工具