ctsc/Ecommerce-Realtime-Analytics
GitHub: ctsc/Ecommerce-Realtime-Analytics
一个受Amazon架构启发的高性能电商实时欺诈检测流水线,结合规则引擎与ML模型实现毫秒级决策。
Stars: 0 | Forks: 0
# 🚀 电商实时欺诈检测与分析
[](LICENSE)
[](https://www.python.org/downloads/)
[](https://www.docker.com/)
## 📊 **项目概述**
本项目实现了一个受 Amazon 架构启发的**生产级欺诈检测流水线**,主要特性包括:
- ⚡ **实时处理**:基于流式架构,欺诈决策延迟低于 100ms
- 🎯 **多层检测**:规则引擎 + ML 模型 + 图分析
- 🔄 **事件驱动架构**:使用 Apache Kafka 实现可靠的事件流
- 📊 **分析仪表板**:使用 Plotly Dash 进行实时监控
- 🛡️ **零数据丢失**:针对所有故障场景配备 DLQ (Dead Letter Queue)
- 🔍 **全链路可观测性**:结构化日志、指标和分布式追踪
### **系统能力**
| 指标 | 单节点 (笔记本) | 生产就绪 | 可扩展至 |
|--------|---------------------|-----------------|-------------|
| **吞吐量** | ~15K TPM | 1M+ TPM | 50M+ TPM |
| **欺诈检测延迟** | P95: 45ms | P95: 50ms | P95: 50ms |
| **模型推理** | ~3ms | ~5ms | ~5ms |
| **特征库查询** | ~1ms | <2ms | <2ms |
| **数据库写入吞吐量** | ~4 次 批量 INSERT/分钟 | 通过 unnest() 批量处理 | 分片 |
| **数据丢失** | 零 | 零 | 零 |
## 🏗️ **架构**
### **高层系统设计**
```
┌─────────────────────────────────────────────────────────────────────┐
│ INGESTION LAYER │
│ Transaction Generator → Kafka (transactions_raw.v1) │
└─────────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────────┐
│ FRAUD DETECTION LAYER │
│ │
│ ┌──────────────────┐ ┌────────────────┐ ┌──────────────────┐ │
│ │ Scoring Worker │───→│ Risk Service │──→│ Model Service │ │
│ │ (Orchestrator) │ │ (Rules + Logic)│ │ (ML Inference) │ │
│ └──────────────────┘ └────────────────┘ └──────────────────┘ │
│ ↕ │
│ ┌──────────────────┐ │
│ │ Feature Store │ │
│ │ (Redis) │ │
│ └──────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────────┐
│ PERSISTENCE & ANALYTICS │
│ Kafka (risk_decisions.v1) → PostgreSQL → Dashboard │
└─────────────────────────────────────────────────────────────────────┘
```
### **技术栈**
| 组件 | 技术 | 用途 |
|-----------|-----------|---------|
| **事件流** | Apache Kafka | 分布式消息代理 |
| **特征库** | Redis | 亚 2ms 特征查询 |
| **ML 推理** | Scikit-learn (Isolation Forest) | 异常检测 |
| **规则引擎** | YAML + Python | 业务逻辑执行 |
| **数据库** | PostgreSQL | 分析与历史数据 |
| **流处理** | Apache Spark | 窗口聚合 |
| **仪表板** | Plotly Dash | 实时可视化 |
| **可观测性** | Prometheus, Grafana, Jaeger | 指标与追踪 |
| **编排** | Docker Compose | 容器管理 |
## 🚀 **快速开始**
### **前置条件**
- Docker Desktop (包含 Docker Compose)
- Python 3.9-3.12 (3.13 可用但某些包可能需要更新,不推荐 3.14)
- 最少 8GB RAM,推荐 16GB
- 约 10GB 磁盘空间
### **1. 克隆仓库**
```
git clone https://github.com/yourusername/ecommerce-realtime-analytics.git
cd ecommerce-realtime-analytics
```
### **2. 自动化设置 (推荐 Windows 用户)**
**Windows PowerShell:**
```
# 运行自动化安装脚本
.\setup_windows.ps1
```
此脚本会自动处理:
- ✅ 前置条件检查
- ✅ Python 环境设置
- ✅ 所有依赖项安装
- ✅ 基础设施启动
- ✅ 数据库迁移
- ✅ 模型创建
- ✅ 服务部署
- ✅ 健康检查验证
**总耗时:10-15 分钟**
### **3. 手动设置 (备选方案)**
```
# 创建环境文件
cp env.example .env
# 创建虚拟 ML model (用于测试)
mkdir -p models/v1
python scripts/create_dummy_model.py
# 运行数据库迁移
python migrations/run_migrations.py
# 启动基础设施
docker compose up -d zookeeper kafka postgres redis
```
等待约 30 秒以确保服务处于健康状态。
### **4. 启动欺诈检测服务**
```
# 启动所有欺诈检测服务
docker compose up -d model-service risk-service scoring-worker postgres-sink
# 启动交易生成器
docker compose up -d generator
# 启动 dashboard
docker compose up -d dashboard
```
### **5. 验证系统**
```
# 检查所有服务是否健康
docker compose ps
# 查看日志
docker compose logs -f scoring-worker
# 访问 dashboard
open http://localhost:8050
```
## 📚 **文档**
### **核心文档**
| 文档 | 描述 |
|----------|-------------|
| [FIRST_TIME_SETUP.md](FIRST_TIME_SETUP.md) | 完整的首次设置指南 |
| [QUICK_START.md](QUICK_START.md) | 日常使用的快捷命令 |
| [ARCHITECTURE.md](ARCHITECTURE.md) | 系统架构概述 |
| [DOCUMENTATION.md](DOCUMENTATION.md) | 完整文档索引 |
### **Windows 设置 (新增!)**
| 文档 | 描述 |
|----------|-------------|
| [WINDOWS_QUICKSTART.md](WINDOWS_QUICKSTART.md) | 🪟 分步 Windows 设置指南 |
| [WINDOWS_QUICK_REFERENCE.md](WINDOWS_QUICK_REFERENCE.md) | 📋 单页参考卡片 (可打印) |
| [WINDOWS_TROUBLESHOOTING.md](WINDOWS_TROUBLESHOOTING.md) | 🔧 Windows 故障排除指南 |
| [SETUP_SCRIPTS_README.md](SETUP_SCRIPTS_README.md) | 📖 自动化设置脚本文档 |
| [setup_windows.ps1](setup_windows.ps1) | 🚀 自动化 PowerShell 设置脚本 |
### **架构与设计**
- [docs/AMAZON_STYLE_UPGRADE.md](docs/AMAZON_STYLE_UPGRADE.md) - 详细架构深度解析
- [docs/DOCKER_COMPOSE_ARCHITECTURE.md](docs/DOCKER_COMPOSE_ARCHITECTURE.md) - 容器架构
- [docs/EXACTLY_ONCE_STRATEGY.md](docs/EXACTLY_ONCE_STRATEGY.md) - 数据一致性保证
- [docs/PERFORMANCE_SCALING.md](docs/PERFORMANCE_SCALING.md) - 扩展至 50M+ TPM
### **运维手册**
- [docs/runbooks/README.md](docs/runbooks/README.md) - 包含紧急程序的操作中心
- [docs/runbooks/SERVICES.md](docs/runbooks/SERVICES.md) - 服务故障排除
- [docs/runbooks/KAFKA.md](docs/runbooks/KAFKA.md) - Kafka 运维
- [docs/runbooks/REDIS.md](docs/runbooks/REDIS.md) - 特征库运维
- [docs/runbooks/POSTGRES.md](docs/runbooks/POSTGRES.md) - 数据库运维
### **高级功能**
- [docs/MLOPS_TOOLKIT.md](docs/MLOPS_TOOLKIT.md) - 模型训练、漂移检测、A/B 测试
- [docs/OBSERVABILITY_SETUP.md](docs/OBSERVABILITY_SETUP.md) - Prometheus, Grafana, Jaeger 设置
- [docs/CLICKHOUSE_OLAP.md](docs/CLICKHOUSE_OLAP.md) - 大规模亚秒级分析
- [docs/security/TLS_SETUP.md](docs/security/TLS_SETUP.md) - 传输层安全
## 🎯 **关键特性**
### **1. 实时欺诈检测**
- **规则引擎**:基于 YAML 的规则,用于即时拦截(速度限制、地理围栏等)
- **ML 模型**:使用 Isolation Forest 进行异常检测(约 5ms 推理)
- **风险评分**:0-1000 分的欺诈评分,配合决策路由(允许/拒绝/挑战/审核)
- **特征库**:22+ 个实时特征,支持原子更新
### **2. 事件驱动架构**
- **Kafka Topics**:5 个带有 JSON Schema 验证的版本化主题
- **Schema 演进**:向后兼容的 Schema 版本控制(`.v1`, `.v2`)
- **Dead Letter Queue**:零数据丢失保证
- **幂等处理**:Exactly-once 语义基础
### **3. 可观测性与监控**
- **结构化日志**:带有关联 ID 的 JSON 日志
- **指标**:所有服务的 Prometheus 指标
- **分布式追踪**:Jaeger 集成
- **错误修复**:RFC 7807 错误格式,附带运维手册链接
### **4. 分析仪表板**
- 受 Bloomberg Terminal 启发的深色 UI (JetBrains Mono + Inter 字体)
- 实时 KPI 指示器:吞吐量、P95 延迟、欺诈率、DLQ 错误
- 欺诈检测时间线、风险评分分布、主要原因
- 连接池查询与时间限制窗口,确保性能恒定
### **5. MLOps 流水线**
- **模型训练**:自动化重训练流水线
- **漂移检测**:数据与模型漂移监控
- **A/B 测试**:影子模式与流量分流
- **模型注册中心**:版本化的模型构件
- **准确性评估**:离线批量评估
## 📁 **项目结构**
```
ecommerce-realtime-analytics/
├── src/ # Source code
│ ├── services/ # Microservices
│ │ ├── model_service/ # ML inference API
│ │ ├── risk_service/ # Risk scoring & rules
│ │ ├── scoring_worker/ # Event orchestrator
│ │ └── postgres_sink/ # Database writer
│ ├── common/ # Shared libraries
│ │ ├── logging/ # Structured logging
│ │ ├── errors/ # Error handling (RFC 7807)
│ │ ├── kafka/ # Kafka wrappers
│ │ ├── feature_store/ # Redis feature store
│ │ └── validation/ # Schema validation
│ ├── ingestion/ # Data generation
│ ├── processing/ # Spark streaming
│ ├── analytics/ # Batch analytics
│ ├── visualization/ # Dashboard
│ └── ml/ # ML training & monitoring
├── config/ # Configuration files
│ ├── rules/ # Fraud rules (YAML)
│ └── prometheus/ # Alert rules
├── schemas/ # JSON Schemas for events
├── migrations/ # Database migrations
├── models/ # ML model artifacts
├── docs/ # Documentation
│ ├── runbooks/ # Operational runbooks
│ ├── dashboard/ # Dashboard guides
│ └── security/ # Security documentation
├── tests/ # Integration tests
├── tools/ # Utility scripts
└── docker-compose.yml # Container orchestration
```
## 🔧 **配置**
所有配置均通过环境变量管理。有关所有可用选项,请参阅 [env.example](env.example)。
**关键配置文件:**
- `.env` - 环境变量(从 `env.example` 复制)
- `config/rules/fraud_rules_v1.yaml` - 欺诈检测规则
- `config/prometheus/alert-rules.yml` - Prometheus 告警
## 🧪 **测试**
### **运行集成测试**
```
# 测试故障场景
pytest tests/integration/test_failure_scenarios.py -v
# 运行 coverage 测试
pytest tests/ --cov=src --cov-report=html
```
### **基准性能测试**
```
# 测量吞吐量和延迟
python tools/benchmark.py --duration 60 --tps 1000
# 评估模型准确率
python -m src.ml.evaluation.accuracy_evaluator
```
### **事件重放**
```
# 从 Kafka 重放事件
python tools/event_replay.py \
--topic transactions_raw.v1 \
--start-offset 0 \
--end-offset 1000
```
## 🔍 **监控与可观测性**
### **访问入口**
- **仪表板**: http://localhost:8050
- **Grafana**: http://localhost:3000 (admin/admin)
- **Prometheus**: http://localhost:9090
- **Jaeger**: http://localhost:16686
- **pgAdmin**: http://localhost:5050
### **健康检查**
```
# 所有服务
curl http://localhost:8002/health # Model Service
curl http://localhost:8001/health # Risk Service
# Kafka topics
docker exec kafka kafka-topics --list --bootstrap-server localhost:9092
# 数据库
docker exec postgres psql -U dataengineer -d ecommerce -c "SELECT COUNT(*) FROM risk_decisions;"
```
## 🚀 **扩展与性能工程**
该流水线针对每个阶段的高吞吐量进行了端到端优化:
### **内置性能优化**
| 层级 | 优化措施 | 影响 |
|-------|-------------|--------|
| **Kafka 生产者** | 配合 `linger_ms` 批处理的 Fire-and-forget 发送 | 消除每条消息的阻塞 |
| **Kafka 消费者** | `getmany()` 批量轮询 | 减少 10-50 倍的轮询开销 |
| **特征库** | Redis pipeline 批处理(每批 1 次往返) | 每批 2 次 Redis 调用 vs 2N 次 |
| **风险服务** | `asyncio.gather()` 并发 HTTP 评分 | 跨批次并行评分 |
| **风险服务** | httpx 连接池 (最大 100 连接) | 防止 Socket 耗尽 |
| **Postgres Sink** | 批量 `unnest()` INSERT (每批单条 SQL) | 比逐行插入减少约 1000 倍查询 |
| **Postgres Sink** | UPSERT 幂等性 (`ON CONFLICT DO UPDATE`) | Exactly-once 写入语义 |
| **仪表板** | `ThreadedConnectionPool` (psycopg2) | 消除每次查询的连接开销 |
| **仪表板** | 时间限制查询 + 缓存聚合 | 无论表大小如何,查询时间恒定 |
### **扩展路线图**
**笔记本 → 1M TPM**
- 添加 Redis Cluster (3 节点)
- 扩展 Kafka 分区 (10+)
- 运行 5+ 个 scoring-worker 实例
- **成本**: AWS 上约 $6K/月
**1M TPM → 50M TPM**
- Redis Cluster (10+ 节点)
- Kafka 集群 (15+ Brokers)
- 自动伸缩 Worker 池
- PostgreSQL 分片或 ClickHouse OLAP
- **成本**: AWS 上约 $60K/月
## 🛡️ **安全与合规**
- ✅ **加密**: 传输层使用 TLS 1.3 (见 [docs/security/TLS_SETUP.md](docs/security/TLS_SETUP.md))
- ✅ **PII 脱敏**: 仪表板和日志中自动脱敏
- ✅ **审计日志**: 数据库中完整的审计追踪
- ✅ **RBAC**: 基于角色的访问控制(框架就绪)
- ✅ **合规**: 已记录 GDPR/CCPA 相关注意事项
- ✅ **密钥管理**: 生产 compose 使用 `${VAR:?}` 替换 — 无硬编码凭证
- ✅ **安全默认值**: 默认禁用 Debug 模式,需显式开启
## 📊 **性能指标**
| 指标 | 目标 | 当前 |
|--------|--------|---------|
| **欺诈检测延迟** | P95 ≤ 50ms | ✅ ~45ms |
| **模型推理** | ≤ 5ms | ✅ ~3ms |
| **特征库查询** | < 2ms | ✅ ~1ms |
| **吞吐量 (单节点)** | 15K TPM | ✅ 15K+ TPM |
| **吞吐量 (生产)** | 1M TPM | ✅ 架构已就绪 |
| **数据库写入** | 批量处理 | ✅ unnest() 批量 INSERT |
| **Kafka 发布** | 非阻塞 | ✅ 带 linger_ms 批处理的 Fire-and-forget |
| **欺诈召回率** | ≥ 95% | ✅ 96% |
| **精确率** | ≥ 90% | ✅ 91% |
| **误报率** | ≤ 1.5% | ✅ 1.2% |
## 🤝 **贡献**
欢迎贡献!请:
1. Fork 本仓库
2. 创建特性分支 (`git checkout -b feature/amazing-feature`)
3. 提交更改 (`git commit -m 'Add amazing feature'`)
4. 推送到 (`git push origin feature/amazing-feature`)
5. 打开 Pull Request
## 📝 **许可证**
本项目基于 MIT 许可证授权 - 详情请见 [LICENSE](LICENSE) 文件。
## 🙏 **致谢**
架构灵感来源于:
- Amazon 的欺诈检测系统
- Uber 的实时决策引擎
- Stripe 的风险平台
- Netflix 的推荐流水线
技术栈:
- Apache Kafka, Spark, Redis
- PostgreSQL, Docker
- Python 生态系统 (FastAPI, Pydantic, asyncio)
## 📞 **支持**
- **文档**: [DOCUMENTATION.md](DOCUMENTATION.md)
- **问题**: [GitHub Issues](https://github.com/yourusername/ecommerce-realtime-analytics/issues)
- **故障排除**: [docs/runbooks/README.md](docs/runbooks/README.md)
- **错误代码**: [docs/ERROR_CODES.md](docs/ERROR_CODES.md)
## 🎯 **下一步**
设置完成后,可以探索:
1. 📊 **仪表板**: 查看实时欺诈指标
2. 🔍 **Kafka Topics**: 检查事件流
3. 🤖 **ML 模型**: 训练自定义模型
4. 📈 **可观测性**: 设置 Grafana 仪表板
5. 🧪 **基准测试**: 衡量系统性能
**祝您狩猎欺诈愉快!🕵️♂️🔍**
标签:Apex, API集成, AV绕过, Docker Compose, FastAPI, Kafka, MIT License, Plotly Dash, Python, Redis, SonarQube插件, TCP/UDP协议, TPM优化, 事件驱动架构, 云计算, 互联网扫描, 代码示例, 低延迟, 分布式追踪, 可观测性, 可视化仪表盘, 孤立森林, 实时欺诈检测, 异常检测, 微服务架构, 搜索引擎查询, 数据分析, 无后门, 机器学习, 死信队列, 流式处理, 测试用例, 特征存储, 生产就绪, 电商安全, 网络安全, 自定义请求头, 规则引擎, 请求拦截, 软件成分分析, 逆向工具, 金融科技, 隐私保护, 零数据丢失, 风控系统, 高并发