kmathanprasath/cloudguard-mlops
GitHub: kmathanprasath/cloudguard-mlops
一套基于 Kubeflow 和 KServe 的云威胁检测 MLOps 系统,融合多源日志进行无监督异常检测,并自动映射 MITRE ATT&CK 战术和关联 CVE。
Stars: 0 | Forks: 0
# CloudGuard





## 架构概述
```
┌─────────────────────────────────────────────────────────────────────┐
│ DATA SOURCES │
│ AWS CloudTrail ──┐ │
│ BETH (K8s) ───┼──► Feature Engineering ──► Fused Log Dataset │
│ Linux Auth ──┘ │
└──────────────────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ KUBEFLOW PIPELINE (CI/CD for ML) │
│ │
│ data_prep ──► train ──► evaluate ──► [quality gate: AUROC≥0.80] │
│ │ │
│ ┌─────────┴──────────┐ │
│ ▼ ▼ │
│ push_model (blocked) │
│ │ │
│ ▼ │
│ deploy_kserve │
└──────────────────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ INFERENCE SERVICE (KServe) │
│ │
│ POST /predict ──► Isolation Forest ──► MITRE ATT&CK Mapping │
│ └──► NVD CVE Enrichment │
│ └──► Claude SOC Analyst │
└─────────────────────────────────────────────────────────────────────┘
```
## MLOps Pipeline 深入解析
该流水线在 `pipeline/cloudguard_pipeline.py` 中使用 **Kubeflow Pipelines v2 SDK** 定义,并编译为 YAML 制品。每个阶段都在一个具有固定依赖项的隔离容器中运行。
### 阶段 1 -> `data_prep`
- 从 S3 读取融合日志 CSV(`fused_csv_uri`)
- 构造时间特征:`hour`、`day_of_week`、`is_weekend`、`is_offhours`
- 提取行为信号:`is_rare_ip`(IP 出现次数 < 3)、`action_count_1h`(滚动用户操作计数)
- 对分类字段进行标签编码:`user`、`action`、`source_type`
- 拟合 `MinMaxScaler`,并将其与训练集/测试集划分一起序列化为流水线制品
- 输出:`X_train`、`X_test`、`y_train`、`y_test`、`scaler`(全部作为 KFP `Dataset`/`Model` 制品)
### 阶段 2 -> `train`
- 在缩放后的训练集上训练 `IsolationForest`
- 超参数为流水线级别的输入(默认:`n_estimators=200`、`contamination=0.005`)
- 使用 `joblib` 将拟合后的模型序列化为 KFP `Model` 制品
### 阶段 3 -> `evaluate`
- 使用 `score_samples` 对测试集进行评分(取反 → 值越高越异常)
- 在第 97.5 百分位阈值下计算 **AUROC** 和 **F1**
- 将这两个指标记录到 KFP 的 `Metrics` 制品中(在 Kubeflow UI 中可见)
- 将 `auroc` 和 `f1` 作为命名元组输出返回,供质量门禁消费
### 阶段 4 -> 质量门禁 (`dsl.Condition`)
```
with dsl.Condition(eval_op.outputs["auroc"] >= min_auroc, name="quality-gate"):
push_op = push_model(...)
deploy_kserve(...).after(push_op)
```
如果 AUROC 低于 `min_auroc`(默认 `0.80`),`push_model` 和 `deploy_kserve` 阶段将被**完全跳过**,当前的线上模型继续提供服务。这是 ML 的 CD(持续部署)门禁。
### 阶段 5 -> `push_model`
- 将 `iforest.pkl` 和 `scaler.pkl` 上传到 S3 的版本化前缀下(`s3:///cloudguard/models/v1/`)
- 使用带有集群内 IAM 或注入的 AWS 凭证的 `boto3`
### 阶段 6 -> `deploy_kserve`
- 通过 Kubernetes Python 客户端(`load_incluster_config`)创建或修补 `KServe InferenceService`
- 配置带有自动扩缩容(`minReplicas=1`,`maxReplicas=3`)的无服务器部署模式
- 在 `/models` 挂载模型 PVC,以便 FastAPI 容器在启动时加载制品
- 优雅地处理 `409 Conflict`,修补现有资源而不是失败
## 推理服务
FastAPI 应用(`app/main.py`)是运行时服务层。
### 接口端点
| 方法 | 路径 | 描述 |
|--------|------|-------------|
| `GET` | `/health` | 存活/就绪探针,返回模型加载状态 |
| `GET` | `/` | 服务元数据 |
| `POST` | `/predict` | 单事件异常评分 + TTP 映射 |
| `POST` | `/predict/batch` | 多事件批量评分 |
### 预测流程
```
LogEvent (JSON) ──► MinMaxScaler.transform ──► IsolationForest.score_samples
│
normalise to [0, 1] anomaly score
│
threshold check (default 0.5)
│
┌───────────────────┴───────────────────┐
▼ ▼
MITRE ATT&CK rules is_anomaly flag
(4 rule-based detectors)
```
### MITRE ATT&CK 检测器
| TTP | 名称 | 战术 | 触发条件 |
|-----|------|--------|-------------------|
| T1110 | 暴力破解 | 凭证访问 | `is_failed=1` 且 `action_count_1h > 20` |
| T1485 | 数据破坏 | 影响 | `"delete"` 在 action 中且 `is_offhours=1` |
| T1530 | 云存储数据获取 | 收集 | `"list"` 在 action 中且 `is_rare_ip=1` |
| T1078 | 有效账户 | 防御规避 | `is_offhours=1` 且 `is_rare_ip=1` 且 `is_failed=0` |
### 输入 Schema
```
{
"hour": 3,
"day_of_week": 1,
"is_weekend": 0,
"is_offhours": 1,
"is_failed": 0,
"is_rare_ip": 1,
"user_enc": 5,
"action_enc": 12,
"source_type_enc": 0,
"action_count_1h": 3,
"action": "DeleteBucket"
}
```
### 响应 Schema
```
{
"is_anomaly": true,
"anomaly_score": 0.8312,
"threshold": 0.5,
"ttp_detections": [
{"ttp": "T1485", "name": "Data Destruction", "tactic": "Impact"}
],
"message": "THREAT DETECTED"
}
```
## 容器化
`Dockerfile` 构建了一个最小化的推理镜像:
- 基础镜像:`python:3.12-slim`,仅包含 `gcc` 作为系统依赖
- 复制 `requirements.txt`(仅包含推理依赖,无训练库)
- 模型制品**不会打包到镜像中**,它们在运行时通过 PVC 挂载到 `/models`
- 环境变量控制模型路径和阈值,使镜像可以在不同模型版本间复用
```
ENV MODEL_PATH=/models/iforest.pkl
ENV SCALER_PATH=/models/scaler.pkl
ENV ANOMALY_THRESHOLD=0.5
```
镜像与模型制品的这种分离是 MLOps 的核心模式,你可以通过交换 PVC 内容来回滚模型,而无需重新构建容器。
## Kubernetes 配置清单
### `k8s/model-pvc.yaml`
在 `kubeflow-mlops` 命名空间中配置一个 1Gi 的 `PersistentVolumeClaim`。Kubeflow 流水线将模型制品写入此处(通过 S3 → PVC 同步或直接挂载),推理容器在启动时从中读取。
### `k8s/inference-service.yaml`
定义 `KServe InferenceService`,包含:
- **无服务器部署模式**:空闲时缩放至零
- **自动扩缩容**:基于请求负载在 1–3 个副本之间伸缩
- **资源请求/限制**:`500m CPU / 512Mi` → `2 CPU / 2Gi`
- **就绪探针**位于 `GET /health`:KServe 在模型加载完成前不会路由流量
- 将 `cloudguard-model-pvc` 挂载到 `/models`
## 项目结构
```
CloudGuard.ipynb # Experimentation notebook (training, EDA, threshold tuning)
requirements.txt # Full notebook + training dependencies
Dockerfile # Inference service container image
app/
main.py # FastAPI inference service (predict, batch, health)
pipeline/
cloudguard_pipeline.py # Kubeflow Pipeline v2: data_prep→train→evaluate→push→deploy
k8s/
inference-service.yaml # KServe InferenceService manifest
model-pvc.yaml # PersistentVolumeClaim for model artifacts
scripts/
build_push.sh # Build & push Docker image to registry
run_pipeline.py # Submit compiled pipeline to Kubeflow
```
## 数据集
| 来源 | 数据集 | 描述 |
|--------|---------|-------------|
| AWS CloudTrail | [flaws.cloud logs](http://summitroute.com/downloads/flaws_cloudtrail_logs.tar) | 真实世界配置错误的 AWS 环境日志 |
| K8s / Syscalls | [BETH Dataset](https://www.kaggle.com/datasets/katehighnam/beth-dataset) | 带有 `evil` 列的带标签 Linux 内核系统调用日志 |
| Linux Auth | [LogHub Linux_2k.log](https://github.com/logpai/loghub/tree/master/Linux) | 带有暴力破解模式的真实 SSH 认证日志 |
## 模型性能
| 模型 | AUROC | 备注 |
|-------|-------|-------|
| Isolation Forest | **0.8935** | 生产模型 `n_estimators=200`,`contamination=0.005` |
| LSTM Autoencoder | ~0.49 | 仅作实验用,未在流水线中使用 |
阈值调优在 notebook 中通过对精确率 / 召回率 / F1 进行网格搜索来完成,以便在模型被提升之前找到最佳的操作点。
## Notebook 部分
| 单元格 | 作用 |
|-------|-------------|
| 0 | 安装依赖项 |
| 1–5 | 下载数据集 (CloudTrail, BETH, Linux auth) |
| 6–8 | 将每个来源解析为统一 schema |
| 9–11 | 融合数据源,特征构造,训练/测试集划分 |
| 12–20 | 标签修复与数据验证 |
| 21–23 | 训练 Isolation Forest,评估,阈值调优 |
| 24–28 | LSTM Autoencoder(实验性) |
| 29 | 最终确定模型 |
| 30 | NVD CVE 富化函数 |
| 31 | MITRE ATT&CK TTP 映射规则 |
| 32–34 | Claude SOC 分析师设置与端到端测试 |
## 部署手册
### 前置条件
- Docker 及容器镜像仓库的访问权限
- 一个运行着 Kubernetes 的集群,并安装了 Kubeflow Pipelines 和 KServe
- 具有 S3 读/写权限的 AWS 凭证
- 为目标集群配置好的 `kubectl`
### 1. 构建并推送推理镜像
```
REGISTRY=your-registry TAG=v1 bash scripts/build_push.sh
```
### 2. 配置 Kubernetes 资源
```
kubectl apply -f k8s/model-pvc.yaml
kubectl apply -f k8s/inference-service.yaml
```
### 3. 运行训练流水线
```
python scripts/run_pipeline.py \
--host http://:8080 \
--fused-csv s3://your-bucket/data/fused_logs.csv \
--s3-bucket your-bucket \
--kserve-image your-registry/cloudguard:v1
```
该流水线运行 `data_prep → train → evaluate → push_model → deploy_kserve`。
如果 AUROC < 0.80,部署将被自动阻止。
在此处跟踪运行状态:`http://:8080/#/runs/details/`
### 4. 本地编译流水线(可选)
```
python pipeline/cloudguard_pipeline.py
# 输出: cloudguard_pipeline.yaml
```
### 5. 调用推理端点
```
curl -X POST http:///v1/models/cloudguard:predict \
-H "Content-Type: application/json" \
-d '{
"hour": 3, "day_of_week": 1, "is_weekend": 0, "is_offhours": 1,
"is_failed": 0, "is_rare_ip": 1, "user_enc": 5, "action_enc": 12,
"source_type_enc": 0, "action_count_1h": 3, "action": "DeleteBucket"
}'
```
## 输出示例
```
TTP detected: T1485 - Data Destruction
CVEs found: 3
=== Claude SOC Analysis ===
Explanation : DeleteBucket was called at 3:14 AM from an IP not previously
seen in this account, targeting the production backup bucket.
This is a strong indicator of compromised credentials.
Attack Tech : T1485 - Data Destruction (MITRE ATT&CK)
Action : Immediately revoke the admin IAM credentials, enable S3
versioning and MFA delete on all buckets, and review
CloudTrail for prior reconnaissance from the flagged IP.
```
## Colab 快速启动(实验)
该 notebook 在搭载 T4 GPU 的 Google Colab 上运行。
`运行时 > 更改运行时类型 > T4 GPU`
### 添加 Colab Secrets
| Secret 名称 | 获取位置 |
|-------------|----------------|
| `KAGGLE_USERNAME` | kaggle.com/settings |
| `KAGGLE_KEY` | kaggle.com/settings > API > Create New Token |
| `ANTHROPIC_API_KEY` | console.anthropic.com > API Keys |
将 `CloudGuard.ipynb` 上传至 [colab.research.google.com](https://colab.research.google.com) 并自上而下运行单元格。
## 技术栈
| 层级 | 技术 |
|-------|-----------|
| 语言 | Python 3.12 |
| ML | scikit-learn (Isolation Forest),TensorFlow/Keras (LSTM,实验性) |
| 服务 | FastAPI + Uvicorn |
| 容器化 | Docker |
| 流水线编排 | Kubeflow Pipelines v2 |
| 模型服务 | KServe v0.13 (无服务器模式) |
| 对象存储 | AWS S3(模型制品) |
| AI 分析师 | Anthropic Claude API |
| 威胁情报 | mitreattack-python,NIST NVD REST API |
| 数据 | pandas,numpy |
## 安全
- 本仓库中的任何地方都没有硬编码凭证
- Colab secrets 在运行时通过 `google.colab.userdata` 加载
- 生产环境部署应使用 Kubernetes Secrets 或 secrets 管理器(例如 AWS Secrets Manager、Vault)并作为环境变量注入
- 切勿将 API 密钥或 AWS 凭证提交到源代码管理中
## 许可证
MIT 许可证,详情请参见 [LICENSE](LICENSE)。
版权所有 (c) 2026 Mathanprasath K
标签:AI安全, Apex, ATT&CK映射, AWS CloudTrail, CCTV/网络接口发现, Chat Copilot, Chrome Headless, Claude, Cloudflare, CVE检测, CVE漏洞富化, DevSecOps, DLL 劫持, Isolation Forest, KServe, Kubeflow Pipelines, Kubernetes安全, Linux认证日志, MITRE ATT&CK, MLOps, Python, SOC分析师, 上游代理, 云威胁检测, 人工智能, 多源日志摄取, 大语言模型, 威胁情报, 子域名变形, 子域名突变, 孤立森林, 安全信息与事件管理, 开发者工具, 异常检测, 异常行为检测, 搜索引擎爬取, 无后门, 无监督学习, 智能安全运营中心, 机器学习, 模型推理服务, 模型训练, 模型部署, 特征工程, 用户模式Hook绕过, 网络安全审计, 自动化防御, 请求拦截, 逆向工具, 速率限制处理