tsang1234/novacrm-churn-demo
GitHub: tsang1234/novacrm-churn-demo
基于Databricks构建的端到端客户流失检测与解决系统演示,整合Medallion数据架构、LightGBM机器学习模型和Claude多智能体实现从预警到留存行动的完整闭环。
Stars: 0 | Forks: 0
# NovaCRM — 多智能体流失检测与解决
这是一个基于虚构公司 **NovaCRM Solutions**(法国 B2B SaaS CRM 发布商)构建的、完整的 Databricks 数据管道端到端演示,用于客户流失检测与解决。
## 概述
```
Bronze (raw) → Silver (clean) → Gold (features/aggregats)
↓
05_ml : LightGBM + MLflow
↓
06_agents : Claude AI Agents
↓
Databricks App (Human-in-the-loop)
```
该管道覆盖了整个价值链:
| 阶段 | 内容 |
|-------|---------|
| **合成数据** | 使用纯 Spark SQL 生成 7 张 Bronze 表(87k+ 事件,16k+ 工单,2k 联系人) |
| **Medallion 架构** | Bronze → Silver(清洗、合规、FKs) → Gold(聚合、Feature Store) |
| **ML Feature Store** | 45+ 特征,用于流失模型(LightGBM + MLflow) |
| **AI 智能体** | 3 个 Claude API 智能体(检测、留存行动、人工验证) |
| **可移植性** | 1 个 `config.yml` 文件即可部署到任何 Databricks 租户 |
## 数据架构
### Unity Catalog 目录:`novacrm_demo`
```
novacrm_demo
├── bronze ← Données brutes telles quelles (_ingested_at, _source)
├── silver ← Nettoyées, typées, FKs validées, dédupliquées
├── gold ← Agrégats business, feature store ML, schemas agents
└── ml ← Réservé aux modèles, expériences MLflow
```
### 表(共 19 张)
#### Bronze — 合成数据
| 表 | 行数 | 描述 |
|-------|-------:|-------------|
| `raw_cs_agents` | 15 | 客户成功代理(15 个区域画像) |
| `raw_companies` | 500 | 法国客户企业(TPE/PME/ETI/GE),包含 `churn_profile` |
| `raw_contacts` | 2 000 | 每家企业 4 个联系人,具有真实的决策者角色 |
| `raw_subscriptions` | 600 | 合同历史(当前 + 历史) |
| `raw_nps_surveys` | 2 000 | 每家企业 4 次 NPS 调查(按季度) |
| `raw_support_tickets` | ~16 500 | 工单分布与流失画像相关 |
| `raw_product_events` | ~87 000 | 产品使用事件(15 种操作类型) |
#### Silver — 清洗后的数据
| 表 | 描述 |
|-------|-------------|
| `dim_companies` | 不含 `churn_profile`(排除模拟元数据),计算了 `is_active` |
| `dim_contacts` | 邮箱标准化(小写/去空格),验证了公司 FK |
| `dim_cs_agents` | 计算了 `years_of_service` |
| `fact_subscriptions` | 计算了 `duration_days` + `is_current_contract` |
| `fact_support_tickets` | `sentiment_category`(正面/中性/负面) + `is_escalated` |
| `fact_product_events` | 提取了 `event_date` 和 `event_hour` |
| `fact_nps_scores` | 重新计算了 `nps_category` 以确保一致性 |
#### Gold — 聚合与 ML
| 表 | 行数 | 描述 |
|-------|-------:|-------------|
| `agg_company_health_score` | 500 | 健康评分 0-100,`risk_tier` High/Medium/Low |
| `feature_store_churn` | 500 | **45+ ML 特征** + 目标 `is_churned` |
| `agg_churn_predictions` | 0 | 准备好用于 ML 预测的 Schema |
| `agg_retention_actions` | 0 | 准备好用于 AI 智能体的 Schema |
| `agg_agent_performance` | 15 | CS 代理 KPI(投资组合、流失率、NPS) |
### 模拟流失画像
合成数据重现了 4 种真实行为:
| 画像 | 占比 | 使用行为 | 工单行为 |
|--------|----------:|-------------------|---------------------|
| `slow_decline` | 30% | 活动逐渐减少(100% → 20%) | 频率增加,情绪逐月恶化 |
| `sudden_stop` | 10% | 活动正常,但最后 2 个月几乎为零 | 生命周期末期出现大量非常负面的 P1/P2 工单 |
| `contract_expiry` | 15% | 活动适中、稳定 | 工单适中,情绪中性至略微负面 |
| `healthy` | 45% | 活动稳定或略有增长 | 工单稀少,情绪正面 |
## 项目结构
```
novacrm_demo_pipeline/
├── config.yml ← Configuration tenant (à modifier)
├── run_pipeline.py ← Orchestrateur Python SDK
│
├── 00_setup/
│ └── 01_catalog_schemas.sql ← CREATE CATALOG + 4 schemas
│
├── 01_bronze/ ← Génération des données synthétiques
│ ├── 01_raw_cs_agents.sql
│ ├── 02_raw_companies.sql
│ ├── 03_raw_contacts.sql
│ ├── 04_raw_subscriptions.sql
│ ├── 05_raw_nps_surveys.sql
│ ├── 06_raw_support_tickets.sql
│ └── 07_raw_product_events.sql
│
├── 02_silver/ ← Nettoyage et conformité
│ ├── 01_dim_cs_agents.sql
│ ├── 02_dim_companies.sql
│ ├── 03_dim_contacts.sql
│ ├── 04_fact_subscriptions.sql
│ ├── 05_fact_support_tickets.sql
│ ├── 06_fact_product_events.sql
│ └── 07_fact_nps_scores.sql
│
├── 03_gold/ ← Agrégats business et feature store
│ ├── 01_agg_company_health_score.sql
│ ├── 02_feature_store_churn.sql
│ ├── 03_agg_churn_predictions_schema.sql
│ ├── 04_agg_retention_actions_schema.sql
│ └── 05_agg_agent_performance.sql
│
├── 04_quality/
│ └── 01_row_count_check.sql ← Row counts + data quality checks
│
├── 05_ml/ ← Pipeline ML (stubs à compléter)
│ ├── 01_train_churn_model.py ← Entraînement LightGBM + MLflow
│ ├── 02_register_model.py ← Promotion Model Registry
│ └── 03_batch_inference.py ← Scoring batch + SHAP
│
└── 06_agents/ ← Agents AI (stubs à compléter)
├── 01_risk_detection_agent.py ← Détection et triage des High Risk
├── 02_retention_action_agent.py ← Génération email/call/discount
└── 03_approval_workflow.py ← Human-in-the-loop validation
```
## 先决条件
### Databricks
- 启用了 **Unity Catalog** 的 Databricks Workspace
- 至少一个 **SQL Warehouse**(Serverless 或 Classic)
- 拥有 `CREATE CATALOG` 权限或拥有带 `CREATE SCHEMA` 权限的现有 catalog
### 本地环境
- Python 3.9+
- 已配置 Databricks CLI (`~/.databrickscfg`) **或** 环境变量
```
# 选项 A — Databricks CLI
databricks configure
# 选项 B — 环境变量
export DATABRICKS_HOST=https://your-workspace.azuredatabricks.net
export DATABRICKS_TOKEN=dapi...
```
## 安装
```
# 1. 克隆 / 复制项目
git clone novacrm_demo_pipeline
cd novacrm_demo_pipeline
# 2. 安装 Python 依赖
pip install databricks-sdk pyyaml
# 3. (可选) 用于 ML 步骤
pip install lightgbm scikit-learn shap mlflow
# 4. (可选) 用于 AI agents
pip install anthropic mlflow
# 5. 配置 Anthropic API 密钥
export ANTHROPIC_API_KEY=sk-ant-...
```
## 配置
根据您的租户修改 `config.yml`:
```
# 目标 Unity Catalog catalog 名称
catalog: novacrm_demo
# SQL Warehouse ID ('auto' 用于自动选择)
# Databricks UI > SQL Warehouses > Connection details > HTTP path
warehouse_id: auto
# Databricks CLI 配置文件 (若未配置则为默认)
databricks_profile: DEFAULT
# 每个 SQL 请求的超时时间(秒)
sql_timeout: 600
# 要执行的 stages (移除某个 stage 以进行部分 re-run)
stages:
- 00_setup
- 01_bronze
- 02_silver
- 03_gold
- 04_quality
```
## 使用
### 完整管道
```
python run_pipeline.py
```
### 可用选项
```
# 覆盖 catalog (无需修改 config.yml)
python run_pipeline.py --catalog mon_autre_catalog
# 重新运行 (Re-run) 单个 stage
python run_pipeline.py --stage 02_silver
python run_pipeline.py --stage 03_gold
# 显示 SQL 但不执行
python run_pipeline.py --dry-run
# 禁用最终 row counts 显示
python run_pipeline.py --skip-counts
```
### ML 管道(第 5 步)
```
# 训练模型并记录到 MLflow
python 05_ml/01_train_churn_model.py
# 将最佳模型提升 (Promote) 为 champion
python 05_ml/02_register_model.py
# 或使用特定的 run:
python 05_ml/02_register_model.py --run-id abc123def456
# 500 家公司的 Batch Scoring → gold.agg_churn_predictions
python 05_ml/03_batch_inference.py
```
### AI 智能体(第 6 步)
```
# 1. 通过 Claude 分析和分类 High Risk 公司 (ReAct + tool_use)
export ANTHROPIC_API_KEY=sk-ant-...
python 06_agents/01_risk_detection_agent.py
python 06_agents/01_risk_detection_agent.py --tier High --mrr-min 2000
python 06_agents/01_risk_detection_agent.py --dry-run # analyse sans générer
# 2. 手动为公司生成操作
python 06_agents/02_retention_action_agent.py --company-id C001 --action-type email
python 06_agents/02_retention_action_agent.py --company-id C042 --action-type discount
# 3. 操作的人工验证 (交互式 CLI 界面)
python 06_agents/03_approval_workflow.py
python 06_agents/03_approval_workflow.py --tier High --limit 5
python 06_agents/03_approval_workflow.py --agent-id AG001 # portefeuille d'un agent CS
python 06_agents/03_approval_workflow.py --non-interactive # approuve tout (tests)
# 执行后更新 outcome
python 06_agents/03_approval_workflow.py \
--update-outcome --outcome saved
```
### 输出示例
```
════════════════════════════════════════════════════════════
NovaCRM Demo Pipeline
Catalog : novacrm_demo
Mode : EXECUTE
════════════════════════════════════════════════════════════
Warehouse sélectionné: Shared Serverless (id=abc123)
[00_setup]
→ 01_catalog_schemas.sql (5 statements)
✓ Stage terminé (1 fichier(s))
[01_bronze]
→ 01_raw_cs_agents.sql (1 statement)
→ 02_raw_companies.sql (1 statement)
...
✓ Stage terminé (7 fichier(s))
...
Pipeline terminé en 142.3s (20 fichier(s) exécutés)
────────────────────────────────────────────────────────────
Row counts — novacrm_demo
────────────────────────────────────────────────────────────
BRONZE
✓ raw_cs_agents 15 rows
✓ raw_companies 500 rows
✓ raw_contacts 2,000 rows
✓ raw_subscriptions 600 rows
✓ raw_nps_surveys 2,000 rows
✓ raw_support_tickets 16,459 rows
✓ raw_product_events 87,250 rows
SILVER
✓ dim_companies 500 rows
...
GOLD
✓ feature_store_churn 500 rows
○ agg_churn_predictions (schema vide - ready for ML/agents)
○ agg_retention_actions (schema vide - ready for ML/agents)
```
## 部署到其他租户
管道通过 `config.yml` 实现 **100% 可参数化**。迁移步骤:
1. 修改 `config.yml` 中的 `catalog`
2. 确保 CLI 配置文件指向正确的 workspace
3. 运行 `python run_pipeline.py`
SQL 中所有对 catalog 的引用都是占位符 `{catalog}`,由编排器动态替换 —— 无需修改任何 SQL 文件。
## 技术选型
### 纯 Spark SQL 数据生成
合成数据完全通过 SQL 生成(无 Python 或 Faker),这允许仅通过 **SQL Warehouse** 执行管道,无需 Databricks 集群。
- `EXPLODE(SEQUENCE(1, N))` 用于生成 N 行
- `pmod(hash(id * seed), N)` 用于确定性伪随机值
- `LATERAL VIEW EXPLODE(SEQUENCE(1, count))` 用于基数可变的表(工单、事件)
数据是**可复现的**:相同的种子总是产生相同的数据。
### 复合健康评分
健康评分(`agg_company_health_score`)是 5 个维度的加权平均值:
```
health_score = usage × 30%
+ support × 25%
+ NPS × 20%
+ payment × 15%
+ engagement × 10%
```
| 评分 | 风险等级 |
|-------|-----------|
| < 40 | 🔴 High |
| 40-65 | 🟡 Medium |
| > 65 | 🟢 Low |
### Feature Store(45+ 特征)
`gold.feature_store_churn` 表是 ML 模型的唯一数据源:
| 分组 | 关键特征 |
|--------|---------------|
| 使用 (8) | `dau_last_30d`, `usage_decline_pct_30d`, `feature_adoption_rate`, `days_since_last_login`, `active_users_ratio` |
| 支持工单 (10) | `ticket_count_30d`, `p1_ticket_count_30d`, `avg_ticket_sentiment_30d`, `sentiment_trend`, `escalation_count_90d` |
| NPS (4) | `nps_latest`, `nps_delta_vs_prev`, `is_detractor` |
| 合同 (7) | `mrr_current`, `mrr_trend_90d`, `days_to_renewal`, `has_downgraded`, `upgrade_count` |
| 通用 (4) | `company_size_encoded`, `customer_tenure_days`, `num_contacts`, `num_decision_makers` |
| **目标** | `is_churned`(如果最后一份合同 = `churned` 则为 1) |
## 路线图
### 第 5 步 — ML 管道 (`05_ml/`) ✅
- [x] `01_train_churn_model.py` — LightGBM + 5-fold CV + MLflow(scale_pos_weight=24 针对 4% 流失率)
- [x] `02_register_model.py` — 提升 UC Registry,包含 champion/challenger 别名 + 治理标签
- [x] `03_batch_inference.py` — 500 家企业的批量评分 + SHAP top-5 → MERGE 到 `gold.agg_churn_predictions`
### 第 6 步 — AI 智能体 (`06_agents/`) ✅
- [x] `01_risk_detection_agent.py` — Claude 智能体(ReAct + tool_use):高风险分流,决定 email/call/discount/escalation,MLflow tracing
- [x] `02_retention_action_agent.py` — Claude 智能体:生成个性化内容(邮件、通话脚本、优惠、升级简报) → MERGE 到 `gold.agg_retention_actions`
- [x] `03_approval_workflow.py` — CLI Human-in-the-loop:批准 / 拒绝 / 忽略,统计仪表板,更新结果
### 第 7 步 — Databricks App
- [x] 面向 CS 代理的 Streamlit 仪表板
- [x] 按 risk_tier 过滤的投资组合视图
- [x] 行动验证界面(批准 / 修改 / 拒绝)
- [x] AI 智能体的性能指标(挽回率、ROI)
## 数据质量
脚本 `04_quality/01_row_count_check.sql` 检查:
1. 各表的 **行数** 是否达到预期最小阈值
2. 所有 Silver 表的 **PKs 非 NULL**
3. **risk_tier 分布** 以验证健康评分的一致性
仅重新运行检查:
```
python run_pipeline.py --stage 04_quality
```
## 约束与约定
| 约定 | 详情 |
|-----------|--------|
| 存储格式 | 全部使用 Delta |
| 时间戳 | UTC |
| 金额 | EUR,整数(无分) |
| `_前缀列` | 仅限 Bronze(`_ingested_at`, `_source`) |
| `churn_profile` | 仅限 Bronze — Silver/Gold 中排除 |
| 可复现种子 | `pmod(hash(id * seed), N)` — 确定性 |
| UC 注释 | Unity Catalog 中的关键表和列 |
## 贡献 / 扩展
### 添加新的 Bronze 表
1. 创建 `01_bronze/08_ma_nouvelle_table.sql`,使用 `{catalog}` 作为占位符
2. 重新运行:`python run_pipeline.py --stage 01_bronze`
### 修改目标 catalog
```
python run_pipeline.py --catalog novacrm_prod
```
### 测试而不执行
```
python run_pipeline.py --dry-run
```
## 许可证
内部演示项目 —— 数据完全虚构。
NovaCRM Solutions 不存在。任何与真实企业的相似之处纯属巧合。
标签:AI代理, Apex, B2B, Bronze-Silver-Gold, Claude API, CRM系统, Databricks, Feature Store, HITL, Human-in-the-loop, Kubernetes, LightGBM, Medallion架构, MLflow, NovaCRM, PyRIT, Python, SaaS运营, Spark SQL, Streamlit, Unity Catalog, 合成数据, 商业智能, 多智能体系统, 大语言模型应用, 实时分析, 客户成功, 客户流失检测, 客户留存, 数据工程, 数据清洗, 数据管道, 无后门, 机器学习, 流失预测, 演示项目, 特征工程, 端到端方案, 访问控制, 软件工程, 逆向工具, 风险控制