salientshub/threat-intel-pipeline
GitHub: salientshub/threat-intel-pipeline
自动化网络威胁情报管道,实现从 OSINT 采集、VirusTotal 富化、STIX 2.1 规范化到 OpenSearch 分发的完整 CTI 生命周期管理。
Stars: 0 | Forks: 0
# 🛡️ 威胁情报 Pipeline
[](.)
[](.)
[](.)
[](.)
[](LICENSE)
## 📋 目录
- [概述](#overview)
- [架构](#architecture)
- [功能特性](#features)
- [前置条件](#prerequisites)
- [安装设置](#setup)
- [使用方法](#usage)
- [测试](#testing)
- [项目结构](#project-structure)
- [扩展 Pipeline](#extending-the-pipeline)
- [故障排除](#troubleshooting)
- [CI/CD 集成](#cicd-integration)
- [许可证](#license)
## 概述
本项目使用 Python 实现了**完整的 CTI 生命周期**:
1. **收集** — 从 [AlienVault OTX](https://otx.alienvault.com/) pulse 订阅中抓取失陷指标。
2. **富化** — 查询 [VirusTotal](https://www.virustotal.com/) 以获取信誉评分(恶意 / 可疑 / 无害)。支持 IP、域名和文件哈希。
3. **规范化** — 将富化后的 IOC 转换为带有动态 MITRE ATT&CK 杀伤链映射的 **STIX 2.1** Indicator 对象。
4. **分发** — 将 STIX bundle 作为安全事件推送到 **OpenSearch**,或在模拟模式下写入本地文件。
## 架构
```
┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ AlienVault │────▶│ VirusTotal │────▶│ STIX 2.1 │────▶│ OpenSearch │
│ OTX (OSINT) │ │ Enrichment │ │ Conversion │ │ SIEM Push │
└──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘
fetcher.py enricher.py stixifier.py pusher.py
│ │ │ │
└────────────────────┴────────────────────┴────────────────────┘
│
main.py
(CLI Orchestrator)
```
**数据流:** `main.py` 顺序编排每个阶段,并在 VirusTotal 调用之间支持可配置的速率限制。MITRE ATT&CK 技术映射根据富化后的严重程度进行动态分配。
### 动态 MITRE ATT&CK 映射
| 恶意计数 | 技术 | 阶段 |
|---|---|---|
| > 5 | T1071 — Application Layer Protocol | Command & Control |
| 1–5 | T1071.001 — Web Protocols | Command & Control |
| 0 | T1592 — Gather Victim Host Information | Reconnaissance |
### 严重程度分类
| 恶意计数 | 严重程度 |
|---|---|
| > 10 | Critical |
| 6–10 | High |
| 1–5 | Medium |
| 0 | Low |
## 功能特性
| 功能 | 详情 |
|---|---|
| **OSINT 收集** | 带有重试逻辑的 AlienVault OTX pulse 订阅 API |
| **威胁富化** | VirusTotal v3 — IP、域名和文件哈希信誉 |
| **STIX 2.1 合规** | 带有自定义属性的 `stix2` 库 |
| **MITRE ATT&CK 映射** | 基于严重程度的动态技术分配 |
| **SIEM 集成** | 带有指数退避重试的 OpenSearch 事件摄取 |
| **模拟模式** | 无需实时凭证的完整演示(写入文件) |
| **结构化日志** | 控制台 + 轮转文件 (5 MB, 3 个备份) |
| **速率限制** | 可配置的休眠时间 + HTTP 429 处理 |
| **CLI 接口** | 通过 argparse 支持 `--limit` 和 `--no-push` 标志 |
| **安全性** | 通过 `.env` 管理密钥,从不硬编码 |
| **代码质量** | Black + Flake8 + pre-commit hooks |
| **测试套件** | 使用 requests_mock 的 pytest,包含覆盖率报告 |
## 前置条件
- **操作系统:** Ubuntu 22.04 / WSL2 (或任何 Linux 发行版)
- **Python:** 3.8+
- **pip:** 包管理器
- **(可选)** AlienVault OTX 账户 — [免费注册](https://otx.alienvault.com/)
- **(可选)** VirusTotal 账户 — [免费注册](https://www.virustotal.com/)
- **(可选)** OpenSearch 实例
## 安装设置
### 快速开始
```
# 克隆并进入项目
git clone https://github.com/your-org/threat_intel_pipeline.git
cd threat_intel_pipeline
# 一键设置
make install
# 配置 API 密钥
cp .env.example .env
nano .env # Add your API keys (or leave blank for simulation)
# 验证一切是否正常运行
make test
# 运行 pipeline
make run
```
### 手动设置
```
# 创建并激活虚拟环境
python3 -m venv venv
source venv/bin/activate
# 安装依赖
pip install -r requirements.txt
# 配置环境
cp .env.example .env
nano .env
# 可选:在本地启动 OpenSearch
docker compose up -d opensearch
# 运行
python main.py
```
## 使用方法
```
# 使用默认设置运行(3个 pulses,启用 push)
python main.py
# 获取更多 pulses
python main.py --limit 10
# 仅进行 enrich 和 stixify(跳过 OpenSearch push)
python main.py --limit 5 --no-push
```
### 配置
通过 `.env` 或 `config/settings.py` 进行覆盖:
| 变量 | 默认值 | 描述 |
|---|---|---|
| `OTX_API_KEY` | *(空)* | AlienVault OTX API 密钥 |
| `VT_API_KEY` | *(空)* | VirusTotal API 密钥 |
| `OPENSEARCH_URL` | `http://localhost:9200` | OpenSearch endpoint URL |
| `MAX_IOCS_PER_RUN` | `5` | 每次执行处理的 IOC 数量上限 |
| `RATE_LIMIT_SECONDS` | `15` | VT API 调用之间的暂停时间 |
| `LOG_LEVEL` | `INFO` | 日志详细程度 (DEBUG/INFO/WARNING/ERROR) |
## 测试
```
# 运行完整测试套件并附带覆盖率
make test
# 或者手动进行
pytest tests/ -v --tb=short --cov=core --cov=utils --cov-report=term-missing
# Lint 检查
make lint
# 自动格式化
make format
```
## 项目结构
```
threat_intel_pipeline/
├── .env.example # API key template
├── .gitignore # Git exclusions
├── .pre-commit-config.yaml # Pre-commit hooks config
├── Makefile # Corporate convenience targets
├── README.md # This file
├── requirements.txt # Python dependencies
├── setup.py # Installable package config
├── main.py # CLI orchestrator (entry point)
├── config/
│ └── settings.py # Environment + constants loader
├── core/
│ ├── __init__.py
│ ├── fetcher.py # AlienVault OTX collection
│ ├── enricher.py # VirusTotal enrichment
│ ├── stixifier.py # STIX 2.1 conversion + MITRE mapping
│ └── pusher.py # OpenSearch event push
├── utils/
│ ├── __init__.py
│ └── logger.py # Structured logging configuration
└── tests/
├── __init__.py
├── test_fetcher.py # OTX fetcher unit tests
├── test_enricher.py # VT enricher unit tests
└── test_stixifier.py # STIX conversion unit tests
```
## 扩展 Pipeline
### 添加暗网模块
1. 创建包含 `fetch_from_tor()` 函数的 `core/darkweb.py`。
2. 在 `main.py` 中将其注册为额外的抓取源。
3. 在富化前将结果与 OTX IOC 合并。
### 添加更多 SIEM (Splunk, Sentinel 等)
1. 创建 `core/pusher_splunk.py` 并实现相同的接口。
2. 在 `main.py` 中添加 `--siem` CLI 标志以选择目标。
3. 使用工厂模式实例化正确的 pusher。
### 添加更多富化源
1. 创建 `core/enricher_shodan.py` 或类似文件。
2. 在 `main.py` 中链式调用富化逻辑 (VT → Shodan → AbuseIPDB)。
3. 在 STIX 转换前合并富化字典。
## 故障排除
| 问题 | 解决方案 |
|---|---|
| `OTX_API_KEY is not set` | 将您的密钥添加到 `.env` 或在模拟模式下运行 |
| `VT rate limit hit` | Pipeline 会在遇到 HTTP 429 时自动等待 60 秒。增加 `RATE_LIMIT_SECONDS` 以避免触发限制 |
| `ModuleNotFoundError` | 激活您的 venv:`source venv/bin/activate` |
| `stix2` import errors | 确保已成功执行 `pip install stix2` (需要 Python 3.8+) |
| 测试因 import 错误而失败 | 从项目根目录运行:`pytest tests/` |
| `Permission denied` on logs | 确保 `logs/` 目录具有写入权限 |
| OpenSearch 推送失败 | 检查 `OPENSEARCH_URL` 是否已设置且可访问,然后重试 |
## CI/CD 集成
### GitHub Actions (示例)
```
name: CI
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- run: pip install -r requirements.txt
- run: pytest tests/ -v --cov=core --cov-report=xml
- run: black --check core/ utils/ config/ main.py tests/
- run: flake8 core/ utils/ config/ main.py tests/ --max-line-length=88
```
## 许可证
本项目是开源的,并基于 [MIT License](LICENSE) 发布。
标签:Ask搜索, Cloudflare, ESC4, IOC指标, MITRE ATT&CK, MITRE ATT&CK映射, MIT协议, OSINT, OTX, Python, SecOps, STIX 2.1, VirusTotal, 云安全架构, 威胁情报, 安全事件响应, 安全规则引擎, 安全运营, 实时处理, 开发者工具, 情报收集, 情报规范化, 扫描框架, 数据富化, 无后门, 杀伤链映射, 漏洞研究, 网络安全, 自动化流水线, 请求拦截, 逆向工具, 隐私保护