ElkinStas/cti-multi-agent
GitHub: ElkinStas/cti-multi-agent
基于LangGraph的多智能体网络威胁情报系统,自动从开放网络发现安全事件并生成结构化威胁报告。
Stars: 0 | Forks: 0
# 网络威胁智能体系统
一个多智能体系统,用于从开放网络中发现网络安全事件,提取结构化情报,通过 NVD API 丰富引用的 CVE 信息,并通过 REST API 提供整合后的威胁报告。

## 架构
```
flowchart LR
A["POST /search"] --> B["Run Store"]
A --> C["LangGraph Orchestrator"]
C --> D["SearchAgent"]
D --> E["ExtractionAgent"]
E --> F["PostProcessingAgent"]
F --> G["CVEEnrichmentAgent"]
G --> H["ReportAgent"]
H --> I["Report Builder"]
I --> B
B --> J["GET /search/{run_id}/status"]
B --> K["GET /search/{run_id}/report"]
```
该流水线被建模为一个 **LangGraph `StateGraph`** —— 一个在智能体之间具有显式状态转换的有向无环图。每个智能体都是一个单独的类,具有一个 `run()` 方法,作为图节点被调用。编排器通过该图传递一个共享的类型化状态字典(`AgentState`);每个节点都读取并写入定义良好的键。

