cypher-me/threat-intelligence-ETL-pipeline

GitHub: cypher-me/threat-intelligence-ETL-pipeline

威胁情报ETL管道,将异构安全遥测数据规范化、异常检测和威胁充实后输出至SIEM平台。

Stars: 0 | Forks: 0

# 威胁情报 ETL Pipeline Threat Intelligence ETL 是一个 Python 3.14 安全分析服务,用于摄取原始遥测数据,将其规范化为通用事件模型,对可疑行为进行评分,使用威胁上下文丰富高风险事件,并将发现结果发布给下游 SIEM 平台。该代码库包含一个用于手动执行和检查的 FastAPI 控制平面,以及一个用于事件驱动 S3 处理的 AWS Lambda 处理程序。 ## 项目概述 本项目专为需要紧凑、可审计的威胁情报充实管道且不依赖大型 SIEM 特定代码库的团队而设计。它接受异构安全日志,应用确定性清洗和特征工程,使用 scikit-learn 运行异常检测,将可疑活动归类为威胁模式,利用 GeoIP 和外部威胁源丰富结果,并以适用于 SOC 摄取的格式导出最终发现。 代码库分为清晰的 ETL 阶段: 1. `src/ingestion` 加载 JSON, CSV, syslog, text, Parquet, 和 gzip 压缩文件。 2. `src/transform` 清洗记录,规范化 schema,并导出检测特征。 3. `src/detection` 应用异常检测和威胁分类逻辑。 4. `src/enrichment` 添加 GeoIP 详细信息和外部信誉数据,使用 Redis 进行 feed 缓存。 5. `src/output` 格式化并将发现导出到 HTTP 目标或 S3。 6. `src/api` 公开用于手动上传、pipeline 执行和状态查询的操作端点。 7. `src/lambda_handler.py` 提供 serverless S3 触发的运行时。 ## 架构 ``` Data Sources -> S3 / API Upload -> Ingestion -> Validation -> Cleaning -> Feature Engineering -> Anomaly Detection -> Classification -> GeoIP + Threat Feed Enrichment -> SIEM Export / S3 Output / API Queries ``` ## Python 3.14 设置 该代码库目前在本地开发、CI、Docker 和 SAM 配置中均以 Python 3.14 为目标。 ``` python3.14 -m venv .venv source .venv/bin/activate python -m pip install --upgrade pip python -m pip install -r requirements.txt cp .env.example .env uvicorn src.api.app:app --reload ``` 如果您机器上的 `pip` 解析为损坏的 shim 或其他解释器,请从激活的 Python 3.14 虚拟环境中使用 `python -m pip`,而不是直接调用 `pip`。 ## Requirements 文件 `requirements.txt` 文件包含项目的顶层运行时和测试依赖项,而不是特定于机器的 `pip freeze` 快照。它涵盖: 1. ETL 和分析库,例如 pandas, numpy, pyarrow, 和 scikit-learn。 2. 通过 boto3 和 botocore 进行 AWS 访问。 3. 使用 FastAPI, Pydantic, multipart 上传支持 和 Uvicorn 进行 API 服务。 4. 充实和操作工具,包括 requests, Redis, MaxMind, dotenv, 和结构化日志记录。 5. 测试依赖项,包括 pytest, coverage, Moto, 和 httpx。 ## 运行服务 ### FastAPI ``` uvicorn src.api.app:app --reload --host 0.0.0.0 --port 8000 ``` 服务运行后,可以在 `/docs` 访问交互式 API 文档,并在 `/openapi.json` 获取 OpenAPI schema。 ### Docker Compose ``` docker-compose up --build ``` 这将启动: 1. 端口 `8000` 上的 API。 2. 端口 `6379` 上的 Redis,用于威胁 feed 缓存。 3. 端口 `4566` 上的 LocalStack,用于本地 AWS 工作流测试。 ### AWS SAM ``` sam build sam deploy --guided ``` SAM 模板在 Python 3.14 上部署 Lambda 函数,并将其连接到原始日志 S3 bucket。 ## API 端点 `src/api/app.py` 中的 FastAPI 应用程序公开了以下操作端点: | Method | Path | Purpose | |---|---|---| | `POST` | `/logs/ingest` | 上传日志文件,解析它,并返回 schema 和验证结果。接受 multipart 字段 `file` 和可选的 `source`。 | | `POST` | `/logs/validate` | 验证上传的文件而不运行完整 pipeline。返回有效性、错误计数、警告计数和行统计信息。 | | `POST` | `/pipeline/run` | 上传文件并在后台任务中异步运行 ETL pipeline。返回一个短生命周期的 `job_id`。 | | `GET` | `/pipeline/status/{job_id}` | 检索先前启动的 pipeline 运行的状态。返回 `running`, `complete`, 或 `failed`。 | | `GET` | `/anomalies` | 查询当前保存在内存中的发现。支持 `source_ip`, `severity`, 和 `limit` 筛选器。 | | `GET` | `/threats/top` | 返回按 `risk_score` 排序的最高风险发现。支持 `limit` 查询参数。 | | `POST` | `/siem/export` | 将内存中的发现导出到 SIEM 目标。Body 字段为 `format` (`json`, `cef`, `leef`) 和 `destination`。 | | `GET` | `/metrics` | 返回 pipeline 计数器,例如已处理事件、检测到的异常、运行计数和活动作业。 | | `GET` | `/health` | 用于服务监控和容器健康检查的轻量级就绪检查。 | ## Pipeline 流程 1. 文件通过 S3 或 API 上传到达。 2. 摄取层确定格式并将其加载到 pandas DataFrame 中。 3. 验证检查所需的列、已知事件类型和 IP 地址结构。 4. 清洗和特征工程规范化字段并导出模型就绪的特征。 5. Isolation Forest 从工程特征集中识别异常值。 6. 威胁分类和充实添加严重性、GeoIP 上下文和 feed 匹配。 7. 发现被格式化以供导出,并可选择发送到 SIEM 端点或存储在 S3 中。 ## 配置 设置从 `config/settings.py` 中的环境变量加载。 | Variable | Default | Description | |---|---|---| | `AWS_REGION` | `us-east-1` | S3 和相关集成使用的 AWS 区域。 | | `S3_LOG_BUCKET` | `threat-intel-raw-logs` | 原始上传遥测数据的源 bucket。 | | `AWS_ACCESS_KEY_ID` | `` | 用于本地开发或非 IAM 执行的 AWS 访问密钥。 | | `AWS_SECRET_ACCESS_KEY` | `` | 用于本地开发或非 IAM 执行的 AWS 密钥。 | | `SIEM_ENDPOINT` | `` | 将发现导出到外部 SIEM 时使用的 HTTP 端点。 | | `SIEM_API_KEY` | `` | 添加到出站 SIEM 请求中的 API 密钥。 | | `REDIS_HOST` | `localhost` | 用于 feed 和 blocklist 缓存的 Redis 主机。 | | `REDIS_PORT` | `6379` | 充实作业使用的 Redis 端口。 | | `ANOMALY_CONTAMINATION` | `0.01` | Isolation Forest 污染比率。 | | `LOG_LEVEL` | `INFO` | structlog 输出的全局日志详细程度。 | | `GEO_IP_DB_PATH` | `/opt/GeoLite2-City.mmdb` | MaxMind GeoLite2 数据库的文件系统路径。 | ## 测试 ``` pytest ``` 单元和集成测试套件配置为在 CI 中的 Python 3.14 上运行。
标签:Apex, AV绕过, AWS Lambda, ETL 管道, FastAPI, GeoIP 丰富, JSON/CSV 解析, Python 3.14, S3 触发器, scikit-learn, SIEM 集成, 不可变基础设施, 信安, 威胁情报, 威胁评分, 安全运营中心, 安全遥测, 开发者工具, 异常检测, 搜索引擎查询, 数据清洗, 数据管道, 数据规范化, 无服务器, 日志处理, 机器学习, 特征工程, 网络映射, 自动化扫描, 请求拦截, 软件工程