ibfavas/threat-intel-pipeline

GitHub: ibfavas/threat-intel-pipeline

一套自动化 CTI 流水线,从开源威胁订阅源获取、评分、富化 IOC,并导出到 Wazuh 以实现实时检测与告警。

Stars: 0 | Forks: 0

# 威胁情报聚合与 IOC 富化流水线 个人项目 | 蓝队 / CTI | 2026 自动化威胁情报流水线,可从多个开源 CTI 订阅源获取 IOC,根据置信度进行评分,通过外部 API 进行富化,并将可执行的情报作为实时检测列表推送到 Wazuh SIEM 中。 详细技术文档:[WRITEUP.md](WRITEUP.md) ## 🔦 核心亮点 - 从 OTX AlienVault、abuse.ch URLhaus 和 Feodo Tracker 获取 IOC - 置信度评分引擎综合考量时效性、来源信誉和跨源交叉验证 - 通过 AbuseIPDB 和 VirusTotal API 进行自动化富化,并具备速率限制处理能力 - 将高置信度 IOC 直接导出到 Wazuh CDB 列表中,以进行实时日志关联和告警 - 通过 systemd timer 实现完全自动化 — 在 Arch Linux 上每 6 小时运行一次 - 提供用于手动控制的 CLI 界面:ingest (获取)、score (评分)、enrich (富化)、export (导出)、report (报告) ## 🛠️ 使用技能 `Python` `SQLite` `Wazuh` `CTI Feed 分析` `IOC 富化` `AbuseIPDB API` `VirusTotal API` `OTX AlienVault` `abuse.ch` `置信度评分` `SIEM 集成` `systemd` `Arch Linux` `威胁情报` `检测工程` `蓝队` ## 📸 截图 ### 流水线架构 ![流水线架构](https://static.pigsec.cn/wp-content/uploads/repos/2026/05/5a995dba52122234.png) 可编辑源文件:[`assets/diagrams/architecture.drawio`](assets/diagrams/architecture.drawio) 此图展示了完整的 IOC 生命周期。特定于订阅源的摄取器从 URLhaus、Feodo Tracker 和 OTX 拉取原始 CTI,将每条记录标准化为共享 schema,然后将其存储在 SQLite 中以进行去重和评分。将摄取、评分、富化和导出保持为独立的阶段,使流水线更易于调试和扩展。在蓝队工作流中,这种分离在操作上也很重要:嘈杂的订阅源获取不应直接转换为 SIEM 检测逻辑,直到经过置信度评分和富化对其进行过滤。 ### 1️⃣ 订阅源摄取 ![订阅源摄取输出](https://static.pigsec.cn/wp-content/uploads/repos/2026/05/1436e809bf122235.png) 此截图证明流水线可以摄取所有三个配置的 CTI 源。URLhaus 提供恶意 URL 及派生的域名/IP,Feodo 提供精选的 C2 IP,而 OTX 从 pulses 中提供更广泛的社区来源指标。重要的工程细节在于,每个订阅源都有不同的原始字段和格式,但摄取器在存储之前将所有内容标准化为相同的 IOC 模型。这使得后续阶段与具体订阅源无关。 ### 2️⃣ SQLite 存储与去重 **A. 完整摄取后的数据库统计** ![SQLite 统计和订阅源健康状况](https://static.pigsec.cn/wp-content/uploads/repos/2026/05/3bd8b4ae19122237.png) 统计输出验证了摄取过程在 SQLite 中生成了可查询的数据。它显示了总 IOC 数量、按 IOC 类型的细分、按来源的细分以及最新的订阅源运行结果。这对运维非常有用,因为它提供了快速的订阅源健康可见性:如果某个来源开始失败,流水线会记录该失败,而不是默默地生成过时的检测。 **B. 去重实践** ![SQLite 去重查询](https://static.pigsec.cn/wp-content/uploads/repos/2026/05/a54a8d0d80122238.png) 此查询展示了去重和来源/标签合并。数据库使用 `UNIQUE(value, type)`,因此重复的指标不会创建重复的行。相反,重复或相互印证的观察结果会更新 `last_seen`、合并标签、合并来源并更新 `hit_count`。这很重要,因为在多个订阅源中看到的单个 IOC 是比在一个嘈杂的社区订阅源中看到一次的 IOC 更强的信号。 ### 3️⃣ 置信度评分引擎 **分数分布** ![评分报告](https://static.pigsec.cn/wp-content/uploads/repos/2026/05/35b20522c1122239.png) 评分截图展示了原始 IOC 如何转化为优先级情报。分数结合了时效性、来源信誉和跨源交叉验证。最近的 URLhaus 或 Feodo 指标自然会比过时的指标得分更高,而多源目击会增加置信度。这可以防止流水线将每个订阅源条目视为具有同等的可操作性,当输出成为实时 SIEM 检测内容时,这一点至关重要。 ### 4️⃣ API 富化 **富化输出和数据库验证** ![API 富化和数据库查询](https://static.pigsec.cn/wp-content/uploads/repos/2026/05/62b909d0b0122240.png) 富化步骤为高置信度 IOC 增加了外部上下文。对于 IP,AbuseIPDB 提供滥用置信度、国家、ISP 和使用类型,而 VirusTotal 提供恶意/可疑引擎计数。SQLite 查询证明富化数据以 JSON 的形式持久化在 `enrichment` 列中。将其存储为 JSON 保持了 schema 的简洁,同时保留了完整的 API 细节以用于报告或未来的检测逻辑。 ### 5️⃣ Wazuh CDB 导出与实时检测 **A. CDB 列表文件内容** ![Wazuh CDB 导出](https://static.pigsec.cn/wp-content/uploads/repos/2026/05/1d790a49b5122242.png) 导出器将高置信度的 IP 和域名转换为 Wazuh CDB 列表格式:`indicator:label`。此截图显示了用于基于 Docker 的 Wazuh 部署的本地导出路径。其目的是从被动情报存储转变为主动检测内容。一旦被 Wazuh 加载,这些列表就可以在实时日志分析期间进行查询。 **B. Wazuh 自定义规则** ![Wazuh 自定义规则](https://static.pigsec.cn/wp-content/uploads/repos/2026/05/692317b006122243.png) 自定义 Wazuh 规则将解码后的日志字段与导出的 CDB 列表连接起来。对于 SSH 事件,在内置的 SSH 失败规则解码源 IP 之后,该规则会针对 `etc/lists/threat-intel-ips` 检查 `srcip`。规则 ID、严重级别、MITRE 映射和描述使得告警在 SIEM 中具有可操作性。这就是 CTI 转化为检测工程的关键节点。 **C. Docker 部署至 Wazuh Manager** ![Docker Wazuh 部署](https://static.pigsec.cn/wp-content/uploads/repos/2026/05/6337bb7592122244.png) 因为此 Wazuh 环境运行在 Docker 中,所以必须将 CDB 列表复制到 manager 容器中,而不是写入宿主机的 `/var/ossec`。此截图展示了操作部署路径:将列表和规则复制到 manager 中,修复所有权和权限,在 Wazuh 配置中注册列表,然后重启 manager。这验证了流水线在真实的容器化 Wazuh 环境中的可用性。 **D. Wazuh Logtest 在匹配 IOC 时触发告警** ![Wazuh logtest 告警](https://static.pigsec.cn/wp-content/uploads/repos/2026/05/025e67b1d0122245.png) `wazuh-logtest` 无需等待真实终端生成日志即可验证规则行为。测试日志包含来自导出 IP CDB 列表的一个 IOC,Wazuh 将其解码为 `srcip`,针对威胁情报列表进行检查,并以级别 `12` 触发规则 `100500`。这证明了检测逻辑的端到端工作:订阅源 IOC -> SQLite -> 评分 -> 导出 -> Wazuh CDB -> 告警。 **E. Wazuh 仪表盘告警** ![Wazuh 仪表盘告警](https://static.pigsec.cn/wp-content/uploads/repos/2026/05/a77fd1ae88122246.png) 仪表盘截图显示了在分析师界面中呈现的相同检测结果。它包括匹配的 IOC、规则 ID、严重性、规则描述、manager 名称以及解码后的事件字段。这是最终的操作证明:流水线不仅生成文件,它还产生了安全分析师可以在 Wazuh 中进行分类的可见安全事件。 ### 6️⃣ 自动化 — systemd 定时器 **A. 定时器状态** ![systemd 定时器状态](https://static.pigsec.cn/wp-content/uploads/repos/2026/05/c330eafca0122248.png) 定时器状态证明流水线无需手动执行即可持续运行。用户级 systemd 定时器每六小时调度一次服务,这对于分析师工作站或实验室传感器上的轻量级 CTI 刷新非常合适。`Persistent=true` 也意味着错过的运行可以在系统重新上线时进行补偿执行。 **B. 自动化运行的 Journal 日志** ![systemd 服务日志](https://static.pigsec.cn/wp-content/uploads/repos/2026/05/8f51ea3074122249.png) Journal 输出显示了完整的自动化服务执行过程:摄取、评分、样本富化和导出。这很重要,因为自动化是将项目从一次性脚本转变为可维护的守护进程式流水线的关键。日志为排查订阅源失败、API 问题和导出问题提供了操作证据。 ## 🗂️ 仓库结构 ``` threat-intel-pipeline/ ├── README.md ├── WRITEUP.md ├── requirements.txt ├── config.py # API keys, thresholds, paths ├── cli.py # Entry point ├── pipeline/ │ ├── ingestors/ │ │ ├── base.py # Abstract base ingestor │ │ ├── urlhaus.py │ │ ├── feodo.py │ │ └── otx.py │ ├── storage.py # SQLite interface + deduplication │ ├── scorer.py # Confidence scoring engine │ ├── enricher.py # AbuseIPDB + VirusTotal enrichment │ └── exporter.py # Wazuh CDB writer ├── wazuh/ │ └── threat_intel_rules.xml # Custom Wazuh detection rules ├── assets/ │ ├── diagrams/ # Architecture diagram sources │ └── screenshots/ # README screenshots ├── systemd/ │ ├── threat-intel.service │ └── threat-intel.timer └── data/ # gitignored — DB lives here ``` ## ⚙️ 设置 ``` # 1. Clone 和设置环境 git clone https://github.com/ibfavas/threat-intel-pipeline cd threat-intel-pipeline python -m venv .venv && source .venv/bin/activate pip install -r requirements.txt # 2. 设置 API keys export OTX_API_KEY="your_key" export ABUSEIPDB_API_KEY="your_key" export VIRUSTOTAL_API_KEY="your_key" # 3. 运行 pipeline python cli.py ingest python cli.py score python cli.py enrich python cli.py export --wazuh python cli.py report --top 20 ``` Fish shell 用户应使用以下命令激活虚拟环境: ``` source .venv/bin/activate.fish ``` **免费 API 密钥:** - OTX AlienVault → otx.alienvault.com - AbuseIPDB → abuseipdb.com/api - VirusTotal → virustotal.com/gui/my-apikey ### OTX 摄取量 OTX 可能比 URLhaus 和 Feodo 慢,因为它需要先检索 pulses,然后再将其中包含的每个指标进行标准化。默认的 OTX 设置为: ``` TIP_OTX_LIMIT=100 TIP_OTX_MAX_PAGE=5 ``` 用于截图的快速运行: ``` python cli.py ingest --otx-limit 10 --otx-max-page 3 ``` 更大的 OTX 拉取量: ``` python cli.py ingest --otx-limit 250 --otx-max-page 10 ``` 测试时跳过 OTX: ``` python cli.py ingest --feeds urlhaus feodo ``` ### Wazuh 导出路径 默认情况下,`python cli.py export --wazuh` 会写入真实的 Wazuh 路径: ``` /var/ossec/etc/lists/threat-intel-ips /var/ossec/etc/lists/threat-intel-domains ``` 这些路径需要 Wazuh/root 权限。用于截图或本地测试时,请导出到项目本地文件: ``` python cli.py export --wazuh \ --ip-list exports/threat-intel-ips \ --domain-list exports/threat-intel-domains ``` ### 基于 Docker 的 Wazuh 如果 Wazuh 运行在 Docker 中,`/var/ossec/etc/lists` 位于 Wazuh manager 容器内部,而不是宿主机上。请先在本地导出: ``` python cli.py export --wazuh \ --ip-list exports/threat-intel-ips \ --domain-list exports/threat-intel-domains ``` 查找 manager 容器: ``` docker ps --format '{{.Names}}\t{{.Image}}\t{{.Status}}' ``` 对于单节点 Wazuh Docker 部署,manager 容器的名称通常类似于 `single-node-wazuh.manager-1`。将生成的 CDB 列表和自定义规则复制到容器中: ``` docker cp exports/threat-intel-ips single-node-wazuh.manager-1:/var/ossec/etc/lists/threat-intel-ips docker cp exports/threat-intel-domains single-node-wazuh.manager-1:/var/ossec/etc/lists/threat-intel-domains docker cp wazuh/threat_intel_rules.xml single-node-wazuh.manager-1:/var/ossec/etc/rules/threat_intel_rules.xml ``` 在 Wazuh manager 配置中注册自定义 CDB 列表。这是必须的,以便 Wazuh 在分析期间加载这些列表: ``` docker exec single-node-wazuh.manager-1 sh -lc "cp /var/ossec/etc/ossec.conf /var/ossec/etc/ossec.conf.bak-threat-intel && awk '1; /etc\\/lists\\/malicious-ioc\\/malware-hashes<\\/list>/ { print \" etc/lists/threat-intel-ips\"; print \" etc/lists/threat-intel-domains\" }' /var/ossec/etc/ossec.conf > /tmp/ossec.conf && mv /tmp/ossec.conf /var/ossec/etc/ossec.conf" ``` 修复权限并重启 Wazuh manager: ``` docker exec single-node-wazuh.manager-1 chown wazuh:wazuh /var/ossec/etc/lists/threat-intel-ips /var/ossec/etc/lists/threat-intel-domains docker exec single-node-wazuh.manager-1 chmod 660 /var/ossec/etc/lists/threat-intel-ips /var/ossec/etc/lists/threat-intel-domains docker exec single-node-wazuh.manager-1 /var/ossec/bin/wazuh-control restart ``` 截图检查: ``` docker exec single-node-wazuh.manager-1 head -15 /var/ossec/etc/lists/threat-intel-ips docker exec single-node-wazuh.manager-1 head -15 /var/ossec/etc/lists/threat-intel-domains docker exec single-node-wazuh.manager-1 cat /var/ossec/etc/rules/threat_intel_rules.xml ``` 直接 logtest 验证: ``` IOC=$(cut -d: -f1 exports/threat-intel-ips | head -1) printf "May 1 09:30:00 arch sshd[12345]: Failed password for invalid user admin from %s port 55222 ssh2\n" "$IOC" | docker exec -i single-node-wazuh.manager-1 /var/ossec/bin/wazuh-logtest ``` 预期结果:规则 `100500`,级别 `12`,描述为 `Source IP matched threat intelligence CDB list`。 ## 🧠 评分机制 | 因素 | 最高分 | 逻辑 | |---|---|---| | 时效性 | 40 | 30天内线性衰减 | | 来源信誉 | 35 | Feodo 35 > URLhaus 30 > OTX 20 | | 跨源交叉验证 | 25 | 每增加一个来源 +5,上限为 25 | | **总计** | **100** | 得分 ≥ 60 的 IOC 将进入富化和导出阶段 | ## 🔗 数据流 ``` OTX / URLhaus / Feodo ↓ [Ingestors] — normalize to unified schema ↓ [SQLite DB] — deduplicate, merge tags, increment hit_count ↓ [Scoring Engine] — recency + source weight + corroboration ↓ [Enricher] — AbuseIPDB + VirusTotal (score ≥ 60 only) ↓ [Wazuh Exporter] — CDB lists + ossec-control reload ↓ [Wazuh Rules] — real-time log correlation → alerts ``` ## 📋 CLI 参考 | 命令 | 描述 | |---|---| | `python cli.py ingest` | 拉取所有订阅源,标准化,存储 | | `python cli.py ingest --feeds urlhaus feodo` | 仅拉取 URLhaus 和 Feodo | | `python cli.py ingest --otx-limit 100 --otx-max-page 5` | 拉取所有订阅源,带有显式的 OTX 卷控制 | | `python cli.py score` | 对所有 IOC 运行评分引擎 | | `python cli.py score --verbose --top 5` | 显示评分组件细分 | | `python cli.py enrich` | 通过 API 富化高分数 IOC | | `python cli.py enrich --limit 5 --verbose` | 富化少量样本并打印富化摘要 | | `python cli.py enrich --type ip --limit 5 --verbose` | 为 AbuseIPDB/VirusTotal 截图证据富化 IP | | `python cli.py export --wazuh` | 推送到 Wazuh CDB 列表 | | `python cli.py export --wazuh --ip-list exports/threat-intel-ips --domain-list exports/threat-intel-domains` | 将 Wazuh CDB 文件在本地导出以进行截图 | | `python cli.py report --top N` | 按得分打印前 N 个 IOC | | `python cli.py stats` | 数据库记录计数和订阅源健康状况 | 详细的富化输出会打印简洁、适合截图的摘要: ``` [+] Enriched 1.2.3.4: abuse_score=85 vt_malicious=12 country=US ``` ## 🔑 核心要点 并非所有的 IOC 都是等同的 — 一个没有佐证且带有 30 天前时间戳的原始订阅源条目,其信号强度远远弱于本周在三个订阅源中出现的同一个 IP。评分引擎使这种区分变得明确,而不是将 IOC 视为二元的(非黑即白)。 富化应当是有选择性的 — 在低置信度的 IOC 上浪费 API 速率限制会消耗配额并增加噪音。通过置信度阈值来控制富化,可以使流水线在免费层级的 API 密钥上保持高效。 流水线的真正价值在于 Wazuh 集成 — 存放在数据库中的 IOC 只是情报。而作为 CDB 查找列表加载到实时 SIEM 中的 IOC 则成为了检测手段。
标签:abuse.ch, AbuseIPDB, AMSI绕过, API集成, Arch Linux, Ask搜索, IOC失陷标示, IP 地址批量处理, Python, SQLite, systemd, VirusTotal, Wazuh, 个人项目, 可观测性, 威胁情报, 威胁检测, 威胁评分, 安全数据管道, 安全运营, 实时处理, 开发者工具, 情报溯源, 情报聚合, 扫描框架, 数字取证, 无后门, 日志关联分析, 网络信息收集, 网络安全, 自动化脚本, 请求拦截, 逆向工具, 隐私保护