purpleshellsecurity/guardduty-triage-agent
GitHub: purpleshellsecurity/guardduty-triage-agent
一个事件驱动、只读的 AI Agent,利用 Amazon Bedrock 自动分诊 AWS GuardDuty 安全告警,并将结构化判定结果推送给 SIEM。
Stars: 0 | Forks: 0
# GuardDuty 分诊 Agent
[](https://github.com/purpleshellsecurity/guardduty-triage-agent/actions/workflows/ci.yml)
[](LICENSE)
一个只读的 AI agent,可对 IAM、EKS、EC2 等多方面的 AWS GuardDuty 发现进行分诊。它会读取 CloudTrail 和 EKS 审计日志以获取上下文,基于 Amazon Bedrock 进行推理,并将结构化的判定结果放入队列中供您的 SIEM 使用。它只提供建议;由人类执行操作——它绝不会更改您账户中的任何内容。
## 架构
GuardDuty 发出发现 → EventBridge 将其推送给 agent → agent 进行调查并将判定结果写入 SQS → 您的 SIEM 轮询该队列。完全事件驱动:没有 cron 定时任务,没有轮询器,没有容器,空闲时没有任何资源在运行。

| 跳跃节点 | 发生的操作 |
|-----|--------------|
| **GuardDuty → EventBridge** | 每个新的发现都会被自动推送(规则范围限定为 `source: aws.guardduty`)。没有水位线,没有抓取循环。 |
| **EventBridge → Lambda** | 该规则会针对每个发现调用函数。按发现计算,空闲时成本为 $0。 |
| **Lambda(即 agent)** | 根据数据源路由发现,然后在有限的循环中使用**只读**工具收集上下文 —— 针对 IAM 发现使用 CloudTrail,针对 Kubernetes 发现使用 EKS 审计日志 —— 并在 Bedrock 上进行推理。模型通过执行角色运行 —— **无需 API key**。 |
| **Lambda → SQS** | 判定信封被发送到持久的 SQS 队列(`sqs:SendMessage`,这是 agent *唯一*的出站操作)。如果 SIEM 宕机,队列会保留判定结果。 |
| **SQS → SIEM** | 您的 SIEM 轮询队列,并通过 `finding_id` 将每个判定结果与其对应的发现关联起来。(该连接器不在范围内 —— 构建到队列即止。) |
## 覆盖范围
agent 根据数据源(`service.action.actionType`)路由每个 GuardDuty 发现,并根据其证据允许的深度对每个发现进行调查 —— 完整映射见
[docs/finding-coverage.md](docs/finding-coverage.md):
| 领域 (`actionType`) | 调查方式 | 深度 |
|---|---|---|
| CloudTrail (`AWS_API_CALL`) | 参与者的 CloudTrail 会话 (`get_actor_session`) | 深 |
| EKS (`KUBERNETES_API_CALL`) | EKS 控制平面审计日志 (`get_k8s_audit_session`) | 深 |
| VPCFLOW (`NETWORK_CONNECTION` · `PORT_PROBE`) | VPC flow logs (`get_vpc_flow_session`) — ACCEPT/REJECT、端口、字节 | 深 |
| DNS (`DNS_REQUEST`) | 嵌入的详细信息 + IP 扩展 + VPC-flow 信标检查 (`get_vpc_flow_session`) | 已推理 |
| RDS / Runtime (`RDS_LOGIN_ATTEMPT` · `RUNTIME`) | 嵌入的详细信息 + CloudTrail 关联 | 已推理 |
**深** = agent 查询第二证据源以进行佐证。其余部分则根据发现本身的详细信息进行推理(无查询源),并在 `missing_context` 中标记无法验证的内容。
## 部署
该 agent 作为最小权限的 Lambda + EventBridge 规则通过 SAM 发布。一条命令部署,
一条命令销毁。
```
export AWS_REGION=us-east-1
./deploy/deploy_lambda.sh # build + deploy the SAM stack (auto-trigger on)
./deploy/teardown_lambda.sh # remove the stack and its CloudWatch log group
```
**前提条件:** `aws` + `sam` CLI,以及在 Bedrock 控制台中为您的区域启用一次性的 **Claude 模型访问权限**。部署过程会打印 `VerdictQueueUrl` —— 将您的 SIEM 指向它。设置、IAM 和调优位于 [deploy/README.md](deploy/README.md) 中。
" }
]
}
```
(`bedrock:InvokeModel` 为 `*`,因为全局推理配置文件会扇出到各个区域的模型 ARN;一旦它们固定下来,请将其范围限定为您的模型 + 推理配置文件 ARN。)
## 判定结果
agent 为每个发现发送一条 SQS 消息:一个 JSON **信封**加上**消息属性**,
以便 SIEM 无需解析正文即可进行路由和关联。
```
{
"verdict": {
"summary": "1–2 sentence bottom line",
"description": "the fuller account of what happened",
"timeline": ["ISO8601 - action", "... (the alert event is marked)"],
"severity": "medium", // low | medium | high
"risk_score": 45, // 0–100
"mitre_tactics_observed": ["Defense Evasion - T1562.001", "..."],
"iocs": ["..."], "recommended_next_steps": ["..."], "analyst_action_required": true
},
"source": { "finding_id": "...", "finding_type": "...", "account": "...", "region": "..." },
"agent": { "model_id": "global.anthropic.claude-sonnet-4-6" },
"produced_at": "2026-06-22T20:26:07Z",
"request_id": "..."
}
```
消息属性:`finding_id`、`severity`、`risk_score`。一个完整的真实示例(来自实时
Stratus Red Team 引爆)位于
[`examples/sample_verdict_cloudtrail_stop.json`](examples/sample_verdict_cloudtrail_stop.json) 中。
agent 的只读工具:`get_actor_session`(参与者的 CloudTrail 会话 +
完整的会话统计信息配置文件)、`find_events_by_name`、`find_events_for_resource`、
`get_k8s_audit_session`(EKS 控制平面审计日志)、`get_vpc_flow_session`(VPC
flow logs),以及 `lookup_ip`(地理/ASN 扩展信息)。
## 开发
已部署的流水线即产品;其余部分用于对 agent 进行开发。
**测试** —— 纯逻辑单元测试(记录精简、警报/发现解析、seed
组装、判定 JSON 提取)在**无需 API key 且无需 AWS** 的情况下运行,并在每次
推送时通过 [GitHub Actions](.github/workflows/ci.yml) 触发:
```
pip install pytest && pytest -q # 51 tests
```
要对真实的 GuardDuty 发现执行整个流水线,请部署它并运行
**[端到端冒烟测试](docs/smoke-test.md)**。(`investigate.py --finding `
会在您提供的发现文件上运行 agent —— 提供的 `examples/` 是经过脱敏的形状
参考,而非可运行的演示。)
## 文档
- **了解** — [规范](docs/SPEC.md) · [发现覆盖范围](docs/finding-coverage.md)
- **操作** — [部署指南](deploy/README.md) · [端到端冒烟测试](docs/smoke-test.md)
- **了解限制** — [局限性与生产就绪情况](docs/limitations.md)
## 仓库布局
```
.
├── investigate.py the agent — one GuardDuty finding → read-only CloudTrail tools in a loop → verdict
├── lambda_handler.py Lambda entrypoint: seed from the finding → run the agent → send the verdict to SQS
├── deploy/ SAM template + build / deploy / teardown scripts
├── examples/ a real EventBridge input + the SQS verdict it produces (see examples/README.md)
├── tests/ pure-logic unit tests (no API key, no AWS); run on every push
└── docs/ spec · finding coverage · smoke test · limitations
```
`investigate.py` 是 agent(它仅需要 `anthropic` + `boto3`);`lambda_handler.py`
+ `deploy/` 负责打包并将其作为由 GuardDuty 触发的 Lambda 发布。
## 局限性
仅限管理事件(**按设计**对 S3 数据平面活动不可见),无跨账户
关联,且仅支持地理/ASN IP 扩展(无威胁情报信誉)。仅提供建议且
**未**经过安全审计。完整范围、已知故障模式以及客户级部署的需求:
**[docs/limitations.md](docs/limitations.md)**。
## 许可证
MIT。详见 [LICENSE](LICENSE)。
IAM —— 仅使用此策略运行,绝无其他
只读是核心目的。执行角色可以收集上下文、调用模型并发送判定结果 —— 除此之外什么也做不了。 ``` { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "cloudtrail:LookupEvents", "Resource": "*" }, { "Effect": "Allow", "Action": "logs:FilterLogEvents", "Resource": "arn:aws:logs:*:*:log-group:*" }, { "Effect": "Allow", "Action": "ec2:DescribeFlowLogs", "Resource": "*" }, { "Effect": "Allow", "Action": "bedrock:InvokeModel", "Resource": "*" }, { "Effect": "Allow", "Action": "sqs:SendMessage", "Resource": "标签:AMSI绕过, AWS, DPI, 事件驱动, 云计算, 威胁检测, 安全运营, 扫描框架, 自动化分诊, 规则引擎, 逆向工具