tsang1234/novacrm-churn-demo

GitHub: tsang1234/novacrm-churn-demo

基于Databricks构建的端到端客户流失检测与解决系统演示,整合Medallion数据架构、LightGBM机器学习模型和Claude多智能体实现从预警到留存行动的完整闭环。

Stars: 0 | Forks: 0

# NovaCRM — 多智能体流失检测与解决 这是一个基于虚构公司 **NovaCRM Solutions**(法国 B2B SaaS CRM 发布商)构建的、完整的 Databricks 数据管道端到端演示,用于客户流失检测与解决。 ## 概述 ``` Bronze (raw) → Silver (clean) → Gold (features/aggregats) ↓ 05_ml : LightGBM + MLflow ↓ 06_agents : Claude AI Agents ↓ Databricks App (Human-in-the-loop) ``` 该管道覆盖了整个价值链: | 阶段 | 内容 | |-------|---------| | **合成数据** | 使用纯 Spark SQL 生成 7 张 Bronze 表(87k+ 事件,16k+ 工单,2k 联系人) | | **Medallion 架构** | Bronze → Silver(清洗、合规、FKs) → Gold(聚合、Feature Store) | | **ML Feature Store** | 45+ 特征,用于流失模型(LightGBM + MLflow) | | **AI 智能体** | 3 个 Claude API 智能体(检测、留存行动、人工验证) | | **可移植性** | 1 个 `config.yml` 文件即可部署到任何 Databricks 租户 | ## 数据架构 ### Unity Catalog 目录:`novacrm_demo` ``` novacrm_demo ├── bronze ← Données brutes telles quelles (_ingested_at, _source) ├── silver ← Nettoyées, typées, FKs validées, dédupliquées ├── gold ← Agrégats business, feature store ML, schemas agents └── ml ← Réservé aux modèles, expériences MLflow ``` ### 表(共 19 张) #### Bronze — 合成数据 | 表 | 行数 | 描述 | |-------|-------:|-------------| | `raw_cs_agents` | 15 | 客户成功代理(15 个区域画像) | | `raw_companies` | 500 | 法国客户企业(TPE/PME/ETI/GE),包含 `churn_profile` | | `raw_contacts` | 2 000 | 每家企业 4 个联系人,具有真实的决策者角色 | | `raw_subscriptions` | 600 | 合同历史(当前 + 历史) | | `raw_nps_surveys` | 2 000 | 每家企业 4 次 NPS 调查(按季度) | | `raw_support_tickets` | ~16 500 | 工单分布与流失画像相关 | | `raw_product_events` | ~87 000 | 产品使用事件(15 种操作类型) | #### Silver — 清洗后的数据 | 表 | 描述 | |-------|-------------| | `dim_companies` | 不含 `churn_profile`(排除模拟元数据),计算了 `is_active` | | `dim_contacts` | 邮箱标准化(小写/去空格),验证了公司 FK | | `dim_cs_agents` | 计算了 `years_of_service` | | `fact_subscriptions` | 计算了 `duration_days` + `is_current_contract` | | `fact_support_tickets` | `sentiment_category`(正面/中性/负面) + `is_escalated` | | `fact_product_events` | 提取了 `event_date` 和 `event_hour` | | `fact_nps_scores` | 重新计算了 `nps_category` 以确保一致性 | #### Gold — 聚合与 ML | 表 | 行数 | 描述 | |-------|-------:|-------------| | `agg_company_health_score` | 500 | 健康评分 0-100,`risk_tier` High/Medium/Low | | `feature_store_churn` | 500 | **45+ ML 特征** + 目标 `is_churned` | | `agg_churn_predictions` | 0 | 准备好用于 ML 预测的 Schema | | `agg_retention_actions` | 0 | 准备好用于 AI 智能体的 Schema | | `agg_agent_performance` | 15 | CS 代理 KPI(投资组合、流失率、NPS) | ### 模拟流失画像 合成数据重现了 4 种真实行为: | 画像 | 占比 | 使用行为 | 工单行为 | |--------|----------:|-------------------|---------------------| | `slow_decline` | 30% | 活动逐渐减少(100% → 20%) | 频率增加,情绪逐月恶化 | | `sudden_stop` | 10% | 活动正常,但最后 2 个月几乎为零 | 生命周期末期出现大量非常负面的 P1/P2 工单 | | `contract_expiry` | 15% | 活动适中、稳定 | 工单适中,情绪中性至略微负面 | | `healthy` | 45% | 活动稳定或略有增长 | 工单稀少,情绪正面 | ## 项目结构 ``` novacrm_demo_pipeline/ ├── config.yml ← Configuration tenant (à modifier) ├── run_pipeline.py ← Orchestrateur Python SDK │ ├── 00_setup/ │ └── 01_catalog_schemas.sql ← CREATE CATALOG + 4 schemas │ ├── 01_bronze/ ← Génération des données synthétiques │ ├── 01_raw_cs_agents.sql │ ├── 02_raw_companies.sql │ ├── 03_raw_contacts.sql │ ├── 04_raw_subscriptions.sql │ ├── 05_raw_nps_surveys.sql │ ├── 06_raw_support_tickets.sql │ └── 07_raw_product_events.sql │ ├── 02_silver/ ← Nettoyage et conformité │ ├── 01_dim_cs_agents.sql │ ├── 02_dim_companies.sql │ ├── 03_dim_contacts.sql │ ├── 04_fact_subscriptions.sql │ ├── 05_fact_support_tickets.sql │ ├── 06_fact_product_events.sql │ └── 07_fact_nps_scores.sql │ ├── 03_gold/ ← Agrégats business et feature store │ ├── 01_agg_company_health_score.sql │ ├── 02_feature_store_churn.sql │ ├── 03_agg_churn_predictions_schema.sql │ ├── 04_agg_retention_actions_schema.sql │ └── 05_agg_agent_performance.sql │ ├── 04_quality/ │ └── 01_row_count_check.sql ← Row counts + data quality checks │ ├── 05_ml/ ← Pipeline ML (stubs à compléter) │ ├── 01_train_churn_model.py ← Entraînement LightGBM + MLflow │ ├── 02_register_model.py ← Promotion Model Registry │ └── 03_batch_inference.py ← Scoring batch + SHAP │ └── 06_agents/ ← Agents AI (stubs à compléter) ├── 01_risk_detection_agent.py ← Détection et triage des High Risk ├── 02_retention_action_agent.py ← Génération email/call/discount └── 03_approval_workflow.py ← Human-in-the-loop validation ``` ## 先决条件 ### Databricks - 启用了 **Unity Catalog** 的 Databricks Workspace - 至少一个 **SQL Warehouse**(Serverless 或 Classic) - 拥有 `CREATE CATALOG` 权限或拥有带 `CREATE SCHEMA` 权限的现有 catalog ### 本地环境 - Python 3.9+ - 已配置 Databricks CLI (`~/.databrickscfg`) **或** 环境变量 ``` # 选项 A — Databricks CLI databricks configure # 选项 B — 环境变量 export DATABRICKS_HOST=https://your-workspace.azuredatabricks.net export DATABRICKS_TOKEN=dapi... ``` ## 安装 ``` # 1. 克隆 / 复制项目 git clone novacrm_demo_pipeline cd novacrm_demo_pipeline # 2. 安装 Python 依赖 pip install databricks-sdk pyyaml # 3. (可选) 用于 ML 步骤 pip install lightgbm scikit-learn shap mlflow # 4. (可选) 用于 AI agents pip install anthropic mlflow # 5. 配置 Anthropic API 密钥 export ANTHROPIC_API_KEY=sk-ant-... ``` ## 配置 根据您的租户修改 `config.yml`: ``` # 目标 Unity Catalog catalog 名称 catalog: novacrm_demo # SQL Warehouse ID ('auto' 用于自动选择) # Databricks UI > SQL Warehouses > Connection details > HTTP path warehouse_id: auto # Databricks CLI 配置文件 (若未配置则为默认) databricks_profile: DEFAULT # 每个 SQL 请求的超时时间(秒) sql_timeout: 600 # 要执行的 stages (移除某个 stage 以进行部分 re-run) stages: - 00_setup - 01_bronze - 02_silver - 03_gold - 04_quality ``` ## 使用 ### 完整管道 ``` python run_pipeline.py ``` ### 可用选项 ``` # 覆盖 catalog (无需修改 config.yml) python run_pipeline.py --catalog mon_autre_catalog # 重新运行 (Re-run) 单个 stage python run_pipeline.py --stage 02_silver python run_pipeline.py --stage 03_gold # 显示 SQL 但不执行 python run_pipeline.py --dry-run # 禁用最终 row counts 显示 python run_pipeline.py --skip-counts ``` ### ML 管道(第 5 步) ``` # 训练模型并记录到 MLflow python 05_ml/01_train_churn_model.py # 将最佳模型提升 (Promote) 为 champion python 05_ml/02_register_model.py # 或使用特定的 run: python 05_ml/02_register_model.py --run-id abc123def456 # 500 家公司的 Batch Scoring → gold.agg_churn_predictions python 05_ml/03_batch_inference.py ``` ### AI 智能体(第 6 步) ``` # 1. 通过 Claude 分析和分类 High Risk 公司 (ReAct + tool_use) export ANTHROPIC_API_KEY=sk-ant-... python 06_agents/01_risk_detection_agent.py python 06_agents/01_risk_detection_agent.py --tier High --mrr-min 2000 python 06_agents/01_risk_detection_agent.py --dry-run # analyse sans générer # 2. 手动为公司生成操作 python 06_agents/02_retention_action_agent.py --company-id C001 --action-type email python 06_agents/02_retention_action_agent.py --company-id C042 --action-type discount # 3. 操作的人工验证 (交互式 CLI 界面) python 06_agents/03_approval_workflow.py python 06_agents/03_approval_workflow.py --tier High --limit 5 python 06_agents/03_approval_workflow.py --agent-id AG001 # portefeuille d'un agent CS python 06_agents/03_approval_workflow.py --non-interactive # approuve tout (tests) # 执行后更新 outcome python 06_agents/03_approval_workflow.py \ --update-outcome --outcome saved ``` ### 输出示例 ``` ════════════════════════════════════════════════════════════ NovaCRM Demo Pipeline Catalog : novacrm_demo Mode : EXECUTE ════════════════════════════════════════════════════════════ Warehouse sélectionné: Shared Serverless (id=abc123) [00_setup] → 01_catalog_schemas.sql (5 statements) ✓ Stage terminé (1 fichier(s)) [01_bronze] → 01_raw_cs_agents.sql (1 statement) → 02_raw_companies.sql (1 statement) ... ✓ Stage terminé (7 fichier(s)) ... Pipeline terminé en 142.3s (20 fichier(s) exécutés) ──────────────────────────────────────────────────────────── Row counts — novacrm_demo ──────────────────────────────────────────────────────────── BRONZE ✓ raw_cs_agents 15 rows ✓ raw_companies 500 rows ✓ raw_contacts 2,000 rows ✓ raw_subscriptions 600 rows ✓ raw_nps_surveys 2,000 rows ✓ raw_support_tickets 16,459 rows ✓ raw_product_events 87,250 rows SILVER ✓ dim_companies 500 rows ... GOLD ✓ feature_store_churn 500 rows ○ agg_churn_predictions (schema vide - ready for ML/agents) ○ agg_retention_actions (schema vide - ready for ML/agents) ``` ## 部署到其他租户 管道通过 `config.yml` 实现 **100% 可参数化**。迁移步骤: 1. 修改 `config.yml` 中的 `catalog` 2. 确保 CLI 配置文件指向正确的 workspace 3. 运行 `python run_pipeline.py` SQL 中所有对 catalog 的引用都是占位符 `{catalog}`,由编排器动态替换 —— 无需修改任何 SQL 文件。 ## 技术选型 ### 纯 Spark SQL 数据生成 合成数据完全通过 SQL 生成(无 Python 或 Faker),这允许仅通过 **SQL Warehouse** 执行管道,无需 Databricks 集群。 - `EXPLODE(SEQUENCE(1, N))` 用于生成 N 行 - `pmod(hash(id * seed), N)` 用于确定性伪随机值 - `LATERAL VIEW EXPLODE(SEQUENCE(1, count))` 用于基数可变的表(工单、事件) 数据是**可复现的**:相同的种子总是产生相同的数据。 ### 复合健康评分 健康评分(`agg_company_health_score`)是 5 个维度的加权平均值: ``` health_score = usage × 30% + support × 25% + NPS × 20% + payment × 15% + engagement × 10% ``` | 评分 | 风险等级 | |-------|-----------| | < 40 | 🔴 High | | 40-65 | 🟡 Medium | | > 65 | 🟢 Low | ### Feature Store(45+ 特征) `gold.feature_store_churn` 表是 ML 模型的唯一数据源: | 分组 | 关键特征 | |--------|---------------| | 使用 (8) | `dau_last_30d`, `usage_decline_pct_30d`, `feature_adoption_rate`, `days_since_last_login`, `active_users_ratio` | | 支持工单 (10) | `ticket_count_30d`, `p1_ticket_count_30d`, `avg_ticket_sentiment_30d`, `sentiment_trend`, `escalation_count_90d` | | NPS (4) | `nps_latest`, `nps_delta_vs_prev`, `is_detractor` | | 合同 (7) | `mrr_current`, `mrr_trend_90d`, `days_to_renewal`, `has_downgraded`, `upgrade_count` | | 通用 (4) | `company_size_encoded`, `customer_tenure_days`, `num_contacts`, `num_decision_makers` | | **目标** | `is_churned`(如果最后一份合同 = `churned` 则为 1) | ## 路线图 ### 第 5 步 — ML 管道 (`05_ml/`) ✅ - [x] `01_train_churn_model.py` — LightGBM + 5-fold CV + MLflow(scale_pos_weight=24 针对 4% 流失率) - [x] `02_register_model.py` — 提升 UC Registry,包含 champion/challenger 别名 + 治理标签 - [x] `03_batch_inference.py` — 500 家企业的批量评分 + SHAP top-5 → MERGE 到 `gold.agg_churn_predictions` ### 第 6 步 — AI 智能体 (`06_agents/`) ✅ - [x] `01_risk_detection_agent.py` — Claude 智能体(ReAct + tool_use):高风险分流,决定 email/call/discount/escalation,MLflow tracing - [x] `02_retention_action_agent.py` — Claude 智能体:生成个性化内容(邮件、通话脚本、优惠、升级简报) → MERGE 到 `gold.agg_retention_actions` - [x] `03_approval_workflow.py` — CLI Human-in-the-loop:批准 / 拒绝 / 忽略,统计仪表板,更新结果 ### 第 7 步 — Databricks App - [x] 面向 CS 代理的 Streamlit 仪表板 - [x] 按 risk_tier 过滤的投资组合视图 - [x] 行动验证界面(批准 / 修改 / 拒绝) - [x] AI 智能体的性能指标(挽回率、ROI) ## 数据质量 脚本 `04_quality/01_row_count_check.sql` 检查: 1. 各表的 **行数** 是否达到预期最小阈值 2. 所有 Silver 表的 **PKs 非 NULL** 3. **risk_tier 分布** 以验证健康评分的一致性 仅重新运行检查: ``` python run_pipeline.py --stage 04_quality ``` ## 约束与约定 | 约定 | 详情 | |-----------|--------| | 存储格式 | 全部使用 Delta | | 时间戳 | UTC | | 金额 | EUR,整数(无分) | | `_前缀列` | 仅限 Bronze(`_ingested_at`, `_source`) | | `churn_profile` | 仅限 Bronze — Silver/Gold 中排除 | | 可复现种子 | `pmod(hash(id * seed), N)` — 确定性 | | UC 注释 | Unity Catalog 中的关键表和列 | ## 贡献 / 扩展 ### 添加新的 Bronze 表 1. 创建 `01_bronze/08_ma_nouvelle_table.sql`,使用 `{catalog}` 作为占位符 2. 重新运行:`python run_pipeline.py --stage 01_bronze` ### 修改目标 catalog ``` python run_pipeline.py --catalog novacrm_prod ``` ### 测试而不执行 ``` python run_pipeline.py --dry-run ``` ## 许可证 内部演示项目 —— 数据完全虚构。 NovaCRM Solutions 不存在。任何与真实企业的相似之处纯属巧合。
标签:AI代理, Apex, B2B, Bronze-Silver-Gold, Claude API, CRM系统, Databricks, Feature Store, HITL, Human-in-the-loop, Kubernetes, LightGBM, Medallion架构, MLflow, NovaCRM, PyRIT, Python, SaaS运营, Spark SQL, Streamlit, Unity Catalog, 合成数据, 商业智能, 多智能体系统, 大语言模型应用, 实时分析, 客户成功, 客户流失检测, 客户留存, 数据工程, 数据清洗, 数据管道, 无后门, 机器学习, 流失预测, 演示项目, 特征工程, 端到端方案, 访问控制, 软件工程, 逆向工具, 风险控制