purpleshellsecurity/aws-triage-agent

GitHub: purpleshellsecurity/aws-triage-agent

一个事件驱动的只读 AI 代理,用于自动分诊 AWS GuardDuty 告警,通过分析 CloudTrail 日志并借助 Bedrock 大模型生成结构化安全裁决。

Stars: 0 | Forks: 0

# AWS Triage Agent [![CI](https://github.com/purpleshellsecurity/aws-triage-agent/actions/workflows/ci.yml/badge.svg)](https://github.com/purpleshellsecurity/aws-triage-agent/actions/workflows/ci.yml) [![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](LICENSE) ## 架构 GuardDuty 触发 finding → EventBridge 将其推送到 agent → agent 进行调查并将裁决写入 SQS → 你的 SIEM 轮询该队列。完全事件驱动:没有 cron,没有轮询器,没有容器,空闲时没有任何运行。 ``` ┌─────────────────────────────────────┐ │ Lambda — the agent │ GuardDuty EventBridge │ 1. normalize the alert │ SQS queue your finding ───▶ rule ───▶ │ 2. read CloudTrail (read-only) │ ───▶ aws-triage- ───▶ SIEM (new alert) aws.guardduty │ 3. reason on Amazon Bedrock │ agent-verdicts (polls) auto-trigger │ 4. emit a structured JSON verdict │ (durable, 14d) └─────────────────────────────────────┘ ``` | 跳转 | 发生了什么 | |-----|--------------| | **GuardDuty → EventBridge** | 每一个新的 finding 都会被自动推送(规则范围限定为 `source: aws.guardduty`)。没有水位线,没有抓取循环。 | | **EventBridge → Lambda** | 该规则会针对每个 finding 调用一次函数。单次 finding 计算,空闲时成本为 $0。 | | **Lambda(agent)** | 规范化告警,在受限循环中使用 **只读** 工具从 CloudTrail 收集上下文,并在 Bedrock 上进行推理。模型通过执行角色运行 —— **无需 API key**。 | | **Lambda → SQS** | 裁决信封被发送到持久的 SQS 队列(`sqs:SendMessage`,这是 agent 的*唯一*出站操作)。如果 SIEM 宕机,队列会保留裁决。 | | **SQS → SIEM** | 你的 SIEM 轮询该队列,并通过 `finding_id` 将每个裁决与其对应的 finding 关联起来。(该连接器不在范围内 —— 构建过程到队列为止。) | CloudTrail 事件是由 **分析师主动发起的**,而非自动分诊 —— 按需调用即可: ``` ./scripts/SOC/triage_finding.sh --finding # or --event ``` ## 部署 该 agent 通过 SAM 以最小权限的 Lambda + EventBridge 规则形式提供。一条命令部署, 一条命令销毁。 ``` export AWS_REGION=us-east-1 ./deploy/deploy_lambda.sh # build + deploy the SAM stack (auto-trigger on) ./deploy/teardown_lambda.sh # remove the stack and its CloudWatch log group ``` **前提条件:** 需要安装 `aws` + `sam` CLIs,并在 Bedrock 控制台中为你的区域启用一次性的 **Claude 模型访问权限**。部署过程会打印出 `VerdictQueueUrl` —— 将你的 SIEM 指向它。有关设置、IAM 和调优请参阅 [deploy/README.md](deploy/README.md)。
IAM —— 仅使用此策略运行,别无其他 只读是核心目的。执行角色可以收集上下文、调用模型并发送裁决 —— 除此以外什么都不能做。 ``` { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "cloudtrail:LookupEvents", "Resource": "*" }, { "Effect": "Allow", "Action": "bedrock:InvokeModel", "Resource": "*" }, { "Effect": "Allow", "Action": "sqs:SendMessage", "Resource": "" } ] } ``` (`bedrock:InvokeModel` 设置为 `*`,是因为全局推理配置文件会分散到各个区域的模型 ARN 中;一旦模型和推理配置文件 ARN 确定下来,请将其范围限定为你的模型和推理配置文件 ARN。)
## 裁决 agent 会针对每个 finding 发送一条 SQS 消息:一个 JSON **信封** 加上 **消息属性**,以便 SIEM 可以在不解析消息体的情况下进行路由和关联。 ``` { "verdict": { "summary": "1–2 sentence bottom line", "description": "the fuller account of what happened", "timeline": ["ISO8601 - action", "... (the alert event is marked)"], "severity": "medium", // INFORMATIONAL | LOW | MEDIUM | HIGH | CRITICAL "risk_score": 45, // 0–100 "mitre_tactics_observed": ["Defense Evasion - T1562.001", "..."], "iocs": ["..."], "recommended_next_steps": ["..."], "analyst_action_required": true }, "source": { "finding_id": "...", "finding_type": "...", "account": "...", "region": "..." }, "agent": { "model_id": "global.anthropic.claude-sonnet-4-6" }, "produced_at": "2026-06-22T20:26:07Z", "request_id": "..." } ``` 消息属性包括:`finding_id`、`severity`、`risk_score`。完整的真实案例(来自一次实时的 Stratus Red Team 触发)位于 [`examples/sample_verdict_cloudtrail_stop.json`](examples/sample_verdict_cloudtrail_stop.json)。 agent 的只读工具包括:`get_actor_session`(获取参与者的 CloudTrail 会话 + 完整的会话统计信息配置)、`find_events_by_name`、`find_events_for_resource` 以及 `lookup_ip`(地理位置/ASN 扩展信息)。 ## 开发 已部署的流水线即为最终产品;其余部分用于对 agent 进行开发。
在本地针对单个告警运行 agent(无需部署) 本地运行的处理程序与发布版本在字节层面完全一致,因此这是你无需搭建整个技术栈即可测试更改 —— 或手动分诊单个告警 —— 的方法。它会使用你的 AWS 凭证。 ``` python3 -m venv .venv && source .venv/bin/activate # Python 3.10+ pip install -r requirements.txt aws sts get-caller-identity # confirm the RIGHT (sandbox) account # 对示例的 EventBridge finding 运行该 agent(真实的触发结构): python3 -c "import json, lambda_handler as h; \ print(json.dumps(h.handler(json.load(open('examples/sample_eventbridge_finding.json')), None), indent=2))" ``` 步骤轨迹输出到 stderr,裁决输出到 stdout。`investigate.py` 还提供了一个 `--finding ` / `--event ` 的 CLI 用于指向你自己的告警文件;`--dry-run` 会显示 prompt + 种子数据,且不会发起任何 API/AWS 调用。
**测试** —— 纯逻辑单元测试(记录精简、告警/finding 解析、种子 数据组装、裁决 JSON 提取)在 **无需 API key 且无需 AWS** 的环境下运行,并在每次 push 时通过 [GitHub Actions](.github/workflows/ci.yml) 执行: ``` pip install pytest && pytest -q # 37 tests ``` ## 文档 - **了解** — [规范](docs/SPEC.md) - **运维** — [部署指南](deploy/README.md) · [部署演练](docs/deploying_as_a_lambda.md) · [端到端冒烟测试](docs/smoke-test.md) - **了解限制** — [局限性与生产就绪情况](docs/limitations.md) ## 仓库布局 ``` . ├── investigate.py the agent — one alert → read-only CloudTrail tools in a loop → verdict ├── lambda_handler.py Lambda entrypoint: route the event → run the agent → send the verdict to SQS ├── deploy/ SAM template + build / deploy / teardown scripts ├── scripts/SOC/ analyst helper — triage one alert on demand by id ├── examples/ a real EventBridge input + the SQS verdict it produces (see examples/README.md) ├── tests/ pure-logic unit tests (no API key, no AWS); run on every push └── docs/ spec · deploy guide · deploy walkthrough · smoke test · limitations ``` `investigate.py` 即为该 agent(它只需要 `anthropic` + `boto3`);`lambda_handler.py` + `deploy/` 负责将其打包并发布为由 GuardDuty 触发的 Lambda。 ## 局限性 仅支持管理事件(**设计上**对 S3 数据面活动不可见),不支持跨账户 关联,且 IP 扩展信息仅限于地理位置/ASN(无威胁情报信誉)。仅供参考,且**未经过** 安全审计。完整的范围说明、已知的故障模式以及客户级部署所需的要求:**[docs/limitations.md](docs/limitations.md)**。 ## 许可证 MIT。详见 [LICENSE](LICENSE)。
标签:AWS, DPI, GuardDuty, 告警分诊, 安全运营, 扫描框架, 逆向工具