1212121600/AegisThreat
GitHub: 1212121600/AegisThreat
一个通过检测、追踪、防守三个 AI 代理协作,将海量安全告警自动转化为攻击链并推荐防御策略的多代理安全运营平台。
Stars: 0 | Forks: 0
AegisThreat
多代理 APT 攻击链追踪与自适应防御系统
## 什么是 AegisThreat?
AegisThreat 是一个**原生 AI 安全运营平台**,三个协作代理共同自动化威胁狩猎中最困难的部分:将成千上万的嘈杂告警转化为连贯的攻击故事,预测攻击者的下一步行动,并推荐精确的防御行动。
**它解决的问题**:企业每天生成数万条安全告警。APT 攻击在 48 小时内展开 6-12 个不同的步骤。人工分析师需要 4-8 小时手动拼凑一条攻击链。当他们完成时,攻击者早已转移。
**AegisThreat 的工作方式**:三个专业 AI 代理通过共享的 ThreatBus 进行通信,每个代理处理管道的某个部分:
```
Raw Alerts ──→ [Detection Agent] ──→ AttackFragment
│
[Tracing Agent] ──→ AttackChain (6-12 steps)
│
[Defense Agent] ──→ DecisionScript
```
| 代理 | 角色 | 输入 | 输出 |
|------|------|------|------|
| **检测** | 告警融合 | SIEM/EDR 告警 | `AttackFragment` — 带有可疑 ATT&CK TTP 的聚类告警 |
| **追踪** | 链式推理 | AttackFragment | `AttackChain` — 从初始访问到影响的完整杀伤链,以及预测的下一步行动 |
| **防御** | 自适应响应 | AttackChain | `DecisionScript` — 具有业务影响评估的优先级 SOAR 行动 |
所有代理通过共享的 **ThreatBus**(Kafka 或内存)进行通信,发布标准化的 **威胁事件协议**(TEP)消息。
## 快速开始
### 前置条件
- Python 3.10+
- Windows / Linux / macOS
### 安装并运行演示
```
# Clone
git clone https://github.com/YOUR_USER/AegisThreat.git
cd AegisThreat
# Install dependencies
pip install pydantic fastapi uvicorn numpy pyyaml websockets
# Run the end-to-end demo
python -m aegis.cli demo --scenario phishing-to-exfil
```
**预期输出** — 完整管道在合成攻击数据上运行:
```
Attack Fragments Generated: 9
[frag-xxx] confidence=0.85 TTPs: [T1566, T1059, T1071, T1003, T1048]
Attack Chains Generated: 9
Path: T1566 → T1059 → T1003 → T1021 → T1083 → T1048
Predicted next: T1485 (Data Destruction)
Defense Decisions Generated: 9
1. ISOLATE_HOST → ws-finance-07 (Credential dump)
2. BLOCK_IP → 185.220.101.34 (C2 communication)
3. RESET_CREDENTIAL → jwilson (Compromised account)
Human approval required: Yes
```
### 启动 API 服务器
```
python -m aegis.cli server
# Open http://localhost:8000/docs for Swagger UI
```
### 打开仪表板
```
start dashboard.html
# Or open it directly in your browser
```
仪表板通过 WebSocket 连接到 API 服务器以获取实时更新,并提供:
- 手动告警提交表单
- 一键式场景执行(4 种预设 APT 攻击模式)
- D3.js 攻击链可视化
- 决策批准/拒绝工作流
## 功能
### 已实现
| 功能 | 描述 |
|------|------|
| **双窗口告警聚类** | 关键告警 5 分钟快速通道 + 30 分钟完整窗口 |
| **告警去重** | 基于实体哈希的跨 SIEM/EDR/NGFW 传感器去重 |
| **模式规范化** | 将 Splunk、SentinelOne、Suricata、Zeek 字段映射到规范格式 |
| **基于规则的 TTP 映射** | 30+ 告警模式 → ATT&CK 技术 ID 映射 |
| **基于锚点的 BFS 路径推理** | 通过 ATT&CK 图将观察到的 TTP 扩展为 6 步攻击链 |
| **路径剪枝** | 平台一致性 + 战术排序 + 数据源共享过滤器 |
| **对手模拟** | 三级红队策略(简单/中等/APT),具有概率性 0day 回退 |
| **防御仲裁** | 通过覆盖率、剩余风险和业务影响对防御方案进行定量评估 |
| **SQLite 持久化** | 攻击片段、链和决策在服务器重启后保留 |
| **WebSocket 实时推送** | 实时事件流传输到已连接的仪表板 |
| **API 认证** | API 密钥 / Bearer 令牌,采用常量时间比较 |
| **SOAR 剧本导出** | 人类可读的行动计划和可执行命令 |
| **合成数据生成器** | 4 个用于测试和演示的真实 APT 攻击场景 |
| **单文件仪表板** | React + D3.js HTML 文件,无需构建步骤 |
## 架构
```
┌──────────────────────────┐
│ Command Center (API) │
│ FastAPI + WebSocket │
│ + SQLite Persistence │
└────┬──────┬──────┬─────────┘
│ │ │
┌───────────────┼──────┼──────┼───────────────┐
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────┐
│ Detection │ │ Tracing │ │ Defense │ │ Command │
│ Agent │─►│ Agent │─►│ Agent │─►│ Center │
│ (fragment) │ │ (chain) │ │ (decision) │ │ │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────┘
│ │ ▲ │ ▲
│ │ │ │ │
└─────────────────┴───┴─────────────┴────┘
ThreatBus (threat.debate)
```
### 威胁事件协议(TEP)
所有代理间通信使用标准化的 JSON 信封:
```
{
"event_id": "evt-a1b2c3d4",
"event_type": "fragment",
"producer": "detection",
"correlation_id": "frag-xxx",
"timestamp": "2026-05-04T10:00:00Z",
"payload": { ... }
}
```
### 攻击场景(内置演示数据)
| 场景 | TTP 链 | 步骤 | 模式 |
|------|--------|------|------|
| **钓鱼 → 数据泄露** | T1566→T1204→T1059→T1071→T1003→T1021→T1083→T1048 | 9 | APT29 风格鱼叉式钓鱼活动 |
| **暴力破解 → 破坏** | T1110→T1078→T1021→T1003→T1485 | 8 | VPN 暴力破解导致勒索软件 |
| **Web 利用 → 数据窃取** | T1190→T1059→T1071→T1003→T1048 | 5 | CVE 利用与反向 shell |
| **供应链 → 数据泄露** | T1195→T1071→T1547→T1083→T1560→T1048 | 6 | 妥协更新与长期潜伏 |
## 项目结构
```
AegisThreat/
├── aegis/
│ ├── agents/ # Three AI agents
│ │ ├── base.py # Abstract agent with lifecycle + bus integration
│ │ ├── detection.py # Alert fusion → AttackFragment
│ │ ├── tracing.py # BFS path reasoning → AttackChain
│ │ └── defense.py # TTP→mitigation mapping → DecisionScript
│ ├── core/
│ │ ├── models.py # Canonical data models (TEP protocol)
│ │ ├── bus.py # InMemoryBus + KafkaBus with DLQ
│ │ ├── alert_dedup.py # Dedup + normalization + severity filter
│ │ ├── persistence.py # SQLite storage layer
│ │ └── security.py # HMAC signing + replay protection
│ ├── inference/
│ │ ├── path_pruner.py # Anchor-based BFS + pruning rules
│ │ ├── alert_cluster.py # DBSCAN clustering (Phase 2 stub)
│ │ ├── path_scorer.py # GraphSAGE scoring (Phase 2 stub)
│ │ └── bayesian.py # Bayesian network (Phase 2 stub)
│ ├── sandbox/
│ │ ├── red_team.py # Independent attacker simulation
│ │ ├── arbitrator.py # Quantitative defense evaluation
│ │ ├── mcts.py # Monte Carlo tree search (Phase 2 stub)
│ │ └── debate.py # Multi-round debate engine (Phase 2 stub)
│ ├── knowledge/
│ │ ├── graph.py # Neo4j interface with mock fallback
│ │ ├── schema.cypher # Full ATT&CK knowledge graph schema
│ │ └── attck_loader.py # MITRE ATT&CK STIX importer
│ ├── connectors/
│ │ ├── siem.py # Splunk/Elastic/SentinelOne/Suricata/Zeek adapters
│ │ └── soar.py # SOAR playbook builder + command generator
│ ├── llm/
│ │ └── client.py # Unified LLM client (OpenAI/vLLM/Ollama) + template fallback
│ ├── api/
│ │ ├── server.py # FastAPI server v2 with WebSocket + persistence
│ │ ├── auth.py # API key / Bearer token authentication
│ │ ├── websocket.py # Real-time event broadcasting
│ │ └── routes/ # Modular REST endpoints
│ └── cli.py # CLI: demo, server, data-gen
├── tools/
│ ├── data_generator.py # Synthetic APT attack data (4 scenarios)
│ └── attck_importer.py # ATT&CK STIX → Neo4j import CLI
├── tests/ # 4 test files, 60+ test cases
├── config/ # YAML configuration + audit logging
├── docker/ # Docker Compose for Kafka + Neo4j
├── docs/ # Architecture, API reference, readiness assessment
├── dashboard.html # Single-file React + D3.js dashboard
├── pyproject.toml # Python package configuration
└── requirements.txt # Dependency list
```
## 使用示例
### 生成合成攻击数据
```
# List available scenarios
python tools/data_generator.py --list
# Generate a scenario as API-ready JSON
python tools/data_generator.py --scenario phishing-to-exfil --format api --output alerts.json
```
### 将各个模块作为库使用
```
# Attack path reasoning (no external deps)
from aegis.inference.path_pruner import anchor_based_bfs
adjacency = {"T1566": ["T1059", "T1204"], "T1059": ["T1003"], ...}
paths = anchor_based_bfs(adjacency, observed_ttps=["T1566"], max_depth=6)
for path in paths:
print(" → ".join(path))
# Adversary simulation
from aegis.sandbox.red_team import RedTeamSimulator, APT_ATTACKER
sim = RedTeamSimulator()
next_move = sim.simulate_response("T1059", APT_ATTACKER)
# Defense evaluation
from aegis.sandbox.arbitrator import Arbitrator
arb = Arbitrator()
verdict = arb.evaluate(1, ["T1566","T1059","T1003"], defense_actions, impact=30)
print(f"Coverage: {verdict.coverage_score:.0%}, Accepted: {verdict.defense_accepted}")
```
### 将 ATT&CK 数据导入 Neo4j
```
# Start Neo4j
docker-compose -f docker/docker-compose.yml up -d neo4j
# Import ATT&CK STIX data
python tools/attck_importer.py --neo4j-uri bolt://localhost:7687 --neo4j-user neo4j --neo4j-password password
```
标签:APT检测, ATT&CK框架, EDR, HTTP/HTTPS抓包, MTTR优化, PyRIT, SOAR, 人工智能安全, 合规性, 多智能体系统, 威胁情报, 安全运营平台, 密钥泄露防护, 库, 应急响应, 开发者工具, 态势感知, 攻击链分析, 机器学习安全, 脆弱性评估, 自动化响应, 逆向工具