purpleshellsecurity/aws-triage-agent
GitHub: purpleshellsecurity/aws-triage-agent
一个事件驱动的只读 AI 代理,用于自动分诊 AWS GuardDuty 告警,通过分析 CloudTrail 日志并借助 Bedrock 大模型生成结构化安全裁决。
Stars: 0 | Forks: 0
# AWS Triage Agent
[](https://github.com/purpleshellsecurity/aws-triage-agent/actions/workflows/ci.yml)
[](LICENSE)
## 架构
GuardDuty 触发 finding → EventBridge 将其推送到 agent → agent 进行调查并将裁决写入 SQS → 你的 SIEM 轮询该队列。完全事件驱动:没有 cron,没有轮询器,没有容器,空闲时没有任何运行。
```
┌─────────────────────────────────────┐
│ Lambda — the agent │
GuardDuty EventBridge │ 1. normalize the alert │ SQS queue your
finding ───▶ rule ───▶ │ 2. read CloudTrail (read-only) │ ───▶ aws-triage- ───▶ SIEM
(new alert) aws.guardduty │ 3. reason on Amazon Bedrock │ agent-verdicts (polls)
auto-trigger │ 4. emit a structured JSON verdict │ (durable, 14d)
└─────────────────────────────────────┘
```
| 跳转 | 发生了什么 |
|-----|--------------|
| **GuardDuty → EventBridge** | 每一个新的 finding 都会被自动推送(规则范围限定为 `source: aws.guardduty`)。没有水位线,没有抓取循环。 |
| **EventBridge → Lambda** | 该规则会针对每个 finding 调用一次函数。单次 finding 计算,空闲时成本为 $0。 |
| **Lambda(agent)** | 规范化告警,在受限循环中使用 **只读** 工具从 CloudTrail 收集上下文,并在 Bedrock 上进行推理。模型通过执行角色运行 —— **无需 API key**。 |
| **Lambda → SQS** | 裁决信封被发送到持久的 SQS 队列(`sqs:SendMessage`,这是 agent 的*唯一*出站操作)。如果 SIEM 宕机,队列会保留裁决。 |
| **SQS → SIEM** | 你的 SIEM 轮询该队列,并通过 `finding_id` 将每个裁决与其对应的 finding 关联起来。(该连接器不在范围内 —— 构建过程到队列为止。) |
CloudTrail 事件是由 **分析师主动发起的**,而非自动分诊 —— 按需调用即可:
```
./scripts/SOC/triage_finding.sh --finding # or --event
```
## 部署
该 agent 通过 SAM 以最小权限的 Lambda + EventBridge 规则形式提供。一条命令部署,
一条命令销毁。
```
export AWS_REGION=us-east-1
./deploy/deploy_lambda.sh # build + deploy the SAM stack (auto-trigger on)
./deploy/teardown_lambda.sh # remove the stack and its CloudWatch log group
```
**前提条件:** 需要安装 `aws` + `sam` CLIs,并在 Bedrock 控制台中为你的区域启用一次性的 **Claude 模型访问权限**。部署过程会打印出 `VerdictQueueUrl` —— 将你的 SIEM 指向它。有关设置、IAM 和调优请参阅 [deploy/README.md](deploy/README.md)。
" }
]
}
```
(`bedrock:InvokeModel` 设置为 `*`,是因为全局推理配置文件会分散到各个区域的模型 ARN 中;一旦模型和推理配置文件 ARN 确定下来,请将其范围限定为你的模型和推理配置文件 ARN。)
## 裁决
agent 会针对每个 finding 发送一条 SQS 消息:一个 JSON **信封** 加上 **消息属性**,以便 SIEM 可以在不解析消息体的情况下进行路由和关联。
```
{
"verdict": {
"summary": "1–2 sentence bottom line",
"description": "the fuller account of what happened",
"timeline": ["ISO8601 - action", "... (the alert event is marked)"],
"severity": "medium", // INFORMATIONAL | LOW | MEDIUM | HIGH | CRITICAL
"risk_score": 45, // 0–100
"mitre_tactics_observed": ["Defense Evasion - T1562.001", "..."],
"iocs": ["..."], "recommended_next_steps": ["..."], "analyst_action_required": true
},
"source": { "finding_id": "...", "finding_type": "...", "account": "...", "region": "..." },
"agent": { "model_id": "global.anthropic.claude-sonnet-4-6" },
"produced_at": "2026-06-22T20:26:07Z",
"request_id": "..."
}
```
消息属性包括:`finding_id`、`severity`、`risk_score`。完整的真实案例(来自一次实时的 Stratus Red Team 触发)位于
[`examples/sample_verdict_cloudtrail_stop.json`](examples/sample_verdict_cloudtrail_stop.json)。
agent 的只读工具包括:`get_actor_session`(获取参与者的 CloudTrail 会话 + 完整的会话统计信息配置)、`find_events_by_name`、`find_events_for_resource` 以及
`lookup_ip`(地理位置/ASN 扩展信息)。
## 开发
已部署的流水线即为最终产品;其余部分用于对 agent 进行开发。
`
/ `--event ` 的 CLI 用于指向你自己的告警文件;`--dry-run` 会显示 prompt + 种子数据,且不会发起任何 API/AWS 调用。
**测试** —— 纯逻辑单元测试(记录精简、告警/finding 解析、种子
数据组装、裁决 JSON 提取)在 **无需 API key 且无需 AWS** 的环境下运行,并在每次
push 时通过 [GitHub Actions](.github/workflows/ci.yml) 执行:
```
pip install pytest && pytest -q # 37 tests
```
## 文档
- **了解** — [规范](docs/SPEC.md)
- **运维** — [部署指南](deploy/README.md) · [部署演练](docs/deploying_as_a_lambda.md) · [端到端冒烟测试](docs/smoke-test.md)
- **了解限制** — [局限性与生产就绪情况](docs/limitations.md)
## 仓库布局
```
.
├── investigate.py the agent — one alert → read-only CloudTrail tools in a loop → verdict
├── lambda_handler.py Lambda entrypoint: route the event → run the agent → send the verdict to SQS
├── deploy/ SAM template + build / deploy / teardown scripts
├── scripts/SOC/ analyst helper — triage one alert on demand by id
├── examples/ a real EventBridge input + the SQS verdict it produces (see examples/README.md)
├── tests/ pure-logic unit tests (no API key, no AWS); run on every push
└── docs/ spec · deploy guide · deploy walkthrough · smoke test · limitations
```
`investigate.py` 即为该 agent(它只需要 `anthropic` + `boto3`);`lambda_handler.py`
+ `deploy/` 负责将其打包并发布为由 GuardDuty 触发的 Lambda。
## 局限性
仅支持管理事件(**设计上**对 S3 数据面活动不可见),不支持跨账户
关联,且 IP 扩展信息仅限于地理位置/ASN(无威胁情报信誉)。仅供参考,且**未经过**
安全审计。完整的范围说明、已知的故障模式以及客户级部署所需的要求:**[docs/limitations.md](docs/limitations.md)**。
## 许可证
MIT。详见 [LICENSE](LICENSE)。
IAM —— 仅使用此策略运行,别无其他
只读是核心目的。执行角色可以收集上下文、调用模型并发送裁决 —— 除此以外什么都不能做。 ``` { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "cloudtrail:LookupEvents", "Resource": "*" }, { "Effect": "Allow", "Action": "bedrock:InvokeModel", "Resource": "*" }, { "Effect": "Allow", "Action": "sqs:SendMessage", "Resource": "在本地针对单个告警运行 agent(无需部署)
本地运行的处理程序与发布版本在字节层面完全一致,因此这是你无需搭建整个技术栈即可测试更改 —— 或手动分诊单个告警 —— 的方法。它会使用你的 AWS 凭证。 ``` python3 -m venv .venv && source .venv/bin/activate # Python 3.10+ pip install -r requirements.txt aws sts get-caller-identity # confirm the RIGHT (sandbox) account # 对示例的 EventBridge finding 运行该 agent(真实的触发结构): python3 -c "import json, lambda_handler as h; \ print(json.dumps(h.handler(json.load(open('examples/sample_eventbridge_finding.json')), None), indent=2))" ``` 步骤轨迹输出到 stderr,裁决输出到 stdout。`investigate.py` 还提供了一个 `--finding标签:AWS, DPI, GuardDuty, 告警分诊, 安全运营, 扫描框架, 逆向工具