Walentino/cloudtrail-detection-pipeline
GitHub: Walentino/cloudtrail-detection-pipeline
基于 AWS CloudTrail 与 Elastic Security 构建的端到端云安全检测管道,涵盖日志采集、自定义 Lambda 转发、SIEM 集成及 MITRE ATT&CK 映射的检测规则编写与验证。
Stars: 0 | Forks: 0
## 架构
```
flowchart TD
A[AWS API Call] --> B[CloudTrail]
B --> C[S3 Bucket: walentino-cloudtrail-logs-2026]
C -->|S3 Event Notification| D[SQS Queue: elastic-cloudtrail-queue]
D -->|Lambda Event Source Mapping| E[Lambda: esf-cloudtrail-forwarder]
E -->|Elasticsearch API| F[Elastic Security Serverless]
F --> G[Index: cloudtrail-logs]
G --> H[KQL Detection Rules]
H --> I[Alerts Dashboard]
subgraph AWS Cloud
A
B
C
D
E
end
subgraph Elastic Cloud
F
G
H
I
end
```
阶段 1:AWS 环境设置
目标:建立用于日志生成和存储的基础 AWS 基础设施。
组件 配置 目的
AWS 账户 免费套餐 (457664479040) 所有资源部署在 us-east-1
CloudTrail 跟踪 project-detection-trail 记录所有区域的管理事件
日志传输 S3 存储桶 walentino-cloudtrail-logs-2026 接收压缩的 CloudTrail .json.gz 日志文件
存储桶策略 允许 cloudtrail.amazonaws.com 执行 s3:PutObject 授予 CloudTrail 传输日志的权限
验证:
CloudTrail 跟踪状态确认为“Logging”
在 S3 存储桶下的 AWSLogs/457664479040/CloudTrail/ 中可见日志文件
下载并解压了一个示例日志文件 — 确认为包含管理事件记录的有效 JSON
## 阶段 1:AWS 环境设置
**目标:** 建立用于日志生成和存储的基础 AWS 基础设施。
| 组件 | 配置 | 目的 |
|-----------|---------------|---------|
| AWS 账户 | 免费套餐 (`457664479040`) | 所有资源部署在 us-east-1 |
| CloudTrail 跟踪 | `project-detection-trail` | 记录所有区域的管理事件 |
| 日志传输 S3 存储桶 | `walentino-cloudtrail-logs-2026` | 接收压缩的 CloudTrail `.json.gz` 日志文件 |
| 存储桶策略 | 允许 `cloudtrail.amazonaws.com` 执行 `s3:PutObject` | 授予 CloudTrail 传输日志的权限 |
**验证:**
- CloudTrail 跟踪状态确认为“Logging”
- 在 S3 存储桶下的 `AWSLogs/457664479040/CloudTrail/` 中可见日志文件
- 下载并解压了一个示例日志文件 — 确认为包含管理事件记录的有效 JSON
**截图:**



## 阶段 2:易受攻击的资源
**目标:** 部署故意配置错误的 AWS 资源,以模拟现实世界中的云安全风险。这些资源将作为阶段 4 规则的检测目标。
### 资源 1:公共 S3 存储桶
| 属性 | 值 |
|----------|-------|
| 存储桶名称 | `target-data-leak-2026` |
| 公共访问 | 阻止所有公共访问:**关闭** |
| 存储桶策略 | 允许 `Principal: *` 执行 `s3:GetObject` |
| 测试文件 | `test-file.txt` ("This is a test file for detection validation") |
这模拟了一种常见的云配置错误:意外暴露在互联网上的 S3 存储桶。在实际攻击中,攻击者可以枚举并窃取敏感数据。
### 资源 2:过度权限的 IAM 用户
| 属性 | 值 |
|----------|-------|
| 用户名 | `project-admin` |
| 内联策略 | `OverPermissivePolicy` |
| 允许的操作 | `iam:AttachUserPolicy`(所有资源),`sts:AssumeRole`(所有资源) |
这模拟了一个具有过多权限的 IAM 用户。攻击者如果获取了这些凭证,可以将权限提升至完全的 AdministratorAccess。
**截图:**




