cypher-me/threat-intelligence-ETL-pipeline
GitHub: cypher-me/threat-intelligence-ETL-pipeline
威胁情报ETL管道,将异构安全遥测数据规范化、异常检测和威胁充实后输出至SIEM平台。
Stars: 0 | Forks: 0
# 威胁情报 ETL Pipeline
Threat Intelligence ETL 是一个 Python 3.14 安全分析服务,用于摄取原始遥测数据,将其规范化为通用事件模型,对可疑行为进行评分,使用威胁上下文丰富高风险事件,并将发现结果发布给下游 SIEM 平台。该代码库包含一个用于手动执行和检查的 FastAPI 控制平面,以及一个用于事件驱动 S3 处理的 AWS Lambda 处理程序。
## 项目概述
本项目专为需要紧凑、可审计的威胁情报充实管道且不依赖大型 SIEM 特定代码库的团队而设计。它接受异构安全日志,应用确定性清洗和特征工程,使用 scikit-learn 运行异常检测,将可疑活动归类为威胁模式,利用 GeoIP 和外部威胁源丰富结果,并以适用于 SOC 摄取的格式导出最终发现。
代码库分为清晰的 ETL 阶段:
1. `src/ingestion` 加载 JSON, CSV, syslog, text, Parquet, 和 gzip 压缩文件。
2. `src/transform` 清洗记录,规范化 schema,并导出检测特征。
3. `src/detection` 应用异常检测和威胁分类逻辑。
4. `src/enrichment` 添加 GeoIP 详细信息和外部信誉数据,使用 Redis 进行 feed 缓存。
5. `src/output` 格式化并将发现导出到 HTTP 目标或 S3。
6. `src/api` 公开用于手动上传、pipeline 执行和状态查询的操作端点。
7. `src/lambda_handler.py` 提供 serverless S3 触发的运行时。
## 架构
```
Data Sources -> S3 / API Upload -> Ingestion -> Validation -> Cleaning -> Feature Engineering
-> Anomaly Detection -> Classification
-> GeoIP + Threat Feed Enrichment
-> SIEM Export / S3 Output / API Queries
```
## Python 3.14 设置
该代码库目前在本地开发、CI、Docker 和 SAM 配置中均以 Python 3.14 为目标。
```
python3.14 -m venv .venv
source .venv/bin/activate
python -m pip install --upgrade pip
python -m pip install -r requirements.txt
cp .env.example .env
uvicorn src.api.app:app --reload
```
如果您机器上的 `pip` 解析为损坏的 shim 或其他解释器,请从激活的 Python 3.14 虚拟环境中使用 `python -m pip`,而不是直接调用 `pip`。
## Requirements 文件
`requirements.txt` 文件包含项目的顶层运行时和测试依赖项,而不是特定于机器的 `pip freeze` 快照。它涵盖:
1. ETL 和分析库,例如 pandas, numpy, pyarrow, 和 scikit-learn。
2. 通过 boto3 和 botocore 进行 AWS 访问。
3. 使用 FastAPI, Pydantic, multipart 上传支持 和 Uvicorn 进行 API 服务。
4. 充实和操作工具,包括 requests, Redis, MaxMind, dotenv, 和结构化日志记录。
5. 测试依赖项,包括 pytest, coverage, Moto, 和 httpx。
## 运行服务
### FastAPI
```
uvicorn src.api.app:app --reload --host 0.0.0.0 --port 8000
```
服务运行后,可以在 `/docs` 访问交互式 API 文档,并在 `/openapi.json` 获取 OpenAPI schema。
### Docker Compose
```
docker-compose up --build
```
这将启动:
1. 端口 `8000` 上的 API。
2. 端口 `6379` 上的 Redis,用于威胁 feed 缓存。
3. 端口 `4566` 上的 LocalStack,用于本地 AWS 工作流测试。
### AWS SAM
```
sam build
sam deploy --guided
```
SAM 模板在 Python 3.14 上部署 Lambda 函数,并将其连接到原始日志 S3 bucket。
## API 端点
`src/api/app.py` 中的 FastAPI 应用程序公开了以下操作端点:
| Method | Path | Purpose |
|---|---|---|
| `POST` | `/logs/ingest` | 上传日志文件,解析它,并返回 schema 和验证结果。接受 multipart 字段 `file` 和可选的 `source`。 |
| `POST` | `/logs/validate` | 验证上传的文件而不运行完整 pipeline。返回有效性、错误计数、警告计数和行统计信息。 |
| `POST` | `/pipeline/run` | 上传文件并在后台任务中异步运行 ETL pipeline。返回一个短生命周期的 `job_id`。 |
| `GET` | `/pipeline/status/{job_id}` | 检索先前启动的 pipeline 运行的状态。返回 `running`, `complete`, 或 `failed`。 |
| `GET` | `/anomalies` | 查询当前保存在内存中的发现。支持 `source_ip`, `severity`, 和 `limit` 筛选器。 |
| `GET` | `/threats/top` | 返回按 `risk_score` 排序的最高风险发现。支持 `limit` 查询参数。 |
| `POST` | `/siem/export` | 将内存中的发现导出到 SIEM 目标。Body 字段为 `format` (`json`, `cef`, `leef`) 和 `destination`。 |
| `GET` | `/metrics` | 返回 pipeline 计数器,例如已处理事件、检测到的异常、运行计数和活动作业。 |
| `GET` | `/health` | 用于服务监控和容器健康检查的轻量级就绪检查。 |
## Pipeline 流程
1. 文件通过 S3 或 API 上传到达。
2. 摄取层确定格式并将其加载到 pandas DataFrame 中。
3. 验证检查所需的列、已知事件类型和 IP 地址结构。
4. 清洗和特征工程规范化字段并导出模型就绪的特征。
5. Isolation Forest 从工程特征集中识别异常值。
6. 威胁分类和充实添加严重性、GeoIP 上下文和 feed 匹配。
7. 发现被格式化以供导出,并可选择发送到 SIEM 端点或存储在 S3 中。
## 配置
设置从 `config/settings.py` 中的环境变量加载。
| Variable | Default | Description |
|---|---|---|
| `AWS_REGION` | `us-east-1` | S3 和相关集成使用的 AWS 区域。 |
| `S3_LOG_BUCKET` | `threat-intel-raw-logs` | 原始上传遥测数据的源 bucket。 |
| `AWS_ACCESS_KEY_ID` | `` | 用于本地开发或非 IAM 执行的 AWS 访问密钥。 |
| `AWS_SECRET_ACCESS_KEY` | `` | 用于本地开发或非 IAM 执行的 AWS 密钥。 |
| `SIEM_ENDPOINT` | `` | 将发现导出到外部 SIEM 时使用的 HTTP 端点。 |
| `SIEM_API_KEY` | `` | 添加到出站 SIEM 请求中的 API 密钥。 |
| `REDIS_HOST` | `localhost` | 用于 feed 和 blocklist 缓存的 Redis 主机。 |
| `REDIS_PORT` | `6379` | 充实作业使用的 Redis 端口。 |
| `ANOMALY_CONTAMINATION` | `0.01` | Isolation Forest 污染比率。 |
| `LOG_LEVEL` | `INFO` | structlog 输出的全局日志详细程度。 |
| `GEO_IP_DB_PATH` | `/opt/GeoLite2-City.mmdb` | MaxMind GeoLite2 数据库的文件系统路径。 |
## 测试
```
pytest
```
单元和集成测试套件配置为在 CI 中的 Python 3.14 上运行。
标签:Apex, AV绕过, AWS Lambda, ETL 管道, FastAPI, GeoIP 丰富, JSON/CSV 解析, Python 3.14, S3 触发器, scikit-learn, SIEM 集成, 不可变基础设施, 信安, 威胁情报, 威胁评分, 安全运营中心, 安全遥测, 开发者工具, 异常检测, 搜索引擎查询, 数据清洗, 数据管道, 数据规范化, 无服务器, 日志处理, 机器学习, 特征工程, 网络映射, 自动化扫描, 请求拦截, 软件工程