Mubrix2/ai-fraud-detection-pipeline

GitHub: Mubrix2/ai-fraud-detection-pipeline

面向金融科技的事件驱动型实时欺诈检测与 AML 合规流水线,通过双 ML 模型、规则引擎与 SHAP 可解释性在 500ms 内输出合规决策。

Stars: 0 | Forks: 0

# 实时欺诈检测与风险管理流水线 一个用于金融交易的生产级、事件驱动型欺诈检测系统。每笔交易都会流经 Kafka 流式处理流水线、两个独立的 ML 模型、AML 模式检测、可配置的规则引擎以及 SHAP 可解释性分析——在 500 毫秒内生成符合合规要求的决策。 **在线演示:** [ai-fraud-detection-pipeline.vercel.app](https://ai-fraud-detection-pipeline.vercel.app) **API 文档:** [ai-fraud-detection-pipeline.onrender.com/docs](https://ai-fraud-detection-pipeline.onrender.com/docs) ## 演示 ![Demo](https://raw.githubusercontent.com/Mubrix2/ai-fraud-detection-pipeline/main/demo.gif) ## 流水线 ``` Transaction submitted via REST API │ ▼ Kafka Stream ─── raw-transactions topic (3 partitions) │ ▼ Feature Engineering ─── 14 features from transaction fields │ ├── Velocity Layer (two parallel engines): ├── Sender velocity → "Is THIS customer unusually active?" └── Destination velocity → "Are MANY customers hitting THIS destination?" │ ▼ Dual ML Scoring XGBoost Classifier → fraud probability 0–1 (supervised) Isolation Forest → anomaly score (unsupervised) │ ▼ AML Detection Structuring / Smurfing → CTR/SAR generation Rapid Layering → NFIU reporting flag │ ▼ Rules Engine ─── configurable business rules overlay │ ├── Circuit Breaker → Rule Fallback (if ML unavailable) │ ▼ 3-Tier Decision APPROVE (fraud_prob < 0.60) REVIEW (fraud_prob ≥ 0.60 OR AML flag OR rules escalate) BLOCK (fraud_prob ≥ 0.85) │ ▼ SHAP Explanation ─── exact TreeExplainer attributions (compliance) │ ▼ Immutable Audit Log ─── SQLite append-only │ ▼ React Dashboard + AI Investigation Agent (LangGraph) ``` ## 架构 ``` ┌──────────────────────────────────────────────────────────────┐ │ FastAPI Service (single process) │ │ │ │ Main Thread (uvicorn) Consumer Thread (daemon) │ │ ┌────────────────────┐ ┌─────────────────────────┐ │ │ │ POST /submit │ │ polls Kafka every 1s │ │ │ │ → assess_txn() │ │ → assess_txn() │ │ │ │ → store result │ │ → store result │ │ │ │ → publish Kafka │ │ → publish fraud-results │ │ │ │ GET /recent │ └─────────────────────────┘ │ │ │ GET /stats │ │ │ │ POST /investigate │ shared: _results_store (locked) │ │ └────────────────────┘ │ └──────────────────────────────┬───────────────────────────────┘ │ Apache Kafka (KRaft) ├── raw-transactions (3 partitions) └── fraud-results (3 partitions) ``` 消费者线程和 HTTP 端点共享 `_results_store`——一个受锁保护的线程安全内存字典。HTTP `/submit` 端点进行同步评分,因此结果立即可用。消费者线程提供了 Kafka 流式架构。 ## 技术栈 | 层级 | 技术 | 原因 | |---|---|---| | 流处理 | Apache Kafka 4.x (KRaft) | 持久化、可重放、可水平扩展 | | ML — 监督学习 | XGBoost | 针对表格型欺诈数据的同类最佳选择 | | ML — 无监督学习 | Isolation Forest | 无需标签的新颖模式检测 | | 可解释性 | SHAP TreeExplainer | 精确归因——合规要求 | | 特征上下文 | Velocity Engine | 针对每个客户的行为基线 | | AML | 自定义规则引擎 | 结构化、分层、NFIU CTR/SAR | | 规则引擎 | Python 规则 | 在 ML 输出上叠加业务策略 | | 弹性设计 | Circuit Breaker | ML 不可用时的规则降级 | | 审计 | SQLite append-only | 不可变的合规追踪 | | 后端 | FastAPI + Pydantic v2 | 快速、经过验证、异步 | | 前端 | React + Vite + Recharts | 实时调查仪表板 | | AI Agent | LangGraph + Groq | 对话式欺诈调查 | | 容器 | Docker + Docker Compose | 全系统编排 | ## 决策逻辑 ``` # 干净、可审计、标准化 if fraud_prob >= 0.85: decision = "BLOCK" elif fraud_prob >= 0.60: decision = "REVIEW" else: decision = "APPROVE" # Rules 和 AML 可以将 APPROVE 升级为 REVIEW # Anomaly 可以将 APPROVE 升级为 REVIEW # 没有任何操作可以降级(只会增加风险) ``` ## 模型性能 在 554,082 笔预留的测试交易(真实世界的欺诈率为 0.297%)上进行评估: | 指标 | XGBoost | Isolation Forest | |---|---|---| | Fraud Recall | 98.7% | 71.4% | | Fraud Precision | 63.5% | 8.3% | | F1 Score | 0.773 | — | | ROC-AUC | 0.998 | — | **组合策略(如果任一模型标记则 REVIEW/BLOCK):召回率达 98.9%。** 数据集:PaySim(合成移动支付数据,630 万笔交易)。 注意:Isolation Forest 的精确度按预期设定得较低——无论该模式是否被标记为欺诈,它都会标记异常交易。其价值在于捕获 XGBoost 从未训练过的新颖模式。 ## 本地运行 ### 前置条件 - Python 3.12.3 - Node.js 20+ - Java 17+(用于 Kafka) - Kafka 4.x 安装在 WSL2 或 Linux 中 ### 快速开始 — Docker Compose(推荐) ``` # 1. 首先训练模型(一次性,约 10 分钟) python scripts/prepare_data.py python scripts/train_fraud_model.py python scripts/train_anomaly_model.py # 2. 启动所有服务 docker compose up --build ``` | 服务 | URL | |---|---| | API + 文档 | http://localhost:8000/docs | | 仪表板 | http://localhost:80 | | Kafka | localhost:9092 (内部) / 29092 (主机) | ### 手动启动 — 三个终端 ``` # 终端 1 — Kafka kafka-server-start.sh ~/kafka/config/kraft/server.properties # 终端 2 — API(consumer 线程自动启动) source venv-linux/bin/activate uvicorn app.main:app --reload --port 8000 # 终端 3 — React dashboard cd frontend && npm run dev # Dashboard:http://localhost:5173 ``` ### 生成演示流量 ``` # 每秒 10 笔交易,持续 60 秒(2% 欺诈注入) python scripts/simulate_traffic.py --rate 10 --duration 60 # 连续流 python scripts/simulate_traffic.py --rate 5 --duration 0 ``` ## API 参考 | 方法 | 端点 | 描述 | |---|---|---| | `POST` | `/api/v1/transactions/submit` | 提交交易进行筛查 | | `GET` | `/api/v1/transactions/results/{id}` | 获取交易结果 | | `GET` | `/api/v1/transactions/recent` | 用于仪表板的近期交易 | | `GET` | `/api/v1/transactions/stats` | 系统统计信息 | | `POST` | `/api/v1/transactions/investigate` | AI 调查 Agent | | `GET` | `/health` | 服务健康检查 | **提交一笔可疑交易:** ``` curl -X POST http://localhost:8000/api/v1/transactions/submit \ -H "Content-Type: application/json" \ -d '{ "transaction_id": "TXN-001", "step": 3, "type": "TRANSFER", "amount": 750000, "name_orig": "C1234567890", "oldbalance_org": 750000, "newbalance_orig": 0, "name_dest": "C9876543210", "oldbalance_dest": 0, "newbalance_dest": 0 }' ``` **询问调查 Agent:** ``` curl -X POST http://localhost:8000/api/v1/transactions/investigate \ -H "Content-Type: application/json" \ -d '{"question": "Why was TXN-001 flagged?"}' ``` ## 项目结构 ``` realtime-fraud-detection-pipeline/ ├── app/ │ ├── main.py # FastAPI + consumer daemon thread │ ├── config.py # All configuration from env vars │ ├── api/ │ │ ├── schemas.py # Pydantic v2 models (extra=forbid) │ │ └── routes/ │ │ ├── transactions.py # Core fraud detection endpoints │ │ └── health.py │ ├── core/ │ │ ├── feature_engineer.py # 14 engineered features │ │ ├── fraud_scorer.py # XGBoost inference + 3-tier decision │ │ ├── anomaly_detector.py # Isolation Forest inference │ │ ├── explainer.py # SHAP TreeExplainer │ │ ├── velocity_engine.py # Per-customer sliding window features │ │ ├── aml_detector.py # Structuring/layering + SAR/CTR │ │ ├── rules_engine.py # Configurable business rules │ │ ├── circuit_breaker.py # ML fallback resilience │ │ ├── audit_logger.py # Immutable SQLite audit trail │ │ └── investigation_agent.py # LangGraph analyst agent │ ├── services/ │ │ └── detection_service.py # 9-step pipeline orchestration │ └── streaming/ │ ├── producer.py # Kafka producer │ └── consumer.py # Daemon thread consumer ├── scripts/ │ ├── prepare_data.py # SMOTE, scaling, train/test split │ ├── train_fraud_model.py # XGBoost training + threshold search │ ├── train_anomaly_model.py # Isolation Forest training │ ├── evaluate_models.py # Combined model evaluation │ ├── simulate_traffic.py # Demo traffic generator │ └── test_kafka_connection.py # Kafka health check ├── tests/ # Unit tests for every module ├── notebooks/ │ └── 01_data_exploration.ipynb # 7-cell analysis driving design ├── frontend/ # React investigation dashboard ├── Dockerfile # API image ├── Dockerfile.frontend # React multi-stage image ├── docker-compose.yml # Full system orchestration └── nginx.conf # SPA routing config ``` ## 部署 ``` Render (API) Vercel (Frontend) ↓ ↓ FastAPI + consumer React dashboard synchronous scoring polls Render API ↑ No Kafka in cloud — synchronous scoring path handles all functionality. Kafka streaming architecture demonstrated via Docker Compose locally. ``` | 服务 | 平台 | 成本 | |---|---|---| | API | Render Web Service (免费) | $0 | | 前端 | Vercel (免费) | $0 | ## AML 合规 该流水线可检测三种反洗钱(AML)类型: **结构化:** 24 小时内发生多笔 4,500,000 奈拉及以上的交易——旨在规避 5,000,000 奈拉的 CTR 阈值。 **快速分层:** 1 小时内发生 5 次以上的对外转账——资金快速流经账户以掩盖来源。 **CTR 阈值:** 单笔交易达到或超过 5,000,000 奈拉——根据尼日利亚《2022 年禁止洗钱法》,需要自动提交现金交易报告(CTR)。 系统会针对结构化和分层行为生成可疑活动报告(SAR),并标记在 24 小时内向 NFIU 履行的申报义务。 ## 目标 Velocity 目标 Velocity 追踪现在与每个客户的 Velocity 并行运行。 该系统可同时检测: - **每个客户的模式:** 一个被盗账户快速发送多笔交易 - **跨客户(网络)模式:** 许多不同的客户在短时间内访问同一个目标(撞库测试 / 分布式微小欺诈) ## 工程决策 **为什么将消费者作为线程运行?** 保持部署简单——单进程、单容器。展示了相同的架构分离(生产者/消费者),同时避免了第二个服务带来的运维复杂性。 **为什么是三级(APPROVE/REVIEW/BLOCK)而不是四级?** 直接映射到欺诈运营团队的工作方式。REVIEW 路由到分析师队列。BLOCK 拒绝交易。简单、可审计、在成熟的金融科技公司中作为标准实践。 **为什么只在训练数据上应用 SMOTE?** 测试集保留了真实的欺诈率(0.297%),以提供客观的评估指标。在测试数据上应用 SMOTE 会人为夸大召回率——你相当于在合成的简单样本上进行测试。 **为什么使用 PaySim?** 这是唯一一个免费可用且具有适当许可的移动支付欺诈数据集。IEEE-CIS Kaggle 数据集更接近生产数据(包含真实特征:设备指纹、邮箱域),对于构建更贴近现实的模型而言将是下一步的选择。 ## 扩展路线图 本系统旨在处理作品集演示和早期阶段的生产流量(每天数百到数千笔交易)。以下是随着交易量增长需要做出的调整及其原因。 ### 当前容量(按现有构建) | 组件 | 当前限制 | 瓶颈 | |---|---|---| | API | ~50-100 req/s | 单进程,单核 | | 消费者 | 1 个线程,3 个分区 | 仅使用了 3 个分区中的 1 个 | | 结果存储 | 1,000 条记录,内存中 | 重启时丢失,不共享 | | Velocity/AML 历史记录 | 内存字典 | 重启时丢失,单进程 | | 审计日志 | SQLite | 文件级锁 | | Kafka | 单节点 | 无副本,单点故障 | | 目标 Velocity | 检测一个被盗账户快速发送多笔交易| ## 阶段 1:基于图的商家聚类 在高交易量下,目标 Velocity 能捕获直接信号,但会漏掉那些聚合点在多个目标间轮换的慢速移动欺诈团伙。生产环境的解决方案是引入图分析层: - 节点:客户账户、目标账户、IP 地址、设备 - 边:交易(包含金额、时间戳、类型) - 算法:社区发现(Louvain / 谱聚类) - 信号:包含许多首次连接的紧密聚类子图 = 欺诈团伙特征 需要:Neo4j 或 Apache GraphX,每 15 分钟运行一次批处理作业,将高风险子图成员作为黑名单反馈给规则引擎。实时图分析属于独立的流处理范畴(Apache Flink + 图分区)。 ### 阶段 2:1,000 → 50,000 笔交易/天 **添加 Redis 用于共享状态** 用 Redis 替换内存中的 `_results_store`、Velocity 历史和 AML 历史。这允许多个 API 实例共享状态,并在重启后不会丢失客户的行为历史。 **在网关层添加限流** Nginx 的 `limit_req_zone` 可防止单个客户端使 API 过载——同时防范滥用和失控的集成 Bug。 **水平扩展 API** 在负载均衡器后运行 2-3 个 API 实例。每个实例都读写共享的 Redis 状态。Render/Railway 通过“scale”设置支持此功能——如果已接入 Redis,则无需修改代码。 ### 阶段 3:50,000 → 1,000,000 笔交易/天 **将消费者拆分为独立服务** 运行 3 个消费者实例——每个 Kafka 分区对应一个——作为独立的部署。每个实例独占消费一个分区(Kafka 在消费者组内保证了这一点)。 **将审计日志迁移至 PostgreSQL** 当写入量超过约 1,000 次写/秒时,SQLite 的文件锁会成为瓶颈。带有连接池(PgBouncer)的 PostgreSQL 可以轻松应对。 **添加特征存储** Velocity 和 AML 历史记录从 Redis 字典转移到专业的特征存储(如 Feast,或带有结构化 TTL 键的 Redis)中——支持在欺诈、AML 以及未来的信用风险模型之间复用特征。 **多节点 Kafka(3 个 broker,副本因子为 3)** 单个 broker = 单点故障。3 个 broker 且 RF=3 意味着系统能在任何单个 broker 宕机时继续运行,且数据零丢失。 ### 阶段 4:1,000,000+ 笔交易/天 **死信队列(DLQ)** 处理失败 3 次的消息将被发送到 `fraud-dlq`,而不是被丢弃或无限重试。由独立的进程审查 DLQ 消息——通常是格式错误的 payload 或模型超时的边缘情况。 **幂等层** 在这种流量下,网络重试必定会导致重复提交。基于 Redis 的幂等键(transaction_id 的哈希值)可防止对同一笔交易进行重复评分。 **可观测性堆栈** 使用 Prometheus + Grafana 监控指标(请求率、延迟百分位、模型推理时间、Kafka 消费者滞后)。没有这些工具,在客户投诉之前,你将无法察觉系统性能的下降。 **模型服务分离** ML 推理转移到专用服务(或使用 ONNX Runtime 实现低于 10ms 的推理),这样模型更新就不需要重新部署整个 API。 ### 什么保持不变 核心检测流水线——特征工程、14 项特征、9 步 `assess_transaction()` 流程、SHAP 可解释性、AML 类型以及三级决策逻辑——在任何规模下都保持一致。扩展改变的是*基础设施*,而不是*欺诈逻辑*。这种分离(业务逻辑与基础设施)正是该架构能够无需重写即可发展的原因。 ## 作者 **Mubarak Olalekan Oladipo** AI 工程师 — 欺诈检测与风险管理 尼日利亚伊巴丹 · 开放远程工作 [GitHub](https://github.com/Mubrix2) · [LinkedIn](https://www.linkedin.com/in/mubarak-oladipo/) · +234 814 353 0951
标签:Apex, Kafka, SonarQube插件, XGBoost, 反欺诈系统, 反洗钱, 机器学习, 请求拦截, 软件成分分析, 逆向工具, 金融科技