#
## 云检测工程师作品集
### 日期:2026 年 5 月 3 日
## 概述
阶段 3 建立了将 AWS CloudTrail 连接到 Elastic Security SIEM 的数据摄取管道。这是所有检测规则(阶段 4)运行的基础。该管道近乎实时地摄取 CloudTrail 管理事件,从而能够检测 S3 数据窃取、IAM 权限提升和可疑的角色代入。
## 架构
```
AWS CloudTrail (us-east-1)
│
▼
S3 Bucket: walentino-cloudtrail-logs-2026
│
▼ (S3 Event Notification: All object create events)
│
SQS Queue: elastic-cloudtrail-queue
│
▼ (Lambda Event Source Mapping)
│
AWS Lambda: esf-cloudtrail-forwarder (Python 3.12)
│
▼ (Elasticsearch API - ApiKey authentication)
│
Elastic Security Serverless
│
▼
Index: cloudtrail-logs → Kibana Discover
```
## 已部署组件
### Elastic Security Serverless
| 属性 | 值 |
| --- | --- |
| 提供商 | Elastic Cloud (GCP us-east4) |
| 连接别名 | `my-security-project-b85324` |
| Elasticsearch Endpoint | `https://my-security-project-b85324.es.us-east4.gcp.elastic.cloud` |
| 身份验证 | API Key(只写) |
| 目标索引 | `cloudtrail-logs` |
### 2. SQS 队列
| 属性 | 值 |
| --- | --- |
| 队列名称 | `elastic-cloudtrail-queue` |
| 队列 ARN | `arn:aws:sqs:us-east-1:457664479040:elastic-cloudtrail-queue` |
| 队列类型 | Standard |
| 可见性超时 | 300 秒 |
| 访问策略 | 允许来自存储桶 `walentino-cloudtrail-logs-2026` 的 `s3.amazonaws.com` 执行 `SQS:SendMessage` |
### 3. S3 事件通知
| 属性 | 值 |
| --- | --- |
| 存储桶 | `walentino-cloudtrail-logs-2026` |
| 事件名称 | `elastic-forwarder` |
| 事件类型 | 所有对象创建事件 (`s3:ObjectCreated:*`) |
| 目标 | SQS 队列:`elastic-cloudtrail-queue` |
### 4. Lambda 转发器
| 属性 | 值 |
| --- | --- |
| 函数名称 | `esf-cloudtrail-forwarder` |
| 运行时 | Python 3.12 |
| 内存 | 256 MB |
| 超时 | 120 秒 |
| IAM 角色 | `esf-lambda-role` |
| 附加策略 | `AWSLambdaBasicExecutionRole`,`AmazonSQSFullAccess`,`AmazonS3ReadOnlyAccess` |
| 触发器 | SQS 事件源映射 (`elastic-cloudtrail-queue`) |
| 批处理大小 | 10 |
| 依赖项 | `requests`,`urllib3`,`charset_normalizer`,`certifi`,`idna`(打包在部署包中) |
## Lambda 代码
```
import json
import boto3
import requests
import os
import gzip
from datetime import datetime
ELASTIC_URL = os.environ['ELASTIC_URL']
ELASTIC_API_KEY = os.environ['ELASTIC_API_KEY']
INDEX_NAME = 'cloudtrail-logs'
def lambda_handler(event, context):
headers = {
'Authorization': f'ApiKey {ELASTIC_API_KEY}',
'Content-Type': 'application/json'
}
successful = 0
failed = 0
for record in event.get('Records', []):
try:
body = json.loads(record['body'])
s3_bucket = body['Records'][0]['s3']['bucket']['name']
s3_key = body['Records'][0]['s3']['object']['key']
s3_client = boto3.client('s3')
response = s3_client.get_object(Bucket=s3_bucket, Key=s3_key)
content = response['Body'].read()
decompressed = gzip.decompress(content)
events = json.loads(decompressed)
for cloudtrail_event in events.get('Records', []):
event_time = cloudtrail_event.get('eventTime', datetime.utcnow().isoformat())
doc = {
'@timestamp': event_time,
'event.action': cloudtrail_event.get('eventName'),
'event.provider': cloudtrail_event.get('eventSource'),
'source.ip': cloudtrail_event.get('sourceIPAddress'),
'user_agent.original': cloudtrail_event.get('userAgent'),
'cloud.region': cloudtrail_event.get('awsRegion'),
'aws.cloudtrail.user_identity.arn': cloudtrail_event.get('userIdentity', {}).get('arn'),
'aws.cloudtrail.user_identity.type': cloudtrail_event.get('userIdentity', {}).get('type'),
'aws.cloudtrail.event_version': cloudtrail_event.get('eventVersion'),
'aws.cloudtrail.flattened': cloudtrail_event
}
url = f"{ELASTIC_URL}/{INDEX_NAME}/_doc"
resp = requests.post(url, headers=headers, json=doc, timeout=10)
if resp.status_code in [200, 201]:
successful += 1
else:
failed += 1
print(f"Elastic error: {resp.status_code} - {resp.text}")
except Exception as e:
failed += 1
print(f"Error processing record: {e}")
print(f"Processed: {successful} successful, {failed} failed")
return {'statusCode': 200, 'body': json.dumps({'successful': successful, 'failed': failed})}
```
## 已摄取文档结构
每个 CloudTrail 事件都会被转换为以下 Elasticsearch 文档结构:
| 字段 | 来源 | 示例值 |
| --- | --- | --- |
| `@timestamp` | `eventTime` | `2026-05-03T18:03:09Z` |
| `event.action` | `eventName` | `GetBucketVersioning` |
| `event.provider` | `eventSource` | `s3.amazonaws.com` |
| `source.ip` | `sourceIPAddress` | `142.198.170.64` |
| `user_agent.original` | `userAgent` | `Mozilla/5.0... Chrome/147.0.0.0` |
| `cloud.region` | `awsRegion` | `us-east-1` |
| `aws.cloudtrail.user_identity.arn` | `userIdentity.arn` | `arn:aws:iam::457***9040:user/Nino` |
| `aws.cloudtrail.user_identity.type` | `userIdentity.type` | `IAMUser` |
| `aws.cloudtrail.event_version` | `eventVersion` | `1.11` |
| `aws.cloudtrail.flattened` | 完整事件 | 完整的原始 CloudTrail JSON |
## 遇到的问题与解决方案
### 问题 1:Elastic Serverless 托管集成 SQS 字段
**问题:** Elastic Serverless AWS CloudTrail 集成界面缺少 SQS 队列 URL 配置字段。
**根本原因:** Serverless 托管集成抽象了底层转发器配置。
**解决方案:** 部署自定义 Lambda 转发器 (`esf-cloudtrail-forwarder`),而不是依赖托管集成。
### 问题 2:SAR 部署权限被拒绝
**问题:** IAM 用户 `Nino` 未被授权执行 `serverlessrepo:CreateCloudFormationTemplate`。
**根本原因:** Elastic Serverless Forwarder SAR 应用程序具有基于资源的策略,阻止了来自此账户的部署。
**解决方案:** 通过 AWS CLI 和 CloudFormation 手动部署 Lambda 函数,完全绕过了 SAR。
### 问题 3:SQS 可见性超时不匹配
**问题:** `Queue visibility timeout: 30 seconds is less than Function timeout: 120 seconds`。
**解决方案:** 通过 `aws sqs set-queue-attributes` 将 SQS 队列的可见性超时增加到了 300 秒。
### 问题 4:缺少 `requests` 模块
**问题:** `Runtime.ImportModuleError: Unable to import module 'lambda_function': No module named 'requests'`。
**解决方案:** 使用 `pip install requests -t .` 将 `requests` 及其依赖项 (`urllib3`,`charset_normalizer`,`certifi`,`idna`) 打包到 Lambda 部署包中。
### 问题 5:API 密钥权限 — `auto_create` 被拒绝 (403)
**问题:** 对于数据流和标准索引,Elastic 针对只写 API 密钥返回了 `action [indices:admin/auto_create] is unauthorized`。
**根本原因:** Elastic Serverless“只写”API 密钥缺乏首次摄取时自动创建索引所需的 `auto_configure` 和 `create_index` 权限。
**解决方案:** 在 Elastic Dev Tools 控制台中通过 `PUT /cloudtrail-logs` 手动预创建了 `cloudtrail-logs` 索引。一旦索引存在,只写密钥的 `create_doc` 和 `write` 权限就足够了。
### 问题 6:数据流与索引混淆
**问题:** 初始的 Lambda 代码将目标设为 `logs-aws.cloudtrail-default`(一个数据流)。预创建它之后,403 错误依然存在。
**解决方案:** 切换到简单索引 (`cloudtrail-logs`) 并手动预创建它。标准索引不需要数据流所要求的 `auto_configure` 权限。
## 验证
### Lambda 执行日志(成功运行)
```
START RequestId: a4d173a5-cb43-5323-b15b-11d414064a1d
Processed: 38 successful, 0 failed
END RequestId: a4d173a5-cb43-5323-b15b-11d414064a1d
REPORT Duration: 15279.37 ms Billed Duration: 15280 ms Memory Size: 256 MB Max Memory Used: 103 MB
```
### 确认
- Lambda 正在消费 SQS 消息。
- Lambda 成功下载、解压并转换 CloudTrail `.json.gz` 文件。
- 文档已在 Elastic Security Serverless 的 `cloudtrail-logs` 中被索引。
- 数据可在 Kibana Discover 中查询。
## 下一阶段
**阶段 4:Sigma 检测规则** — 在 Elastic Security 中编写三条自定义检测规则以检测:
1. 来自外部 IP 地址的 `s3:GetObject` (T1530)
2. 针对 AdministratorAccess 的 `iam:AttachUserPolicy` (T1098)
3. 来自非标准 User-Agent 的 `sts:AssumeRole` (T1078.001)
## 成本摘要
| 资源 | 成本 |
| --- | --- |
| AWS CloudTrail(管理事件) | 免费套餐 |
| S3 存储桶(日志存储) | < $0.01 |
| SQS 队列 | 免费套餐 |
| Lambda(调用 + 持续时间) | 免费套餐 |
| Elastic Cloud(14 天试用) | $0.00 |
| **总计** | **< $0.01** |
## 概述
阶段 4 在阶段 3 构建的 CloudTrail 摄取管道之上实现了检测层。在 Elastic Security 的规则引擎中编写了三条自定义检测规则,每条规则都针对映射到 MITRE ATT&CK 框架的特定云攻击技术。每条规则都经过刻意触发,以验证端到端的检测:从 AWS API 调用 → CloudTrail 日志 → S3 → SQS → Lambda → Elastic → 警报。
## 检测规则
### 规则 1:来自外部 IP 的 S3 GetObject
| 属性 | 值 |
| --- | --- |
| **规则名称** | S3 GetObject from External IP |
| **规则 ID** | 自定义查询规则 |
| **严重性** | 中 |
| **计划** | 每 5 分钟 |
| **数据源 | `cloudtrail-logs` 索引 |
| **MITRE ATT&CK** | T1530 - Data from Cloud Storage |
| **战术** | 窃取 |
**KQL 查询:**
```
event.action : "GetObject" AND event.provider : "s3.amazonaws.com"
```
**检测内容:**
检测通过 CloudTrail 数据事件发起的 `s3:GetObject` API 调用。任何访问受监控 S3 存储桶中对象的实体都会生成此警报。在生产环境中,过滤器会排除受信任的 IP 范围(企业 VPN、办公网络出口、CI/CD 运行器)。对于本项目,该规则会对所有 `GetObject` 调用触发以验证管道。
**触发方法:**
```
curl
```
访问是从 AWS 控制台标准范围之外的 IP 发起的,从而生成了警报。
**警报数量:** 15
**误报分析:**
- 从 S3 读取数据的合法第三方 SaaS 集成
- 访问共享存储的内部应用程序和微服务
- CI/CD 管道制品检索
- 执行自动化任务的 AWS 服务账户
- 安全扫描工具
**生产环境加固:**
- 添加受信任 IP 范围的排除列表
- 与已知的 User-Agent(AWS SDK、控制台)相关联
- 在维护窗口期间抑制警报
- 如果与异常数据量相结合,则生成更高的严重性级别
### 规则 2:将 AdministratorAccess 策略附加到 IAM 用户
| 属性 | 值 |
| --- | --- |
| **规则名称** | AdministratorAccess Policy Attached to IAM User |
| **规则 ID** | 自定义查询规则 |
| **严重性** | 高 |
| **计划** | 每 5 分钟 |
| **数据源** | `cloudtrail-logs` 索引 |
| **MITRE ATT&CK** | T1098 - Account Manipulation |
| **战术** | 持久化,权限提升 |
**KQL 查询:**
```
event.action : "AttachUserPolicy" AND event.provider : "iam.amazonaws.com" AND aws.cloudtrail.flattened.requestParameters.policyArn : "arn:aws:iam::aws:policy/AdministratorAccess"
```
**检测内容:**
检测 AWS 托管的 `AdministratorAccess` 策略何时被附加到任何 IAM 用户。此策略授予对所有 AWS 资源的不受限制的访问权限。未经授权的附加是关键的权限提升指标。攻破凭证的攻击者通常会附加此策略以维持持久性。
**触发方法:**
1. 导航到 IAM → 用户 → `project-admin`
2. 添加权限 → 直接附加策略
3. 搜索 `AdministratorAccess`,勾选并附加
**警报数量:** 1
**误报分析:**
- 工作时间内合法的管理员入职操作
- 自动化 IAM 配置脚本(CI/CD 管理员账户创建)
- 授权安全人员的紧急访问程序
- 破窗账户配置
**生产环境加固:**
- 与变更管理工单相关联
- 排除已知的管理员配置角色/用户 ARN
- 在批准的变更窗口期间抑制警报
- 如果在工作时间之外执行且没有相应工单,则升级为严重级别
### 规则 3:来自可疑 User-Agent 的 STS AssumeRole
| 属性 | 值 |
| --- | --- |
| **规则名称** | STS AssumeRole from Suspicious User Agent |
| **规则 ID** | 自定义查询规则 |
| **严重性** | 中 |
| **计划** | 每 5 分钟 |
| **数据源** | `cloudtrail-logs` 索引 |
| **MITRE ATT&CK** | T1078.001 - Valid Accounts: Default Accounts |
| **战术** | 凭证访问,防御规避 |
**KQL 查询:**
```
event.action : "AssumeRole" AND event.provider : "sts.amazonaws.com" AND NOT user_agent.original : *aws-cli* AND NOT user_agent.original : *boto3* AND NOT user_agent.original : *console.amazonaws.com* AND NOT user_agent.original : *signin.amazonaws.com* AND NOT user_agent.original : *aws-sdk* AND NOT user_agent.original : *Lambda* AND NOT user_agent.original : *CloudFormation*
```
**检测内容:**
检测 `sts:AssumeRole` API 调用中 User-Agent 字符串与任何已知合法 AWS 工具(CLI、SDK、控制台、CloudFormation、Lambda)不匹配的情况。自定义脚本、恶意软件植入程序和攻击者工具通常使用非标准或完全缺失的 User-Agent 字符串。
**触发方法:**
```
python3 -c "import boto3; from botocore.config import Config; config = Config(user_agent='CustomMalware/1.0'); client = boto3.client('sts', config=config); print(client.get_caller_identity())"
```
这会使用不在排除列表中的 User-Agent 字符串 `CustomMalware/1.0` 调用 `sts:GetCallerIdentity`(该操作内部调用类似 AssumeRole 的机制)。
**警报数量:** 2
**误报分析:**
- 内部安全测试工具(例如 ScoutSuite、Prowler、Pacu)
- 带有硬编码 User-Agent 的自定义监控脚本
- 与 AWS API 集成的第三方安全产品
- 使用非标准 SDK 包装器的开发人员
- 事件响应工具
**生产环境加固:**
- 维护已知内部工具 User-Agent 子字符串的允许列表
- 与角色 ARN 相关联 — 高价值角色(Admin、OrgManagement)应生成严重级别警报
- 在抑制之前检查源 IP 是否在企业网范围内
- 调查任何包含 "custom"、"malware"、"hack" 或类似指标的 User-Agent
## 警报摘要
| 规则 | 严重性 | 警报数量 | MITRE ATT&CK |
| --- | --- | --- | --- |
| 来自外部 IP 的 S3 GetObject | 中 | 15 | T1530 |
| 将 AdministratorAccess 策略附加到 IAM 用户 | 高 | 1 | T1098 |
| 来自可疑 User-Agent 的 STS AssumeRole | 中 | 2 | T1078.001 |
| **总计** | | **18** | |
## 验证方法
每条规则均使用以下过程进行了验证:
1. 在 Elastic Security 中**启用规则**。
2. 在 AWS(CLI、控制台或 SDK)中**执行恶意操作**。
3. **等待 5-10 分钟**,经历 CloudTrail 传输 → S3 通知 → SQS → Lambda → Elastic。
4. 在 Elastic Security → 警报中**验证警报**。
5. **截取屏幕截图**,包含规则名称、严重性、时间戳和计数。
6. **记录**触发方法、KQL 查询和误报分析。
## 架构(端到端)
```
AWS API Call (malicious action)
│
▼
AWS CloudTrail (data & management events)
│
▼
S3 Bucket: walentino-cloudtrail-logs-2026
│
▼ (S3 Event Notification)
SQS Queue: elastic-cloudtrail-queue
│
▼ (Lambda Event Source Mapping)
AWS Lambda: esf-cloudtrail-forwarder
│
▼ (Elasticsearch API)
Elastic Security Serverless
│
▼
Index: cloudtrail-logs
│
▼
Elastic Security Rules (KQL queries)
│
▼
Alerts Dashboard
```
## 展现的技能
- **检测工程:** 编写针对云攻击技术的基于 KQL 的检测规则
- **MITRE ATT&CK 映射:** 将检测逻辑与行业标准框架对齐
- **误报分析:** 预测合法场景并提出排除策略
- **警报验证:** 触发真实的 AWS API 调用以生成和验证警报
- **SIEM 操作:** 在生产 SIEM 中管理规则、计划和警报分类
- **文档编写:** 适用于 SOC 运行手册的全面规则文档
## 下一阶段
**项目 2:自动响应器** — 启用 Amazon GuardDuty,触发真实的威胁发现,并构建一个 Lambda 函数,通过附加隔离策略、撤销控制台会话和 SNS 通知来自动控制受损的 IAM 凭证。
## 成本摘要
| 资源 | 成本 |
| --- | --- |
| CloudTrail 数据事件(测试访问) | < $0.01 |
| 所有其他资源(免费套餐) | $0.00 |
| Elastic Cloud(14 天试用) | $0.00 |
| **总计** | **< $0.01** |
标签:AMSI绕过, AWS, CloudTrail, CSPM, DevSecOps, DPI, Elasticsearch, Elastic Security, KQL, Lambda, OISF, S3, Serverless, Sigma规则, SQS, TinkerPop, 上游代理, 云安全态势管理, 前端应用, 威胁检测, 子域名暴力破解, 安全告警, 安全运营, 开源安全项目, 扫描框架, 日志管理, 漏洞探索, 目标导入, 端到端管道, 自动化响应, 自动化检测