alisterrodrigues/soc-alert-triage-engine

GitHub: alisterrodrigues/soc-alert-triage-engine

专为安全运营中心设计的告警分诊引擎,支持多源数据摄取、威胁情报丰富及 MITRE ATT&CK 关联分析。

Stars: 0 | Forks: 0

# SOC 告警分诊引擎 [![CI](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/0e13c28c3a160333.svg)](https://github.com/alisterrodrigues/soc-alert-triage-engine/actions/workflows/ci.yml) [![Python 3.11+](https://img.shields.io/badge/python-3.11%2B-blue.svg)](https://www.python.org/downloads/) [![License: MIT](https://img.shields.io/badge/license-MIT-green.svg)](LICENSE) [![MITRE ATT&CK](https://img.shields.io/badge/MITRE-ATT%26CK-red.svg)](https://attack.mitre.org/) [![Sources](https://img.shields.io/badge/sources-File%20%7C%20Splunk%20%7C%20Elastic-orange.svg)](#sources) 一个精心设计的 Python CLI 工具,可从任意源摄取安全告警,利用 VirusTotal 和 Shodan 威胁情报进行丰富,通过 6 因子加权模型对优先级进行评分,检测相关联的事件和杀伤链,并生成自包含的 HTML 分析师报告。 ## 功能概述 安全运营中心(SOC)面临告警疲劳的困扰。典型的 SIEM 每个班次会产生数百条告警,其中大部分都是噪音。分析师每次都会问一个问题:**这些告警中哪些真正需要我立即关注?** 本工具系统地回答了这个问题。它从文件导出、Splunk 或 Elasticsearch 拉取告警;针对每个公网源 IP 使用 VirusTotal 和 Shodan 进行丰富;使用可配置的加权模型进行评分;将相关告警按时间窗口归组为事件;并标记多阶段杀伤链。输出结果是一份已评分、已排序、可筛选的 HTML 分析师报告。 ## 截图 ### 终端输出 — 试运行 ![Terminal dry-run output showing pipeline stages and triage summary](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/42da5a23c2160334.png) ### HTML 报告 — 关联事件与优先级概览 ![HTML report showing incident panel with kill chain detection and priority summary cards](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/7f9683eb2c160335.png) ### HTML 报告 — 实时丰富、ATT&CK 标签与评分明细 ![HTML report showing score distribution histogram and filtered INVESTIGATE_SOON alerts](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/21033f1d28160336.png) ### HTML 报告 — “尽快调查”筛选器与评分分布 ![HTML report showing alert table with MITRE tactic column, VT enrichment data, and expanded score breakdown](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/2a4ff1e0dd160338.png) ## 处理流程 ``` flowchart LR A(["CSV / JSON\nSplunk / Elastic"]) --> B[Ingestion] B --> C[Enrichment\nVT + Shodan] C --> D[MITRE Tagging] D --> E[Scoring\n6-factor model] E --> F[Correlation\nKill chain detection] F --> G[(SQLite Store)] F --> H([HTML Report]) ``` | 阶段 | 处理内容 | |---|---| | **摄取** | CSV、JSON、Splunk REST API 或 Elasticsearch 查询 → 标准化为 `Alert` 对象。处理 UTF-8 BOM。JSON null 安全。 | | **丰富** | 针对每个公网源 IP 进行 VirusTotal IP 信誉 + Shodan 端口/CVE 查询。基于文件的缓存(1 小时 TTL)。跳过私有/保留 IP。 | | **MITRE 标记** | 每条告警通过规则 ID 前缀 → 类别回退查找映射到 ATT&CK 战术与技术。 | | **评分** | 6 因子加权模型。缺失的丰富数据在可用因子间重新归一化 —— 而非置零。 | | **关联** | 告警在 15 分钟窗口内按源 IP 分组。当战术链跨越 ≥ 2 个 ATT&CK 阶段时触发杀伤链。 | | **存储** | 所有结果持久化到 SQLite,包含评分明细、丰富字段和运行元数据。 | | **报告** | 自包含 HTML —— 无 CDN,可离线工作。包含评分直方图、可筛选表格、MITRE 徽章、可展开明细。 | ## 快速开始 **要求:** Python 3.11+ ``` # Clone 和设置 git clone https://github.com/alisterrodrigues/soc-alert-triage-engine.git cd soc-alert-triage-engine python -m venv .venv && source .venv/bin/activate pip install -r requirements.txt # Dry-run 对示例数据 — 无需 API keys python -m cli.main \ --input sample_data/alerts_sample.csv \ --config config/config.yaml \ --dry-run --report # Live enrichment 运行 export VT_API_KEY=your_virustotal_key export SHODAN_API_KEY=your_shodan_key python -m cli.main \ --input sample_data/alerts_sample.csv \ --config config/config.yaml \ --report ``` ### Splunk 源 ``` export SPLUNK_USERNAME=admin export SPLUNK_PASSWORD=your_password python -m cli.main --source splunk --config config/config.yaml --report ``` ### Elasticsearch 源 ``` export ELASTIC_API_KEY=your_api_key python -m cli.main --source elastic --config config/config.yaml --report ``` ## 评分模型 每条告警获得一个 `[0.0, 1.0]` 区间的分数,由六个因子的加权和计算得出: | 因子 | 权重 | 描述 | |---|---|---| | 严重性 | 0.25 | `critical`=1.0, `high`=0.75, `medium`=0.45, `low`=0.20 | | VT 恶意率 | 0.20 | 将源 IP 标记为恶意的 VirusTotal 引擎占比 | | Shodan 暴露度 | 0.15 | 加权端口得分 + CVE 数量(RDP/SMB 得分高于 HTTP) | | 资产关键性 | 0.15 | `dc`/`server`=1.0, `cloud`=0.5, `workstation`/other=0.2 | | 时效性 | 0.15 | 指数衰减,6 小时半衰期,下限 0.10 | | 历史出现 | 0.10 | `1 - 0.7ⁿ`,其中 n 为源 IP 在过去 7 天内的出现次数 | **缺失的丰富数据会被重新归一化,而非置零。** 当 VT 和 Shodan 不可用时,剩余权重按比例放大,从而使结构信号(严重性、资产关键性、时效性)保持其完整权重。结果会被标记为 `confidence: low` —— 而非被抑制。完整公式请参阅 [docs/scoring_model.md](docs/scoring_model.md)。 分数映射到优先级标签: | 分数 | 标签 | |---|---| | ≥ 0.80 | `INVESTIGATE_NOW` | | ≥ 0.55 | `INVESTIGATE_SOON` | | ≥ 0.30 | `MONITOR` | | < 0.30 | `LOW_PRIORITY` | ## 事件关联 在可配置的时间窗口(默认:15 分钟)内共享源 IP 的告警会被归组为一个 `CorrelatedIncident`: ``` combined_score = peak * 0.6 + mean * 0.3 + min(alert_count / 10, 1.0) * 0.1 ``` 当事件的战术链跨越两个或更多不同的 ATT&CK 阶段时,将触发杀伤链检测 —— 例如 `RECONNAISSANCE → LATERAL_MOVEMENT → CREDENTIAL_ACCESS → EXFILTRATION`。私有/RFC1918 IP 完全包含在关联中;内部横向移动正是值得检测的模式。 ## MITRE ATT&CK 标记 每条告警都标记有一个战术和技术。当规则 ID 前缀与类别同时匹配时,规则 ID 前缀优先。映射关系位于 `correlation/mitre_mappings.yaml` 中,无需更改代码即可扩展。 | 规则 ID 前缀 / 类别 | 战术 | 技术 | |---|---|---| | `RDP-*` | LATERAL_MOVEMENT | T1021.001 — Remote Desktop Protocol | | `PERSIST-*` | PERSISTENCE | T1547 — Boot or Logon Autostart Execution | | `CRED-*` | CREDENTIAL_ACCESS | T1003 — OS Credential Dumping | | `EXFIL-*` | EXFILTRATION | T1041 — Exfiltration Over C2 Channel | | category: `c2` | COMMAND_AND_CONTROL | T1071 — Application Layer Protocol | | category: `phishing` | INITIAL_ACCESS | T1566 — Phishing | ## 告警输入格式 对于基于文件的摄取,CSV 或 JSON 必须包含以下字段: | 字段 | 必填 | 备注 | |---|---|---| | `alert_id` | 是 | 唯一告警标识符 | | `timestamp` | 是 | ISO 8601,例如 `2026-04-06T05:02:11Z` | | `source_ip` | 是 | IPv4 地址 | | `alert_name` | 是 | 人类可读的描述 | | `severity` | 否 | `critical`、`high`、`medium`、`low`(默认为 `low`) | | `category` | 否 | `malware`、`phishing`、`lateral_movement`、`c2`、`data_exfil`、`recon`、`other` | | `destination_ip` | 否 | | | `destination_port` | 否 | 整数 | | `rule_id` | 否 | 驱动 MITRE 前缀映射 | | `asset_tags` | 否 | 逗号分隔:`dc`、`server`、`endpoint`、`cloud`、`workstation` | | `raw_payload` | 否 | 原始 SIEM 事件的 JSON 字符串 | | `analyst_notes` | 否 | 预先存在的注释 | `sample_data/` 包含 40 条跨越四种真实攻击场景的告警 —— APT 入侵链、内部威胁、勒索软件准备和 C2 信标 —— 旨在触发杀伤链检测并展示评分分布。 ## 配置 所有参数位于 `config/config.yaml`。库代码中没有任何硬编码。 ``` scoring: weights: # Must sum to 1.0 severity: 0.25 vt_malicious_ratio: 0.20 shodan_exposure: 0.15 asset_criticality: 0.15 recency: 0.15 prior_sightings: 0.10 baseline_lookback_days: 7 # Prior sightings history window recency: half_life_hours: 6.0 # Recency decay rate floor: 0.10 # Minimum recency score correlation: window_minutes: 15 # Time window for incident grouping enrichment: virustotal: rate_limit_per_min: 4 # Free tier: 4 req/min cache: ttl_seconds: 3600 # Cache TTL per IP ``` ## 导出 ``` # 导出到 JSON python -m cli.main --input alerts.csv --config config/config.yaml --export json # 导出到 CSV 和 JSON python -m cli.main --input alerts.csv --config config/config.yaml --export both # Custom output 目录 python -m cli.main --input alerts.csv --config config/config.yaml --output-dir /tmp/triage-out --report ``` ## 开发 ``` pip install -r requirements-dev.txt pytest tests/ -v pytest tests/ -v --cov=. --cov-report=term-missing ``` 五个模块共 111 个测试。任何测试中均不进行网络调用。 ## 项目结构 ``` soc-alert-triage-engine/ ├── cli/ │ └── main.py # Entry point and pipeline orchestration ├── ingestion/ │ ├── __init__.py # Alert dataclass, _validate_and_build, _opt_str │ ├── csv_ingestor.py │ └── json_ingestor.py ├── sources/ │ ├── base.py # AlertSource ABC │ ├── file_source.py │ ├── splunk_source.py │ └── elastic_source.py ├── enrichment/ │ ├── virustotal.py │ ├── shodan_lookup.py │ └── cache.py ├── correlation/ │ ├── engine.py # Time-windowed incident grouping │ ├── tagger.py # MITRE ATT&CK tagging │ └── mitre_mappings.yaml # Tactic/technique lookup table ├── scoring/ │ ├── scorer.py # 6-factor weighted model │ └── constants.py # Shared priority label thresholds ├── store/ │ └── alert_db.py # SQLite persistence ├── reporting/ │ └── html_report.py # Self-contained HTML report generator ├── tests/ │ ├── test_ingestion.py │ ├── test_enrichment.py │ ├── test_scoring.py │ ├── test_correlation.py │ └── test_report.py ├── sample_data/ │ ├── alerts_sample.csv # 40 alerts across 4 attack scenarios │ └── alerts_sample.json ├── docs/ │ ├── architecture.md │ ├── scoring_model.md │ └── screenshots/ ├── config/ │ └── config.yaml # All tunables ├── .env.example ├── .github/workflows/ci.yml ├── pyproject.toml ├── requirements.txt └── requirements-dev.txt ``` 模块级文档和设计决策请参阅 [docs/architecture.md](docs/architecture.md)。 ## 设计决策 **缺乏丰富数据 ≠ 缺乏威胁。** 缺失的 VT/Shodan 数据会对模型进行重新归一化,而不是静默压低分数。标记为 `confidence: low` 的 `INVESTIGATE_NOW` 意味着结构信号令人担忧,但外部情报层无法确认 —— 这比被抑制的分数更诚实、更具可操作性。 **私有 IP 不排除在关联之外。** RFC1918 地址会绕过外部丰富 API(无有用数据,浪费配额)。但它们完全包含在事件分组中 —— 内部横向移动正是值得检测的模式。 **无硬编码值。** 权重、阈值、严重性映射、速率限制、缓存 TTL、时间窗口 —— 所有内容都位于 `config/config.yaml` 中。针对特定环境进行调优无需更改任何代码。 **自包含报告。** HTML 报告内联了所有 CSS 和 JavaScript。无 CDN、无外部字体、无运行时依赖。它在在线和离线状态下渲染效果一致,可作为单个文件通过电子邮件发送,且不会将告警数据泄露到外部服务器。 **源提供者抽象。** 添加新的告警源 —— 如 AWS Security Hub、Microsoft Sentinel 或自定义 webhook —— 仅需一个实现 `fetch()` 的类和一个字段映射函数。丰富、评分和报告逻辑无需改动。 ## 已知限制 - 丰富以源 IP 为中心。对于网络钓鱼和数据渗出告警,目标实体通常比源实体更相关。 - VT 速率限制器是进程本地的,并非线程安全。并发运行不会协调 API 调用时机。 - 历史出现不会在单次运行内去重。高频爆发可能会夸大未来的出现计数。 - Splunk 和 Elastic 适配器未实现用于持续轮询的基于检查点的去重。 - 评分权重反映的是运营判断,而非从历史真实情况中学习到的参数。
标签:Ask搜索, Cloudflare, CSV 处理, Elasticsearch, HTML 报告, IP 富化, Kill Chain, MITRE ATT&CK, Python CLI, VirusTotal, 事件关联, 优先级评分, 告警分类, 告警降噪, 威胁情报, 安全运营, 开发者工具, 扫描框架, 文档结构分析, 杀伤链, 网络安全, 自动化报告, 逆向工具, 隐私保护, 风险评分