thilak0105/Cyber-Threat-RAG-pipeline
GitHub: thilak0105/Cyber-Threat-RAG-pipeline
基于MITRE ATT&CK的威胁情报RAG系统,通过知识图谱和多策略检索为网络安全分析提供上下文感知的智能问答能力。
Stars: 0 | Forks: 0
# 网络威胁 RAG 流水线
基于知识图谱和 Agentic 的检索增强生成 (RAG) 流水线,用于网络威胁情报 (CTI),基于 MITRE ATT&CK 技术内容构建。
本项目支持:
- 从 MITRE ATT&CK 收集数据
- 基于 LLM 的实体和关系抽取
- 知识图谱构建与 Neo4j 加载
- 多种 RAG 变体(baseline、graph-only、vector-only、hybrid、agentic)
- 包含对比指标的评估框架
- 用于交互式分析的 Streamlit 演示仪表板
## 1. 项目功能
该流水线将非结构化的 CTI 文本转换为:
- 一个**结构化知识图谱**(实体 + 关系)
- 一个**向量检索索引**(语义 TF-IDF 块)
然后使用以下几种检索策略之一来回答 CTI 问题:
- **Baseline**:图谱优先,向量回退
- **Graph-only**:仅使用图谱事实
- **Vector-only**:仅使用语义文档检索
- **Hybrid-parallel**:图谱 + 向量并行
- **Agentic**:查询分析器 + 路由器
- **Agentic + Validator**:带有自纠错循环的 Agentic
## 2. 仓库结构
```
rag_threat_intel/
├── README.md
├── PROJECT_WORKFLOW.md
├── streamlit_app.py
├── requirements.txt
├── data/
│ ├── raw_cti/ # scraped MITRE technique text files
│ ├── labeled_cti/ # LLM-extracted entity/relation JSON
│ ├── processed_cti/ # graph exports (GraphML/JSON/Cypher)
│ └── evaluation/ # benchmark questions
├── docs/
├── notebooks/
├── results/ # evaluation outputs
├── schemas/ # JSON schemas
├── src/
│ ├── scrape_mitre.py
│ ├── label_generation.py
│ ├── build_kg.py
│ ├── load_neo4j.py
│ ├── neo4j_retriever.py
│ ├── vector_retriever.py
│ ├── rag_generator.py
│ ├── evaluator.py
│ ├── run_evaluation.py
│ ├── run_agentic_evaluation.py
│ ├── agentic_rag.py
│ ├── agents/
│ │ ├── query_analyzer.py
│ │ ├── retrieval_router.py
│ │ └── validator.py
│ └── variants/
│ ├── baseline_rag.py
│ ├── graph_only_rag.py
│ ├── vector_only_rag.py
│ └── hybrid_parallel_rag.py
└── visualizations/
```
## 3. 架构概览
### Stage A: CTI 摄取
1. `src/scrape_mitre.py` 下载并清洗 MITRE ATT&CK 技术页面。
2. 将每项技术保存为 `.txt` 文件,存放在 `data/raw_cti/` 中。
### Stage B: 信息抽取
1. `src/label_generation.py` 将 CTI 文本发送到 Groq LLM。
2. 将实体和关系抽取到 `*_labeled.json` 文件中。
### Stage C: 知识图谱构建
1. `src/build_kg.py` 将所有标记文件合并为一个有向多重图。
2. 执行整合和剪枝。
3. 导出:
- `data/processed_cti/threat_intel_kg.graphml`
- `data/processed_cti/threat_intel_kg.json`
- `data/processed_cti/threat_intel_kg.cypher`
### Stage D: 检索与生成
- 图谱检索:`src/neo4j_retriever.py`
- 向量检索:`src/vector_retriever.py`
- 生成:`src/rag_generator.py` / 特定变体生成器
### Stage E: Agentic 编排
- `src/agents/query_analyzer.py`:分类查询类型
- `src/agents/retrieval_router.py`:选择检索策略
- `src/agents/validator.py`:质量检查和可选的自纠错
- `src/agentic_rag.py`:全流程编排器
### Stage F: 评估与仪表板
- `src/run_evaluation.py`:基线指标
- `src/run_agentic_evaluation.py`:跨变体对比
- `streamlit_app.py`:交互式仪表板和结果可视化
## 4. 设置
### 4.1 前置条件
- Python 3.10+
- Neo4j(本地实例,启用 Bolt)
- 有效的 Groq API 密钥
### 4.2 创建并激活虚拟环境
在项目根目录下执行:
```
python3 -m venv .venv
source .venv/bin/activate
```
### 4.3 安装依赖
```
pip install -r requirements.txt
```
### 4.4 环境变量
在运行流水线之前设置所需的凭证:
```
export GROQ_API_KEY="your_groq_api_key"
export NEO4J_URI="bolt://127.0.0.1:7687"
export NEO4J_USER="neo4j"
export NEO4J_PASSWORD="your_neo4j_password"
```
注意:某些脚本当前在代码中定义了 Neo4j 默认值。如果您的本地配置不同,请更新这些脚本或设置匹配的凭证。
## 5. 端到端运行指南
除非另有说明,请从项目根目录运行这些命令。
### Step 1: 抓取 MITRE ATT&CK 文本
```
cd src
python scrape_mitre.py
cd ..
```
输出:CTI 文件位于 `data/raw_cti/`。
### Step 2: 生成标记的 CTI JSON
```
cd src
python label_generation.py
cd ..
```
输出:标记文件位于 `data/labeled_cti/`。
### Step 3: 构建知识图谱
```
cd src
python build_kg.py
cd ..
```
输出:图谱导出文件位于 `data/processed_cti/`。
### Step 4: 将图谱加载到 Neo4j
```
cd src
python load_neo4j.py
cd ..
```
### Step 5: 运行基线评估
```
cd src
python run_evaluation.py
cd ..
```
输出:
- `results/baseline_evaluation.json`
- `results/baseline_summary.json`
### Step 6: 运行完整变体对比
```
cd src
python run_agentic_evaluation.py
cd ..
```
输出:
- `results/variant_comparison.json`
- `results/variant_comparison_summary.json`
### Step 7: 启动仪表板
```
streamlit run streamlit_app.py
```
## 6. 包含的 RAG 变体
- `baseline`:图谱优先,回退到向量
- `graph_only`:仅 Neo4j 事实
- `vector_only`:仅 TF-IDF 语义块
- `hybrid_parallel`:合并图谱和向量上下文
- `agentic`:分析器 + 路由器驱动的策略选择
- `agentic_validated`:带有验证/自纠错的 Agentic
## 7. 评估指标
项目计算如下指标:
- 实体召回率
- 与参考答案的词汇相似度
- 延迟
- 模糊指标
- (可选)LLM 评判的忠实度/相关性
结果以 JSON 格式持久化存储在 `results/` 中,以便复现和可视化。
## 8. 技术栈
- Python
- Groq LLM API
- Neo4j
- NetworkX
- scikit-learn (TF-IDF + 余弦相似度)
- Streamlit + Plotly + Pandas
- BeautifulSoup + Requests
## 9. 已知限制
- 部分脚本仍使用脚本本地路径假设(需要从 `src/` 执行)。
- Neo4j 设置未完全集中在一个配置文件中。
- 向量检索基于 TF-IDF(而非稠密 embedding)。
- 评估质量取决于标记 CTI 数据的覆盖范围和质量。
## 10. 建议改进
- 通过 `.env` + 一个配置模块集中管理配置
- 用基于 embedding 的检索器替换 TF-IDF
- 添加单元/集成测试和 CI
- 为 API 速率限制添加重试/退避逻辑
- 增强图谱实体标准化和去重
## 11. 学术 / 演示背景
该仓库的结构旨在作为比较网络安全领域中以图谱为中心和 Agentic RAG 方法的迷你项目。
如果您要展示此项目,请包括:
- RAG 架构图
- 来自 `results/variant_comparison_summary.json` 的变体对比表
- Streamlit 实时演示运行
## 12. 许可证
当前不存在许可证文件。在公开复用之前请添加许可证(例如 MIT/Apache-2.0)。
标签:Agentic RAG, Cloudflare, DLL 劫持, Kubernetes, LLM, MITRE ATT&CK, Neo4j, NLP, Python, RAG, Splunk, Streamlit, Unmanaged PE, Vector Search, 人工智能, 关系抽取, 向量化检索, 图机器学习, 大语言模型, 威胁分析, 威胁情报, 实体抽取, 开发者工具, 数据管道, 无后门, 检索增强生成, 混合检索, 特权检测, 用户模式Hook绕过, 网络安全, 自动化侦查工具, 访问控制, 软件工程, 逆向工具, 隐私保护