JethroPhuah/CTI-Agent-Project

GitHub: JethroPhuah/CTI-Agent-Project

基于LangGraph多智能体编排和MCP工具协议的威胁情报自动化分析平台,将报告阅读、IOC富化、知识图谱查询和报告生成整合为可交互、可反馈的智能工作流。

Stars: 0 | Forks: 0

# CTI Agent — 多智能体威胁情报平台 ## 为什么做这个项目 CTI 分析师每天花费数小时处理相同的几项任务: - 阅读最新发布的威胁报告,并提取出相关的工具 / TTPs / IOCs - 交叉关联多份报告以回答诸如 *"APT41 使用了哪些工具,过去 90 天内是否观测到相关活动?"* 的问题 - 在分诊之前,使用 VirusTotal / AbuseIPDB / Recorded Future 充实 IOCs - 以统一的格式编写最终报告 本系统将上述工作浓缩为一个由 **四个专用智能体** 驱动的聊天界面,这些智能体通过 LangGraph 连接在一起,并通过 **MCP (Model Context Protocol) 服务器** 配备了各种工具。用户可以看到智能体采取的每一步操作,可以限制它们允许使用的工具,还可以对回答给出 👍 / 👎 评价,从而积累反馈数据集,用于离线 RLHF / DPO 训练。 ## 演示 ``` User: "What tools does APT41 use?" → Orchestrator decides intent=threat_actor_profile, routes to [graph_query, vector_search, keyword_search] → Retrieval (ReACT) calls graph_query("APT41", "uses") → tools list calls vector_search("APT41 tools") → 3 chunks calls keyword_search("APT41") → 2 hits → Writer renders one-shot "threat_actor_profile" template with the gathered evidence → Validator checks answer is grounded in evidence → valid → User receives Markdown profile, can 👍 / 👎 ``` 上述每一个步骤都显示在聊天气泡下方可折叠的时间线中。 ## 架构 ``` flowchart LR User([User]) -->|HTTP/SSE| FE[Next.js Chat UI] FE -->|POST /chat| API[FastAPI + LangGraph] subgraph Agents [LangGraph StateGraph] ORCH[Orchestrator
intent + plan] --> RET[Retrieval
ReACT loop] RET --> WRT[Writer
one-shot templates] WRT --> VAL[Validator] VAL -.invalid + retries WRT end API --> Agents RET -->|MCP/SSE| MCP1[retrieval-mcp] RET -->|MCP/SSE| MCP2[search-mcp] RET -->|MCP/SSE| MCP3[enrichment-mcp] MCP1 --> Milvus[(Milvus
Vector DB)] MCP1 --> ES[(Elasticsearch)] MCP1 --> Neo[(Neo4j
STIX KG)] MCP1 --> PG[(Postgres
IOC DB)] MCP2 --> Tavily[Tavily / mock] MCP3 --> VT[VirusTotal] MCP3 --> Abuse[AbuseIPDB] MCP3 --> RF[Recorded Future] API --> FB[(Postgres
feedback + traces)] FE -.👍/👎.-> FB ``` ### 四个智能体 | 智能体 | 职责 | 提示词 | LLM 控制 | |---|---|---|---| | **Orchestrator** | 分类意图,选择 writer 模板,决定检索智能体可使用的 MCP 工具 | 单样本 JSON | `temperature=0`,严格 schema | | **Retrieval** | 循环调用 MCP 集群中的工具以收集证据 | ReACT (function-calling) | `temperature=0`,最多 5 次工具调用 | | **Writer** | 从五个精选的单样本模板中选择一个来渲染最终答案 | 单样本 Markdown | `temperature=0.3` | | **Validator** | 判断答案是否有确凿证据支撑;若没有,将反馈发回 Writer 并要求重试 | 单样本 JSON | `temperature=0`,≤2 次重试 | ### 知识库 | 存储 | 内容 | 功能 | |---|---|---| | **Milvus** | 分块 CTI 报告及元数据的句子转换器嵌入 | `vector_search` | | **Elasticsearch** | 完整报告文本,BM25 索引 | `keyword_search` (CVE ID、哈希值) | | **Neo4j** | MITRE ATT&CK STIX 包及作为属性图摄取的情报 | `graph_query` ("APT41 使用了什么工具?") | | **Postgres** | IOCs (IP、域名、哈希、URL、CVE),智能体运行轨迹,反馈 | `ioc_lookup`、历史记录、RLHF 数据集 | ## 功能映射至 "Agentic AI" 需求 这是一个快速参考,说明了已实现的功能及其位置——非常适合面试时的代码讲解。 ### 多智能体编排 (LangGraph) `agents/graph.py` 将四个节点连接成一个 `StateGraph`,并带有一条从 `validator` 在失败时返回到 `writer` 的条件边。每个节点返回一个合并到共享状态的增量字典;`graph.astream(stream_mode="updates")` 为流式 UI 提供逐节点事件。 ### 模块化 MCP 工具 `mcp_servers/{retrieval,search,enrichment}_mcp/server.py` — 三个 FastMCP 服务器,**每个工具类别一个**。每个都使用一个 `ToolRegistry`,因此**添加新工具只需一个装饰器**: ``` @registry.register() def my_new_tool(arg1: str, arg2: int = 10) -> dict: """Tool description shown to the agent.""" ... ``` 对于 enrichment 服务器来说更简洁——添加一个新的 `EnrichmentAdapter` 子类,工具函数就会自动生成: ``` class GreyNoiseAdapter(EnrichmentAdapter): name = "greynoise" def available(self): return bool(os.getenv("GREYNOISE_API_KEY")) def enrich(self, value, ioc_type): ... ADAPTERS.append(GreyNoiseAdapter()) # done -- tool auto-registers ``` 智能体在每次运行时重新发现工具,因此无需重启。 ### 提示词技术 | 技术 | 位置 | 原因 | |---|---|---| | **ReACT** (function-calling) | `prompts/retrieval_react.txt` | 驱动检索智能体的工具使用循环 | | **单样本** | 所有 Writer / Orchestrator / Validator 模板 | 为本地 LLM 固定输出结构(JSON 形状或 Markdown 章节) | 五个精选的 Writer 模板——`summary`、`threat_actor_profile`、`ioc_report`、`correlation`、`general`——每个都包含一个完整的输入/输出示例。Orchestrator 为每个查询选择一个模板。 ### 验证循环 Validator 返回 `{valid, issues, feedback}`。当 `valid=false` 时,LangGraph 的条件边路由回 Writer,**将验证器的反馈传入提示词中**,从而使第二次尝试更具针对性,而非随机重试。 ### 人在回路反馈 - 每次完成的运行都会获得一个 `run_id`。 - UI 中的 👍 / 👎 按钮会带着运行 ID 和评分发起 `POST /feedback` 请求。 - `databases/postgres/init.sql` 中的存储模式: ``` agent_runs (id, user_query, selected_tools, final_answer, steps JSONB, ...) feedback (id, run_id, rating IN (-1, 1), comment, user_email, created_at) ``` `steps JSONB` 列存储了完整的智能体轨迹,因此反馈数据集不仅包含最终答案,还包含每一个推理步骤——可直接用于离线 DPO / RLHF 训练。 ### 用户工具选择 UI 有一个侧边栏,显示每个已发现的 MCP 工具,按类别分组,每个工具都带有一个复选框。用户的选择随查询一起发送,并在 `agents/orchestrator.py` 中**与编排器的计划取交集**——因此用户的选择具有最终决定权。这允许分析师在“仅限内部”模式下运行查询(不使用 Tavily,不使用 VT),以处理敏感案例。 ### 步骤级透明度 每个智能体将 `TraceEvent` 记录到共享状态中。API 通过 Server-Sent Events 将它们流式传输;UI 在每个答案下方渲染一个可折叠的时间线: ``` ✓ Orchestrator intent=threat_actor_profile, rationale="..." ✓ Retrieval action: graph_query → 5 records action: vector_search → 3 chunks ✓ Writer template=threat_actor_profile, 1.8s, preview "..." ✓ Validator ✓ Validated ``` ## 技术栈 **后端:** Python 3.11, FastAPI, sse-starlette, Pydantic v2, LangChain ≥0.3.18, LangGraph ≥0.2.50 **MCP:** `mcp` Python SDK (FastMCP, SSE 传输) —— *不使用 `langchain-mcp-adapters`*,替换为 70 行代码的直连桥接以实现跨版本稳定性 (参见 [生产经验](#production-lessons)) **LLM:** vLLM 通过兼容 OpenAI 的端点提供 Qwen3 服务 (任何兼容的本地服务器均可——vLLM、Ollama、TGI) **嵌入:** `sentence-transformers/multi-qa-mpnet-base-dot-v1` **数据库:** Milvus 2.4, Elasticsearch 8, Neo4j 5 (+APOC), Postgres 16 **前端:** Next.js 14 (App Router), React 18, Tailwind CSS, lucide-react **基础设施:** Docker Compose (单命令堆栈),无需外部依赖 ## 仓库结构 ``` cti-agent/ ├── docker-compose.yml # one-shot stack: 4 DBs + 3 MCPs + API + UI ├── .env.example # all configuration ├── agents/ # LangGraph multi-agent system │ ├── orchestrator.py │ ├── retrieval.py # ReACT loop │ ├── writer.py # one-shot templates │ ├── validator.py │ ├── graph.py # StateGraph wiring + conditional retry edge │ ├── prompts/ # 8 prompt templates │ ├── tools/mcp_loader.py # direct MCP→LangChain bridge (70 lines) │ ├── llm.py # vLLM-tolerant ChatOpenAI factory │ ├── state.py │ └── config.py ├── ingestion/ # RAG pipeline │ ├── rss_ingestor.py # The Hacker News, Bleeping, Krebs, ... │ ├── chunker.py # token-aware sentence chunking + overlap │ ├── extractors.py # regex IOC + actor/malware dictionaries │ ├── embedder.py │ ├── writers.py # Milvus + Elasticsearch │ └── pipeline.py # end-to-end runner ├── databases/ │ ├── milvus/client.py │ ├── elasticsearch/client.py │ ├── neo4j/ │ │ ├── stix_loader.py # MITRE ATT&CK → Neo4j │ │ ├── queries.py # Cypher helpers │ │ └── schema.cypher │ └── postgres/ │ ├── init.sql # iocs + agent_runs + feedback schemas │ └── client.py ├── mcp_servers/ # one server per tool category │ ├── retrieval_mcp/ # vector_search, keyword_search, graph_query, ioc_lookup │ ├── search_mcp/ # tavily_search (hybrid live/mock) │ └── enrichment_mcp/ # VT, AbuseIPDB, Recorded Future (hybrid) ├── api/main.py # FastAPI + SSE streaming ├── frontend/ # Next.js chatbot │ ├── app/page.tsx # chat layout + tool sidebar + step timeline │ ├── components/ # AgentTimeline, ToolSelector, FeedbackButtons, ... │ └── lib/api.ts # SSE client (CRLF-safe, robust JSON parsing) └── scripts/ └── generate_synthetic_reports.py ``` ## 快速开始 ### 前置条件 - Docker Desktop (Windows / Mac / Linux) - 一个可访问的 vLLM 风格的 OpenAI 兼容端点,提供任意 Qwen / Mistral / Llama 聊天模型服务。默认配置假定宿主机上运行着 vLLM。 ### 1. 配置 ``` cp .env.example .env # 编辑 LLM_BASE_URL(指向你的 vLLM 端点)和 LLM_MODEL。 # 可选:设置 TAVILY_API_KEY / VIRUSTOTAL_API_KEY / ABUSEIPDB_API_KEY # 以启用实时模式;否则系统将回退到确定性 # mocks,以便演示始终能正常运行。 ``` ### 2. 启动技术栈 ``` docker compose up -d ``` 启动 Milvus, Elasticsearch, Neo4j, Postgres,三个 MCP 服务器,FastAPI 后端,以及运行在 `http://localhost:3000` 的 Next.js UI。 ### 3. 填充知识库 ``` # 将 RSS 拉取的 CTI 报告嵌入到 Milvus + Elasticsearch 中 docker compose exec api python -m ingestion.pipeline # 将 MITRE ATT&CK STIX bundle 加载到 Neo4j 中 docker compose exec api python -m databases.neo4j.stix_loader ``` ### 4. 开始使用 打开 `http://localhost:3000`,输入查询,观察智能体的时间线。 ### 建议的首次查询 ``` What tools does APT41 use? Summarize the latest LockBit campaign Is 185.12.45.78 malicious? Are FIN7 and LockBit connected? Give me an IOC report on CVE-2024-3400 ``` ## 生产经验 很大一部分工作是在真实基础设施上让本地 LLM 智能体栈能够可靠运行。这些是我处理过的较难、较少在搜索引擎上找到答案的问题——将它们保留在这里,因为它们代表了 Agentic AI 工程所涉及的那种调试过程。 | 问题 | 根本原因 | 本仓库中的修复方案 | |---|---|---| | 每次 chat 调用时 vLLM 都返回 400 错误 | `langchain-openai` 在 *非流式* 调用中附加了 `stream_options={"include_usage": True}`;严格的 vLLM 版本会拒绝它 | `agents/llm.py` 中的 httpx 请求钩子会在请求发出去之前从每个 body 中剥离 `stream_options` / `parallel_tool_calls` / 冗余的 `n=1` | | `"在消息中找不到用户查询。"` | vLLM/Qwen3 聊天模板需要一个 `user` 轮次;LangChain 智能体发送的是仅包含 `SystemMessage` 的有效载荷 | 现在每个智能体节点都发送 `[SystemMessage, HumanMessage]` | | `langchain-mcp-adapters` 版本动荡 | 该库的 `langchain-core` 最低版本要求每周都在变;固定其中一方会破坏另一方 | 替换为 70 行的 MCP→LangChain 直连桥接 (`agents/tools/mcp_loader.py`) —— 仅依赖于 `mcp` (已包含) 和 `pydantic` | | FastMCP `issubclass(annotation, Context)` 在工具注册时崩溃 | `from __future__ import annotations` 将工具参数类型变成了字符串;旧版 FastMCP 无法处理非类注解 | 从所有 MCP 服务器模块中移除 `__future__ annotations`,仅使用裸类参数类型,并添加了猴子补丁作为后备方案 | | 检索智能体完全跳过了工具调用 | 提示词将基于文本的 ReACT (`Thought: ... Action: ...`) 与 tool-calling 接口混用,导致 Qwen3 产生混淆 | 重写了 `retrieval_react.txt`,删除了文本格式的指令,并明确要求使用 function-call | 关于每个问题及其修复方案的更详细版本位于 [`docs/PROMPTS.md`](docs/PROMPTS.md) 和代码行内注释中。单单是 `httpx` 的 body 剥离器,就已经在三个不同的 vLLM 版本中发挥了巨大价值。 ## 扩展系统 ### 添加新的检索源 在 `mcp_servers/retrieval_mcp/server.py` 中添加一个函数: ``` @registry.register() def shodan_lookup(ip: str, top_k: int = 10) -> list: """Look up an IP in Shodan.""" return shodan_client.host(ip) ``` 检索 MCP 服务器会自动注册它。智能体在下一次运行时会发现它。 ### 添加新的富化提供商 在 `mcp_servers/enrichment_mcp/adapters.py` 中添加一个适配器类: ``` class GreyNoiseAdapter(EnrichmentAdapter): name = "greynoise" def available(self): return bool(os.getenv("GREYNOISE_API_KEY")) def enrich(self, value, ioc_type): ... ADAPTERS.append(GreyNoiseAdapter()) ``` 工具函数 (`greynoise_lookup`) 会被自动生成。UI 侧边栏会自动识别它。 ### 添加新的 Writer 模板 1. 创建 `agents/prompts/writer_.txt` 并提供一个单样本示例。 2. 在 `agents/writer.py` 的 `_TEMPLATE_MAP` 中注册它。 3. 在 `agents/prompts/orchestrator.txt` 中提及新的 `writer_template` 值,以便路由模型能够识别它。 ## 路线图 | 项目 | 原因 | |---|---| | **缓存层** | 相同的 IOC 查询每次都会命中外部 API。在 `enrichment_mcp` 前面加一个小的 Redis (或 Postgres 支持的) 缓存,能将热指标检测的延迟从约 1 秒降低到个位数毫秒,并节省配额。会话内的 `vector_search` 结果也同理适用。 | | **后台摄取 Worker** | 目前摄取是手动运行的。一个 `cron` 容器 (或生产环境中的 Airflow / k8s CronJob) 将每小时拉取 RSS,每天拉取 STIX/TAXII 订阅源。 | | **LangSmith / OpenTelemetry 追踪** | 钩子已经在 `.env` 中 (`LANGSMITH_TRACING=true`) —— 需要配套的可观测性仪表盘。 | | **身份认证** | 目前是单租户。增加 SSO + 基于用户的 RBAC,以控制哪些工具可供使用。 | | **DPO 训练循环** | `agent_runs` + `feedback` 表已经构成了一组可用于 DPO 的配对数据集。一个每周发出新 Qwen 适配器的计划微调作业将形成闭环。 | | **威胁行为者实体消歧** | "APT41"、"Wicked Panda" 和 "Barium" 都应解析为同一个节点;目前 `extractors.py` 中的字典是扁平的。将此折叠到 Neo4j 的 `aliases` 字段中。 | ## 诚实的范围说明 为了给审查者设定正确的期望: - **CTI 报告来源于公开的 RSS 订阅源和合成数据。** 真实的生产系统从付费订阅源、内部传感器以及我无法分享的客户特定来源进行摄取。 - **IOC 数据库仅填充了约 10 个演示指标。** 生产环境拥有从威胁情报订阅源源源不断回填的数万个指标。 - **STIX KG 仅包含 MITRE ATT&CK。** 生产还包括由我们的分析师维护的自定义入侵集 / 攻击活动节点。 - **外部富化 API 默认在模拟模式下运行。** 设置相关的 `*_API_KEY` 环境变量即可访问真实的提供商。 - **前端被有意保持极简。** 它足以清晰地展示每一个智能体能力;它并不是一个打磨过的 SOC 控制台。 架构、智能体设计、MCP 服务器模式、提示词技术、验证循环和 HITL 反馈存储与生产系统**完全相同**——只是剥离了客户特定的部分。 ## 致谢 - MITRE ATT&CK STIX 包 (CC BY 4.0) - `iocextract` 提供的 IOC 正则表达式模式 - MCP 工作组制定的 Model Context Protocol 规范 - 灵感来源于——并旨在取代——内部非智能体化的 CTI 机器人
标签:AI安全, APT组织画像, AV绕过, Chat Copilot, DLL 劫持, DPO, FastAPI, Go语言工具, IOC提取, IP 地址批量处理, LangGraph, MCP, OpenCanary, PyRIT, RAG, ReACT, RLHF, STIX, TTP提取, 人机协同, 多智能体系统, 大语言模型, 威胁情报分析, 版权保护, 知识检索, 网络信息收集, 网络威胁情报, 网络安全, 自动化分析, 跨站脚本, 逆向工具, 隐私保护