Mubrix2/ai-fraud-detection-pipeline
GitHub: Mubrix2/ai-fraud-detection-pipeline
面向金融科技的事件驱动型实时欺诈检测与 AML 合规流水线,通过双 ML 模型、规则引擎与 SHAP 可解释性在 500ms 内输出合规决策。
Stars: 0 | Forks: 0
# 实时欺诈检测与风险管理流水线
一个用于金融交易的生产级、事件驱动型欺诈检测系统。每笔交易都会流经 Kafka 流式处理流水线、两个独立的 ML 模型、AML 模式检测、可配置的规则引擎以及 SHAP 可解释性分析——在 500 毫秒内生成符合合规要求的决策。
**在线演示:** [ai-fraud-detection-pipeline.vercel.app](https://ai-fraud-detection-pipeline.vercel.app)
**API 文档:** [ai-fraud-detection-pipeline.onrender.com/docs](https://ai-fraud-detection-pipeline.onrender.com/docs)
## 演示

## 流水线
```
Transaction submitted via REST API
│
▼
Kafka Stream ─── raw-transactions topic (3 partitions)
│
▼
Feature Engineering ─── 14 features from transaction fields
│
├── Velocity Layer (two parallel engines):
├── Sender velocity → "Is THIS customer unusually active?"
└── Destination velocity → "Are MANY customers hitting THIS destination?"
│
▼
Dual ML Scoring
XGBoost Classifier → fraud probability 0–1 (supervised)
Isolation Forest → anomaly score (unsupervised)
│
▼
AML Detection
Structuring / Smurfing → CTR/SAR generation
Rapid Layering → NFIU reporting flag
│
▼
Rules Engine ─── configurable business rules overlay
│
├── Circuit Breaker → Rule Fallback (if ML unavailable)
│
▼
3-Tier Decision
APPROVE (fraud_prob < 0.60)
REVIEW (fraud_prob ≥ 0.60 OR AML flag OR rules escalate)
BLOCK (fraud_prob ≥ 0.85)
│
▼
SHAP Explanation ─── exact TreeExplainer attributions (compliance)
│
▼
Immutable Audit Log ─── SQLite append-only
│
▼
React Dashboard + AI Investigation Agent (LangGraph)
```
## 架构
```
┌──────────────────────────────────────────────────────────────┐
│ FastAPI Service (single process) │
│ │
│ Main Thread (uvicorn) Consumer Thread (daemon) │
│ ┌────────────────────┐ ┌─────────────────────────┐ │
│ │ POST /submit │ │ polls Kafka every 1s │ │
│ │ → assess_txn() │ │ → assess_txn() │ │
│ │ → store result │ │ → store result │ │
│ │ → publish Kafka │ │ → publish fraud-results │ │
│ │ GET /recent │ └─────────────────────────┘ │
│ │ GET /stats │ │
│ │ POST /investigate │ shared: _results_store (locked) │
│ └────────────────────┘ │
└──────────────────────────────┬───────────────────────────────┘
│
Apache Kafka (KRaft)
├── raw-transactions (3 partitions)
└── fraud-results (3 partitions)
```
消费者线程和 HTTP 端点共享 `_results_store`——一个受锁保护的线程安全内存字典。HTTP `/submit` 端点进行同步评分,因此结果立即可用。消费者线程提供了 Kafka 流式架构。
## 技术栈
| 层级 | 技术 | 原因 |
|---|---|---|
| 流处理 | Apache Kafka 4.x (KRaft) | 持久化、可重放、可水平扩展 |
| ML — 监督学习 | XGBoost | 针对表格型欺诈数据的同类最佳选择 |
| ML — 无监督学习 | Isolation Forest | 无需标签的新颖模式检测 |
| 可解释性 | SHAP TreeExplainer | 精确归因——合规要求 |
| 特征上下文 | Velocity Engine | 针对每个客户的行为基线 |
| AML | 自定义规则引擎 | 结构化、分层、NFIU CTR/SAR |
| 规则引擎 | Python 规则 | 在 ML 输出上叠加业务策略 |
| 弹性设计 | Circuit Breaker | ML 不可用时的规则降级 |
| 审计 | SQLite append-only | 不可变的合规追踪 |
| 后端 | FastAPI + Pydantic v2 | 快速、经过验证、异步 |
| 前端 | React + Vite + Recharts | 实时调查仪表板 |
| AI Agent | LangGraph + Groq | 对话式欺诈调查 |
| 容器 | Docker + Docker Compose | 全系统编排 |
## 决策逻辑
```
# 干净、可审计、标准化
if fraud_prob >= 0.85:
decision = "BLOCK"
elif fraud_prob >= 0.60:
decision = "REVIEW"
else:
decision = "APPROVE"
# Rules 和 AML 可以将 APPROVE 升级为 REVIEW
# Anomaly 可以将 APPROVE 升级为 REVIEW
# 没有任何操作可以降级(只会增加风险)
```
## 模型性能
在 554,082 笔预留的测试交易(真实世界的欺诈率为 0.297%)上进行评估:
| 指标 | XGBoost | Isolation Forest |
|---|---|---|
| Fraud Recall | 98.7% | 71.4% |
| Fraud Precision | 63.5% | 8.3% |
| F1 Score | 0.773 | — |
| ROC-AUC | 0.998 | — |
**组合策略(如果任一模型标记则 REVIEW/BLOCK):召回率达 98.9%。**
数据集:PaySim(合成移动支付数据,630 万笔交易)。
注意:Isolation Forest 的精确度按预期设定得较低——无论该模式是否被标记为欺诈,它都会标记异常交易。其价值在于捕获 XGBoost 从未训练过的新颖模式。
## 本地运行
### 前置条件
- Python 3.12.3
- Node.js 20+
- Java 17+(用于 Kafka)
- Kafka 4.x 安装在 WSL2 或 Linux 中
### 快速开始 — Docker Compose(推荐)
```
# 1. 首先训练模型(一次性,约 10 分钟)
python scripts/prepare_data.py
python scripts/train_fraud_model.py
python scripts/train_anomaly_model.py
# 2. 启动所有服务
docker compose up --build
```
| 服务 | URL |
|---|---|
| API + 文档 | http://localhost:8000/docs |
| 仪表板 | http://localhost:80 |
| Kafka | localhost:9092 (内部) / 29092 (主机) |
### 手动启动 — 三个终端
```
# 终端 1 — Kafka
kafka-server-start.sh ~/kafka/config/kraft/server.properties
# 终端 2 — API(consumer 线程自动启动)
source venv-linux/bin/activate
uvicorn app.main:app --reload --port 8000
# 终端 3 — React dashboard
cd frontend && npm run dev
# Dashboard:http://localhost:5173
```
### 生成演示流量
```
# 每秒 10 笔交易,持续 60 秒(2% 欺诈注入)
python scripts/simulate_traffic.py --rate 10 --duration 60
# 连续流
python scripts/simulate_traffic.py --rate 5 --duration 0
```
## API 参考
| 方法 | 端点 | 描述 |
|---|---|---|
| `POST` | `/api/v1/transactions/submit` | 提交交易进行筛查 |
| `GET` | `/api/v1/transactions/results/{id}` | 获取交易结果 |
| `GET` | `/api/v1/transactions/recent` | 用于仪表板的近期交易 |
| `GET` | `/api/v1/transactions/stats` | 系统统计信息 |
| `POST` | `/api/v1/transactions/investigate` | AI 调查 Agent |
| `GET` | `/health` | 服务健康检查 |
**提交一笔可疑交易:**
```
curl -X POST http://localhost:8000/api/v1/transactions/submit \
-H "Content-Type: application/json" \
-d '{
"transaction_id": "TXN-001",
"step": 3, "type": "TRANSFER", "amount": 750000,
"name_orig": "C1234567890",
"oldbalance_org": 750000, "newbalance_orig": 0,
"name_dest": "C9876543210",
"oldbalance_dest": 0, "newbalance_dest": 0
}'
```
**询问调查 Agent:**
```
curl -X POST http://localhost:8000/api/v1/transactions/investigate \
-H "Content-Type: application/json" \
-d '{"question": "Why was TXN-001 flagged?"}'
```
## 项目结构
```
realtime-fraud-detection-pipeline/
├── app/
│ ├── main.py # FastAPI + consumer daemon thread
│ ├── config.py # All configuration from env vars
│ ├── api/
│ │ ├── schemas.py # Pydantic v2 models (extra=forbid)
│ │ └── routes/
│ │ ├── transactions.py # Core fraud detection endpoints
│ │ └── health.py
│ ├── core/
│ │ ├── feature_engineer.py # 14 engineered features
│ │ ├── fraud_scorer.py # XGBoost inference + 3-tier decision
│ │ ├── anomaly_detector.py # Isolation Forest inference
│ │ ├── explainer.py # SHAP TreeExplainer
│ │ ├── velocity_engine.py # Per-customer sliding window features
│ │ ├── aml_detector.py # Structuring/layering + SAR/CTR
│ │ ├── rules_engine.py # Configurable business rules
│ │ ├── circuit_breaker.py # ML fallback resilience
│ │ ├── audit_logger.py # Immutable SQLite audit trail
│ │ └── investigation_agent.py # LangGraph analyst agent
│ ├── services/
│ │ └── detection_service.py # 9-step pipeline orchestration
│ └── streaming/
│ ├── producer.py # Kafka producer
│ └── consumer.py # Daemon thread consumer
├── scripts/
│ ├── prepare_data.py # SMOTE, scaling, train/test split
│ ├── train_fraud_model.py # XGBoost training + threshold search
│ ├── train_anomaly_model.py # Isolation Forest training
│ ├── evaluate_models.py # Combined model evaluation
│ ├── simulate_traffic.py # Demo traffic generator
│ └── test_kafka_connection.py # Kafka health check
├── tests/ # Unit tests for every module
├── notebooks/
│ └── 01_data_exploration.ipynb # 7-cell analysis driving design
├── frontend/ # React investigation dashboard
├── Dockerfile # API image
├── Dockerfile.frontend # React multi-stage image
├── docker-compose.yml # Full system orchestration
└── nginx.conf # SPA routing config
```
## 部署
```
Render (API) Vercel (Frontend)
↓ ↓
FastAPI + consumer React dashboard
synchronous scoring polls Render API
↑
No Kafka in cloud — synchronous scoring path handles all functionality.
Kafka streaming architecture demonstrated via Docker Compose locally.
```
| 服务 | 平台 | 成本 |
|---|---|---|
| API | Render Web Service (免费) | $0 |
| 前端 | Vercel (免费) | $0 |
## AML 合规
该流水线可检测三种反洗钱(AML)类型:
**结构化:** 24 小时内发生多笔 4,500,000 奈拉及以上的交易——旨在规避 5,000,000 奈拉的 CTR 阈值。
**快速分层:** 1 小时内发生 5 次以上的对外转账——资金快速流经账户以掩盖来源。
**CTR 阈值:** 单笔交易达到或超过 5,000,000 奈拉——根据尼日利亚《2022 年禁止洗钱法》,需要自动提交现金交易报告(CTR)。
系统会针对结构化和分层行为生成可疑活动报告(SAR),并标记在 24 小时内向 NFIU 履行的申报义务。
## 目标 Velocity
目标 Velocity 追踪现在与每个客户的 Velocity 并行运行。
该系统可同时检测:
- **每个客户的模式:** 一个被盗账户快速发送多笔交易
- **跨客户(网络)模式:** 许多不同的客户在短时间内访问同一个目标(撞库测试 / 分布式微小欺诈)
## 工程决策
**为什么将消费者作为线程运行?**
保持部署简单——单进程、单容器。展示了相同的架构分离(生产者/消费者),同时避免了第二个服务带来的运维复杂性。
**为什么是三级(APPROVE/REVIEW/BLOCK)而不是四级?**
直接映射到欺诈运营团队的工作方式。REVIEW 路由到分析师队列。BLOCK 拒绝交易。简单、可审计、在成熟的金融科技公司中作为标准实践。
**为什么只在训练数据上应用 SMOTE?**
测试集保留了真实的欺诈率(0.297%),以提供客观的评估指标。在测试数据上应用 SMOTE 会人为夸大召回率——你相当于在合成的简单样本上进行测试。
**为什么使用 PaySim?**
这是唯一一个免费可用且具有适当许可的移动支付欺诈数据集。IEEE-CIS Kaggle 数据集更接近生产数据(包含真实特征:设备指纹、邮箱域),对于构建更贴近现实的模型而言将是下一步的选择。
## 扩展路线图
本系统旨在处理作品集演示和早期阶段的生产流量(每天数百到数千笔交易)。以下是随着交易量增长需要做出的调整及其原因。
### 当前容量(按现有构建)
| 组件 | 当前限制 | 瓶颈 |
|---|---|---|
| API | ~50-100 req/s | 单进程,单核 |
| 消费者 | 1 个线程,3 个分区 | 仅使用了 3 个分区中的 1 个 |
| 结果存储 | 1,000 条记录,内存中 | 重启时丢失,不共享 |
| Velocity/AML 历史记录 | 内存字典 | 重启时丢失,单进程 |
| 审计日志 | SQLite | 文件级锁 |
| Kafka | 单节点 | 无副本,单点故障 |
| 目标 Velocity | 检测一个被盗账户快速发送多笔交易|
## 阶段 1:基于图的商家聚类
在高交易量下,目标 Velocity 能捕获直接信号,但会漏掉那些聚合点在多个目标间轮换的慢速移动欺诈团伙。生产环境的解决方案是引入图分析层:
- 节点:客户账户、目标账户、IP 地址、设备
- 边:交易(包含金额、时间戳、类型)
- 算法:社区发现(Louvain / 谱聚类)
- 信号:包含许多首次连接的紧密聚类子图
= 欺诈团伙特征
需要:Neo4j 或 Apache GraphX,每 15 分钟运行一次批处理作业,将高风险子图成员作为黑名单反馈给规则引擎。实时图分析属于独立的流处理范畴(Apache Flink + 图分区)。
### 阶段 2:1,000 → 50,000 笔交易/天
**添加 Redis 用于共享状态**
用 Redis 替换内存中的 `_results_store`、Velocity 历史和 AML 历史。这允许多个 API 实例共享状态,并在重启后不会丢失客户的行为历史。
**在网关层添加限流**
Nginx 的 `limit_req_zone` 可防止单个客户端使 API 过载——同时防范滥用和失控的集成 Bug。
**水平扩展 API**
在负载均衡器后运行 2-3 个 API 实例。每个实例都读写共享的 Redis 状态。Render/Railway 通过“scale”设置支持此功能——如果已接入 Redis,则无需修改代码。
### 阶段 3:50,000 → 1,000,000 笔交易/天
**将消费者拆分为独立服务**
运行 3 个消费者实例——每个 Kafka 分区对应一个——作为独立的部署。每个实例独占消费一个分区(Kafka 在消费者组内保证了这一点)。
**将审计日志迁移至 PostgreSQL**
当写入量超过约 1,000 次写/秒时,SQLite 的文件锁会成为瓶颈。带有连接池(PgBouncer)的 PostgreSQL 可以轻松应对。
**添加特征存储**
Velocity 和 AML 历史记录从 Redis 字典转移到专业的特征存储(如 Feast,或带有结构化 TTL 键的 Redis)中——支持在欺诈、AML 以及未来的信用风险模型之间复用特征。
**多节点 Kafka(3 个 broker,副本因子为 3)**
单个 broker = 单点故障。3 个 broker 且 RF=3 意味着系统能在任何单个 broker 宕机时继续运行,且数据零丢失。
### 阶段 4:1,000,000+ 笔交易/天
**死信队列(DLQ)**
处理失败 3 次的消息将被发送到 `fraud-dlq`,而不是被丢弃或无限重试。由独立的进程审查 DLQ 消息——通常是格式错误的 payload 或模型超时的边缘情况。
**幂等层**
在这种流量下,网络重试必定会导致重复提交。基于 Redis 的幂等键(transaction_id 的哈希值)可防止对同一笔交易进行重复评分。
**可观测性堆栈**
使用 Prometheus + Grafana 监控指标(请求率、延迟百分位、模型推理时间、Kafka 消费者滞后)。没有这些工具,在客户投诉之前,你将无法察觉系统性能的下降。
**模型服务分离**
ML 推理转移到专用服务(或使用 ONNX Runtime 实现低于 10ms 的推理),这样模型更新就不需要重新部署整个 API。
### 什么保持不变
核心检测流水线——特征工程、14 项特征、9 步 `assess_transaction()` 流程、SHAP 可解释性、AML 类型以及三级决策逻辑——在任何规模下都保持一致。扩展改变的是*基础设施*,而不是*欺诈逻辑*。这种分离(业务逻辑与基础设施)正是该架构能够无需重写即可发展的原因。
## 作者
**Mubarak Olalekan Oladipo**
AI 工程师 — 欺诈检测与风险管理
尼日利亚伊巴丹 · 开放远程工作
[GitHub](https://github.com/Mubrix2) · [LinkedIn](https://www.linkedin.com/in/mubarak-oladipo/) · +234 814 353 0951
标签:Apex, Kafka, SonarQube插件, XGBoost, 反欺诈系统, 反洗钱, 机器学习, 请求拦截, 软件成分分析, 逆向工具, 金融科技