mintheinkhinemtk/Log-collection-to-Splunk-Using-Python-
GitHub: mintheinkhinemtk/Log-collection-to-Splunk-Using-Python-
基于 Python 的威胁情报自动化管道,从 AbuseIPDB 收集恶意 IP 并用 VirusTotal 数据进行富化后推送至 Splunk,帮助分析师实现多源情报的自动汇聚与关联分析。
Stars: 1 | Forks: 0
# 威胁情报富化管道 (AbuseIPDB → VirusTotal → Splunk)
一个基于 Python 的威胁情报自动化管道,它从 AbuseIPDB 拉取高可信度恶意 IP,使用 VirusTotal 信誉数据对其进行富化,然后通过 HTTP Event Collector (HEC) 将合并后的情报转发至 Splunk 供分析师使用。
## 为什么会有这个项目
SOC 分析师通常需要在不同的威胁情报源之间手动切换,以便对可疑 IP 进行鉴定。此管道自动化了富化步骤:给定一组来自 AbuseIPDB 的已知恶意 IP,它会检索 VirusTotal 的判定结果,并将合并后的数据发送到 Splunk,以便在其中进行搜索、制作仪表板以及与内部日志进行关联。
## 架构
AbuseIPDB API -> ABUSEIPDB_IPs.csv -> main.py -> VirusTotal API -> Splunk HEC -> index=main
之所以选择两阶段设计(收集,然后富化),是为了应对 AbuseIPDB 免费层级 5 次请求的限制——通过缓存到 CSV,可以重复运行富化过程,而不会超出 AbuseIPDB 的配额。
## 功能
- 从 AbuseIPDB 拉取可信度 ≥ 90% 的 IP
- 将 IP 列表缓存到 CSV 中,以遵守 AbuseIPDB 免费层级的限制
- 使用 VirusTotal 的扫描结果(引擎、方法、判定结果)对每个 IP 进行富化
- 在本地文件中跟踪已处理的 IP,以避免重复的 API 调用
- 仅筛选输出恶意判定结果 —— 减少 Splunk 中的噪音
- 将结构化的 JSON 事件转发给 Splunk HEC,并带有可配置的 sourcetype
- 在 VirusTotal 调用之间实现了 20 秒的延迟,以遵守 4次/分钟的免费层级限制
## 技术栈
- Python 3
- AbuseIPDB API v2
- VirusTotal API v3
- Splunk HEC(带有 token 认证的 HTTP POST)
- `requests`、`pandas`、`json`
## 文件
- `ABUSEIPDB_IP_COLLECTION.py` — 从 AbuseIPDB 获取 IP 列表,并写入 `ABUSEIPDB_IPs.csv`
- `main.py` — 读取 CSV,为每个 IP 查询 VirusTotal,并将恶意结果转发至 Splunk
- `ABUSEIPDB_IPs.csv` — 示例 IP 列表(可通过收集脚本重新生成)
- `collected_IPs.txt` — 跟踪已处理的 IP(首次运行时自动创建)
## 设置
### 前置条件
- Python 3.8+
- AbuseIPDB API 密钥 — https://www.abuseipdb.com/api
- VirusTotal API 密钥 — https://www.virustotal.com/
- 已启用 HEC 的 Splunk 实例(企业版或免费版)
### 配置 Splunk HEC
1. 设置 → 数据输入 → HTTP Event Collector → 新建 Token
2. 将 sourcetype 设置为 `_json` 或接受脚本定义的 `virustotal`
3. 记下 token 和 HEC 端口(默认为 8088)
### 配置脚本
替换这两个 Python 文件中的占位符字符串:
- `ABUSEIPDB_IP_COLLECTION.py` 中的 `'abuseipdb-key'`
- `main.py` 中的 `'your-virustotal-api-key'` 和 `'your-Splunk-HEC-token'`
### 运行
```
python ABUSEIPDB_IP_COLLECTION.py
python main.py
```
## Splunk 中的示例事件
经过富化后,索引到 Splunk 中的每个事件如下所示:
```
{
"IP": "x.x.x.x",
"score": 12,
"Data": [
{ "Engine": "Fortinet", "Method": "blacklist", "Result": "malware" },
{ "Engine": "Sophos", "Method": "blacklist", "Result": "malicious" }
]
}
```
## 我的收获
- API 速率限制驱动了设计决策。当每个 API 都有不同的限制时,带有磁盘缓存的两阶段管道比链式的实时调用要干净得多。
- Splunk HEC 是一种用于摄取自定义数据的强大模式 —— 发送带有 `sourcetype=virustotal` 的 JSON 可以使字段立即可用于搜索。
- 在简单的文本文件中跟踪已处理的项目,是一种使脚本具备幂等性和可重复运行的低成本方法。
- 在索引之前过滤判定结果(仅限恶意结果)可以降低 Splunk 的成本,并使数据更具可操作性。
## 限制和已知问题
- API 密钥是硬编码的 —— 应该将其移至环境变量中
- 网络错误没有重试/退避机制
- 单线程;对于更大的 IP 列表,使用异步会带来好处
- Splunk HEC POST 在本地开发时使用了 `verify=False`;在生产环境中应验证证书
## 未来的改进
- 将密钥移至环境变量或配置文件中
- 添加结构化日志记录以代替 print 语句
- 将文本文文件去重缓存替换为 SQLite,以提高可查询性
- 使用 Docker 进行容器化,以实现便携式部署
- 基于富化数据添加 Sigma 规则映射
## 许可证
MIT
标签:AbuseIPDB, API集成, Ask搜索, CVE漏洞管理, HEC, HTTP Event Collector, HTTP/HTTPS抓包, IP信誉查询, IP 地址批量处理, Python, SOAR, TI, VirusTotal, 代码示例, 可观测性, 威胁情报, 威胁情报自动化, 安全编排与自动化响应, 安全脚本, 安全运营中心, 密码管理, 开发者工具, 态势感知, 恶意IP检测, 数据分析, 数据富化, 无后门, 日志聚合, 网络信息收集, 网络映射, 网络调试, 自动化