cc22n/soc-forge

GitHub: cc22n/soc-forge

SOC Forge 是一个自托管的可定制化威胁情报调查引擎,通过并行查询 17 个情报 API 并归一化结果,帮助 SOC 分析师消除手动切换平台的时间浪费。

Stars: 0 | Forks: 0

# SOC Forge **为 SOC 分析师打造的可定制化威胁情报调查引擎** SOC Forge 允许安全分析师构建个性化的调查配置,同时查询 17 个威胁情报 API,并通过社区知识库分享调查结果——从而消除了浪费分析师大量时间的手动切换 API 的繁琐操作。 ## 问题背景 SOC 分析师每次调查都会浪费 30-60 分钟,手动检查 10 多个威胁情报平台,在不同标签页之间复制数据,并重复他们的同事已经做过的查询。企业级 SOAR 工具可以解决这个问题,但每年的成本高达 5 万美元以上。目前在这两者之间没有合适的替代方案。 ## 解决方案 SOC Forge 是一个免费、自托管的调查引擎,具备以下特性: 1. **调查配置** — 定义可复用的查询配置:调用哪些 API、期望哪些字段以及按何种优先级顺序 2. **并行执行** — 粘贴任意 IOC (hash, IP, domain, URL) → 选择配置 → 通过 `ThreadPoolExecutor` 并发查询所有 API,几秒内即可返回结果 3. **字段归一化** — 将 17 个 API 中的 244 个字段映射到统一的分类法,使 `country`、`asn` 和 `malware_family` 无论来源如何都具有相同的含义 4. **DB 级别的结果缓存** — 在每个数据源的 TTL 窗口内复用最近的 `InvestigationResult` 记录,避免在重启和跨 worker 时产生冗余的 API 调用 5. **社区知识库** — 分享调查结果,让团队永远不必重复相同的 API 调用。可投票确认或反驳调查发现 6. **完整的审计追踪** — 每一次查询、每一次登录、每一项操作都通过 SHA-256 加密链进行跟踪(类似区块链的防篡改检测) 7. **LLM 摘要** — 一键生成自然语言摘要和行动建议;支持 Anthropic、OpenAI、Groq、xAI Grok 和 Google Gemini——只需通过 `.env` 变量即可切换提供商 8. **着陆页与注册** — 为新访问者提供公开的着陆页;包含 IP 速率限制、密码强度强制检查和暴力破解防护的安全自注册功能 ## 技术栈 | 层级 | 技术 | 原因 | |-------|-----------|-----| | Backend | Python 3.12 / Django 5.1 | 成熟的 ORM,内置后台管理,经过实战检验的安全性 | | Database | PostgreSQL | JSONB 支持灵活的 API 响应,稳健的索引 | | Frontend | Django Templates + Tailwind CSS (CDN) | 服务端渲染,快速,无 JS 框架开销 | | Task Queue | Celery + Redis | 支持优先级队列的异步调查分发 | | REST API | Django REST Framework | 基于 Token 认证的编程式访问 | | LLM | Anthropic · OpenAI · Groq · xAI · Gemini | 自然语言摘要 — 可通过 `.env` 切换提供商 | | Security | django-axes, CSP headers, audit middleware | 从第一天起就遵循 OWASP 规范 | | APIs | 17 个威胁情报源 | 真实集成,非模拟数据 | ## 支持的情报源 (17) | 来源 | IOC 类型 | 认证方式 | 速率限制 | |--------|----------|------|------------| | **VirusTotal** | Hash, IP, Domain, URL | API Key | 4/min | | **AbuseIPDB** | IP | API Key | 17/min | | **Shodan** | IP | API Key | 1/min | | **GreyNoise** | IP | API Key | 1/min | | **AlienVault OTX** | Hash, IP, Domain, URL | API Key | 167/min | | **Google Safe Browsing** | URL, Domain | API Key | 167/min | | **Hybrid Analysis** | Hash, URL | API Key | 3/min | | **SecurityTrails** | Domain, IP | API Key | 1/min | | **ThreatFox** | Hash, IP, Domain | Free | 10/min | | **URLhaus** | URL, Domain, Hash | Free | 10/min | | **URLScan.io** | URL, Domain | API Key | 2/min | | **Pulsedive** | IP, Domain, Hash, URL | API Key | 1/min | | **Criminal IP** | IP | API Key | 1/min | | **IPQualityScore** | IP, URL | API Key | 3/min | | **Censys** | IP, Domain | Basic Auth | 4/min | | **Malware Bazaar** | Hash | Free | 10/min | | **ipinfo.io** | IP | API Key | 833/min | ## 系统架构 ``` ┌────────────────────────────────────────────────────────────┐ │ Django Templates │ │ (Tailwind CSS dark theme) │ ├────────────────────────────────────────────────────────────┤ │ Django REST Framework (apps/api/) │ │ TokenAuth · Investigations · Community · Status poll │ ├────────────────────────────────────────────────────────────┤ │ Django Views │ │ users · sources · profiles · investigations · community│ ├────────────────────────────────────────────────────────────┤ │ Celery Task Queue (Redis broker) │ │ high_priority (≤3 sources) · full_investigation │ │ ↓ fallback to sync if unavailable │ ├────────────────────────────────────────────────────────────┤ │ Investigation Engine │ │ ┌──────────────┐ ┌──────────┐ ┌──────────────────────┐ │ │ │ Orchestrator │→│ Registry │→│ 17 API Adapters │ │ │ │(ThreadPool) │ └──────────┘ │ VT·AbuseIPDB·Shodan │ │ │ └──────────────┘ │ GreyNoise·OTX·... │ │ │ │ ↑ DB cache check └──────────────────────┘ │ │ ▼ (TTL per source) │ │ ┌──────────┐ ┌──────────┐ │ │ │Transforms│ │Validators│ │ │ └──────────┘ └──────────┘ │ ├────────────────────────────────────────────────────────────┤ │ Django ORM (17 models) │ │ User · Organization · UserReputation · AuditLog · │ │ Source · AvailableField · InvestigationProfile · │ │ ProfileSourceConfig · ExpectedField · Indicator · │ │ Investigation · InvestigationResult · IndicatorTag · │ │ CommunityIndicator · CommunityResult · │ │ CommunityNote · ConfidenceVote │ ├────────────────────────────────────────────────────────────┤ │ PostgreSQL │ │ GIN indexes · B-tree indexes · JSONB · full-text search │ └────────────────────────────────────────────────────────────┘ ``` ## 项目结构 ``` soc-forge/ ├── config/ │ ├── celery.py # Celery app with autodiscover │ ├── settings/ │ │ ├── base.py # Shared settings + security + DRF + Celery │ │ ├── development.py # Debug toolbar, console email │ │ └── production.py # HTTPS, HSTS, secure cookies │ └── urls.py # Root URL conf (includes /api/) ├── apps/ │ ├── users/ # Custom User + AuditLog + middleware │ │ ├── models.py # User, Organization, UserReputation, AuditLog (crypto chain) │ │ ├── middleware.py # Audit trail for state-changing requests │ │ └── security_middleware.py # Rate limiting (Redis/LocMem), sanitization, headers │ ├── sources/ # Threat intel API catalog │ │ ├── models.py # Source (with TTL), AvailableField │ │ └── management/commands/ # seed_sources (17 APIs + 8 profiles) │ ├── profiles/ # Investigation configurations │ │ └── models.py # InvestigationProfile, ProfileSourceConfig, ExpectedField │ ├── investigations/ # Query execution + results │ │ ├── models.py # Indicator, Investigation, InvestigationResult (schema_version) │ │ ├── tasks.py # Celery shared_task + dispatch_investigation() │ │ └── engine/ # ← The core engine │ │ ├── base_adapter.py # Abstract adapter (AdapterResponse with source_slug) │ │ ├── transforms.py # Field normalization functions │ │ ├── registry.py # Slug → Adapter mapping │ │ ├── orchestrator.py # Parallel execution (ThreadPoolExecutor) + DB cache │ │ └── adapters/ # 17 API-specific adapters │ ├── api/ # REST API (DRF) │ │ ├── serializers.py # 6 serializers │ │ ├── views.py # 5 endpoints with TokenAuthentication │ │ └── urls.py # /api/ URL patterns │ ├── community/ # Shared knowledge base │ │ └── models.py # CommunityIndicator, CommunityResult, Note, Vote │ └── core/ # Shared utilities │ ├── enums.py # IOCType, InvestigationStatus, etc. │ ├── validators.py # IOC validation + private IP blocking │ ├── mixins.py # org_investigations_filter, user_can_access_investigation │ └── exceptions.py # Domain exceptions │ │ ├── llm.py # Multi-provider LLM abstraction (Anthropic/OpenAI/Groq/Grok/Gemini) │ │ └── engine/ # ← The core engine ├── templates/ │ ├── home.html # Public landing page │ ├── registration/ │ │ ├── login.html # Sign-in form │ │ └── register.html # Self-registration form │ └── … # Dark SOC-themed UI └── tests/ # 132 tests (pytest-django) ``` ## 快速开始 ### 前置条件 - Python 3.12+ - PostgreSQL 14+ - Redis (可选 — 如果没有,将回退到同步执行) - 至少一个威胁情报 API 密钥 (免费选项: ThreatFox, URLhaus, Malware Bazaar) ### 安装说明 ``` # Clone git clone https://github.com/YOUR_USERNAME/soc-forge.git cd soc-forge # 虚拟环境 python -m venv venv source venv/bin/activate # Linux/Mac # venv\Scripts\activate # Windows # 依赖 pip install -r requirements.txt # 环境 cp .env.example .env # 使用你的 SECRET_KEY、DATABASE_URL 和 API keys 编辑 .env # 数据库 python manage.py migrate python manage.py createsuperuser # Seed 情报源(17 个 APIs + 8 个默认配置文件) python manage.py seed_sources --profiles # 运行 python manage.py runserver ``` ### 可选:用于异步调查的 Celery ``` # 在一个单独的终端中 celery -A config worker -l info -Q high_priority,full_investigation,celery ``` ### 可选:LLM 摘要 在 `.env` 中设置一个提供商(只需匹配对应的密钥): ``` # Anthropic Claude(默认) LLM_PROVIDER=anthropic ANTHROPIC_API_KEY=sk-ant-... # OpenAI ChatGPT LLM_PROVIDER=openai OPENAI_API_KEY=sk-... # Groq — 快速,提供免费套餐 LLM_PROVIDER=groq GROQ_API_KEY=gsk_... # xAI Grok LLM_PROVIDER=grok GROK_API_KEY=xai-... # Google Gemini LLM_PROVIDER=gemini GEMINI_API_KEY=AIza... # 覆盖任意 provider 的模型(可选) LLM_MODEL=llama-3.1-8b-instant ``` ### 首次调查 1. 前往 `http://localhost:8000` — 新访问者的着陆页 2. 点击 **Create Account** 进行注册,如果已有账号则点击 **Sign In** 3. 导航至 **Investigations → New Investigation** 4. 粘贴一个 IP 地址:`185.220.101.50` 5. 选择 **"IP Reputation Check"** 配置 6. 点击 **Execute Investigation** 7. 查看来自 AbuseIPDB、GreyNoise 和 ipinfo.io 的结果 — 每个来源都会显示一个 OK/Partial/Error 标记 8. 点击 **Generar resumen** 获取 AI 驱动的分析(需要在 `.env` 中配置 LLM 密钥) 9. 点击 **Share to Community** 将其添加到知识库 ## REST API 使用 DRF token 进行认证 (`Authorization: Token `)。 | 方法 | 端点 | 描述 | |--------|----------|-------------| | `POST` | `/api/investigations/` | 启动新调查 | | `GET` | `/api/investigations/{id}/` | 获取调查及结果 | | `GET` | `/api/investigations/{id}/status/` | 轮询异步状态 | | `GET` | `/api/community/` | 浏览社区 IOC | | `POST` | `/api/token/` | 获取认证 token | ## 安全特性 | 特性 | 实现方式 | |---------|---------------| | **暴力破解防护** | django-axes:5 次登录失败 → 锁定 1 小时 | | **CSRF 防护** | Django CSRF middleware + HttpOnly cookies — 登出使用 POST 而非 GET | | **输入净化** | 自定义中间件拦截 IOC 输入中的 XSS、SQLi 和路径遍历 | | **私有 IP 拦截** | 验证器拒绝 RFC1918、环回、链路本地、多播地址 (IPv4 + IPv6) | | **速率限制** | 基于 Redis 的原子计数器(回退到 LocMemCache) — 同时覆盖调查和注册接口 | | **注册安全** | IP 速率限制(10 分钟内 5 次尝试)、用户名白名单 `[a-zA-Z0-9_-]`、邮箱唯一性、Django 密码验证器 | | **加密审计追踪** | AuditLog 中的 SHA-256 链 — `verify_chain()` 可检测任何被篡改的条目 | | **安全请求头** | X-Frame-Options, X-Content-Type-Options, Referrer-Policy, Permissions-Policy | | **会话安全** | HttpOnly,8 小时过期,浏览器关闭时失效 | | **密码策略** | 至少 10 个字符,常见密码检查,相似度检查 | | **生产环境 HTTPS** | HSTS,安全 cookies,SSL 重定向 (production.py) | | **多租户隔离** | Organization 模型 + Q 过滤辅助函数,限定所有调查查询的范围 | ## 默认调查配置 | 配置文件 | IOC 类型 | 来源 | 用例 | |---------|----------|---------|----------| | Quick Hash Lookup | Hash | VT, MalwareBazaar | 快速恶意软件检查 | | Full Malware Analysis | Hash | VT, MalwareBazaar, HybridAnalysis, ThreatFox, OTX | 深度 hash 调查 | | IP Reputation Check | IP | AbuseIPDB, GreyNoise, ipinfo | 快速 IP 分流 | | Infrastructure Recon | IP | Shodan, Censys, CriminalIP | 端口/服务/漏洞扫描 | | Full IP Investigation | IP | 9 sources | 完整 IP 分析 | | Domain Investigation | Domain | VT, SecurityTrails, OTX, URLScan, SafeBrowsing | 域名分流 | | Phishing URL Analysis | URL | VT, URLScan, SafeBrowsing, IPQS, OTX | 钓鱼检测 | | Malware Delivery URL | URL | URLhaus, VT, URLScan, HybridAnalysis | 恶意 URL 检查 | ## 改进计划 — 已完成 (18/18 项任务) 该项目在初始构建之后经历了一个结构化的四阶段改进计划: ### 第一阶段 — 立竿见影的影响 - **并行 API 执行** — 编排器中的 `ThreadPoolExecutor(max_workers=8)`;DB 准备和 `bulk_create` 保留在主线程中,以避免 ORM 线程问题 - **`AdapterResponse` 上的 `source_slug`** — 适配器现在可以在并行结果中自我识别 - **私有 IP 验证** — 拒绝 RFC1918、环回、链路本地、多播以及 IPv6 保留范围 - **基于 Redis 的速率限制** — 使用带有原子 `add()`+`incr()` 的 `caches["rate_limit"]`;LocMemCache 作为开发/测试环境的回退方案 - **按来源的状态标记** — 每个来源面板显示 OK / Partial / Error / No data ### 第二阶段 — 数据质量与安全 - **DB 级别的结果缓存** — 在进行任何 API 调用之前,检查数据源 TTL 内的 `InvestigationResult` 记录;命中时复制结果,未命中时调用 API - **`schema_version` 字段** — 为 `InvestigationResult` 上的未来字段迁移做好准备 - **感知 TTL 的社区去重** — 过期的社区条目将被刷新;最近的条目将被跳过 - **加密审计链** — `AuditLog.save()` 对上一个条目的哈希计算 SHA-256;`verify_chain()` 返回第一个断裂的链接 ### 第三阶段 — 可扩展性与多租户支持 - **性能索引** — `idx_inv_results_source_fetched`(编排器缓存)、`idx_investigations_indicator`、`idx_community_results_dedup`、`idx_audit_log_detail_gin` (GIN)、`idx_audit_log_entry_hash` - **Organization 模型** — `User` 上的可空 FK;`org_investigations_filter()` 和 `user_can_access_investigation()` 限定所有查询的范围 - **用户信誉系统** — 带有 `trust_weight` 的 `UserReputation`(新用户为 0.1× → 已认证用户为 1.0×);限制为每小时 30 次分享;争议惩罚 -10 分 - **STIX 2.1 导出** — `GET /investigations/{id}/export/stix/` 下载符合规范的 Bundle JSON,其中包含每个来源的 `indicator` + `observed-data` 对象 ### 第四阶段 — REST API 与自动化 - **REST API** — `apps/api/`,使用 DRF `TokenAuthentication`;5 个用于调查、社区和状态轮询的端点 - **分析仪表盘** — PostgreSQL 聚合(`Avg`、`Count`、`TruncDate`、`F`);IOC 细分柱状图、来源性能表(发现率 + 平均耗时 ms)、热门 IOC - **Celery 异步分发** — 带有重试机制的 `run_investigation_task` 共享任务;`dispatch_investigation()` 选择 `high_priority`(≤3 个来源)或 `full_investigation` 队列;如果 Redis 不可用则回退到同步执行 - **LLM 摘要** — `POST /investigations/{id}/summary/` → JSON `{summary, recommendation, provider}`;与提供商无关的 `apps/investigations/llm.py` 模块根据 `.env` 中的 `LLM_PROVIDER` 分发到 Anthropic、OpenAI、Groq、xAI 或 Gemini;UI 面板显示生成响应的模型 ## 关键设计决策 **无异步 Django 的并行执行** `ThreadPoolExecutor` 将 HTTP 调用移出主线程,同时所有 DB 操作(ORM 查、`bulk_create`)保留在主线程中。这避免了 Django ORM 的线程问题,而无需引入异步视图或 ASGI。 **DB 缓存而非 Redis 缓存** 在每个数据源的 TTL 内复用现有的 `InvestigationResult` 行,避免了为缓存增加 Redis 依赖。它在重启后依然有效,可跨多个 worker 工作,并保持审计追踪的完整性。 **加密审计日志** 每个 `AuditLog` 条目存储前一个条目哈希的 SHA-256 值,形成一条防篡改的链条。`verify_chain()` 可以检测任何历史记录是否被更改。 **带有可空 FK 的 Organization 隔离** 个人用户可以在不属于任何组织的情况下继续使用系统。Q 过滤辅助函数(`org_investigations_filter`)在每个视图中透明地处理这两种情况。 **无外部依赖库的 STIX 2.1** 该规范足够直观,可以针对所需的子集(indicator + observed-data)进行手动实现。不需要 `stix2` 依赖,这意味着减少了一项供应链风险。 **带有同步回退的 Celery** `dispatch_investigation()` 会捕获 Celery/Redis 连接错误并回退到同步执行。该应用在开发环境中无需运行 Redis 即可工作。 **单包实现多提供商 LLM** Groq、xAI (Grok) 和 Gemini 都暴露了兼容 OpenAI 的 REST API,因此 `openai` SDK 只需切换 `base_url` 即可覆盖这三者。只有 Anthropic 需要自己的 SDK。更改提供商只需在 `.env` 中修改一行——无需更改代码。 **根路径着陆页,仪表盘需登录** `/` 提供公开的着陆页;已认证的用户会被重定向到 `/dashboard/`。这样可以在不复制模板的情况下,将营销/引导与需要认证的应用分开。 **为什么选择 Django 而不是 Flask?** 17 个具有复杂关系的互连模型。Django 的 ORM、admin、auth 和 migrations 可以开箱即用地处理这些问题。 **为什么从第一天起就使用 PostgreSQL?** JSONB 字段能高效存储原始 API 响应。GIN 索引实现了快速的 JSON 键搜索。避免了日后从 SQLite 迁移到 PostgreSQL 的痛苦。 **为什么选择服务端渲染模板而不是 React/Vue?** 这是一个安全工具,而不是 SPA。服务端渲染构建速度更快,更容易保护安全(无需 CORS,无需 JWT),并且 Django 模板结合 Tailwind 能以更低的复杂性提供专业的 UI。 ## 贡献 这是一个作品集项目,但也欢迎提交 PR。请先开一个 issue 来讨论变更内容。 ## 许可证 MIT
标签:DNS枚举, GitHub, IOC查询, IP 地址批量处理, IP限流, LLM摘要, Python, SHA-256, SOC分析师, ThreadPoolExecutor, Web安全, 人工智能安全, 免费SOAR替代, 区块链防篡改, 合规性, 多源情报聚合, 威胁情报, 威胁情报API, 安全合规, 库, 应急响应, 开发者工具, 搜索引擎查询, 数据规范化, 无后门, 测试用例, 社区知识库, 网络代理, 网络信息收集, 网络安全工具, 网络测绘, 自动化调查, 自助化平台, 蓝队分析, 逆向工具