ctsc/Ecommerce-Realtime-Analytics

GitHub: ctsc/Ecommerce-Realtime-Analytics

一个受Amazon架构启发的高性能电商实时欺诈检测流水线,结合规则引擎与ML模型实现毫秒级决策。

Stars: 0 | Forks: 0

# 🚀 电商实时欺诈检测与分析 [![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](LICENSE) [![Python 3.9+](https://img.shields.io/badge/python-3.9+-blue.svg)](https://www.python.org/downloads/) [![Docker](https://img.shields.io/badge/docker-required-blue.svg)](https://www.docker.com/) ## 📊 **项目概述** 本项目实现了一个受 Amazon 架构启发的**生产级欺诈检测流水线**,主要特性包括: - ⚡ **实时处理**:基于流式架构,欺诈决策延迟低于 100ms - 🎯 **多层检测**:规则引擎 + ML 模型 + 图分析 - 🔄 **事件驱动架构**:使用 Apache Kafka 实现可靠的事件流 - 📊 **分析仪表板**:使用 Plotly Dash 进行实时监控 - 🛡️ **零数据丢失**:针对所有故障场景配备 DLQ (Dead Letter Queue) - 🔍 **全链路可观测性**:结构化日志、指标和分布式追踪 ### **系统能力** | 指标 | 单节点 (笔记本) | 生产就绪 | 可扩展至 | |--------|---------------------|-----------------|-------------| | **吞吐量** | ~15K TPM | 1M+ TPM | 50M+ TPM | | **欺诈检测延迟** | P95: 45ms | P95: 50ms | P95: 50ms | | **模型推理** | ~3ms | ~5ms | ~5ms | | **特征库查询** | ~1ms | <2ms | <2ms | | **数据库写入吞吐量** | ~4 次 批量 INSERT/分钟 | 通过 unnest() 批量处理 | 分片 | | **数据丢失** | 零 | 零 | 零 | ## 🏗️ **架构** ### **高层系统设计** ``` ┌─────────────────────────────────────────────────────────────────────┐ │ INGESTION LAYER │ │ Transaction Generator → Kafka (transactions_raw.v1) │ └─────────────────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────────────┐ │ FRAUD DETECTION LAYER │ │ │ │ ┌──────────────────┐ ┌────────────────┐ ┌──────────────────┐ │ │ │ Scoring Worker │───→│ Risk Service │──→│ Model Service │ │ │ │ (Orchestrator) │ │ (Rules + Logic)│ │ (ML Inference) │ │ │ └──────────────────┘ └────────────────┘ └──────────────────┘ │ │ ↕ │ │ ┌──────────────────┐ │ │ │ Feature Store │ │ │ │ (Redis) │ │ │ └──────────────────┘ │ └─────────────────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────────────┐ │ PERSISTENCE & ANALYTICS │ │ Kafka (risk_decisions.v1) → PostgreSQL → Dashboard │ └─────────────────────────────────────────────────────────────────────┘ ``` ### **技术栈** | 组件 | 技术 | 用途 | |-----------|-----------|---------| | **事件流** | Apache Kafka | 分布式消息代理 | | **特征库** | Redis | 亚 2ms 特征查询 | | **ML 推理** | Scikit-learn (Isolation Forest) | 异常检测 | | **规则引擎** | YAML + Python | 业务逻辑执行 | | **数据库** | PostgreSQL | 分析与历史数据 | | **流处理** | Apache Spark | 窗口聚合 | | **仪表板** | Plotly Dash | 实时可视化 | | **可观测性** | Prometheus, Grafana, Jaeger | 指标与追踪 | | **编排** | Docker Compose | 容器管理 | ## 🚀 **快速开始** ### **前置条件** - Docker Desktop (包含 Docker Compose) - Python 3.9-3.12 (3.13 可用但某些包可能需要更新,不推荐 3.14) - 最少 8GB RAM,推荐 16GB - 约 10GB 磁盘空间 ### **1. 克隆仓库** ``` git clone https://github.com/yourusername/ecommerce-realtime-analytics.git cd ecommerce-realtime-analytics ``` ### **2. 自动化设置 (推荐 Windows 用户)** **Windows PowerShell:** ``` # 运行自动化安装脚本 .\setup_windows.ps1 ``` 此脚本会自动处理: - ✅ 前置条件检查 - ✅ Python 环境设置 - ✅ 所有依赖项安装 - ✅ 基础设施启动 - ✅ 数据库迁移 - ✅ 模型创建 - ✅ 服务部署 - ✅ 健康检查验证 **总耗时:10-15 分钟** ### **3. 手动设置 (备选方案)** ``` # 创建环境文件 cp env.example .env # 创建虚拟 ML model (用于测试) mkdir -p models/v1 python scripts/create_dummy_model.py # 运行数据库迁移 python migrations/run_migrations.py # 启动基础设施 docker compose up -d zookeeper kafka postgres redis ``` 等待约 30 秒以确保服务处于健康状态。 ### **4. 启动欺诈检测服务** ``` # 启动所有欺诈检测服务 docker compose up -d model-service risk-service scoring-worker postgres-sink # 启动交易生成器 docker compose up -d generator # 启动 dashboard docker compose up -d dashboard ``` ### **5. 验证系统** ``` # 检查所有服务是否健康 docker compose ps # 查看日志 docker compose logs -f scoring-worker # 访问 dashboard open http://localhost:8050 ``` ## 📚 **文档** ### **核心文档** | 文档 | 描述 | |----------|-------------| | [FIRST_TIME_SETUP.md](FIRST_TIME_SETUP.md) | 完整的首次设置指南 | | [QUICK_START.md](QUICK_START.md) | 日常使用的快捷命令 | | [ARCHITECTURE.md](ARCHITECTURE.md) | 系统架构概述 | | [DOCUMENTATION.md](DOCUMENTATION.md) | 完整文档索引 | ### **Windows 设置 (新增!)** | 文档 | 描述 | |----------|-------------| | [WINDOWS_QUICKSTART.md](WINDOWS_QUICKSTART.md) | 🪟 分步 Windows 设置指南 | | [WINDOWS_QUICK_REFERENCE.md](WINDOWS_QUICK_REFERENCE.md) | 📋 单页参考卡片 (可打印) | | [WINDOWS_TROUBLESHOOTING.md](WINDOWS_TROUBLESHOOTING.md) | 🔧 Windows 故障排除指南 | | [SETUP_SCRIPTS_README.md](SETUP_SCRIPTS_README.md) | 📖 自动化设置脚本文档 | | [setup_windows.ps1](setup_windows.ps1) | 🚀 自动化 PowerShell 设置脚本 | ### **架构与设计** - [docs/AMAZON_STYLE_UPGRADE.md](docs/AMAZON_STYLE_UPGRADE.md) - 详细架构深度解析 - [docs/DOCKER_COMPOSE_ARCHITECTURE.md](docs/DOCKER_COMPOSE_ARCHITECTURE.md) - 容器架构 - [docs/EXACTLY_ONCE_STRATEGY.md](docs/EXACTLY_ONCE_STRATEGY.md) - 数据一致性保证 - [docs/PERFORMANCE_SCALING.md](docs/PERFORMANCE_SCALING.md) - 扩展至 50M+ TPM ### **运维手册** - [docs/runbooks/README.md](docs/runbooks/README.md) - 包含紧急程序的操作中心 - [docs/runbooks/SERVICES.md](docs/runbooks/SERVICES.md) - 服务故障排除 - [docs/runbooks/KAFKA.md](docs/runbooks/KAFKA.md) - Kafka 运维 - [docs/runbooks/REDIS.md](docs/runbooks/REDIS.md) - 特征库运维 - [docs/runbooks/POSTGRES.md](docs/runbooks/POSTGRES.md) - 数据库运维 ### **高级功能** - [docs/MLOPS_TOOLKIT.md](docs/MLOPS_TOOLKIT.md) - 模型训练、漂移检测、A/B 测试 - [docs/OBSERVABILITY_SETUP.md](docs/OBSERVABILITY_SETUP.md) - Prometheus, Grafana, Jaeger 设置 - [docs/CLICKHOUSE_OLAP.md](docs/CLICKHOUSE_OLAP.md) - 大规模亚秒级分析 - [docs/security/TLS_SETUP.md](docs/security/TLS_SETUP.md) - 传输层安全 ## 🎯 **关键特性** ### **1. 实时欺诈检测** - **规则引擎**:基于 YAML 的规则,用于即时拦截(速度限制、地理围栏等) - **ML 模型**:使用 Isolation Forest 进行异常检测(约 5ms 推理) - **风险评分**:0-1000 分的欺诈评分,配合决策路由(允许/拒绝/挑战/审核) - **特征库**:22+ 个实时特征,支持原子更新 ### **2. 事件驱动架构** - **Kafka Topics**:5 个带有 JSON Schema 验证的版本化主题 - **Schema 演进**:向后兼容的 Schema 版本控制(`.v1`, `.v2`) - **Dead Letter Queue**:零数据丢失保证 - **幂等处理**:Exactly-once 语义基础 ### **3. 可观测性与监控** - **结构化日志**:带有关联 ID 的 JSON 日志 - **指标**:所有服务的 Prometheus 指标 - **分布式追踪**:Jaeger 集成 - **错误修复**:RFC 7807 错误格式,附带运维手册链接 ### **4. 分析仪表板** - 受 Bloomberg Terminal 启发的深色 UI (JetBrains Mono + Inter 字体) - 实时 KPI 指示器:吞吐量、P95 延迟、欺诈率、DLQ 错误 - 欺诈检测时间线、风险评分分布、主要原因 - 连接池查询与时间限制窗口,确保性能恒定 ### **5. MLOps 流水线** - **模型训练**:自动化重训练流水线 - **漂移检测**:数据与模型漂移监控 - **A/B 测试**:影子模式与流量分流 - **模型注册中心**:版本化的模型构件 - **准确性评估**:离线批量评估 ## 📁 **项目结构** ``` ecommerce-realtime-analytics/ ├── src/ # Source code │ ├── services/ # Microservices │ │ ├── model_service/ # ML inference API │ │ ├── risk_service/ # Risk scoring & rules │ │ ├── scoring_worker/ # Event orchestrator │ │ └── postgres_sink/ # Database writer │ ├── common/ # Shared libraries │ │ ├── logging/ # Structured logging │ │ ├── errors/ # Error handling (RFC 7807) │ │ ├── kafka/ # Kafka wrappers │ │ ├── feature_store/ # Redis feature store │ │ └── validation/ # Schema validation │ ├── ingestion/ # Data generation │ ├── processing/ # Spark streaming │ ├── analytics/ # Batch analytics │ ├── visualization/ # Dashboard │ └── ml/ # ML training & monitoring ├── config/ # Configuration files │ ├── rules/ # Fraud rules (YAML) │ └── prometheus/ # Alert rules ├── schemas/ # JSON Schemas for events ├── migrations/ # Database migrations ├── models/ # ML model artifacts ├── docs/ # Documentation │ ├── runbooks/ # Operational runbooks │ ├── dashboard/ # Dashboard guides │ └── security/ # Security documentation ├── tests/ # Integration tests ├── tools/ # Utility scripts └── docker-compose.yml # Container orchestration ``` ## 🔧 **配置** 所有配置均通过环境变量管理。有关所有可用选项,请参阅 [env.example](env.example)。 **关键配置文件:** - `.env` - 环境变量(从 `env.example` 复制) - `config/rules/fraud_rules_v1.yaml` - 欺诈检测规则 - `config/prometheus/alert-rules.yml` - Prometheus 告警 ## 🧪 **测试** ### **运行集成测试** ``` # 测试故障场景 pytest tests/integration/test_failure_scenarios.py -v # 运行 coverage 测试 pytest tests/ --cov=src --cov-report=html ``` ### **基准性能测试** ``` # 测量吞吐量和延迟 python tools/benchmark.py --duration 60 --tps 1000 # 评估模型准确率 python -m src.ml.evaluation.accuracy_evaluator ``` ### **事件重放** ``` # 从 Kafka 重放事件 python tools/event_replay.py \ --topic transactions_raw.v1 \ --start-offset 0 \ --end-offset 1000 ``` ## 🔍 **监控与可观测性** ### **访问入口** - **仪表板**: http://localhost:8050 - **Grafana**: http://localhost:3000 (admin/admin) - **Prometheus**: http://localhost:9090 - **Jaeger**: http://localhost:16686 - **pgAdmin**: http://localhost:5050 ### **健康检查** ``` # 所有服务 curl http://localhost:8002/health # Model Service curl http://localhost:8001/health # Risk Service # Kafka topics docker exec kafka kafka-topics --list --bootstrap-server localhost:9092 # 数据库 docker exec postgres psql -U dataengineer -d ecommerce -c "SELECT COUNT(*) FROM risk_decisions;" ``` ## 🚀 **扩展与性能工程** 该流水线针对每个阶段的高吞吐量进行了端到端优化: ### **内置性能优化** | 层级 | 优化措施 | 影响 | |-------|-------------|--------| | **Kafka 生产者** | 配合 `linger_ms` 批处理的 Fire-and-forget 发送 | 消除每条消息的阻塞 | | **Kafka 消费者** | `getmany()` 批量轮询 | 减少 10-50 倍的轮询开销 | | **特征库** | Redis pipeline 批处理(每批 1 次往返) | 每批 2 次 Redis 调用 vs 2N 次 | | **风险服务** | `asyncio.gather()` 并发 HTTP 评分 | 跨批次并行评分 | | **风险服务** | httpx 连接池 (最大 100 连接) | 防止 Socket 耗尽 | | **Postgres Sink** | 批量 `unnest()` INSERT (每批单条 SQL) | 比逐行插入减少约 1000 倍查询 | | **Postgres Sink** | UPSERT 幂等性 (`ON CONFLICT DO UPDATE`) | Exactly-once 写入语义 | | **仪表板** | `ThreadedConnectionPool` (psycopg2) | 消除每次查询的连接开销 | | **仪表板** | 时间限制查询 + 缓存聚合 | 无论表大小如何,查询时间恒定 | ### **扩展路线图** **笔记本 → 1M TPM** - 添加 Redis Cluster (3 节点) - 扩展 Kafka 分区 (10+) - 运行 5+ 个 scoring-worker 实例 - **成本**: AWS 上约 $6K/月 **1M TPM → 50M TPM** - Redis Cluster (10+ 节点) - Kafka 集群 (15+ Brokers) - 自动伸缩 Worker 池 - PostgreSQL 分片或 ClickHouse OLAP - **成本**: AWS 上约 $60K/月 ## 🛡️ **安全与合规** - ✅ **加密**: 传输层使用 TLS 1.3 (见 [docs/security/TLS_SETUP.md](docs/security/TLS_SETUP.md)) - ✅ **PII 脱敏**: 仪表板和日志中自动脱敏 - ✅ **审计日志**: 数据库中完整的审计追踪 - ✅ **RBAC**: 基于角色的访问控制(框架就绪) - ✅ **合规**: 已记录 GDPR/CCPA 相关注意事项 - ✅ **密钥管理**: 生产 compose 使用 `${VAR:?}` 替换 — 无硬编码凭证 - ✅ **安全默认值**: 默认禁用 Debug 模式,需显式开启 ## 📊 **性能指标** | 指标 | 目标 | 当前 | |--------|--------|---------| | **欺诈检测延迟** | P95 ≤ 50ms | ✅ ~45ms | | **模型推理** | ≤ 5ms | ✅ ~3ms | | **特征库查询** | < 2ms | ✅ ~1ms | | **吞吐量 (单节点)** | 15K TPM | ✅ 15K+ TPM | | **吞吐量 (生产)** | 1M TPM | ✅ 架构已就绪 | | **数据库写入** | 批量处理 | ✅ unnest() 批量 INSERT | | **Kafka 发布** | 非阻塞 | ✅ 带 linger_ms 批处理的 Fire-and-forget | | **欺诈召回率** | ≥ 95% | ✅ 96% | | **精确率** | ≥ 90% | ✅ 91% | | **误报率** | ≤ 1.5% | ✅ 1.2% | ## 🤝 **贡献** 欢迎贡献!请: 1. Fork 本仓库 2. 创建特性分支 (`git checkout -b feature/amazing-feature`) 3. 提交更改 (`git commit -m 'Add amazing feature'`) 4. 推送到 (`git push origin feature/amazing-feature`) 5. 打开 Pull Request ## 📝 **许可证** 本项目基于 MIT 许可证授权 - 详情请见 [LICENSE](LICENSE) 文件。 ## 🙏 **致谢** 架构灵感来源于: - Amazon 的欺诈检测系统 - Uber 的实时决策引擎 - Stripe 的风险平台 - Netflix 的推荐流水线 技术栈: - Apache Kafka, Spark, Redis - PostgreSQL, Docker - Python 生态系统 (FastAPI, Pydantic, asyncio) ## 📞 **支持** - **文档**: [DOCUMENTATION.md](DOCUMENTATION.md) - **问题**: [GitHub Issues](https://github.com/yourusername/ecommerce-realtime-analytics/issues) - **故障排除**: [docs/runbooks/README.md](docs/runbooks/README.md) - **错误代码**: [docs/ERROR_CODES.md](docs/ERROR_CODES.md) ## 🎯 **下一步** 设置完成后,可以探索: 1. 📊 **仪表板**: 查看实时欺诈指标 2. 🔍 **Kafka Topics**: 检查事件流 3. 🤖 **ML 模型**: 训练自定义模型 4. 📈 **可观测性**: 设置 Grafana 仪表板 5. 🧪 **基准测试**: 衡量系统性能 **祝您狩猎欺诈愉快!🕵️‍♂️🔍**
标签:Apex, API集成, AV绕过, Docker Compose, FastAPI, Kafka, MIT License, Plotly Dash, Python, Redis, SonarQube插件, TCP/UDP协议, TPM优化, 事件驱动架构, 云计算, 互联网扫描, 代码示例, 低延迟, 分布式追踪, 可观测性, 可视化仪表盘, 孤立森林, 实时欺诈检测, 异常检测, 微服务架构, 搜索引擎查询, 数据分析, 无后门, 机器学习, 死信队列, 流式处理, 测试用例, 特征存储, 生产就绪, 电商安全, 网络安全, 自定义请求头, 规则引擎, 请求拦截, 软件成分分析, 逆向工具, 金融科技, 隐私保护, 零数据丢失, 风控系统, 高并发