cc22n/soc-forge
GitHub: cc22n/soc-forge
SOC Forge 是一个自托管的可定制化威胁情报调查引擎,通过并行查询 17 个情报 API 并归一化结果,帮助 SOC 分析师消除手动切换平台的时间浪费。
Stars: 0 | Forks: 0
# SOC Forge
**为 SOC 分析师打造的可定制化威胁情报调查引擎**
SOC Forge 允许安全分析师构建个性化的调查配置,同时查询 17 个威胁情报 API,并通过社区知识库分享调查结果——从而消除了浪费分析师大量时间的手动切换 API 的繁琐操作。
## 问题背景
SOC 分析师每次调查都会浪费 30-60 分钟,手动检查 10 多个威胁情报平台,在不同标签页之间复制数据,并重复他们的同事已经做过的查询。企业级 SOAR 工具可以解决这个问题,但每年的成本高达 5 万美元以上。目前在这两者之间没有合适的替代方案。
## 解决方案
SOC Forge 是一个免费、自托管的调查引擎,具备以下特性:
1. **调查配置** — 定义可复用的查询配置:调用哪些 API、期望哪些字段以及按何种优先级顺序
2. **并行执行** — 粘贴任意 IOC (hash, IP, domain, URL) → 选择配置 → 通过 `ThreadPoolExecutor` 并发查询所有 API,几秒内即可返回结果
3. **字段归一化** — 将 17 个 API 中的 244 个字段映射到统一的分类法,使 `country`、`asn` 和 `malware_family` 无论来源如何都具有相同的含义
4. **DB 级别的结果缓存** — 在每个数据源的 TTL 窗口内复用最近的 `InvestigationResult` 记录,避免在重启和跨 worker 时产生冗余的 API 调用
5. **社区知识库** — 分享调查结果,让团队永远不必重复相同的 API 调用。可投票确认或反驳调查发现
6. **完整的审计追踪** — 每一次查询、每一次登录、每一项操作都通过 SHA-256 加密链进行跟踪(类似区块链的防篡改检测)
7. **LLM 摘要** — 一键生成自然语言摘要和行动建议;支持 Anthropic、OpenAI、Groq、xAI Grok 和 Google Gemini——只需通过 `.env` 变量即可切换提供商
8. **着陆页与注册** — 为新访问者提供公开的着陆页;包含 IP 速率限制、密码强度强制检查和暴力破解防护的安全自注册功能
## 技术栈
| 层级 | 技术 | 原因 |
|-------|-----------|-----|
| Backend | Python 3.12 / Django 5.1 | 成熟的 ORM,内置后台管理,经过实战检验的安全性 |
| Database | PostgreSQL | JSONB 支持灵活的 API 响应,稳健的索引 |
| Frontend | Django Templates + Tailwind CSS (CDN) | 服务端渲染,快速,无 JS 框架开销 |
| Task Queue | Celery + Redis | 支持优先级队列的异步调查分发 |
| REST API | Django REST Framework | 基于 Token 认证的编程式访问 |
| LLM | Anthropic · OpenAI · Groq · xAI · Gemini | 自然语言摘要 — 可通过 `.env` 切换提供商 |
| Security | django-axes, CSP headers, audit middleware | 从第一天起就遵循 OWASP 规范 |
| APIs | 17 个威胁情报源 | 真实集成,非模拟数据 |
## 支持的情报源 (17)
| 来源 | IOC 类型 | 认证方式 | 速率限制 |
|--------|----------|------|------------|
| **VirusTotal** | Hash, IP, Domain, URL | API Key | 4/min |
| **AbuseIPDB** | IP | API Key | 17/min |
| **Shodan** | IP | API Key | 1/min |
| **GreyNoise** | IP | API Key | 1/min |
| **AlienVault OTX** | Hash, IP, Domain, URL | API Key | 167/min |
| **Google Safe Browsing** | URL, Domain | API Key | 167/min |
| **Hybrid Analysis** | Hash, URL | API Key | 3/min |
| **SecurityTrails** | Domain, IP | API Key | 1/min |
| **ThreatFox** | Hash, IP, Domain | Free | 10/min |
| **URLhaus** | URL, Domain, Hash | Free | 10/min |
| **URLScan.io** | URL, Domain | API Key | 2/min |
| **Pulsedive** | IP, Domain, Hash, URL | API Key | 1/min |
| **Criminal IP** | IP | API Key | 1/min |
| **IPQualityScore** | IP, URL | API Key | 3/min |
| **Censys** | IP, Domain | Basic Auth | 4/min |
| **Malware Bazaar** | Hash | Free | 10/min |
| **ipinfo.io** | IP | API Key | 833/min |
## 系统架构
```
┌────────────────────────────────────────────────────────────┐
│ Django Templates │
│ (Tailwind CSS dark theme) │
├────────────────────────────────────────────────────────────┤
│ Django REST Framework (apps/api/) │
│ TokenAuth · Investigations · Community · Status poll │
├────────────────────────────────────────────────────────────┤
│ Django Views │
│ users · sources · profiles · investigations · community│
├────────────────────────────────────────────────────────────┤
│ Celery Task Queue (Redis broker) │
│ high_priority (≤3 sources) · full_investigation │
│ ↓ fallback to sync if unavailable │
├────────────────────────────────────────────────────────────┤
│ Investigation Engine │
│ ┌──────────────┐ ┌──────────┐ ┌──────────────────────┐ │
│ │ Orchestrator │→│ Registry │→│ 17 API Adapters │ │
│ │(ThreadPool) │ └──────────┘ │ VT·AbuseIPDB·Shodan │ │
│ └──────────────┘ │ GreyNoise·OTX·... │ │
│ │ ↑ DB cache check └──────────────────────┘ │
│ ▼ (TTL per source) │
│ ┌──────────┐ ┌──────────┐ │
│ │Transforms│ │Validators│ │
│ └──────────┘ └──────────┘ │
├────────────────────────────────────────────────────────────┤
│ Django ORM (17 models) │
│ User · Organization · UserReputation · AuditLog · │
│ Source · AvailableField · InvestigationProfile · │
│ ProfileSourceConfig · ExpectedField · Indicator · │
│ Investigation · InvestigationResult · IndicatorTag · │
│ CommunityIndicator · CommunityResult · │
│ CommunityNote · ConfidenceVote │
├────────────────────────────────────────────────────────────┤
│ PostgreSQL │
│ GIN indexes · B-tree indexes · JSONB · full-text search │
└────────────────────────────────────────────────────────────┘
```
## 项目结构
```
soc-forge/
├── config/
│ ├── celery.py # Celery app with autodiscover
│ ├── settings/
│ │ ├── base.py # Shared settings + security + DRF + Celery
│ │ ├── development.py # Debug toolbar, console email
│ │ └── production.py # HTTPS, HSTS, secure cookies
│ └── urls.py # Root URL conf (includes /api/)
├── apps/
│ ├── users/ # Custom User + AuditLog + middleware
│ │ ├── models.py # User, Organization, UserReputation, AuditLog (crypto chain)
│ │ ├── middleware.py # Audit trail for state-changing requests
│ │ └── security_middleware.py # Rate limiting (Redis/LocMem), sanitization, headers
│ ├── sources/ # Threat intel API catalog
│ │ ├── models.py # Source (with TTL), AvailableField
│ │ └── management/commands/ # seed_sources (17 APIs + 8 profiles)
│ ├── profiles/ # Investigation configurations
│ │ └── models.py # InvestigationProfile, ProfileSourceConfig, ExpectedField
│ ├── investigations/ # Query execution + results
│ │ ├── models.py # Indicator, Investigation, InvestigationResult (schema_version)
│ │ ├── tasks.py # Celery shared_task + dispatch_investigation()
│ │ └── engine/ # ← The core engine
│ │ ├── base_adapter.py # Abstract adapter (AdapterResponse with source_slug)
│ │ ├── transforms.py # Field normalization functions
│ │ ├── registry.py # Slug → Adapter mapping
│ │ ├── orchestrator.py # Parallel execution (ThreadPoolExecutor) + DB cache
│ │ └── adapters/ # 17 API-specific adapters
│ ├── api/ # REST API (DRF)
│ │ ├── serializers.py # 6 serializers
│ │ ├── views.py # 5 endpoints with TokenAuthentication
│ │ └── urls.py # /api/ URL patterns
│ ├── community/ # Shared knowledge base
│ │ └── models.py # CommunityIndicator, CommunityResult, Note, Vote
│ └── core/ # Shared utilities
│ ├── enums.py # IOCType, InvestigationStatus, etc.
│ ├── validators.py # IOC validation + private IP blocking
│ ├── mixins.py # org_investigations_filter, user_can_access_investigation
│ └── exceptions.py # Domain exceptions
│ │ ├── llm.py # Multi-provider LLM abstraction (Anthropic/OpenAI/Groq/Grok/Gemini)
│ │ └── engine/ # ← The core engine
├── templates/
│ ├── home.html # Public landing page
│ ├── registration/
│ │ ├── login.html # Sign-in form
│ │ └── register.html # Self-registration form
│ └── … # Dark SOC-themed UI
└── tests/ # 132 tests (pytest-django)
```
## 快速开始
### 前置条件
- Python 3.12+
- PostgreSQL 14+
- Redis (可选 — 如果没有,将回退到同步执行)
- 至少一个威胁情报 API 密钥 (免费选项: ThreatFox, URLhaus, Malware Bazaar)
### 安装说明
```
# Clone
git clone https://github.com/YOUR_USERNAME/soc-forge.git
cd soc-forge
# 虚拟环境
python -m venv venv
source venv/bin/activate # Linux/Mac
# venv\Scripts\activate # Windows
# 依赖
pip install -r requirements.txt
# 环境
cp .env.example .env
# 使用你的 SECRET_KEY、DATABASE_URL 和 API keys 编辑 .env
# 数据库
python manage.py migrate
python manage.py createsuperuser
# Seed 情报源(17 个 APIs + 8 个默认配置文件)
python manage.py seed_sources --profiles
# 运行
python manage.py runserver
```
### 可选:用于异步调查的 Celery
```
# 在一个单独的终端中
celery -A config worker -l info -Q high_priority,full_investigation,celery
```
### 可选:LLM 摘要
在 `.env` 中设置一个提供商(只需匹配对应的密钥):
```
# Anthropic Claude(默认)
LLM_PROVIDER=anthropic
ANTHROPIC_API_KEY=sk-ant-...
# OpenAI ChatGPT
LLM_PROVIDER=openai
OPENAI_API_KEY=sk-...
# Groq — 快速,提供免费套餐
LLM_PROVIDER=groq
GROQ_API_KEY=gsk_...
# xAI Grok
LLM_PROVIDER=grok
GROK_API_KEY=xai-...
# Google Gemini
LLM_PROVIDER=gemini
GEMINI_API_KEY=AIza...
# 覆盖任意 provider 的模型(可选)
LLM_MODEL=llama-3.1-8b-instant
```
### 首次调查
1. 前往 `http://localhost:8000` — 新访问者的着陆页
2. 点击 **Create Account** 进行注册,如果已有账号则点击 **Sign In**
3. 导航至 **Investigations → New Investigation**
4. 粘贴一个 IP 地址:`185.220.101.50`
5. 选择 **"IP Reputation Check"** 配置
6. 点击 **Execute Investigation**
7. 查看来自 AbuseIPDB、GreyNoise 和 ipinfo.io 的结果 — 每个来源都会显示一个 OK/Partial/Error 标记
8. 点击 **Generar resumen** 获取 AI 驱动的分析(需要在 `.env` 中配置 LLM 密钥)
9. 点击 **Share to Community** 将其添加到知识库
## REST API
使用 DRF token 进行认证 (`Authorization: Token `)。
| 方法 | 端点 | 描述 |
|--------|----------|-------------|
| `POST` | `/api/investigations/` | 启动新调查 |
| `GET` | `/api/investigations/{id}/` | 获取调查及结果 |
| `GET` | `/api/investigations/{id}/status/` | 轮询异步状态 |
| `GET` | `/api/community/` | 浏览社区 IOC |
| `POST` | `/api/token/` | 获取认证 token |
## 安全特性
| 特性 | 实现方式 |
|---------|---------------|
| **暴力破解防护** | django-axes:5 次登录失败 → 锁定 1 小时 |
| **CSRF 防护** | Django CSRF middleware + HttpOnly cookies — 登出使用 POST 而非 GET |
| **输入净化** | 自定义中间件拦截 IOC 输入中的 XSS、SQLi 和路径遍历 |
| **私有 IP 拦截** | 验证器拒绝 RFC1918、环回、链路本地、多播地址 (IPv4 + IPv6) |
| **速率限制** | 基于 Redis 的原子计数器(回退到 LocMemCache) — 同时覆盖调查和注册接口 |
| **注册安全** | IP 速率限制(10 分钟内 5 次尝试)、用户名白名单 `[a-zA-Z0-9_-]`、邮箱唯一性、Django 密码验证器 |
| **加密审计追踪** | AuditLog 中的 SHA-256 链 — `verify_chain()` 可检测任何被篡改的条目 |
| **安全请求头** | X-Frame-Options, X-Content-Type-Options, Referrer-Policy, Permissions-Policy |
| **会话安全** | HttpOnly,8 小时过期,浏览器关闭时失效 |
| **密码策略** | 至少 10 个字符,常见密码检查,相似度检查 |
| **生产环境 HTTPS** | HSTS,安全 cookies,SSL 重定向 (production.py) |
| **多租户隔离** | Organization 模型 + Q 过滤辅助函数,限定所有调查查询的范围 |
## 默认调查配置
| 配置文件 | IOC 类型 | 来源 | 用例 |
|---------|----------|---------|----------|
| Quick Hash Lookup | Hash | VT, MalwareBazaar | 快速恶意软件检查 |
| Full Malware Analysis | Hash | VT, MalwareBazaar, HybridAnalysis, ThreatFox, OTX | 深度 hash 调查 |
| IP Reputation Check | IP | AbuseIPDB, GreyNoise, ipinfo | 快速 IP 分流 |
| Infrastructure Recon | IP | Shodan, Censys, CriminalIP | 端口/服务/漏洞扫描 |
| Full IP Investigation | IP | 9 sources | 完整 IP 分析 |
| Domain Investigation | Domain | VT, SecurityTrails, OTX, URLScan, SafeBrowsing | 域名分流 |
| Phishing URL Analysis | URL | VT, URLScan, SafeBrowsing, IPQS, OTX | 钓鱼检测 |
| Malware Delivery URL | URL | URLhaus, VT, URLScan, HybridAnalysis | 恶意 URL 检查 |
## 改进计划 — 已完成 (18/18 项任务)
该项目在初始构建之后经历了一个结构化的四阶段改进计划:
### 第一阶段 — 立竿见影的影响
- **并行 API 执行** — 编排器中的 `ThreadPoolExecutor(max_workers=8)`;DB 准备和 `bulk_create` 保留在主线程中,以避免 ORM 线程问题
- **`AdapterResponse` 上的 `source_slug`** — 适配器现在可以在并行结果中自我识别
- **私有 IP 验证** — 拒绝 RFC1918、环回、链路本地、多播以及 IPv6 保留范围
- **基于 Redis 的速率限制** — 使用带有原子 `add()`+`incr()` 的 `caches["rate_limit"]`;LocMemCache 作为开发/测试环境的回退方案
- **按来源的状态标记** — 每个来源面板显示 OK / Partial / Error / No data
### 第二阶段 — 数据质量与安全
- **DB 级别的结果缓存** — 在进行任何 API 调用之前,检查数据源 TTL 内的 `InvestigationResult` 记录;命中时复制结果,未命中时调用 API
- **`schema_version` 字段** — 为 `InvestigationResult` 上的未来字段迁移做好准备
- **感知 TTL 的社区去重** — 过期的社区条目将被刷新;最近的条目将被跳过
- **加密审计链** — `AuditLog.save()` 对上一个条目的哈希计算 SHA-256;`verify_chain()` 返回第一个断裂的链接
### 第三阶段 — 可扩展性与多租户支持
- **性能索引** — `idx_inv_results_source_fetched`(编排器缓存)、`idx_investigations_indicator`、`idx_community_results_dedup`、`idx_audit_log_detail_gin` (GIN)、`idx_audit_log_entry_hash`
- **Organization 模型** — `User` 上的可空 FK;`org_investigations_filter()` 和 `user_can_access_investigation()` 限定所有查询的范围
- **用户信誉系统** — 带有 `trust_weight` 的 `UserReputation`(新用户为 0.1× → 已认证用户为 1.0×);限制为每小时 30 次分享;争议惩罚 -10 分
- **STIX 2.1 导出** — `GET /investigations/{id}/export/stix/` 下载符合规范的 Bundle JSON,其中包含每个来源的 `indicator` + `observed-data` 对象
### 第四阶段 — REST API 与自动化
- **REST API** — `apps/api/`,使用 DRF `TokenAuthentication`;5 个用于调查、社区和状态轮询的端点
- **分析仪表盘** — PostgreSQL 聚合(`Avg`、`Count`、`TruncDate`、`F`);IOC 细分柱状图、来源性能表(发现率 + 平均耗时 ms)、热门 IOC
- **Celery 异步分发** — 带有重试机制的 `run_investigation_task` 共享任务;`dispatch_investigation()` 选择 `high_priority`(≤3 个来源)或 `full_investigation` 队列;如果 Redis 不可用则回退到同步执行
- **LLM 摘要** — `POST /investigations/{id}/summary/` → JSON `{summary, recommendation, provider}`;与提供商无关的 `apps/investigations/llm.py` 模块根据 `.env` 中的 `LLM_PROVIDER` 分发到 Anthropic、OpenAI、Groq、xAI 或 Gemini;UI 面板显示生成响应的模型
## 关键设计决策
**无异步 Django 的并行执行**
`ThreadPoolExecutor` 将 HTTP 调用移出主线程,同时所有 DB 操作(ORM 查、`bulk_create`)保留在主线程中。这避免了 Django ORM 的线程问题,而无需引入异步视图或 ASGI。
**DB 缓存而非 Redis 缓存**
在每个数据源的 TTL 内复用现有的 `InvestigationResult` 行,避免了为缓存增加 Redis 依赖。它在重启后依然有效,可跨多个 worker 工作,并保持审计追踪的完整性。
**加密审计日志**
每个 `AuditLog` 条目存储前一个条目哈希的 SHA-256 值,形成一条防篡改的链条。`verify_chain()` 可以检测任何历史记录是否被更改。
**带有可空 FK 的 Organization 隔离**
个人用户可以在不属于任何组织的情况下继续使用系统。Q 过滤辅助函数(`org_investigations_filter`)在每个视图中透明地处理这两种情况。
**无外部依赖库的 STIX 2.1**
该规范足够直观,可以针对所需的子集(indicator + observed-data)进行手动实现。不需要 `stix2` 依赖,这意味着减少了一项供应链风险。
**带有同步回退的 Celery**
`dispatch_investigation()` 会捕获 Celery/Redis 连接错误并回退到同步执行。该应用在开发环境中无需运行 Redis 即可工作。
**单包实现多提供商 LLM**
Groq、xAI (Grok) 和 Gemini 都暴露了兼容 OpenAI 的 REST API,因此 `openai` SDK 只需切换 `base_url` 即可覆盖这三者。只有 Anthropic 需要自己的 SDK。更改提供商只需在 `.env` 中修改一行——无需更改代码。
**根路径着陆页,仪表盘需登录**
`/` 提供公开的着陆页;已认证的用户会被重定向到 `/dashboard/`。这样可以在不复制模板的情况下,将营销/引导与需要认证的应用分开。
**为什么选择 Django 而不是 Flask?**
17 个具有复杂关系的互连模型。Django 的 ORM、admin、auth 和 migrations 可以开箱即用地处理这些问题。
**为什么从第一天起就使用 PostgreSQL?**
JSONB 字段能高效存储原始 API 响应。GIN 索引实现了快速的 JSON 键搜索。避免了日后从 SQLite 迁移到 PostgreSQL 的痛苦。
**为什么选择服务端渲染模板而不是 React/Vue?**
这是一个安全工具,而不是 SPA。服务端渲染构建速度更快,更容易保护安全(无需 CORS,无需 JWT),并且 Django 模板结合 Tailwind 能以更低的复杂性提供专业的 UI。
## 贡献
这是一个作品集项目,但也欢迎提交 PR。请先开一个 issue 来讨论变更内容。
## 许可证
MIT
标签:DNS枚举, GitHub, IOC查询, IP 地址批量处理, IP限流, LLM摘要, Python, SHA-256, SOC分析师, ThreadPoolExecutor, Web安全, 人工智能安全, 免费SOAR替代, 区块链防篡改, 合规性, 多源情报聚合, 威胁情报, 威胁情报API, 安全合规, 库, 应急响应, 开发者工具, 搜索引擎查询, 数据规范化, 无后门, 测试用例, 社区知识库, 网络代理, 网络信息收集, 网络安全工具, 网络测绘, 自动化调查, 自助化平台, 蓝队分析, 逆向工具