Walentino/cloudtrail-detection-pipeline

GitHub: Walentino/cloudtrail-detection-pipeline

基于 AWS CloudTrail 与 Elastic Security 构建的端到端云安全检测管道,涵盖日志采集、自定义 Lambda 转发、SIEM 集成及 MITRE ATT&CK 映射的检测规则编写与验证。

Stars: 0 | Forks: 0

## 架构 ``` flowchart TD A[AWS API Call] --> B[CloudTrail] B --> C[S3 Bucket: walentino-cloudtrail-logs-2026] C -->|S3 Event Notification| D[SQS Queue: elastic-cloudtrail-queue] D -->|Lambda Event Source Mapping| E[Lambda: esf-cloudtrail-forwarder] E -->|Elasticsearch API| F[Elastic Security Serverless] F --> G[Index: cloudtrail-logs] G --> H[KQL Detection Rules] H --> I[Alerts Dashboard] subgraph AWS Cloud A B C D E end subgraph Elastic Cloud F G H I end ``` 阶段 1:AWS 环境设置 目标:建立用于日志生成和存储的基础 AWS 基础设施。 组件 配置 目的 AWS 账户 免费套餐 (457664479040) 所有资源部署在 us-east-1 CloudTrail 跟踪 project-detection-trail 记录所有区域的管理事件 日志传输 S3 存储桶 walentino-cloudtrail-logs-2026 接收压缩的 CloudTrail .json.gz 日志文件 存储桶策略 允许 cloudtrail.amazonaws.com 执行 s3:PutObject 授予 CloudTrail 传输日志的权限 验证: CloudTrail 跟踪状态确认为“Logging” 在 S3 存储桶下的 AWSLogs/457664479040/CloudTrail/ 中可见日志文件 下载并解压了一个示例日志文件 — 确认为包含管理事件记录的有效 JSON ## 阶段 1:AWS 环境设置 **目标:** 建立用于日志生成和存储的基础 AWS 基础设施。 | 组件 | 配置 | 目的 | |-----------|---------------|---------| | AWS 账户 | 免费套餐 (`457664479040`) | 所有资源部署在 us-east-1 | | CloudTrail 跟踪 | `project-detection-trail` | 记录所有区域的管理事件 | | 日志传输 S3 存储桶 | `walentino-cloudtrail-logs-2026` | 接收压缩的 CloudTrail `.json.gz` 日志文件 | | 存储桶策略 | 允许 `cloudtrail.amazonaws.com` 执行 `s3:PutObject` | 授予 CloudTrail 传输日志的权限 | **验证:** - CloudTrail 跟踪状态确认为“Logging” - 在 S3 存储桶下的 `AWSLogs/457664479040/CloudTrail/` 中可见日志文件 - 下载并解压了一个示例日志文件 — 确认为包含管理事件记录的有效 JSON **截图:** ![CloudTrail Trail](https://static.pigsec.cn/wp-content/uploads/repos/2026/05/50c997ecee185539.png) ![S3 Bucket](https://static.pigsec.cn/wp-content/uploads/repos/2026/05/e779b3ed22185545.png) ![Bucket Policy](https://static.pigsec.cn/wp-content/uploads/repos/2026/05/d9298dbbfd185553.png) ## 阶段 2:易受攻击的资源 **目标:** 部署故意配置错误的 AWS 资源,以模拟现实世界中的云安全风险。这些资源将作为阶段 4 规则的检测目标。 ### 资源 1:公共 S3 存储桶 | 属性 | 值 | |----------|-------| | 存储桶名称 | `target-data-leak-2026` | | 公共访问 | 阻止所有公共访问:**关闭** | | 存储桶策略 | 允许 `Principal: *` 执行 `s3:GetObject` | | 测试文件 | `test-file.txt` ("This is a test file for detection validation") | 这模拟了一种常见的云配置错误:意外暴露在互联网上的 S3 存储桶。在实际攻击中,攻击者可以枚举并窃取敏感数据。 ### 资源 2:过度权限的 IAM 用户 | 属性 | 值 | |----------|-------| | 用户名 | `project-admin` | | 内联策略 | `OverPermissivePolicy` | | 允许的操作 | `iam:AttachUserPolicy`(所有资源),`sts:AssumeRole`(所有资源) | 这模拟了一个具有过多权限的 IAM 用户。攻击者如果获取了这些凭证,可以将权限提升至完全的 AdministratorAccess。 **截图:** ![S3 Public Access Off](https://static.pigsec.cn/wp-content/uploads/repos/2026/05/0e211d5dda185559.png) ![S3 Bucket Policy](https://static.pigsec.cn/wp-content/uploads/repos/2026/05/88dde3580f185604.png) ![IAM User](https://static.pigsec.cn/wp-content/uploads/repos/2026/05/90bb8e66f5185609.png) ![Inline Policy](https://static.pigsec.cn/wp-content/uploads/repos/2026/05/4d151358d5185614.png) # ## 云检测工程师作品集 ### 日期:2026 年 5 月 3 日 ## 概述 阶段 3 建立了将 AWS CloudTrail 连接到 Elastic Security SIEM 的数据摄取管道。这是所有检测规则(阶段 4)运行的基础。该管道近乎实时地摄取 CloudTrail 管理事件,从而能够检测 S3 数据窃取、IAM 权限提升和可疑的角色代入。 ## 架构 ``` AWS CloudTrail (us-east-1) │ ▼ S3 Bucket: walentino-cloudtrail-logs-2026 │ ▼ (S3 Event Notification: All object create events) │ SQS Queue: elastic-cloudtrail-queue │ ▼ (Lambda Event Source Mapping) │ AWS Lambda: esf-cloudtrail-forwarder (Python 3.12) │ ▼ (Elasticsearch API - ApiKey authentication) │ Elastic Security Serverless │ ▼ Index: cloudtrail-logs → Kibana Discover ``` ## 已部署组件 ### Elastic Security Serverless | 属性 | 值 | | --- | --- | | 提供商 | Elastic Cloud (GCP us-east4) | | 连接别名 | `my-security-project-b85324` | | Elasticsearch Endpoint | `https://my-security-project-b85324.es.us-east4.gcp.elastic.cloud` | | 身份验证 | API Key(只写) | | 目标索引 | `cloudtrail-logs` | ### 2. SQS 队列 | 属性 | 值 | | --- | --- | | 队列名称 | `elastic-cloudtrail-queue` | | 队列 ARN | `arn:aws:sqs:us-east-1:457664479040:elastic-cloudtrail-queue` | | 队列类型 | Standard | | 可见性超时 | 300 秒 | | 访问策略 | 允许来自存储桶 `walentino-cloudtrail-logs-2026` 的 `s3.amazonaws.com` 执行 `SQS:SendMessage` | ### 3. S3 事件通知 | 属性 | 值 | | --- | --- | | 存储桶 | `walentino-cloudtrail-logs-2026` | | 事件名称 | `elastic-forwarder` | | 事件类型 | 所有对象创建事件 (`s3:ObjectCreated:*`) | | 目标 | SQS 队列:`elastic-cloudtrail-queue` | ### 4. Lambda 转发器 | 属性 | 值 | | --- | --- | | 函数名称 | `esf-cloudtrail-forwarder` | | 运行时 | Python 3.12 | | 内存 | 256 MB | | 超时 | 120 秒 | | IAM 角色 | `esf-lambda-role` | | 附加策略 | `AWSLambdaBasicExecutionRole`,`AmazonSQSFullAccess`,`AmazonS3ReadOnlyAccess` | | 触发器 | SQS 事件源映射 (`elastic-cloudtrail-queue`) | | 批处理大小 | 10 | | 依赖项 | `requests`,`urllib3`,`charset_normalizer`,`certifi`,`idna`(打包在部署包中) | ## Lambda 代码 ``` import json import boto3 import requests import os import gzip from datetime import datetime ELASTIC_URL = os.environ['ELASTIC_URL'] ELASTIC_API_KEY = os.environ['ELASTIC_API_KEY'] INDEX_NAME = 'cloudtrail-logs' def lambda_handler(event, context): headers = { 'Authorization': f'ApiKey {ELASTIC_API_KEY}', 'Content-Type': 'application/json' } successful = 0 failed = 0 for record in event.get('Records', []): try: body = json.loads(record['body']) s3_bucket = body['Records'][0]['s3']['bucket']['name'] s3_key = body['Records'][0]['s3']['object']['key'] s3_client = boto3.client('s3') response = s3_client.get_object(Bucket=s3_bucket, Key=s3_key) content = response['Body'].read() decompressed = gzip.decompress(content) events = json.loads(decompressed) for cloudtrail_event in events.get('Records', []): event_time = cloudtrail_event.get('eventTime', datetime.utcnow().isoformat()) doc = { '@timestamp': event_time, 'event.action': cloudtrail_event.get('eventName'), 'event.provider': cloudtrail_event.get('eventSource'), 'source.ip': cloudtrail_event.get('sourceIPAddress'), 'user_agent.original': cloudtrail_event.get('userAgent'), 'cloud.region': cloudtrail_event.get('awsRegion'), 'aws.cloudtrail.user_identity.arn': cloudtrail_event.get('userIdentity', {}).get('arn'), 'aws.cloudtrail.user_identity.type': cloudtrail_event.get('userIdentity', {}).get('type'), 'aws.cloudtrail.event_version': cloudtrail_event.get('eventVersion'), 'aws.cloudtrail.flattened': cloudtrail_event } url = f"{ELASTIC_URL}/{INDEX_NAME}/_doc" resp = requests.post(url, headers=headers, json=doc, timeout=10) if resp.status_code in [200, 201]: successful += 1 else: failed += 1 print(f"Elastic error: {resp.status_code} - {resp.text}") except Exception as e: failed += 1 print(f"Error processing record: {e}") print(f"Processed: {successful} successful, {failed} failed") return {'statusCode': 200, 'body': json.dumps({'successful': successful, 'failed': failed})} ``` ## 已摄取文档结构 每个 CloudTrail 事件都会被转换为以下 Elasticsearch 文档结构: | 字段 | 来源 | 示例值 | | --- | --- | --- | | `@timestamp` | `eventTime` | `2026-05-03T18:03:09Z` | | `event.action` | `eventName` | `GetBucketVersioning` | | `event.provider` | `eventSource` | `s3.amazonaws.com` | | `source.ip` | `sourceIPAddress` | `142.198.170.64` | | `user_agent.original` | `userAgent` | `Mozilla/5.0... Chrome/147.0.0.0` | | `cloud.region` | `awsRegion` | `us-east-1` | | `aws.cloudtrail.user_identity.arn` | `userIdentity.arn` | `arn:aws:iam::457***9040:user/Nino` | | `aws.cloudtrail.user_identity.type` | `userIdentity.type` | `IAMUser` | | `aws.cloudtrail.event_version` | `eventVersion` | `1.11` | | `aws.cloudtrail.flattened` | 完整事件 | 完整的原始 CloudTrail JSON | ## 遇到的问题与解决方案 ### 问题 1:Elastic Serverless 托管集成 SQS 字段 **问题:** Elastic Serverless AWS CloudTrail 集成界面缺少 SQS 队列 URL 配置字段。 **根本原因:** Serverless 托管集成抽象了底层转发器配置。 **解决方案:** 部署自定义 Lambda 转发器 (`esf-cloudtrail-forwarder`),而不是依赖托管集成。 ### 问题 2:SAR 部署权限被拒绝 **问题:** IAM 用户 `Nino` 未被授权执行 `serverlessrepo:CreateCloudFormationTemplate`。 **根本原因:** Elastic Serverless Forwarder SAR 应用程序具有基于资源的策略,阻止了来自此账户的部署。 **解决方案:** 通过 AWS CLI 和 CloudFormation 手动部署 Lambda 函数,完全绕过了 SAR。 ### 问题 3:SQS 可见性超时不匹配 **问题:** `Queue visibility timeout: 30 seconds is less than Function timeout: 120 seconds`。 **解决方案:** 通过 `aws sqs set-queue-attributes` 将 SQS 队列的可见性超时增加到了 300 秒。 ### 问题 4:缺少 `requests` 模块 **问题:** `Runtime.ImportModuleError: Unable to import module 'lambda_function': No module named 'requests'`。 **解决方案:** 使用 `pip install requests -t .` 将 `requests` 及其依赖项 (`urllib3`,`charset_normalizer`,`certifi`,`idna`) 打包到 Lambda 部署包中。 ### 问题 5:API 密钥权限 — `auto_create` 被拒绝 (403) **问题:** 对于数据流和标准索引,Elastic 针对只写 API 密钥返回了 `action [indices:admin/auto_create] is unauthorized`。 **根本原因:** Elastic Serverless“只写”API 密钥缺乏首次摄取时自动创建索引所需的 `auto_configure` 和 `create_index` 权限。 **解决方案:** 在 Elastic Dev Tools 控制台中通过 `PUT /cloudtrail-logs` 手动预创建了 `cloudtrail-logs` 索引。一旦索引存在,只写密钥的 `create_doc` 和 `write` 权限就足够了。 ### 问题 6:数据流与索引混淆 **问题:** 初始的 Lambda 代码将目标设为 `logs-aws.cloudtrail-default`(一个数据流)。预创建它之后,403 错误依然存在。 **解决方案:** 切换到简单索引 (`cloudtrail-logs`) 并手动预创建它。标准索引不需要数据流所要求的 `auto_configure` 权限。 ## 验证 ### Lambda 执行日志(成功运行) ``` START RequestId: a4d173a5-cb43-5323-b15b-11d414064a1d Processed: 38 successful, 0 failed END RequestId: a4d173a5-cb43-5323-b15b-11d414064a1d REPORT Duration: 15279.37 ms Billed Duration: 15280 ms Memory Size: 256 MB Max Memory Used: 103 MB ``` ### 确认 - Lambda 正在消费 SQS 消息。 - Lambda 成功下载、解压并转换 CloudTrail `.json.gz` 文件。 - 文档已在 Elastic Security Serverless 的 `cloudtrail-logs` 中被索引。 - 数据可在 Kibana Discover 中查询。 ## 下一阶段 **阶段 4:Sigma 检测规则** — 在 Elastic Security 中编写三条自定义检测规则以检测: 1. 来自外部 IP 地址的 `s3:GetObject` (T1530) 2. 针对 AdministratorAccess 的 `iam:AttachUserPolicy` (T1098) 3. 来自非标准 User-Agent 的 `sts:AssumeRole` (T1078.001) ## 成本摘要 | 资源 | 成本 | | --- | --- | | AWS CloudTrail(管理事件) | 免费套餐 | | S3 存储桶(日志存储) | < $0.01 | | SQS 队列 | 免费套餐 | | Lambda(调用 + 持续时间) | 免费套餐 | | Elastic Cloud(14 天试用) | $0.00 | | **总计** | **< $0.01** | ## 概述 阶段 4 在阶段 3 构建的 CloudTrail 摄取管道之上实现了检测层。在 Elastic Security 的规则引擎中编写了三条自定义检测规则,每条规则都针对映射到 MITRE ATT&CK 框架的特定云攻击技术。每条规则都经过刻意触发,以验证端到端的检测:从 AWS API 调用 → CloudTrail 日志 → S3 → SQS → Lambda → Elastic → 警报。 ## 检测规则 ### 规则 1:来自外部 IP 的 S3 GetObject | 属性 | 值 | | --- | --- | | **规则名称** | S3 GetObject from External IP | | **规则 ID** | 自定义查询规则 | | **严重性** | 中 | | **计划** | 每 5 分钟 | | **数据源 | `cloudtrail-logs` 索引 | | **MITRE ATT&CK** | T1530 - Data from Cloud Storage | | **战术** | 窃取 | **KQL 查询:** ``` event.action : "GetObject" AND event.provider : "s3.amazonaws.com" ``` **检测内容:** 检测通过 CloudTrail 数据事件发起的 `s3:GetObject` API 调用。任何访问受监控 S3 存储桶中对象的实体都会生成此警报。在生产环境中,过滤器会排除受信任的 IP 范围(企业 VPN、办公网络出口、CI/CD 运行器)。对于本项目,该规则会对所有 `GetObject` 调用触发以验证管道。 **触发方法:** ``` curl ``` 访问是从 AWS 控制台标准范围之外的 IP 发起的,从而生成了警报。 **警报数量:** 15 **误报分析:** - 从 S3 读取数据的合法第三方 SaaS 集成 - 访问共享存储的内部应用程序和微服务 - CI/CD 管道制品检索 - 执行自动化任务的 AWS 服务账户 - 安全扫描工具 **生产环境加固:** - 添加受信任 IP 范围的排除列表 - 与已知的 User-Agent(AWS SDK、控制台)相关联 - 在维护窗口期间抑制警报 - 如果与异常数据量相结合,则生成更高的严重性级别 ### 规则 2:将 AdministratorAccess 策略附加到 IAM 用户 | 属性 | 值 | | --- | --- | | **规则名称** | AdministratorAccess Policy Attached to IAM User | | **规则 ID** | 自定义查询规则 | | **严重性** | 高 | | **计划** | 每 5 分钟 | | **数据源** | `cloudtrail-logs` 索引 | | **MITRE ATT&CK** | T1098 - Account Manipulation | | **战术** | 持久化,权限提升 | **KQL 查询:** ``` event.action : "AttachUserPolicy" AND event.provider : "iam.amazonaws.com" AND aws.cloudtrail.flattened.requestParameters.policyArn : "arn:aws:iam::aws:policy/AdministratorAccess" ``` **检测内容:** 检测 AWS 托管的 `AdministratorAccess` 策略何时被附加到任何 IAM 用户。此策略授予对所有 AWS 资源的不受限制的访问权限。未经授权的附加是关键的权限提升指标。攻破凭证的攻击者通常会附加此策略以维持持久性。 **触发方法:** 1. 导航到 IAM → 用户 → `project-admin` 2. 添加权限 → 直接附加策略 3. 搜索 `AdministratorAccess`,勾选并附加 **警报数量:** 1 **误报分析:** - 工作时间内合法的管理员入职操作 - 自动化 IAM 配置脚本(CI/CD 管理员账户创建) - 授权安全人员的紧急访问程序 - 破窗账户配置 **生产环境加固:** - 与变更管理工单相关联 - 排除已知的管理员配置角色/用户 ARN - 在批准的变更窗口期间抑制警报 - 如果在工作时间之外执行且没有相应工单,则升级为严重级别 ### 规则 3:来自可疑 User-Agent 的 STS AssumeRole | 属性 | 值 | | --- | --- | | **规则名称** | STS AssumeRole from Suspicious User Agent | | **规则 ID** | 自定义查询规则 | | **严重性** | 中 | | **计划** | 每 5 分钟 | | **数据源** | `cloudtrail-logs` 索引 | | **MITRE ATT&CK** | T1078.001 - Valid Accounts: Default Accounts | | **战术** | 凭证访问,防御规避 | **KQL 查询:** ``` event.action : "AssumeRole" AND event.provider : "sts.amazonaws.com" AND NOT user_agent.original : *aws-cli* AND NOT user_agent.original : *boto3* AND NOT user_agent.original : *console.amazonaws.com* AND NOT user_agent.original : *signin.amazonaws.com* AND NOT user_agent.original : *aws-sdk* AND NOT user_agent.original : *Lambda* AND NOT user_agent.original : *CloudFormation* ``` **检测内容:** 检测 `sts:AssumeRole` API 调用中 User-Agent 字符串与任何已知合法 AWS 工具(CLI、SDK、控制台、CloudFormation、Lambda)不匹配的情况。自定义脚本、恶意软件植入程序和攻击者工具通常使用非标准或完全缺失的 User-Agent 字符串。 **触发方法:** ``` python3 -c "import boto3; from botocore.config import Config; config = Config(user_agent='CustomMalware/1.0'); client = boto3.client('sts', config=config); print(client.get_caller_identity())" ``` 这会使用不在排除列表中的 User-Agent 字符串 `CustomMalware/1.0` 调用 `sts:GetCallerIdentity`(该操作内部调用类似 AssumeRole 的机制)。 **警报数量:** 2 **误报分析:** - 内部安全测试工具(例如 ScoutSuite、Prowler、Pacu) - 带有硬编码 User-Agent 的自定义监控脚本 - 与 AWS API 集成的第三方安全产品 - 使用非标准 SDK 包装器的开发人员 - 事件响应工具 **生产环境加固:** - 维护已知内部工具 User-Agent 子字符串的允许列表 - 与角色 ARN 相关联 — 高价值角色(Admin、OrgManagement)应生成严重级别警报 - 在抑制之前检查源 IP 是否在企业网范围内 - 调查任何包含 "custom"、"malware"、"hack" 或类似指标的 User-Agent ## 警报摘要 | 规则 | 严重性 | 警报数量 | MITRE ATT&CK | | --- | --- | --- | --- | | 来自外部 IP 的 S3 GetObject | 中 | 15 | T1530 | | 将 AdministratorAccess 策略附加到 IAM 用户 | 高 | 1 | T1098 | | 来自可疑 User-Agent 的 STS AssumeRole | 中 | 2 | T1078.001 | | **总计** | | **18** | | ## 验证方法 每条规则均使用以下过程进行了验证: 1. 在 Elastic Security 中**启用规则**。 2. 在 AWS(CLI、控制台或 SDK)中**执行恶意操作**。 3. **等待 5-10 分钟**,经历 CloudTrail 传输 → S3 通知 → SQS → Lambda → Elastic。 4. 在 Elastic Security → 警报中**验证警报**。 5. **截取屏幕截图**,包含规则名称、严重性、时间戳和计数。 6. **记录**触发方法、KQL 查询和误报分析。 ## 架构(端到端) ``` AWS API Call (malicious action) │ ▼ AWS CloudTrail (data & management events) │ ▼ S3 Bucket: walentino-cloudtrail-logs-2026 │ ▼ (S3 Event Notification) SQS Queue: elastic-cloudtrail-queue │ ▼ (Lambda Event Source Mapping) AWS Lambda: esf-cloudtrail-forwarder │ ▼ (Elasticsearch API) Elastic Security Serverless │ ▼ Index: cloudtrail-logs │ ▼ Elastic Security Rules (KQL queries) │ ▼ Alerts Dashboard ``` ## 展现的技能 - **检测工程:** 编写针对云攻击技术的基于 KQL 的检测规则 - **MITRE ATT&CK 映射:** 将检测逻辑与行业标准框架对齐 - **误报分析:** 预测合法场景并提出排除策略 - **警报验证:** 触发真实的 AWS API 调用以生成和验证警报 - **SIEM 操作:** 在生产 SIEM 中管理规则、计划和警报分类 - **文档编写:** 适用于 SOC 运行手册的全面规则文档 ## 下一阶段 **项目 2:自动响应器** — 启用 Amazon GuardDuty,触发真实的威胁发现,并构建一个 Lambda 函数,通过附加隔离策略、撤销控制台会话和 SNS 通知来自动控制受损的 IAM 凭证。 ## 成本摘要 | 资源 | 成本 | | --- | --- | | CloudTrail 数据事件(测试访问) | < $0.01 | | 所有其他资源(免费套餐) | $0.00 | | Elastic Cloud(14 天试用) | $0.00 | | **总计** | **< $0.01** |
标签:AMSI绕过, AWS, CloudTrail, CSPM, DevSecOps, DPI, Elasticsearch, Elastic Security, KQL, Lambda, OISF, S3, Serverless, Sigma规则, SQS, TinkerPop, 上游代理, 云安全态势管理, 前端应用, 威胁检测, 子域名暴力破解, 安全告警, 安全运营, 开源安全项目, 扫描框架, 日志管理, 漏洞探索, 目标导入, 端到端管道, 自动化响应, 自动化检测