kmathanprasath/cloudguard-mlops

GitHub: kmathanprasath/cloudguard-mlops

一套基于 Kubeflow 和 KServe 的云威胁检测 MLOps 系统,融合多源日志进行无监督异常检测,并自动映射 MITRE ATT&CK 战术和关联 CVE。

Stars: 0 | Forks: 0

# CloudGuard ![Python](https://img.shields.io/badge/Python-3.12-blue) ![许可证](https://img.shields.io/badge/License-MIT-green) ![模型](https://img.shields.io/badge/Model-Isolation%20Forest-purple) ![流水线](https://img.shields.io/badge/Orchestration-Kubeflow%20Pipelines%20v2-orange) ![服务](https://img.shields.io/badge/Serving-KServe%20v0.13-blueviolet) ## 架构概述 ``` ┌─────────────────────────────────────────────────────────────────────┐ │ DATA SOURCES │ │ AWS CloudTrail ──┐ │ │ BETH (K8s) ───┼──► Feature Engineering ──► Fused Log Dataset │ │ Linux Auth ──┘ │ └──────────────────────────────┬──────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────┐ │ KUBEFLOW PIPELINE (CI/CD for ML) │ │ │ │ data_prep ──► train ──► evaluate ──► [quality gate: AUROC≥0.80] │ │ │ │ │ ┌─────────┴──────────┐ │ │ ▼ ▼ │ │ push_model (blocked) │ │ │ │ │ ▼ │ │ deploy_kserve │ └──────────────────────────────┬──────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────┐ │ INFERENCE SERVICE (KServe) │ │ │ │ POST /predict ──► Isolation Forest ──► MITRE ATT&CK Mapping │ │ └──► NVD CVE Enrichment │ │ └──► Claude SOC Analyst │ └─────────────────────────────────────────────────────────────────────┘ ``` ## MLOps Pipeline 深入解析 该流水线在 `pipeline/cloudguard_pipeline.py` 中使用 **Kubeflow Pipelines v2 SDK** 定义,并编译为 YAML 制品。每个阶段都在一个具有固定依赖项的隔离容器中运行。 ### 阶段 1 -> `data_prep` - 从 S3 读取融合日志 CSV(`fused_csv_uri`) - 构造时间特征:`hour`、`day_of_week`、`is_weekend`、`is_offhours` - 提取行为信号:`is_rare_ip`(IP 出现次数 < 3)、`action_count_1h`(滚动用户操作计数) - 对分类字段进行标签编码:`user`、`action`、`source_type` - 拟合 `MinMaxScaler`,并将其与训练集/测试集划分一起序列化为流水线制品 - 输出:`X_train`、`X_test`、`y_train`、`y_test`、`scaler`(全部作为 KFP `Dataset`/`Model` 制品) ### 阶段 2 -> `train` - 在缩放后的训练集上训练 `IsolationForest` - 超参数为流水线级别的输入(默认:`n_estimators=200`、`contamination=0.005`) - 使用 `joblib` 将拟合后的模型序列化为 KFP `Model` 制品 ### 阶段 3 -> `evaluate` - 使用 `score_samples` 对测试集进行评分(取反 → 值越高越异常) - 在第 97.5 百分位阈值下计算 **AUROC** 和 **F1** - 将这两个指标记录到 KFP 的 `Metrics` 制品中(在 Kubeflow UI 中可见) - 将 `auroc` 和 `f1` 作为命名元组输出返回,供质量门禁消费 ### 阶段 4 -> 质量门禁 (`dsl.Condition`) ``` with dsl.Condition(eval_op.outputs["auroc"] >= min_auroc, name="quality-gate"): push_op = push_model(...) deploy_kserve(...).after(push_op) ``` 如果 AUROC 低于 `min_auroc`(默认 `0.80`),`push_model` 和 `deploy_kserve` 阶段将被**完全跳过**,当前的线上模型继续提供服务。这是 ML 的 CD(持续部署)门禁。 ### 阶段 5 -> `push_model` - 将 `iforest.pkl` 和 `scaler.pkl` 上传到 S3 的版本化前缀下(`s3:///cloudguard/models/v1/`) - 使用带有集群内 IAM 或注入的 AWS 凭证的 `boto3` ### 阶段 6 -> `deploy_kserve` - 通过 Kubernetes Python 客户端(`load_incluster_config`)创建或修补 `KServe InferenceService` - 配置带有自动扩缩容(`minReplicas=1`,`maxReplicas=3`)的无服务器部署模式 - 在 `/models` 挂载模型 PVC,以便 FastAPI 容器在启动时加载制品 - 优雅地处理 `409 Conflict`,修补现有资源而不是失败 ## 推理服务 FastAPI 应用(`app/main.py`)是运行时服务层。 ### 接口端点 | 方法 | 路径 | 描述 | |--------|------|-------------| | `GET` | `/health` | 存活/就绪探针,返回模型加载状态 | | `GET` | `/` | 服务元数据 | | `POST` | `/predict` | 单事件异常评分 + TTP 映射 | | `POST` | `/predict/batch` | 多事件批量评分 | ### 预测流程 ``` LogEvent (JSON) ──► MinMaxScaler.transform ──► IsolationForest.score_samples │ normalise to [0, 1] anomaly score │ threshold check (default 0.5) │ ┌───────────────────┴───────────────────┐ ▼ ▼ MITRE ATT&CK rules is_anomaly flag (4 rule-based detectors) ``` ### MITRE ATT&CK 检测器 | TTP | 名称 | 战术 | 触发条件 | |-----|------|--------|-------------------| | T1110 | 暴力破解 | 凭证访问 | `is_failed=1` 且 `action_count_1h > 20` | | T1485 | 数据破坏 | 影响 | `"delete"` 在 action 中且 `is_offhours=1` | | T1530 | 云存储数据获取 | 收集 | `"list"` 在 action 中且 `is_rare_ip=1` | | T1078 | 有效账户 | 防御规避 | `is_offhours=1` 且 `is_rare_ip=1` 且 `is_failed=0` | ### 输入 Schema ``` { "hour": 3, "day_of_week": 1, "is_weekend": 0, "is_offhours": 1, "is_failed": 0, "is_rare_ip": 1, "user_enc": 5, "action_enc": 12, "source_type_enc": 0, "action_count_1h": 3, "action": "DeleteBucket" } ``` ### 响应 Schema ``` { "is_anomaly": true, "anomaly_score": 0.8312, "threshold": 0.5, "ttp_detections": [ {"ttp": "T1485", "name": "Data Destruction", "tactic": "Impact"} ], "message": "THREAT DETECTED" } ``` ## 容器化 `Dockerfile` 构建了一个最小化的推理镜像: - 基础镜像:`python:3.12-slim`,仅包含 `gcc` 作为系统依赖 - 复制 `requirements.txt`(仅包含推理依赖,无训练库) - 模型制品**不会打包到镜像中**,它们在运行时通过 PVC 挂载到 `/models` - 环境变量控制模型路径和阈值,使镜像可以在不同模型版本间复用 ``` ENV MODEL_PATH=/models/iforest.pkl ENV SCALER_PATH=/models/scaler.pkl ENV ANOMALY_THRESHOLD=0.5 ``` 镜像与模型制品的这种分离是 MLOps 的核心模式,你可以通过交换 PVC 内容来回滚模型,而无需重新构建容器。 ## Kubernetes 配置清单 ### `k8s/model-pvc.yaml` 在 `kubeflow-mlops` 命名空间中配置一个 1Gi 的 `PersistentVolumeClaim`。Kubeflow 流水线将模型制品写入此处(通过 S3 → PVC 同步或直接挂载),推理容器在启动时从中读取。 ### `k8s/inference-service.yaml` 定义 `KServe InferenceService`,包含: - **无服务器部署模式**:空闲时缩放至零 - **自动扩缩容**:基于请求负载在 1–3 个副本之间伸缩 - **资源请求/限制**:`500m CPU / 512Mi` → `2 CPU / 2Gi` - **就绪探针**位于 `GET /health`:KServe 在模型加载完成前不会路由流量 - 将 `cloudguard-model-pvc` 挂载到 `/models` ## 项目结构 ``` CloudGuard.ipynb # Experimentation notebook (training, EDA, threshold tuning) requirements.txt # Full notebook + training dependencies Dockerfile # Inference service container image app/ main.py # FastAPI inference service (predict, batch, health) pipeline/ cloudguard_pipeline.py # Kubeflow Pipeline v2: data_prep→train→evaluate→push→deploy k8s/ inference-service.yaml # KServe InferenceService manifest model-pvc.yaml # PersistentVolumeClaim for model artifacts scripts/ build_push.sh # Build & push Docker image to registry run_pipeline.py # Submit compiled pipeline to Kubeflow ``` ## 数据集 | 来源 | 数据集 | 描述 | |--------|---------|-------------| | AWS CloudTrail | [flaws.cloud logs](http://summitroute.com/downloads/flaws_cloudtrail_logs.tar) | 真实世界配置错误的 AWS 环境日志 | | K8s / Syscalls | [BETH Dataset](https://www.kaggle.com/datasets/katehighnam/beth-dataset) | 带有 `evil` 列的带标签 Linux 内核系统调用日志 | | Linux Auth | [LogHub Linux_2k.log](https://github.com/logpai/loghub/tree/master/Linux) | 带有暴力破解模式的真实 SSH 认证日志 | ## 模型性能 | 模型 | AUROC | 备注 | |-------|-------|-------| | Isolation Forest | **0.8935** | 生产模型 `n_estimators=200`,`contamination=0.005` | | LSTM Autoencoder | ~0.49 | 仅作实验用,未在流水线中使用 | 阈值调优在 notebook 中通过对精确率 / 召回率 / F1 进行网格搜索来完成,以便在模型被提升之前找到最佳的操作点。 ## Notebook 部分 | 单元格 | 作用 | |-------|-------------| | 0 | 安装依赖项 | | 1–5 | 下载数据集 (CloudTrail, BETH, Linux auth) | | 6–8 | 将每个来源解析为统一 schema | | 9–11 | 融合数据源,特征构造,训练/测试集划分 | | 12–20 | 标签修复与数据验证 | | 21–23 | 训练 Isolation Forest,评估,阈值调优 | | 24–28 | LSTM Autoencoder(实验性) | | 29 | 最终确定模型 | | 30 | NVD CVE 富化函数 | | 31 | MITRE ATT&CK TTP 映射规则 | | 32–34 | Claude SOC 分析师设置与端到端测试 | ## 部署手册 ### 前置条件 - Docker 及容器镜像仓库的访问权限 - 一个运行着 Kubernetes 的集群,并安装了 Kubeflow Pipelines 和 KServe - 具有 S3 读/写权限的 AWS 凭证 - 为目标集群配置好的 `kubectl` ### 1. 构建并推送推理镜像 ``` REGISTRY=your-registry TAG=v1 bash scripts/build_push.sh ``` ### 2. 配置 Kubernetes 资源 ``` kubectl apply -f k8s/model-pvc.yaml kubectl apply -f k8s/inference-service.yaml ``` ### 3. 运行训练流水线 ``` python scripts/run_pipeline.py \ --host http://:8080 \ --fused-csv s3://your-bucket/data/fused_logs.csv \ --s3-bucket your-bucket \ --kserve-image your-registry/cloudguard:v1 ``` 该流水线运行 `data_prep → train → evaluate → push_model → deploy_kserve`。 如果 AUROC < 0.80,部署将被自动阻止。 在此处跟踪运行状态:`http://:8080/#/runs/details/` ### 4. 本地编译流水线(可选) ``` python pipeline/cloudguard_pipeline.py # 输出: cloudguard_pipeline.yaml ``` ### 5. 调用推理端点 ``` curl -X POST http:///v1/models/cloudguard:predict \ -H "Content-Type: application/json" \ -d '{ "hour": 3, "day_of_week": 1, "is_weekend": 0, "is_offhours": 1, "is_failed": 0, "is_rare_ip": 1, "user_enc": 5, "action_enc": 12, "source_type_enc": 0, "action_count_1h": 3, "action": "DeleteBucket" }' ``` ## 输出示例 ``` TTP detected: T1485 - Data Destruction CVEs found: 3 === Claude SOC Analysis === Explanation : DeleteBucket was called at 3:14 AM from an IP not previously seen in this account, targeting the production backup bucket. This is a strong indicator of compromised credentials. Attack Tech : T1485 - Data Destruction (MITRE ATT&CK) Action : Immediately revoke the admin IAM credentials, enable S3 versioning and MFA delete on all buckets, and review CloudTrail for prior reconnaissance from the flagged IP. ``` ## Colab 快速启动(实验) 该 notebook 在搭载 T4 GPU 的 Google Colab 上运行。 `运行时 > 更改运行时类型 > T4 GPU` ### 添加 Colab Secrets | Secret 名称 | 获取位置 | |-------------|----------------| | `KAGGLE_USERNAME` | kaggle.com/settings | | `KAGGLE_KEY` | kaggle.com/settings > API > Create New Token | | `ANTHROPIC_API_KEY` | console.anthropic.com > API Keys | 将 `CloudGuard.ipynb` 上传至 [colab.research.google.com](https://colab.research.google.com) 并自上而下运行单元格。 ## 技术栈 | 层级 | 技术 | |-------|-----------| | 语言 | Python 3.12 | | ML | scikit-learn (Isolation Forest),TensorFlow/Keras (LSTM,实验性) | | 服务 | FastAPI + Uvicorn | | 容器化 | Docker | | 流水线编排 | Kubeflow Pipelines v2 | | 模型服务 | KServe v0.13 (无服务器模式) | | 对象存储 | AWS S3(模型制品) | | AI 分析师 | Anthropic Claude API | | 威胁情报 | mitreattack-python,NIST NVD REST API | | 数据 | pandas,numpy | ## 安全 - 本仓库中的任何地方都没有硬编码凭证 - Colab secrets 在运行时通过 `google.colab.userdata` 加载 - 生产环境部署应使用 Kubernetes Secrets 或 secrets 管理器(例如 AWS Secrets Manager、Vault)并作为环境变量注入 - 切勿将 API 密钥或 AWS 凭证提交到源代码管理中 ## 许可证 MIT 许可证,详情请参见 [LICENSE](LICENSE)。 版权所有 (c) 2026 Mathanprasath K
标签:AI安全, Apex, ATT&CK映射, AWS CloudTrail, CCTV/网络接口发现, Chat Copilot, Chrome Headless, Claude, Cloudflare, CVE检测, CVE漏洞富化, DevSecOps, DLL 劫持, Isolation Forest, KServe, Kubeflow Pipelines, Kubernetes安全, Linux认证日志, MITRE ATT&CK, MLOps, Python, SOC分析师, 上游代理, 云威胁检测, 人工智能, 多源日志摄取, 大语言模型, 威胁情报, 子域名变形, 子域名突变, 孤立森林, 安全信息与事件管理, 开发者工具, 异常检测, 异常行为检测, 搜索引擎爬取, 无后门, 无监督学习, 智能安全运营中心, 机器学习, 模型推理服务, 模型训练, 模型部署, 特征工程, 用户模式Hook绕过, 网络安全审计, 自动化防御, 请求拦截, 逆向工具, 速率限制处理