astafford8488/phantom
GitHub: astafford8488/phantom
AI 驱动的检测工程与自主威胁狩猎平台,解决告警疲劳与检测覆盖不足的问题。
Stars: 0 | Forks: 0
# 👻 PHANTOM
### AI-Powered Detection Engineering & Autonomous Threat Hunting Platform
[](https://www.python.org/downloads/)
[](LICENSE)
[](https://github.com/SigmaHQ/sigma)
[](https://attack.mitre.org/)
[](https://github.com/astafford8488/phantom/actions)
*一个生产级平台,结合了 Sigma 规则执行、机器学习异常检测、自主 LLM 驱动的威胁狩猎以及基于图形的告警关联,形成统一的检测流水线 — 提供完整的 MITRE ATT&CK 覆盖分析。*
[功能](#features) · [架构](#architecture) · [快速开始](#quick-start) · [检测流水线](#detection-pipeline) · [API 参考](#api-reference)
## 问题
安全团队面临着无法扩展的挑战:
| 挑战 | 现实 |
|------|------|
| **告警疲劳** | SOC 每天收到 10,000+ 条告警,其中 80% 是误报 |
| **检测缺口** | 大多数组织覆盖不到 20% 的 MITRE ATT&CK 技术 |
| **手动狩猎** | 威胁狩猎需要难以扩展的专业知识 |
| **工具孤岛** | Sigma 规则、异常检测和关联存在于独立系统中 |
| **规则编写缓慢** | 编写经过验证的 Sigma 规则需要数小时 |
**PHANTOM 解决了这个问题**,通过将基于规则的检测、机器学习异常分析和自主 AI 驱动的狩猎统一到单个流水线中 — 智能告警关联将数千条告警转化为可操作的事件。
## PHANTOM 对比
| 能力 | 传统 SIEM | PHANTOM |
|------|-----------|--------|
| Sigma 规则执行 | 部分支持 | 完整引擎(含修饰符) |
| 机器学习异常检测 | 基础阈值 | 孤立森林 + UEBA |
| 自主狩猎 | 仅手动 | LLM 驱动假设 |
| 告警关联 | 仅时间窗口 | 基于图形 + 杀伤链 |
| MITRE 覆盖分析 | 仅仪表盘 | 缺口分析 + 建议 |
| 规则生成 | 手动 YAML | 自然语言 → Sigma |
| 检测流水线 | 分钟级 | 秒级 |
## 功能
### Sigma 检测引擎
- 完整的 Sigma 规范支持(字段修饰符、通配符、逻辑运算符)
- `contains`、`startswith`、`endswith`、`re` 字段修饰符
- `and`、`or`、`not` 逻辑条件,支持 `N of X*` 聚合
- 日志源过滤与多模式匹配
- 规则验证与 LLM 驱动的规则生成
### 机器学习异常检测
- **孤立森林** — 基于事件特征的多变量异常检测
- **统计基线** — 每项特征的 Z 分数异常检测
- **UEBA 画像** — 每个实体的行为基线(每小时活动、已知进程、登录来源)
- 可配置阈值的加权集成评分
### 自主威胁狩猎
- **6 个内置剧本** — 横向移动、持久化、凭证访问、渗出、防御规避、发现
- **模式狩猎** — 编码的 PowerShell、可疑父子进程、暴力破解检测
- **时序分析** — 活动突发检测与下班时间异常
- **LLM 假设** — Claude 驱动的假设生成与自动化调查
### 基于图形的告警关联
- 实体重叠检测(共享用户、主机、IP、进程)
- MITRE ATT&CK 杀伤链序列识别
- 可配置窗口的时间邻近性关联
- 严重性升级模式检测
- 连接组件事件分组
### MITRE ATT&CK 覆盖
- 14 个战术类别中的 70+ 技术目录
- 按战术和优先级的覆盖细分
- 缺口分析与优先建议
- 可视化热图数据生成
## 架构
```
PHANTOM Detection Pipeline
┌─────────────────────────────────────────────────────────────────────┐
│ │
│ ┌──────────┐ ┌───────────┐ ┌──────────────────────────┐ │
│ │ Events │───▶│ Normalize │───▶│ Detection Engines │ │
│ │ (JSON, │ │ (Pipeline)│ │ │ │
│ │ CEF, │ └───────────┘ │ ┌────────────────────┐ │ │
│ │ Syslog) │ │ │ Sigma Engine │ │ │
│ └──────────┘ │ │ (Rule Evaluation) │ │ │
│ │ └────────────────────┘ │ │
│ │ ┌────────────────────┐ │ │
│ │ │ Anomaly Detector │ │ │
│ │ │ (IF + UEBA + Stat) │ │ │
│ │ └────────────────────┘ │ │
│ └──────────┬───────────────┘ │
│ │ │
│ ┌──────────▼───────────────┐ │
│ │ Threat Hunter (LLM) │ │
│ │ (Playbooks + Hypotheses) │ │
│ └──────────┬───────────────┘ │
│ │ │
│ ┌──────────────┐ ┌───────────────────────▼──────────────┐ │
│ │ MITRE │◀───│ Alert Correlator (Graph) │ │
│ │ Coverage │ │ (Entity + Temporal + Kill-Chain) │ │
│ │ Analysis │ └──────────────────────────────────────┘ │
│ └──────────────┘ │ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ Incidents │ │
│ │ (Grouped Alerts) │ │
│ └──────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────┘
```
## 快速开始
### 安装
```
git clone https://github.com/astafford8488/phantom.git
cd phantom
pip install -e ".[dev]"
```
### 运行检测流水线
```
import asyncio
from phantom import PhantomEngine
async def main():
engine = PhantomEngine()
# Load Sigma rules
engine.load_rules("rules/")
# Security events (from SIEM, EDR, or log files)
events = [
{
"CommandLine": "powershell -enc ZQBjAGgAbwA=",
"process_name": "powershell.exe",
"username": "jsmith",
"hostname": "WS-001",
"category": "process_creation",
"product": "windows",
},
{
"CommandLine": "mimikatz sekurlsa::logonpasswords",
"process_name": "mimikatz.exe",
"username": "admin",
"hostname": "DC-01",
"category": "process_creation",
"product": "windows",
},
]
# Run full pipeline
report = await engine.analyze(events, run_hunting=True)
print(f"Detections: {report.total_detections}")
print(f"Incidents: {len(report.incidents)}")
print(f"Coverage: {report.coverage.get('percentage', 0):.1f}%")
print(report.summary())
asyncio.run(main())
```
### CLI 用法
```
# 分析日志文件
phantom analyze events.json --rules rules/ --hunt
# 验证 Sigma 规则
phantom rules validate rules/suspicious_powershell.yml
# 根据描述生成 Sigma 规则
phantom rules generate "Detect certutil downloading files from external URLs"
# 查看 MITRE ATT&CK 覆盖范围
phantom coverage --rules rules/
# 启动 REST API 服务器
phantom serve --port 8000 --rules rules/
```
## 检测流水线
### 阶段 1:日志规范化
```
from phantom.ingestion.pipeline import LogPipeline
pipeline = LogPipeline()
events = pipeline.normalize([
{"EventID": "4688", "Computer": "DC-01", "CommandLine": "whoami /all"},
{"raw": "CEF:0|Vendor|Product|1.0|100|Alert|5|src=10.0.0.1"},
])
# → 规范化为通用事件架构并丰富
```
### 阶段 2:Sigma 规则检测
```
from phantom.detection.sigma_engine import SigmaEngine
sigma = SigmaEngine()
sigma.add_rule_yaml("""
title: Encoded PowerShell
id: ps-001
logsource:
category: process_creation
detection:
selection:
CommandLine|contains: '-enc'
condition: selection
level: high
tags:
- attack.t1059.001
""")
results = sigma.evaluate(events)
```
### 阶段 3:异常检测
```
from phantom.detection.anomaly import AnomalyDetector
detector = AnomalyDetector(contamination=0.05)
anomalies = detector.detect(events)
# → 隔离森林 + 统计 + UEBA 集成
```
### 阶段 4:威胁狩猎
```
from phantom.hunting.hunter import ThreatHunter
hunter = ThreatHunter()
findings = await hunter.hunt(events)
# → 剧本匹配 + 模式狩猎 + 时间分析
```
### 阶段 5:告警关联
```
from phantom.correlation.graph import AlertCorrelator
correlator = AlertCorrelator()
incidents = correlator.correlate(all_alerts)
# → 基于图的分组与杀伤链识别
```
## API 参考
### REST 端点
| 方法 | 端点 | 描述 |
|------|------|------|
| `POST` | `/v1/analyze` | 运行完整检测流水线 |
| `POST` | `/v1/sigma/evaluate` | 在事件上评估 Sigma 规则 |
| `POST` | `/v1/sigma/validate` | 验证 Sigma 规则 YAML |
| `POST` | `/v1/sigma/generate` | 从自然语言生成规则 |
| `POST` | `/v1/anomaly/detect` | 运行异常检测 |
| `GET` | `/v1/rules` | 列出已加载的 Sigma 规则 |
| `GET` | `/v1/coverage` | MITRE ATT&CK 覆盖报告 |
| `GET` | `/health` | 健康检查 |
```
# 分析事件
curl -X POST http://localhost:8000/v1/analyze \
-H "Content-Type: application/json" \
-d '{"events": [...], "run_hunting": true}'
# 生成 Sigma 规则
curl -X POST http://localhost:8000/v1/sigma/generate \
-H "Content-Type: application/json" \
-d '{"description": "Detect ransomware file encryption patterns"}'
```
## 项目结构
```
phantom/
├── src/phantom/
│ ├── __init__.py # Package exports
│ ├── engine.py # PhantomEngine orchestrator
│ ├── cli.py # Click CLI interface
│ ├── detection/
│ │ ├── sigma_engine.py # Sigma rule parser & evaluator
│ │ └── anomaly.py # ML anomaly detection (IF + UEBA)
│ ├── hunting/
│ │ └── hunter.py # Autonomous threat hunter
│ ├── correlation/
│ │ └── graph.py # Graph-based alert correlator
│ ├── mitre/
│ │ └── mapper.py # MITRE ATT&CK coverage engine
│ ├── ingestion/
│ │ └── pipeline.py # Log normalization pipeline
│ ├── api/
│ │ └── server.py # FastAPI REST server
│ └── utils/
│ └── logging.py # Structured logging
├── tests/ # 80+ unit tests
│ ├── test_sigma.py
│ ├── test_anomaly.py
│ ├── test_correlation.py
│ ├── test_mitre.py
│ ├── test_hunting.py
│ └── test_pipeline.py
├── examples/ # Usage examples
│ ├── detection_pipeline.py
│ ├── sigma_rules.py
│ └── threat_hunting.py
├── rules/ # Sample Sigma rules
│ ├── suspicious_powershell.yml
│ ├── credential_dumping.yml
│ └── lateral_movement.yml
├── configs/ # Configuration profiles
│ ├── default.yaml
│ └── strict.yaml
└── pyproject.toml
```
## 路线图
- [x] 支持完整修饰符的 Sigma 规则引擎
- [x] 孤立森林 + UEBA 异常检测
- [x] 6 个内置狩猎剧本
- [x] 基于图形的杀伤链识别告警关联
- [x] MITRE ATT&CK 覆盖分析与缺口建议
- [x] LLM 驱动的 Sigma 规则生成
- [x] REST API 与 CLI
- [ ] 实时事件流(Kafka/Redis)
- [ ] Elasticsearch/Splunk 后端集成
- [ ] YARA 规则支持(文件级检测)
- [ ] 基于 Transformer 的异常检测
- [ ] 交互式 MITRE ATT&CK 热图仪表盘
- [ ] 从误报反馈自动调节规则
- [ ] 多租户部署与 RBAC
## 许可证
MIT 许可证 — 详细信息请参见 [LICENSE](LICENSE)。