Goullashankar/Fraud-Detection-system-Pipeline
GitHub: Goullashankar/Fraud-Detection-system-Pipeline
一个基于 Kafka 与 Spark 的实时电商欺诈检测流水线,结合机器学习与规则引擎实现高吞吐、低延迟的生产级检测。
Stars: 0 | Forks: 0
# 实时电商欺诈检测流水线
一个生产级别的实时流处理流水线,使用 Apache Kafka、Spark Structured Streaming 和隔离森林(Isolation Forest)机器学习模型结合混合规则引擎来检测欺诈性电商交易。以每秒处理 500+ 笔交易的速度运行,并通过 Streamlit 仪表板实现实时欺诈监控。
## 实时架构
交易生成器(Python + Faker)
↓
Apache Kafka(3 个主题 | 3 个分区)
├── raw-transactions
├── fraud-alerts
└── dead-letter-queue(格式错误的事件)
↓
Spark Structured Streaming(PySpark)
├── 实时特征工程
├── 隔离森林异常评分
├── 混合规则引擎(ML + 业务规则)
└── foreachBatch 写入存储
↓
存储层
├── SQLite(欺诈结果 + 分析)
└── HBase 模式(用户画像 + 告警)
↓
Streamlit 仪表板(实时欺诈监控)
## 独特之处
- **死信队列** — 格式错误的事件会被路由到独立的主题并通过批处理作业重新处理,而不是导致流水线崩溃,这正是生产环境欺诈系统处理不良数据的方式
- **混合规则引擎** — 将隔离森林 ML 异常评分与硬性业务规则结合,类似于真实的金融科技欺诈系统,既能捕捉未知模式也能识别明显违规
- **4 种真实的欺诈注入模式** — 高金额、交易速率、未知设备、地点跳跃,使模拟更贴近生产环境
- **自定义 Docker 镜像** — numpy、pandas、scikit-learn 永久嵌入 Spark 镜像,确保在任何环境中均可重复部署
- **端到端流水线** — 从数据生成到实时仪表板,而不仅仅是一个笔记本实验
## 技术栈
| 层级 | 技术 |
|---|---|
| 消息代理 | Apache Kafka |
| 流处理 | Apache Spark Structured Streaming |
| 机器学习模型 | 隔离森林(scikit-learn) |
| 特征工程 | PySpark UDF |
| 存储 | SQLite、HBase(架构) |
| 仪表板 | Streamlit + Plotly |
| 容器化 | Docker、Docker Compose |
| 语言 | Python |
## 欺诈检测逻辑
每个交易都会通过两个并行运行在 Spark 微批执行器中的层级进行评分。
**层级 1 — 隔离森林机器学习模型**
基于 10,500 笔交易训练(10,000 正常 + 500 合成欺诈样本)。使用 6 个特征:交易金额、最近一小时内交易次数(速率)、地点纬度、地点经度、用户是否为新商户、一天中的小时数。输出一个介于 -1(欺诈)和 +1(正常)之间的异常评分。低于 0 的评分会被标记为潜在欺诈。
**层级 2 — 混合规则引擎**
- 金额大于 50,000 卢比 → 标记为 high_amount 欺诈
- 一小时内超过 20 笔交易 → 标记为 velocity 欺诈
- 新商户且金额大于 10,000 卢比 → 标记为 new_merchant_high_amount
- 地点纬度超出印度范围(低于 8 或高于 35 度)→ 标记为 location_anomaly
如果任意一层检测到交易,则将其标记为欺诈。这反映了真实金融科技公司如何结合机器学习与业务规则。
## Kafka 架构
三个主题处理不同的数据流。raw-transactions 主题接收所有传入的交易事件,并通过 3 个分区供 Spark 执行器并行消费。fraud-alerts 主题存储确认的欺诈事件供下游处理。dead-letter-queue 主题接收原本会崩溃流水线的格式错误或不可解析事件,从而实现独立的批处理作业进行清理和重试。
## 项目结构
fraud-detection-pipeline/
├── producer/
│ └── transaction_generator.py # 模拟每秒 500+ 笔交易,包含 4 种欺诈模式
├── spark_jobs/
│ └── fraud_detector.py # Spark 流处理 + 机器学习模型 + 规则引擎
├── models/
│ └── train_model.py # 隔离森林离线训练脚本
├── dashboard/
│ └── app.py # Streamlit 实时欺诈监控仪表板
├── config/
│ └── hadoop.env # Hadoop 和 HBase 配置
├── Dockerfile.spark # 自定义 Spark 镜像,预置 ML 依赖
├── docker-compose.yml # 完整的 5 容器堆栈编排
├── start.ps1 # 一键启动脚本
└── README.md
## 安装与运行
### 先决条件
- Docker Desktop
- Python 3.8 或更高版本
- Java 8 或更高版本
- Git
### 第一步 — 克隆仓库
```
git clone https://github.com/vinayp2219/fraud-detection-pipeline.git
cd fraud-detection-pipeline
```
### 第二步 — 下载 Kafka Spark 连接器 JAR 包
```
Invoke-WebRequest -Uri "https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.4.1/spark-sql-kafka-0-10_2.12-3.4.1.jar" -OutFile "spark_jobs/spark-sql-kafka.jar"
Invoke-WebRequest -Uri "https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.4.0/kafka-clients-3.4.0.jar" -OutFile "spark_jobs/kafka-clients.jar"
Invoke-WebRequest -Uri "https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.4.1/spark-token-provider-kafka-0-10_2.12-3.4.1.jar" -OutFile "spark_jobs/spark-token-provider.jar"
Invoke-WebRequest -Uri "https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.11.1/commons-pool2-2.11.1.jar" -OutFile "spark_jobs/commons-pool2.jar"
```
### 第三步 — 构建自定义 Spark 镜像
```
docker compose build spark spark-worker
```
### 第四步 — 一键启动所有服务
```
.\start.ps1
```
### 第五步 — 打开实时仪表板
http://localhost:8501
## 结果
- 流水线首次运行处理 6,481 笔交易
- 检测到 255 个欺诈告警 — 欺诈率 3.9%,符合真实电商基准
- 成功识别 4 种欺诈模式 — high_amount、velocity、location_anomaly、new_merchant_high_amount
- 流水线延迟低于每微批 3 秒
- 由于死信队列处理格式错误事件,实现零数据丢失
## 关键面试谈点
- 死信队列是该项目与典型学生项目的区别。大多数新人不会考虑数据格式错误时会发生什么。你构建了一个生产级别的解决方案。
- 混合规则引擎展示了对真实业务约束的理解 — 纯机器学习并不总是足够的,当一笔 1.5 万卢比的交易明显可疑时,无论模型评分如何都应该被拦截。
- 自定义 Docker 镜像体现了生产思维 — 依赖项预置在镜像中意味着容器重启后无需手动重装。
- Kafka 的双监听配置 — 内部使用 kafka:29092 供 Docker 内的 Spark 使用,外部使用 localhost:9092 供 Windows 上的 Python 使用 — 展示了对容器化网络的深入理解。
## 简历要点
- 构建了实时欺诈检测流水线,以每秒 500+ 笔交易的速度摄取数据,使用 Apache Kafka Structured Streaming 和 PySpark
- 部署隔离森林异常检测模型与混合规则引擎,在 4 种模式下实现 3.9% 的欺诈识别率
- 实现了 Kafka 死信队列路由格式错误事件,确保生产环境零停机
- 使用预置依赖的自定义 Docker 镜像容器化端到端机器学习流水线,实现跨环境可重复部署
## 后续改进
- SHAP 可解释性,展示具体哪些特征驱动每笔欺诈决策
- 自动模型重训练循环,仪表板确认的欺诈标签反馈回训练数据
- Grafana 监控仪表板,用于流水线健康指标 — Kafka 消费者滞后、Spark 批处理时长、每秒吞吐量
- 将存储层从 SQLite 切换到 HBase 或 Cassandra,以支持水平扩展的分布式写入
- 添加 Spark 窗口聚合,用于计算用户 5 分钟滚动平均消费作为额外欺诈信号
## 作者
**Pabbu Vinay Goud**
[LinkedIn](https://www.linkedin.com/in/pabbu-vinay) | [GitHub](https://github.com/vinayp2219)
标签:500+ 事务/秒, Apache Kafka, Docker, Faker, HBase, Isolation Forest, Kubernetes, PySpark, Python, Spark Structured Streaming, SQLite, Streamlit, 事务处理, 云计算, 安全防御评估, 实时欺诈检测, 数据管道, 无后门, 机器学习异常检测, 死信队列, 流处理, 混合规则引擎, 版权保护, 特征工程, 生产级数据管道, 电商反欺诈, 端到端管道, 规则引擎, 访问控制, 请求拦截, 软件工程, 软件成分分析, 逆向工具, 金融风控, 高吞吐