Adityax-07/Autonomous-MLOPs-incident-response-agent
GitHub: Adityax-07/Autonomous-MLOPs-incident-response-agent
构建一个自主 MLOps 事件响应代理,实现 5 分钟级模型漂移检测与自动修复。
Stars: 0 | Forks: 0
# 自主 MLOps 事件响应代理
## 1. 问题陈述
UPI(统一支付接口)在印度每月处理超过 140 亿笔交易。
基于历史数据训练的欺诈检测模型会在用户行为发生漂移时悄无声息地退化——节日消费模式、新商户类别或设备使用习惯的变化都会改变模型训练时的特征分布。
传统 MLOps 流水线是在造成财务损失**之后**才检测到漂移。本项目构建一个**自主事件响应代理**,实现以下功能:
1. 每 5 分钟使用 Evidently AI 监控实时预测分布
2. 使用 LangGraph 代理(基于规则或 Claude Haiku)对严重性进行判断
3. **无需人工批准即可采取行动**:自动重训练并推广新模型、回滚到稳定检查点,或触发 Slack 告警,所有决策均记录到 MLflow
最终实现系统在数分钟内检测并修正模型退化,而非数天。
## 2. 系统架构
```
UPI Transactions
|
v
┌─────────────────────────┐
│ FastAPI Server :8000 │
│ XGBoost Pipeline │
│ POST /predict │
│ GET /metrics (Prom) │
└────────┬────────────────┘
| logs to data/predictions.log
v
┌──────────────────────────────────┐
│ Monitor Scheduler (APScheduler) │
│ Every 5 min: │
│ Evidently DataDrift report │
│ Writes monitor/latest_report │
└──────────────┬───────────────────┘
|
v
┌──────────────────────────────────┐
│ LangGraph Agent (3 nodes) │
│ │
│ monitor_node │
│ -> validates report freshness │
│ reason_node (rule-based | LLM) │
│ -> retrain / rollback / alert │
│ act_node │
│ -> executes action │
│ -> retries with downgrade │
└──┬────────────┬──────────────────┘
| | |
v v v
Retrain Rollback Alert
MLflow MLflow Slack
Registry Registry Webhook
|
v
model.joblib overwritten
POST /reload -> hot-swap
Grafana shows new ROC-AUC
Observability:
FastAPI /metrics -> Prometheus -> Grafana (6 panels)
All retrains logged to MLflow experiment "upi_fraud_retrain"
```
## 3. 代理决策逻辑
`reason_node` 对 Evidently 的 `drift_score`(显著漂移的特征比例)应用阈值逻辑:
| 漂移分数 | 条件 | 决策 |
|----------|------|------|
| > 0.50 | 大多数特征发生漂移 | **RETRAIN** |
| > 0.30 | 显著漂移 | **ROLLBACK** |
| > 0.10 或高影响特征漂移 | 轻微但明显 | **ALERT** |
| <= 0.10 | 所有分布稳定 | **OK** |
**高影响特征覆盖:**
特征 `amount`、`txn_count_1h` 和 `sender_age_days` 对 UPI 欺诈检测至关重要。若其中任意一项发生漂移,即使幅度很小,代理也会将严重性提升一级(例如告警升级为回滚),因为交易金额 10% 的漂移对欺诈分数分布的影响远大于 `merchant_cat` 的同等变化。
**重试策略:**
若 `act_node` 执行失败(例如 MLflow 在重训练期间不可用),图会将流程路由回 `reason_node` 并设置 `retry_count=1`。节点会自动降级决策(重训练 → 回滚 → 告警),以尝试成本更低的恢复方式。
**LLM 模式:**
设置 `AGENT_MODE=llm` 并配置 `ANTHROPIC_API_KEY`,即可使用 Claude Haiku 进行推理。LLM 会接收完整的漂移报告(JSON 格式)并返回结构化 JSON。任一环节失败时自动回退到规则模式。
## 4. 演示:注入漂移 → 代理恢复
### 一键演示(Docker)
```
bash scripts/run_demo.sh
```
### 本地逐步操作
```
# 0. 初始设置
cd mlops-agent
pip install -r requirements.txt
python data/generate_data.py # creates reference.csv + production.csv
python -m app.train # trains initial XGBoost model
# 1. 启动 FastAPI 服务器
uvicorn app.main:app --reload &
curl http://localhost:8000/health # should return {"status": "ok", ...}
# 2. 生成一些预测以填充 predictions.log
python -c "
import requests, random
for _ in range(200):
r = requests.post('http://localhost:8000/predict', json={
'amount': random.uniform(100, 5000), 'hour': random.randint(9, 21),
'merchant_cat': random.randint(0, 4), 'device_type': random.randint(0, 2),
'sender_age_days': random.randint(30, 2000), 'receiver_age_days': random.randint(30, 2000),
'txn_count_1h': random.randint(1, 5), 'same_device': random.randint(0, 1)
})
print('Done. 200 predictions logged.')
"
# 3. 注入高严重性漂移(模拟节日季偏移)
python data/inject_drift.py --severity high
# 偏移:amount x3.5 (log),hour -> 2-5am,txn_count_1h x3x
# 4. 手动运行漂移检查
python -m monitor.scheduler --once
# 预期:drift_score=0.62 (62%) -> recommendation=retrain
# 5. 运行代理(读取 latest_report.json,进行决策并执行)
python -m agent.runner --once
# 预期:
# Decision : RETRAIN
# Confidence : 0.95
# Action taken : promoted|roc_auc=0.9812|mlflow_run_id=...
# 6. 查看 MLflow 结果
mlflow ui --backend-store-uri mlruns
# 打开 http://127.0.0.1:5000 -> Experiments -> upi_fraud_retrain
# 7. 查看 Prometheus 指标
curl http://localhost:8000/metrics | grep model_
# model_drift_score 0.625
# model_accuracy 0.9812
# agent_decisions_total{decision="retrain"} 1.0
```
## 5. 结果
### 测试运行中的指标(中等漂移场景)
| 指标 | 漂移前 | 重训练后 |
|------|--------|----------|
| 漂移分数 | 0.00 | 0.375 |
| 模型 ROC-AUC | 0.9654 | 0.9718 |
| 决策 | ok | rollback |
| 恢复时间 | — | ~45 秒 |
| 代理置信度 | — | 0.88 |
### Grafana 面板
运行 `docker compose up` 后,访问 `http://localhost:3000`(admin/admin):
- **随时间变化的漂移分数**——带 0.3 和 0.5 阈值线的折线图
- **随时间变化的模型 ROC-AUC**——显示重训练成功后的准确率跃升
- **代理决策**——按重训练 / 回滚 / 告警 / 正常分类的柱状图
- **预测量**——堆叠面积图:每分钟欺诈与正常交易
- **推理延迟**——P50 / P95 / P99 百分位线(通常为 1–4 毫秒)
- **重训练汇总**——当前状态表:总重训练次数、ROC-AUC、漂移分数
*[示意图占位符——运行 `bash scripts/run_demo.sh` 查看实时仪表盘]*
## 6. 本地运行(单命令)
```
# 先决条件:已安装并运行 Docker Desktop
git clone
cd mlops-agent
# 1. 生成训练数据并训练初始模型
python data/generate_data.py
python -m app.train
# 2. 复制环境文件
cp .env.example .env
# 可选:添加 SLACK_WEBHOOK_URL 和 ANTHROPIC_API_KEY
# 3. 启动完整堆栈
docker compose up --build
# 等待约 30 秒进行健康检查,然后:
# FastAPI 文档: http://localhost:8000/docs
# MLflow UI: http://localhost:5000
# Prometheus: http://localhost:9090
# Grafana: http://localhost:3000 (admin/admin)
# 4. 在第二个终端中注入漂移
python data/inject_drift.py --severity high
# 5. 在 docker compose 日志中观察代理的响应
docker compose logs -f agent-runner
```
## 7. 部署(Railway 免费额度)
```
# 安装 Railway CLI
npm install -g @railway/cli
railway login
# 创建项目
railway init
# 通过仪表板或以下方式添加环境变量:
railway variables set ANTHROPIC_API_KEY=sk-ant-...
railway variables set SLACK_WEBHOOK_URL=https://hooks.slack.com/...
railway variables set AGENT_MODE=llm
# 部署 FastAPI 服务器
railway up --service fastapi-server
# 部署 MLflow(需要持久卷 — Railway Pro 需要卷支持)
# 对于免费层:使用 SQLite 后端和 Railway 的临时存储
railway up --service mlflow-server
# Prometheus + Grafana 应保持本地或使用廉价 VPS(Fly.io、Render)
# 因为它们需要持久卷用于有意义的数据保留。
```
## 项目结构
```
mlops-agent/
├── app/
│ ├── main.py FastAPI server + /metrics endpoint
│ ├── model.py Thread-safe model singleton + hot-reload
│ ├── metrics.py Prometheus metric definitions
│ ├── train.py XGBoost training script
│ └── schemas.py Pydantic v2 request/response models
├── data/
│ ├── generate_data.py UPI transaction data generator
│ └── inject_drift.py 3-feature drift injector (--severity low/medium/high)
├── monitor/
│ ├── drift_check.py Evidently DataDrift + DataQuality report
│ ├── scheduler.py APScheduler (every 5 min)
│ └── test_drift.py 10 unit tests for drift logic
├── agent/
│ ├── state.py AgentState TypedDict + Decision Literal
│ ├── graph.py LangGraph StateGraph (4 conditional edges + retry)
│ ├── runner.py CLI: --once / --watch / --mock
│ └── nodes/
│ ├── monitor.py Load + validate latest_report.json
│ ├── reason.py Rule-based (offline) + Claude Haiku (LLM) reasoning
│ └── act.py Route decision to action function
├── actions/
│ ├── __init__.py execute_action() with MLflow tagging + timing
│ ├── retrain.py Retrain + MLflow logging + promotion + hot-reload
│ ├── rollback.py MLflow registry rollback + local backup fallback
│ └── alert.py Slack Block Kit + file fallback
├── grafana/
│ ├── dashboards/ mlops_agent.json (6-panel dashboard)
│ └── provisioning/ Auto-provisioning for datasource + dashboard
├── prometheus/
│ └── prometheus.yml Scrape config (15s interval, 15d retention)
├── scripts/
│ └── run_demo.sh Full demo automation script
├── tests/
│ └── test_full_loop.py End-to-end: drift -> agent -> retrain -> MLflow
├── docker-compose.yml Full 6-service stack
├── Dockerfile Single image for all Python services
└── .env.example All required environment variables documented
```
## 技术栈
| 组件 | 技术 | 选择原因 |
|------|------|----------|
| 推理 | FastAPI + XGBoost | 低延迟服务,行业标准 |
| 漂移检测 | Evidently AI 0.7.x | 成熟可靠,支持 Wasserstein 与 JS 距离 |
| 代理编排 | LangGraph 1.0.9 | 带重试路由的状态图 |
| LLM 推理 | Claude Haiku(Anthropic) | 快速、低成本、结构化 JSON 输出 |
| 实验追踪 | MLflow 2.x | 模型注册表与工件存储 |
| 指标 | Prometheus + Grafana | 事实上的可观测性标准 |
| 调度 | APScheduler 3.x | 轻量级,无 Celery 开销 |
| 容器化 | Docker Compose | 一键本地环境 |
标签:APScheduler, AV绕过, Claude Haiku, ETW劫持, Evidently AI, FastAPI, LangGraph, MLflow, MLOps, Slack告警, XGBoost, 关键词SEO, 印度UPI, 实时预测监控, 开源框架, 持续部署, 持续集成, 支付欺诈检测, 数据漂移检测, 无监督漂移检测, 日志记录, 时间序列监控, 机器学习模型, 模型再训练, 模型回滚, 模型监控, 模型部署自动化, 版权保护, 特征分布漂移, 系统架构图, 自动化运维, 自定义请求头, 自治AI代理, 节日消费模式, 逆向工具, 金融风控