mintheinkhinemtk/Log-collection-to-Splunk-Using-Python-

GitHub: mintheinkhinemtk/Log-collection-to-Splunk-Using-Python-

基于 Python 的威胁情报自动化管道,从 AbuseIPDB 收集恶意 IP 并用 VirusTotal 数据进行富化后推送至 Splunk,帮助分析师实现多源情报的自动汇聚与关联分析。

Stars: 1 | Forks: 0

# 威胁情报富化管道 (AbuseIPDB → VirusTotal → Splunk) 一个基于 Python 的威胁情报自动化管道,它从 AbuseIPDB 拉取高可信度恶意 IP,使用 VirusTotal 信誉数据对其进行富化,然后通过 HTTP Event Collector (HEC) 将合并后的情报转发至 Splunk 供分析师使用。 ## 为什么会有这个项目 SOC 分析师通常需要在不同的威胁情报源之间手动切换,以便对可疑 IP 进行鉴定。此管道自动化了富化步骤:给定一组来自 AbuseIPDB 的已知恶意 IP,它会检索 VirusTotal 的判定结果,并将合并后的数据发送到 Splunk,以便在其中进行搜索、制作仪表板以及与内部日志进行关联。 ## 架构 AbuseIPDB API -> ABUSEIPDB_IPs.csv -> main.py -> VirusTotal API -> Splunk HEC -> index=main 之所以选择两阶段设计(收集,然后富化),是为了应对 AbuseIPDB 免费层级 5 次请求的限制——通过缓存到 CSV,可以重复运行富化过程,而不会超出 AbuseIPDB 的配额。 ## 功能 - 从 AbuseIPDB 拉取可信度 ≥ 90% 的 IP - 将 IP 列表缓存到 CSV 中,以遵守 AbuseIPDB 免费层级的限制 - 使用 VirusTotal 的扫描结果(引擎、方法、判定结果)对每个 IP 进行富化 - 在本地文件中跟踪已处理的 IP,以避免重复的 API 调用 - 仅筛选输出恶意判定结果 —— 减少 Splunk 中的噪音 - 将结构化的 JSON 事件转发给 Splunk HEC,并带有可配置的 sourcetype - 在 VirusTotal 调用之间实现了 20 秒的延迟,以遵守 4次/分钟的免费层级限制 ## 技术栈 - Python 3 - AbuseIPDB API v2 - VirusTotal API v3 - Splunk HEC(带有 token 认证的 HTTP POST) - `requests`、`pandas`、`json` ## 文件 - `ABUSEIPDB_IP_COLLECTION.py` — 从 AbuseIPDB 获取 IP 列表,并写入 `ABUSEIPDB_IPs.csv` - `main.py` — 读取 CSV,为每个 IP 查询 VirusTotal,并将恶意结果转发至 Splunk - `ABUSEIPDB_IPs.csv` — 示例 IP 列表(可通过收集脚本重新生成) - `collected_IPs.txt` — 跟踪已处理的 IP(首次运行时自动创建) ## 设置 ### 前置条件 - Python 3.8+ - AbuseIPDB API 密钥 — https://www.abuseipdb.com/api - VirusTotal API 密钥 — https://www.virustotal.com/ - 已启用 HEC 的 Splunk 实例(企业版或免费版) ### 配置 Splunk HEC 1. 设置 → 数据输入 → HTTP Event Collector → 新建 Token 2. 将 sourcetype 设置为 `_json` 或接受脚本定义的 `virustotal` 3. 记下 token 和 HEC 端口(默认为 8088) ### 配置脚本 替换这两个 Python 文件中的占位符字符串: - `ABUSEIPDB_IP_COLLECTION.py` 中的 `'abuseipdb-key'` - `main.py` 中的 `'your-virustotal-api-key'` 和 `'your-Splunk-HEC-token'` ### 运行 ``` python ABUSEIPDB_IP_COLLECTION.py python main.py ``` ## Splunk 中的示例事件 经过富化后,索引到 Splunk 中的每个事件如下所示: ``` { "IP": "x.x.x.x", "score": 12, "Data": [ { "Engine": "Fortinet", "Method": "blacklist", "Result": "malware" }, { "Engine": "Sophos", "Method": "blacklist", "Result": "malicious" } ] } ``` ## 我的收获 - API 速率限制驱动了设计决策。当每个 API 都有不同的限制时,带有磁盘缓存的两阶段管道比链式的实时调用要干净得多。 - Splunk HEC 是一种用于摄取自定义数据的强大模式 —— 发送带有 `sourcetype=virustotal` 的 JSON 可以使字段立即可用于搜索。 - 在简单的文本文件中跟踪已处理的项目,是一种使脚本具备幂等性和可重复运行的低成本方法。 - 在索引之前过滤判定结果(仅限恶意结果)可以降低 Splunk 的成本,并使数据更具可操作性。 ## 限制和已知问题 - API 密钥是硬编码的 —— 应该将其移至环境变量中 - 网络错误没有重试/退避机制 - 单线程;对于更大的 IP 列表,使用异步会带来好处 - Splunk HEC POST 在本地开发时使用了 `verify=False`;在生产环境中应验证证书 ## 未来的改进 - 将密钥移至环境变量或配置文件中 - 添加结构化日志记录以代替 print 语句 - 将文本文文件去重缓存替换为 SQLite,以提高可查询性 - 使用 Docker 进行容器化,以实现便携式部署 - 基于富化数据添加 Sigma 规则映射 ## 许可证 MIT
标签:AbuseIPDB, API集成, Ask搜索, CVE漏洞管理, HEC, HTTP Event Collector, HTTP/HTTPS抓包, IP信誉查询, IP 地址批量处理, Python, SOAR, TI, VirusTotal, 代码示例, 可观测性, 威胁情报, 威胁情报自动化, 安全编排与自动化响应, 安全脚本, 安全运营中心, 密码管理, 开发者工具, 态势感知, 恶意IP检测, 数据分析, 数据富化, 无后门, 日志聚合, 网络信息收集, 网络映射, 网络调试, 自动化