L4Ng0dd3ssHQ/threat-data-pipeline
GitHub: L4Ng0dd3ssHQ/threat-data-pipeline
这是一个基于Python构建的生产级威胁情报分析管道,能够自动接入并清洗多源威胁数据,进行异常检测并生成可视化的分析报告。
Stars: 0 | Forks: 0
# 威胁数据管道
这是一个用 Python 构建的生产级网络安全分析管道,用于处理现实世界的威胁情报数据。
该项目从 URLHaus 和 CISA Known Exploited Vulnerabilities (KEV) 接入实时数据,处理公共数据源中常见的畸形 CSV 边缘情况,验证并清洗多源数据集,检测趋势和异常,并导出可供分析师使用的 Excel 和 PDF 报告。
## 本项目的亮点
- 使用真实的威胁情报源,而非玩具数据
- 处理混乱的生产级 CSV 问题:latin-1 编码、文件头部注释、畸形行、损坏的引号以及 Excel 导出边缘情况
- 将接入、验证、清洗、分析、可视化和报告分离为模块化层级
- 产生的输出具有实际操作价值,而不仅仅是学术上的正确性
- 包含针对模式推断、缺失值处理和异常值检测的测试
## 展示摘要
该管道的设计类似于轻量级安全分析系统,而非一次性脚本。
- URLHaus 源接入支持通过 `.env` 进行 API 密钥认证
- CISA KEV 接入直接从维护的 GitHub CSV 源拉取
- 多源记录合并时保留了数据源谱系
- 在清洗前后暴露数据质量问题
- 趋势和异常输出被转化为通俗易懂的分析师摘要
- 结果导出为 Excel 和 PDF 交付物,适合审查或演示
## 演示命令
```
python -m threat_data_pipeline.cli --urlhaus --kev --output-dir output
```
示例结果结构:
```
{
"rows": 15965,
"columns": 17,
"executive_summary": "The combined threat-intelligence dataset contains 4,842 records. URLHaus contributed 3,290 cleaned URL records. CISA KEV contributed 1,552 exploited-vulnerability records. The most frequently affected KEV vendor is Microsoft with 362 entries. An activity spike was detected on 2022-03-03 in dateAdded with 95 observations. Observed daily activity in dateAdded has decreased by 43.4% in the most recent period versus the earlier baseline."
}
```
生成的输出:
- `output/threat_intelligence_report.xlsx`
- `output/threat_intelligence_summary.pdf`
- `output/charts/*.png`
- `output/pipeline.log`
## 架构
```
flowchart LR
A[URLHaus API Feed] --> B[Ingestion Layer]
C[CISA KEV CSV] --> B
D[Local CSV Uploads] --> B
B --> E[Schema and Quality Validation]
E --> F[Cleaning and Normalization]
F --> G[Analysis Engine]
G --> H[Visualization Layer]
G --> I[Insight Generation]
E --> J[Data Quality Report]
H --> K[Excel Export]
I --> K
J --> K
H --> L[PDF Summary]
I --> L
```
核心模块:
- `ingestion.py`:本地/API/URL CSV 接入,包含大小检查、编码回退、注释跳过和畸形行恢复
- `validation.py`:模式推断、缺失值检查、重复项、不一致性和异常值检测
- `cleaning.py`:空值处理、分类标准化、日期标准化、URL 标准化和转换日志记录
- `analysis.py`:汇总统计、相关性、细分、趋势检测、异常检测和执行摘要生成
- `visualization.py`:针对严重性、时间线、异常高亮和边缘数据集的图表生成
- `reporting.py`:Excel 和 PDF 导出逻辑,包含 Excel 安全性清理
- `pipeline.py`:编排层,将业务逻辑与任何未来的 UI 或调度器分离开来
## 已解决的工程挑战
这是我在展示、代码讲解或面试中会重点强调的部分:
- URLHaus 编码问题:接入路径现在首先尝试 UTF-8,并在需要时回退到 latin-1
- 带注释前缀的 CSV 源:URLHaus 以 `#` 开头的注释行会被安全忽略
- 损坏的公共源行:解析器从 pandas 的 C 引擎回退到 Python 引擎,然后针对畸形引号回退到字面引号模式
- 源可追溯性:合并的记录保留源元数据,以便下游分析保持可解释性
- Excel 导出失败:非法的工作表字符和时区感知的日期时间会在生成工作簿之前被清理
- 混合源叙述:执行摘要现在区分 URLHaus URL 活动和 KEV 漏洞活动
## 功能特性
- 在同一次运行中接入本地 CSV、URLHaus 和 CISA KEV 的多源数据
- 基于 `.env` 的密钥加载,使用 `python-dotenv`
- 文件类型和文件大小验证
- 编码检测和回退处理
- 畸形行恢复,不会导致管道崩溃
- 针对数值、分类、日期和 URL 列的模式推断
- 重复项检测、缺失值报告和数值异常值检测
- 针对数值和分类数据的可配置缺失值策略
- URL、分类和日期标准化,并记录转换日志
- 汇总统计、相关性分析、细分和时间线异常检测
- 通俗易懂的执行摘要,包含建议的分析师后续步骤
- 导出为 Excel 和 PDF,包含嵌入的图表和支持表
- 按日期范围、威胁类别和状态进行过滤
## 设置
### PowerShell
```
python -m venv .venv
.venv\Scripts\Activate.ps1
pip install -e .[dev]
Copy-Item .env.example .env
```
### Git Bash
```
python -m venv .venv
source .venv/Scripts/activate
pip install -e '.[dev]'
cp .env.example .env
```
将环境变量添加到 `.env`:
```
URLHAUS_API_KEY=your_urlhaus_auth_key
CISA_KEV_URL=https://raw.githubusercontent.com/cisagov/kev-data/develop/known_exploited_vulnerabilities.csv
```
## 仪表板
在本地运行交互式 Streamlit 仪表板:
```
streamlit run dashboard.py
```
仪表板读取与 CLI 相同的设置,并支持 `.env` 值和 Streamlit secrets。
## Streamlit 部署
要发布仪表板并获取可共享的 URL,请将 `dashboard.py` 部署到 Streamlit Community Cloud。
1. 将最新的仓库更改推送到 GitHub。
2. 前往 [Streamlit Community Cloud](https://share.streamlit.io/)。
3. 从 `L4Ng0dd3ssHQ/threat-data-pipeline` 创建一个新应用。
4. 将主文件路径设置为 `dashboard.py`。
5. 在应用设置中,添加来自 `.streamlit/secrets.toml.example` 的密钥。
6. 部署应用并复制生成的公共 URL。
在 Streamlit Cloud 中添加的密钥:
```
URLHAUS_API_KEY = "your-urlhaus-api-key"
THREATFOX_API_KEY = "your-threatfox-api-key"
ALIENVAULT_API_KEY = "your-alienvault-api-key"
```
可选设置:
```
URLHAUS_FEED_URL = "https://urlhaus-api.abuse.ch/files/exports/recent.csv"
CISA_KEV_URL = "https://raw.githubusercontent.com/cisagov/kev-data/main/data/known_exploited_vulnerabilities.csv"
FEODO_TRACKER_URL = "https://feodotracker.abuse.ch/downloads/ipblocklist.csv"
MAX_FILE_SIZE_MB = "512"
CSV_CHUNK_SIZE = "100000"
OUTPUT_DIR = "output"
```
## 使用方法
运行两个实时源:
```
python -m threat_data_pipeline.cli --urlhaus --kev --output-dir output
```
运行实时源并加上本地 CSV 上传:
```
python -m threat_data_pipeline.cli --urlhaus --kev --input-csv ./samples/internal_feed.csv --output-dir output
```
运行过滤分析:
```
python -m threat_data_pipeline.cli --urlhaus --kev --start-date 2026-01-01 --end-date 2026-03-01 --threat-category phishing --status-filter active --output-dir output
```
## 可靠性与边缘情况
- 空数据集返回安全的执行摘要,而不是崩溃
- 单列数据集仍然生成回退分布图
- 超大或非 CSV 输入会因明确的错误消息而失败
- 缺少 URLHaus 凭证会因特定的 `.env` 错误而快速失败
- 稀疏数值数据集跳过无效的相关性和异常值计算
- 仅在所需列存在时才创建可选图表
- 质量报告和导出阶段能容忍畸形的源行
## 测试
```
pytest
```
当前的自动化覆盖范围包括:
- 模式推断
- 缺失值处理
- 异常值检测
- latin-1 回退处理
- 畸形引号行恢复
## 扩展到企业级工作流
该代码库的结构经过精心设计,可以超越本地 CLI 进行扩展:
- 存储:用 AWS S3、Azure Blob Storage 或 Google Cloud Storage 替换本地磁盘
- 计算:将 pandas 操作迁移到 Spark、Dask 或 Polars,以处理超内存工作负载
- 调度:使用 Airflow、Prefect、GitHub Actions 或云原生调度器运行重复作业
- 数仓:将清洗后的输出加载到 BigQuery、Snowflake 或安全数据湖
- 运维:将日志转发到 Datadog、Splunk、CloudWatch 或 Azure Monitor
- 消费者:通过 REST API、内部分析师 UI、Notebook 工作流或 SOAR 集成公开管道
## 如何在展示中呈现此项目
如果你正在向招聘人员、招聘经理或评委演示此项目,最有力的表述方式是:
1. 我使用实时网络安全数据源构建了一个模块化的威胁情报分析管道。
2. 我处理了公共源在生产系统中经常导致的真实接入失败问题。
3. 我将原始 CSV 转化为面向分析师的整洁输出,包含趋势检测、异常暴露和可导出的报告。
简短的作品集推介:
标签:CISA KEV, CSV 处理, ETL, Excel 导出, HTTP/HTTPS抓包, JavaCC, Kubernetes, PDF 导出, Python, URLHaus, 代码示例, 威胁情报, 安全运营, 开发者工具, 异常检测, 态势感知, 扫描框架, 报表生成, 数据分析, 数据清洗, 数据验证, 无后门, 源码分析, 网络安全, 自动化流水线, 逆向工具, 隐私保护