### 智能体职责
| 智能体 | 输入 | 输出 | 角色 |
|---|---|---|---|
| **SearchAgent** | 用户查询 + 时间范围 | `list[SearchResult]` | 通过 DuckDuckGo HTML 搜索和 Google News RSS 发现候选文章。并行(`ThreadPoolExecutor`)补充文章内容,对相关性进行评分,并通过 `TimeWindow` 进行后置过滤。 |
| **ExtractionAgent** | 原始搜索结果 | `list[Incident]` | 将非结构化文本转换为结构化的事件记录。LLM 路径(Claude 结合单次提示)或基于正则表达式的启发式后备方案。通过线程池并行处理。 |
| **PostProcessingAgent** | 原始事件 + 查询 | `list[Incident]`(已清理) | 标题规范化,CVE 噪音过滤,去重,查询词元相关性门控。 |
| **CVEEnrichmentAgent** | 包含 CVE ID 的事件 | `list[Vulnerability]` | 查询 NVD API 以获取每个唯一的 CVE,具有速率限制和响应缓存。 |
| **ReportAgent** | 事件 + 漏洞 | 分析字典 | 综合生成执行摘要、主要发现、建议。LLM 或启发式生成。 |
最后的 **Report Builder** 步骤负责组装 JSON/Markdown 报告,补充事件级别的 CVE 引用,计算质量指标,运行评估工具,并附加流水线遥测数据。
### 异步执行模型
FastAPI 端点是异步的。编排器流水线通过 `asyncio.create_task` 作为后台任务运行。智能体使用同步 HTTP(`httpx.Client`)。这是安全的,因为 LangGraph 的 `ainvoke()` 将同步节点函数调度到**线程池执行器**中,因此阻塞 I/O 不会停滞事件循环。
这是一个经过深思熟虑的权衡:完全异步的智能体在架构上会更清晰,但同步智能体更易于测试、调试,以及与阶段内的 `ThreadPoolExecutor` 扇出组合。线程池调度消除了实际风险。
### 重试与容错
两级重试策略:
1. **HTTP 客户端**(`app/clients.py`):在发生瞬时错误(429/5xx、超时、连接失败)时最多重试 3 次,并带有指数退避和抖动。
2. **编排器节点**(`app/orchestrator.py`):每个智能体节点额外进行 1 次尝试,**仅针对瞬时/传输错误**(`httpx.TimeoutException`、`ConnectError`、`OSError` 等)。确定性失败(解析错误、模式错误、错误数据)会立即降级,而不会浪费重试。降级的节点返回安全的后备结果并记录警告。
### 每节点遥测
每个智能体节点都会记录:
| 字段 | 描述 |
|---|---|
| `started_at` / `finished_at` | 挂钟时间戳 |
| `duration_ms` | 执行时间(以毫秒为单位) |
| `items_in` / `items_out` | 吞吐量 |
| `attempt` | 第几次尝试成功 |
| `degraded` | 节点是否降级 |
| `error` | 如果降级时的错误消息 |
遥测数据会附加到报告 JSON 中,并呈现在 Markdown 摘要和 Streamlit UI 中。
### 缓存
内存中的 TTL 缓存可减少冗余的外部调用:
- **NVD 缓存**(`24h TTL`):避免在不同运行或具有重叠事件的同一运行中重新获取相同的 CVE。
- **LLM 缓存**(`1h TTL`):以 `sha256(model + system_prompt + user_prompt)` 为键,避免为相同的提取或综合提示付费。
两种缓存都使用带有每键过期的线程安全 `SimpleCache`。生产环境中将替换为 Redis;该接口被有意设计得很窄。
### 事件 ↔ CVE 关联
全局漏洞注册表(`Report.vulnerabilities`)被保留用于去重报告。此外,每个事件都带有 `related_vulnerabilities: list[IncidentCVERef]` —— 直接嵌入在事件内部的轻量级丰富引用。这与作业预期的输出格式相匹配,并使得从事件到漏洞的检查变得直接,而无需交叉引用。
## 设计决策
**为什么选择 LangGraph。** 该流水线目前是一个线性 DAG,但 LangGraph 支持条件边、并行扇出和人机交互检查点,只需最少的重构。`StateGraph` 提供了类型化状态、显式的边定义以及带有线程池分发的异步执行。
**为什么是五个智能体而不是三个。** PostProcessingAgent 和 ReportAgent 解决了实际的故障模式(嘈杂的 CVE、重复的事件、标题中的网站名称后缀、需要独立于机械组装的叙述综合)。移除它们会明显降低报告质量。
**独立的 LLM 模型。** 提取使用 `claude-haiku-4-20250414`(便宜、快速,足以胜任结构化提取)。报告综合使用 `claude-sonnet-4-20250514`(更强的推理能力,以提升叙述质量)。可通过环境变量配置。
**时间范围处理。** 解析为 `TimeWindow(start, end)` —— 一个半开区间。`"2023"` 变为 `[2023-01-01, 2024-01-01)`,而不是 `[2023-01-01, ∞)`。`"last 2 years"` 变为 `[now-2y, None)`。既作为查询级别的提示(附加到搜索词)应用,也作为对 `published_at` 的后置过滤器应用,并带有规范化的时区(将简单日期时间视为 UTC)。
**内存运行存储。** 追求简单性的 MVP。在进程重启后不具备持久性。不支持多工作器部署。生产环境将使用 Redis 或 Postgres。异步接口(`InMemoryRunStore`)使得替换只需更改单个文件。
**单一 HTTP 客户端。** 所有出站 HTTP 均通过 `app/clients.py` —— 一个带有重试/退避机制的 `httpx.Client` 包装器。没有混合使用 urllib/httpx。
## 项目结构
```
├── app/
│ ├── agents/
│ │ ├── search.py # Web discovery, parallel hydration, TimeWindow
│ │ ├── extraction.py # LLM / heuristic extraction, parallel
│ │ ├── postprocess.py # Normalization, dedup, relevance
│ │ ├── enrichment.py # NVD enrichment with cache + rate pacing
│ │ └── report.py # LLM / heuristic synthesis
│ ├── cache.py # In-memory TTL cache (NVD + LLM)
│ ├── clients.py # httpx HTTP client with retry/backoff
│ ├── config.py # Settings from env (dotenv support)
│ ├── evaluation.py # Gold-query evaluation harness
│ ├── llm.py # LLM abstraction with lazy import + cache
│ ├── logging_utils.py # Structured JSON logging
│ ├── main.py # FastAPI app and async run lifecycle
│ ├── models.py # Pydantic models (incl. NodeTelemetry, IncidentCVERef)
│ ├── orchestrator.py # LangGraph DAG, telemetry, transient-only retry
│ ├── reporting.py # Report builder, CVE hydration, Markdown
│ └── store.py # In-memory run store
├── ui/
│ └── app.py # Streamlit user interface
├── tests/ # 42 tests, all run without network/API keys
├── examples/
│ ├── eval_dataset.json # Gold-query evaluation profiles
│ ├── sample_report.json # Example JSON output
│ └── sample_report.md # Example Markdown output
├── Dockerfile
├── docker-compose.yml # API + UI services
├── requirements.txt
├── pytest.ini
├── .env.example
└── README.md
```
## API
### `POST /search`
```
{
"query": "ransomware attacks healthcare sector",
"time_range": "last 2 years",
"max_articles": 5
}
```
响应(`202 Accepted`):
```
{ "run_id": "uuid", "status": "PENDING" }
```
### `GET /search/{run_id}/status`
返回 `PENDING`、`RUNNING`、`COMPLETED` 或 `FAILED`。
### `GET /search/{run_id}/report`
包含 `incidents`(带有 `related_vulnerabilities`)、`vulnerabilities`、`pipeline_telemetry` 和 `warnings` 的最终 JSON 报告。在适当时返回 `404` / `409`。
### `GET /health`
存活探针。
## 设置
### 本地
```
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
cp .env.example .env
# 将你的 ANTHROPIC_API_KEY 添加到 .env(可选 — 启发式回退在没有它的情况下也能工作)
uvicorn app.main:app --reload
```
在第二个终端中:
```
streamlit run ui/app.py
```
### Docker
```
cp .env.example .env
# 如果需要,使用你的 API key 编辑 .env
docker compose up --build
```
API: `http://localhost:8000` · UI: `http://localhost:8501`
### 测试
```
pytest
```
所有测试均在无网络访问或 API 密钥的情况下运行。
### 使用 curl 进行快速 API 测试
```
# 提交 search
curl -s -X POST http://localhost:8000/search \
-H 'Content-Type: application/json' \
-d '{"query": "MOVEit vulnerability exploitation", "max_articles": 3}'
# → {"run_id": "...", "status": "PENDING"}
# 轮询状态(替换 RUN_ID)
curl -s http://localhost:8000/search/RUN_ID/status
# → {"run_id": "...", "status": "COMPLETED", ...}
# 检索 report
curl -s http://localhost:8000/search/RUN_ID/report | python3 -m json.tool
```
## 配置
| 变量 | 默认值 | 用途 |
|---|---|---|
| `ANTHROPIC_API_KEY` | — | 启用 LLM 提取和综合 |
| `ANTHROPIC_EXTRACTION_MODEL` | `claude-haiku-4-20250414` | 用于结构化提取的模型(便宜) |
| `ANTHROPIC_REPORT_MODEL` | `claude-sonnet-4-20250514` | 用于报告综合的模型(更强大) |
| `ANTHROPIC_BASE_URL` | — | 自定义 API 端点 |
| `OLLAMA_BASE_URL` | `http://127.0.0.1:11434` | Ollama 服务器 URL |
| `OLLAMA_MODEL` | — | 启用 Ollama |
| `DEFAULT_MAX_ARTICLES` | `5` | 默认文章限制 |
| `MAX_LLM_CHARS_PER_ARTICLE` | `8000` | 每篇文章的提示预算 |
| `REQUEST_TIMEOUT_SECONDS` | `20` | HTTP 超时 |
## 已知限制与权衡
- **网络发现。** DuckDuckGo HTML 抓取和 Google News RSS 是免费的但很脆弱。通过双源故障隔离和相关性过滤进行补偿。
- **文章解析。** 基于正则表达式的 HTML 提取会遗漏 JS 渲染或付费墙内的内容。
- **启发式提取。** 较为保守;组织提取是最弱的字段。LLM 路径可大幅改善所有字段。
- **NVD 速率限制。** 请求之间有 1 秒延迟(NVD:在没有 key 的情况下为 30 秒内 5 次请求)。支持 NVD API key 可以减少此延迟。
- **内存存储。** 重启后不具持久性,不支持多工作器安全。
- **内存缓存。** 同样的警告;生产环境将替换为 Redis。
## 如果有更多时间我会改进的地方
1. **CISA KEV 集成** —— 标记已知被利用漏洞目录中的 CVE。
2. **完全异步的智能体** —— 使用 `httpx.AsyncClient` 在节点内进行并发获取。
3. **Redis 支持的存储与缓存** —— 持久性和多工作器支持。
4. **OpenTelemetry 追踪** —— 每个智能体的跨度;当前的 `pipeline_telemetry` 是一个轻量级的替代方案。
5. **LLM 调用门控** —— 当启发式提取了足够字段时跳过 LLM,从而降低成本。
6. **更丰富的评估** —— 更大的黄金数据集,CI 回归运行。
## 示例输出
示例产物:`examples/sample_report.json` 和 `examples/sample_report.md`。
标签:Anthropic Claude, AV绕过, CVE漏洞 enrichment, DLL 劫持, DuckDuckGo搜索, ESC4, FastAPI, Google News RSS, Kubernetes, LangGraph, LLM, NVD API, OSINT, PyRIT, Python, Unmanaged PE, 图编排, 多智能体系统, 大语言模型, 威胁情报, 安全报告生成, 实时处理, 开发者工具, 异步REST API, 无后门, 智能体工作流, 有向无环图, 状态图, 结构化数据提取, 网络威胁狩猎, 网络安全, 自动化情报收集, 请求拦截, 运行时操纵, 逆向工具, 隐私保护