purpleshellsecurity/guardduty-triage-agent

GitHub: purpleshellsecurity/guardduty-triage-agent

一个事件驱动、只读的 AI Agent,利用 Amazon Bedrock 自动分诊 AWS GuardDuty 安全告警,并将结构化判定结果推送给 SIEM。

Stars: 0 | Forks: 0

# GuardDuty 分诊 Agent [![CI](https://static.pigsec.cn/wp-content/uploads/repos/2026/06/b80b30cb1e110555.svg)](https://github.com/purpleshellsecurity/guardduty-triage-agent/actions/workflows/ci.yml) [![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](LICENSE) 一个只读的 AI agent,可对 IAM、EKS、EC2 等多方面的 AWS GuardDuty 发现进行分诊。它会读取 CloudTrail 和 EKS 审计日志以获取上下文,基于 Amazon Bedrock 进行推理,并将结构化的判定结果放入队列中供您的 SIEM 使用。它只提供建议;由人类执行操作——它绝不会更改您账户中的任何内容。 ## 架构 GuardDuty 发出发现 → EventBridge 将其推送给 agent → agent 进行调查并将判定结果写入 SQS → 您的 SIEM 轮询该队列。完全事件驱动:没有 cron 定时任务,没有轮询器,没有容器,空闲时没有任何资源在运行。 ![GuardDuty 分诊 Agent — 事件驱动的 GuardDuty 警报分诊](https://raw.githubusercontent.com/purpleshellsecurity/guardduty-triage-agent/main/img/architecture.svg) | 跳跃节点 | 发生的操作 | |-----|--------------| | **GuardDuty → EventBridge** | 每个新的发现都会被自动推送(规则范围限定为 `source: aws.guardduty`)。没有水位线,没有抓取循环。 | | **EventBridge → Lambda** | 该规则会针对每个发现调用函数。按发现计算,空闲时成本为 $0。 | | **Lambda(即 agent)** | 根据数据源路由发现,然后在有限的循环中使用**只读**工具收集上下文 —— 针对 IAM 发现使用 CloudTrail,针对 Kubernetes 发现使用 EKS 审计日志 —— 并在 Bedrock 上进行推理。模型通过执行角色运行 —— **无需 API key**。 | | **Lambda → SQS** | 判定信封被发送到持久的 SQS 队列(`sqs:SendMessage`,这是 agent *唯一*的出站操作)。如果 SIEM 宕机,队列会保留判定结果。 | | **SQS → SIEM** | 您的 SIEM 轮询队列,并通过 `finding_id` 将每个判定结果与其对应的发现关联起来。(该连接器不在范围内 —— 构建到队列即止。) | ## 覆盖范围 agent 根据数据源(`service.action.actionType`)路由每个 GuardDuty 发现,并根据其证据允许的深度对每个发现进行调查 —— 完整映射见 [docs/finding-coverage.md](docs/finding-coverage.md): | 领域 (`actionType`) | 调查方式 | 深度 | |---|---|---| | CloudTrail (`AWS_API_CALL`) | 参与者的 CloudTrail 会话 (`get_actor_session`) | 深 | | EKS (`KUBERNETES_API_CALL`) | EKS 控制平面审计日志 (`get_k8s_audit_session`) | 深 | | VPCFLOW (`NETWORK_CONNECTION` · `PORT_PROBE`) | VPC flow logs (`get_vpc_flow_session`) — ACCEPT/REJECT、端口、字节 | 深 | | DNS (`DNS_REQUEST`) | 嵌入的详细信息 + IP 扩展 + VPC-flow 信标检查 (`get_vpc_flow_session`) | 已推理 | | RDS / Runtime (`RDS_LOGIN_ATTEMPT` · `RUNTIME`) | 嵌入的详细信息 + CloudTrail 关联 | 已推理 | **深** = agent 查询第二证据源以进行佐证。其余部分则根据发现本身的详细信息进行推理(无查询源),并在 `missing_context` 中标记无法验证的内容。 ## 部署 该 agent 作为最小权限的 Lambda + EventBridge 规则通过 SAM 发布。一条命令部署, 一条命令销毁。 ``` export AWS_REGION=us-east-1 ./deploy/deploy_lambda.sh # build + deploy the SAM stack (auto-trigger on) ./deploy/teardown_lambda.sh # remove the stack and its CloudWatch log group ``` **前提条件:** `aws` + `sam` CLI,以及在 Bedrock 控制台中为您的区域启用一次性的 **Claude 模型访问权限**。部署过程会打印 `VerdictQueueUrl` —— 将您的 SIEM 指向它。设置、IAM 和调优位于 [deploy/README.md](deploy/README.md) 中。
IAM —— 仅使用此策略运行,绝无其他 只读是核心目的。执行角色可以收集上下文、调用模型并发送判定结果 —— 除此之外什么也做不了。 ``` { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "cloudtrail:LookupEvents", "Resource": "*" }, { "Effect": "Allow", "Action": "logs:FilterLogEvents", "Resource": "arn:aws:logs:*:*:log-group:*" }, { "Effect": "Allow", "Action": "ec2:DescribeFlowLogs", "Resource": "*" }, { "Effect": "Allow", "Action": "bedrock:InvokeModel", "Resource": "*" }, { "Effect": "Allow", "Action": "sqs:SendMessage", "Resource": "" } ] } ``` (`bedrock:InvokeModel` 为 `*`,因为全局推理配置文件会扇出到各个区域的模型 ARN;一旦它们固定下来,请将其范围限定为您的模型 + 推理配置文件 ARN。)
## 判定结果 agent 为每个发现发送一条 SQS 消息:一个 JSON **信封**加上**消息属性**, 以便 SIEM 无需解析正文即可进行路由和关联。 ``` { "verdict": { "summary": "1–2 sentence bottom line", "description": "the fuller account of what happened", "timeline": ["ISO8601 - action", "... (the alert event is marked)"], "severity": "medium", // low | medium | high "risk_score": 45, // 0–100 "mitre_tactics_observed": ["Defense Evasion - T1562.001", "..."], "iocs": ["..."], "recommended_next_steps": ["..."], "analyst_action_required": true }, "source": { "finding_id": "...", "finding_type": "...", "account": "...", "region": "..." }, "agent": { "model_id": "global.anthropic.claude-sonnet-4-6" }, "produced_at": "2026-06-22T20:26:07Z", "request_id": "..." } ``` 消息属性:`finding_id`、`severity`、`risk_score`。一个完整的真实示例(来自实时 Stratus Red Team 引爆)位于 [`examples/sample_verdict_cloudtrail_stop.json`](examples/sample_verdict_cloudtrail_stop.json) 中。 agent 的只读工具:`get_actor_session`(参与者的 CloudTrail 会话 + 完整的会话统计信息配置文件)、`find_events_by_name`、`find_events_for_resource`、 `get_k8s_audit_session`(EKS 控制平面审计日志)、`get_vpc_flow_session`(VPC flow logs),以及 `lookup_ip`(地理/ASN 扩展信息)。 ## 开发 已部署的流水线即产品;其余部分用于对 agent 进行开发。 **测试** —— 纯逻辑单元测试(记录精简、警报/发现解析、seed 组装、判定 JSON 提取)在**无需 API key 且无需 AWS** 的情况下运行,并在每次 推送时通过 [GitHub Actions](.github/workflows/ci.yml) 触发: ``` pip install pytest && pytest -q # 51 tests ``` 要对真实的 GuardDuty 发现执行整个流水线,请部署它并运行 **[端到端冒烟测试](docs/smoke-test.md)**。(`investigate.py --finding ` 会在您提供的发现文件上运行 agent —— 提供的 `examples/` 是经过脱敏的形状 参考,而非可运行的演示。) ## 文档 - **了解** — [规范](docs/SPEC.md) · [发现覆盖范围](docs/finding-coverage.md) - **操作** — [部署指南](deploy/README.md) · [端到端冒烟测试](docs/smoke-test.md) - **了解限制** — [局限性与生产就绪情况](docs/limitations.md) ## 仓库布局 ``` . ├── investigate.py the agent — one GuardDuty finding → read-only CloudTrail tools in a loop → verdict ├── lambda_handler.py Lambda entrypoint: seed from the finding → run the agent → send the verdict to SQS ├── deploy/ SAM template + build / deploy / teardown scripts ├── examples/ a real EventBridge input + the SQS verdict it produces (see examples/README.md) ├── tests/ pure-logic unit tests (no API key, no AWS); run on every push └── docs/ spec · finding coverage · smoke test · limitations ``` `investigate.py` 是 agent(它仅需要 `anthropic` + `boto3`);`lambda_handler.py` + `deploy/` 负责打包并将其作为由 GuardDuty 触发的 Lambda 发布。 ## 局限性 仅限管理事件(**按设计**对 S3 数据平面活动不可见),无跨账户 关联,且仅支持地理/ASN IP 扩展(无威胁情报信誉)。仅提供建议且 **未**经过安全审计。完整范围、已知故障模式以及客户级部署的需求: **[docs/limitations.md](docs/limitations.md)**。 ## 许可证 MIT。详见 [LICENSE](LICENSE)。
标签:AMSI绕过, AWS, DPI, 事件驱动, 云计算, 威胁检测, 安全运营, 扫描框架, 自动化分诊, 规则引擎, 逆向工具