meghnadh7/aegis-graph

GitHub: meghnadh7/aegis-graph

一个基于LangGraph的多租户自主SOC告警分诊副驾驶,实现从IOC富化、ATT&CK映射、调查时间线到DFIR-IRIS自动建案的端到端自动化流程。

Stars: 0 | Forks: 0

# aegis-graph 这是我在一家 MSSP 面试应用 AI 工程师职位时做的一个小项目。其初衷是端到端地模拟一个 Tier-1 SOC 分诊副驾驶的工作流程:接收告警、IOC 扩充、MITRE ATT&CK 映射、调查时间线、可强制进行修正的自我批评机制,以及在 DFIR-IRIS 中生成最终案例。 它默认完全在离线环境下运行(每个外部 API 都有确定性的模拟),因此你可以直接克隆并运行 `make demo` 而无需提供任何密钥。 ## 它的实际功能 一个 Wazuh 格式的告警命中 FastAPI webhook 后,会被推送到 Redis 流,并进入一个包含五个节点的 LangGraph 状态机: 1. **triage** — 从告警中提取 IOC,并行向 5 个威胁情报源(VirusTotal、Shodan、AbuseIPDB、URLhaus、GreyNoise)发起查询,每个都封装为 MCP 风格的服务器。所有内容在与 LLM 交互前,都会经过正则表达式/启发式注入防护的检测。 2. **attack_mapper** — 使用 HyDE:LLM 会编写一个假设的检测叙述,然后将其(而不是简短的原始告警)进行向量化,并在每个租户专属的 Pinecone 命名空间中与 ATT&CK + Sigma 进行匹配。LLM 只从检索到的候选结果中挑选最佳匹配,因此它无法凭空捏造技术 ID。 3. **investigator** — 根据主机/IP/用户在近期事件历史中进行关联查询,从租户资产清单中查找资产关键性,并构建时间线。 4. **reflector** — 作为 LLM 评委,从四个维度(证据质量、推理连贯性、幻觉风险、调查深度)对当前案例进行评分。如果综合得分低于 `CONFIDENCE_THRESHOLD` 且我们尚未进行过两次修订,它将循环回 triage 节点。 5. **reporter** — 将裁定、摘要和建议措施写入 DFIR-IRIS(包括案例、IOC、资产、时间线、ATT&CK 属性和备注)。 项目中内置了两个合成租户(`tenant_a` = 一家金融科技公司,`tenant_b` = 一家医疗服务提供商)。每个租户都有自己独立的 Pinecone 命名空间、运行手册语料库和系统提示词上下文,因此相同的告警模式会根据不同的关键资产清单进行权衡。 ## 运行方式 ``` git clone https://github.com/meghnadh7/aegis-graph.git cd aegis-graph python3.11 -m venv .venv && source .venv/bin/activate pip install -r requirements.txt cp .env.example .env # MOCK_MODE=true is the default make demo # one alert through the full pipeline (rich panels) make test # pytest make eval # 200-alert golden run + metrics make eval-summary # pretty-print dataset stats + last eval run ``` 要使用真实 API 运行:请将你的密钥填入 `.env` 文件,并设置 `MOCK_MODE=false`。每个外部集成(LLM、Pinecone、5 个 TI 数据源、DFIR-IRIS)都受同一标志控制——任何缺少密钥的组件都会自动回退到模拟模式。 ## 结果 测试: ``` $ pytest tests/ -q ................ [100%] 16 passed in 1.56s ``` 针对内置的 200 条告警黄金数据集进行的评估(每种技术 40 条 × 5 种技术,在两个租户中平均分配,TP/FP/升级 比例约为 40/35/25): ``` $ python -m evals.run_evals --concurrency 16 === AegisGraph Eval Summary === verdict_accuracy: {'mean': 0.41, 'median': 0.0, 'n': 200} attack_mapping_accuracy: {'mean': 0.58, 'median': 0.5, 'n': 200} hallucination_resistance: {'mean': 1.0, 'median': 1.0, 'n': 200} ioc_f1: {'mean': 0.86, 'median': 1.0, 'n': 200} cost_per_alert_usd: {'mean': 0.015, 'median': 0.01, 'n': 200} wall_time_seconds: 4.44 alerts: 200 ``` 几点诚实的说明: - 这些数据是在 **mock LLM** 下得出的,而不是真实模型。该模拟是刻意通过关键词驱动的——如果遇到 powershell-encoded 提示词则判定为 TP,否则大致随机选择一个裁定。因此,在带有小幅识别加成的 3 分类系统中,0.41 的 verdict_accuracy 是符合预期的。这样运行的目的是证明 pipeline 和评估器在所有三个类别(TP / FP / Escalate)中都正确连接。使用真实的 Claude 时,这个数字会有所提升;而真正有意义的工作是确保评估基础设施能够捕捉到性能退化。 - 数据集进行了正确的分层:每种技术 16 个 TP / 14 个 FP / 10 个 Escalate × 5 种技术 × 2 个租户 = 200 条告警。 - `ioc_f1 = 0.86`(中位数 1.0)是这里真正的有效信号——IOC 提取器和由富化数据驱动的恶意标签确实发挥了作用。在我修复了一个 bug(当时 mock LLM 会将富化数据负载(包括 TI 工具 URL)当作告警进行扫描)之前,这个数字大约是 0.63。 - `attack_mapping_accuracy = 0.58` 是因为 HyDE 检索找到了正确的技术家族,但模拟程序通常会选择一个同级的子技术(例如,真实值是 `T1003` 时选择了 `T1003.001`),从而只获得了部分得分。 - `hallucination_resistance` 是一种启发式方法:它会扣减摘要中如“definitely”或“confirmed exfiltration”等毫无保留的绝对词汇的分数。这并非真正的依据评估——这是一个 TODO。 - $0.015/alert 是根据 mock LLM 返回的 token 数量模拟计算得出的。实际成本取决于模型和 prompt-cache 命中率。 运行 `make eval-summary` 可以查看数据集细分及上次评估运行结果的结构化视图。 `evals/last_results.json` 包含了每条告警的细分结果。 ## 架构 ``` Wazuh / synthetic alert │ ▼ FastAPI /webhook ──► Redis stream ──► LangGraph │ ┌─────────────────┘ ▼ triage (IOC extract + 5 MCP fan-out) │ ▼ attack_mapper (HyDE → Pinecone) │ ▼ investigator (timeline, criticality) │ ▼ reflector (LLM-as-judge) │ conf<0.75? ─┴─► (revise, max 2 loops) ─► back to triage │ ▼ reporter ──► DFIR-IRIS ``` ## 技术栈 - 使用 LangGraph 构建监督图——具有显式状态、条件边,并通过 `interrupt_after` 提供原生 HITL(人在环)钩子。 - Pinecone + HyDE 检索用于 ATT&CK / Sigma / 运行手册查找。每个租户的命名空间是其隔离边界。 - 5 个 MCP 风格的 Python 服务器用于威胁情报,每个服务器都具备重试、TTL 缓存和确定性模拟功能。 - 使用 DFIR-IRIS REST API 作为案例后端。 - Wazuh 格式的合成告警(无需接入活跃的 SIEM)。 - 如果你提供密钥,可以使用 LangSmith 进行追踪。 - 在所有工具输出到达 LLM 上下文之前,使用一个小型的自定义提示注入防护对其进行过滤。 ## 仓库布局 ``` agents/ LangGraph nodes, prompts, schemas, state mcp_servers/ VT / Shodan / AbuseIPDB / URLhaus / GreyNoise knowledge_base/ MITRE loader, Sigma rules, tenant runbooks, HyDE retriever guardrails/ PromptInjectionGuard case_management/ DFIR-IRIS REST client ingestion/ FastAPI webhook + Redis stream + synthetic generator evals/ golden dataset, evaluators, runner tenants/ per-tenant config (FinTech, Healthcare) scripts/ check_deps, setup_kb, generate_alerts, run_demo tests/ pytest docker-compose.yml Redis + Postgres + DFIR-IRIS + the API kubernetes/ manifests ``` ## 多租户 项目中内置了两个合成租户:**tenant_a / FinTech Corp**(AWS,PCI-DSS,支付处理器 + 认证服务器是其核心资产)和 **tenant_b / Healthcare LLC**(本地 VMware + Azure,HIPAA,EHR + 放射科 PACS 是其核心资产)。 隔离边界如下: - **Pinecone** — 每个租户都有自己独立的 `{tenant_id}_attack`、`{tenant_id}_sigma`、`{tenant_id}_runbooks` 命名空间。检索器调用在构建时便已限定范围,因此一个租户不会意外地将另一个租户的运行手册拉入其上下文。 - **系统提示词** — 每个节点都会将租户的环境描述、关键资产清单和抑制规则注入到其系统提示词中。相同的告警模式在针对金融科技公司与医院进行评估时,会产生不同的解读。 - **Redis Streams** — 告警发布到 `alerts:{tenant_id}`,因此背压和吵闹邻居效应会被限制在单个租户范围内。 - **DFIR-IRIS** — `tenant_id` 映射到一条客户记录,从而确保案例落在正确的账户下。 ## 安全与防护机制 - **提示注入防护** 会在 LLM 处理每一个告警字段和 TI 工具响应之前对其进行检查。正则表达式可捕捉明显的“ignore previous instructions”类指令及 chat-template token;字段截断则限制了负载大小。 - **不允许用户提交文档至知识库** — 数据摄取是离线进行的,并通过代码提交,而非通过 API 暴露。RAG 投毒需要进行代码更改,而不仅仅是上传恶意文件。 - **威胁模型** 位于 [`THREAT_MODEL.md`](THREAT_MODEL.md) 中,其中每个场景都映射到了一项 MITRE ATLAS 技术。 ## 路线图 / 接下来我会做什么 - 将合成的 Wazuh 数据源替换为真实的 Stellar Cyber XDR 适配器——这才是 MSSP 实际的摄取路径。 - 针对每个租户,基于历史分析师的决策进行微调,以减少修订循环次数。 - 在 reporter 节点上配置 Torq webhook,用于在人工批准裁定后实现响应自动化(如主机隔离、关闭工单)。 - 将 `interrupt_after` 接入一个小型审查 UI,以便分析师在写入 IRIS 之前进行批准或编辑。 - 设置 LangSmith CI 门禁,如果黄金数据集上的裁定准确率退化超过阈值,则使 PR 失败。
标签:AbuseIPDB, AI安全, Ask搜索, AV绕过, Chat Copilot, Cloudflare, DFIR-IRIS, DLL 劫持, FastAPI, HyDE检索, IOC富集, IP 地址批量处理, LangGraph, LangSmith, LLM评估, MCP服务器, MITRE ATT&CK, MSSP, Ollama, Pinecone, Python, RAG, Redis, SOAR, SOC分诊助手, VirusTotal, Wazuh, 医疗安全, 反思机制, 向量数据库, 大语言模型, 威胁情报, 子域名突变, 安全运营中心, 密码管理, 工程师面试项目, 开发者工具, 快速傅里叶变换, 提示注入防御, 插件系统, 搜索引擎查询, 数字取证与事件响应, 无后门, 无线安全, 案件管理, 源代码安全, 网络信息收集, 网络安全, 网络安全审计, 网络映射, 自主SOC, 自动化响应, 自我批评, 请求拦截, 逆向工具, 金融安全, 隐私保护