Abwahab55/Fraud-Detection-System

GitHub: Abwahab55/Fraud-Detection-System

基于纯 Python 实现的融合规则引擎与统计模型的亚毫秒级实时交易欺诈检测系统。

Stars: 0 | Forks: 0

# 欺诈检测系统 ## ![Python](https://img.shields.io/badge/Python-3.9+-3776AB?logo=python&logoColor=white) ![NumPy](https://img.shields.io/badge/NumPy-013243?logo=numpy&logoColor=white) ![Machine Learning](https://img.shields.io/badge/ML-Anomaly%20Detection-orange) ![Rule Engine](https://img.shields.io/badge/Engine-Rules-blueviolet) ![Real-Time](https://img.shields.io/badge/System-Real--Time-success) ![Low Latency](https://img.shields.io/badge/Latency-%3C1ms-brightgreen) ![AWS Ready](https://img.shields.io/badge/AWS-Serverless-FF9900?logo=amazonaws&logoColor=white) ![Kafka](https://img.shields.io/badge/Streaming-Kafka-231F20?logo=apachekafka&logoColor=white) ![Redis](https://img.shields.io/badge/Cache-Redis-DC382D?logo=redis&logoColor=white) ![Docker](https://img.shields.io/badge/Container-Docker-2496ED?logo=docker&logoColor=white) ![Tests](https://img.shields.io/badge/Tests-100%25%20Passing-success) ![License](https://img.shields.io/badge/License-MIT-lightgrey) 一个用纯 Python 编写的实时交易欺诈评分引擎。 结合了确定性规则引擎和统计 ML 模型,将每笔交易分类为 **APPROVE**(批准)、**REVIEW**(审核)或 **BLOCK**(拦截)——延迟在亚毫秒级别。 ## 目录 1. [概述](#overview) 2. [架构](#architecture) 3. [项目结构](#project-structure) 4. [快速开始](#quick-start) 5. [工作原理](#how-it-works) - [Feature Store](#feature-store) - [Rule Engine](#rule-engine) - [ML Model](#ml-model) - [Score Blending & Decisions](#score-blending--decisions) 6. [模拟的欺诈模式](#fraud-patterns-simulated) 7. [运行演示](#running-the-demo) 8. [运行测试](#running-the-tests) 9. [示例输出](#sample-output) 10. [输出文件](#output-files) 11. [配置与扩展](#configuration--extending) 12. [生产路线图](#production-roadmap) 13. [依赖项](#dependencies) ## 概述 本项目实现了银行和支付处理商用于实时检测欺诈交易的核心模式。 关键属性: | 属性 | 值 | |---|---| | 平均评分延迟 | 每笔交易 **< 1 ms** | | 决策类型 | APPROVE / REVIEW / BLOCK | | 风险等级 | LOW / MEDIUM / HIGH / CRITICAL | | 检测到的欺诈模式 | 5 种 (card-testing, ATO, night-online, spike, velocity) | | 单元测试 | **62 个测试,100% 通过** | | 外部依赖 | 仅 `numpy` | ## 架构 ``` Transaction │ ▼ ┌─────────────────────────────────────┐ │ Feature Store │ ← Per-user rolling window (1 hour) │ • amount_ratio_to_avg │ Tracks: velocity, spend, country, │ • tx_count_last_hour │ category frequency, avg spend │ • spend_last_hour │ │ • country_mismatch (0/1) │ │ • unusual_category (0/1) │ │ • is_night (0/1) │ └──────────────┬──────────────────────┘ │ features dict ┌───────┴────────┐ │ │ ▼ ▼ ┌─────────────┐ ┌─────────────┐ │ Rule Engine │ │ ML Model │ │ │ │ │ │ 11 rules │ │ Calibrated │ │ Each fires │ │ anomaly │ │ independently│ │ scorer │ │ │ │ │ │ rule_score │ │ model_score │ │ ∈ [0, 0.85]│ │ ∈ [0, 1] │ └──────┬──────┘ └──────┬──────┘ │ │ └────────┬────────┘ │ ▼ final_score = 0.40 × rule_score + 0.60 × model_score │ ▼ ┌───────────────┐ │ Risk Banding │ │ │ │ [0.00, 0.30) │ → LOW → APPROVE │ [0.30, 0.55) │ → MEDIUM → REVIEW │ [0.55, 0.75) │ → HIGH → REVIEW │ [0.75, 1.00] │ → CRITICAL → BLOCK └───────────────┘ ``` ## 项目结构 ``` fraud_project/ │ ├── main.py # Entry point — run the demo │ ├── src/ │ ├── __init__.py │ ├── models.py # Transaction and FraudScore dataclasses │ ├── feature_store.py # Per-user rolling feature computation │ ├── rule_engine.py # 11 deterministic fraud rules │ ├── ml_model.py # Statistical anomaly scoring model │ ├── detector.py # Orchestrator — glues everything together │ ├── simulator.py # Synthetic transaction stream generator │ └── reporter.py # Console output, JSON export, CSV export │ ├── tests/ │ ├── __init__.py │ └── test_all.py # 62 unit tests (6 test classes) │ └── outputs/ # Auto-created — JSON and CSV reports land here ``` ## 快速开始 ### 1. 克隆 / 复制项目 ``` git clone cd fraud_project ``` ### 2. 安装唯一依赖 ``` pip install numpy ``` ### 3. 运行演示 ``` python main.py ``` ### 4. 运行测试 ``` python -m unittest tests/test_all.py -v ``` ## 工作原理 ### Feature Store `src/feature_store.py` Feature Store 使用 Python `deque` 对象维护**按用户划分的滑动窗口**(默认为 1 小时)内的近期交易。对于每笔传入的交易,它计算: | 特征 | 描述 | |---|---| | `amount_ratio_to_avg` | 当前金额 ÷ 用户滚动平均金额 | | `tx_count_last_hour` | 用户在过去一小时内进行的交易数量 | | `spend_last_hour` | 用户在过去一小时内花费的总欧元 (EUR) | | `country_mismatch` | 如果交易国家 ≠ 用户所在国家则为 1 | | `unusual_category` | 如果商户类别 ≠ 用户最常使用的类别则为 1 | | `is_night` | 如果交易时间在 23:00–05:59 则为 1 | | `is_online` | 如果交易在线上进行则为 1 | 特征在评分前被**读取**,在评分后被**写入**,因此当前交易绝不会污染其自身的特征。 在生产环境中:用 Redis 支持的实现替换 `FeatureStore`。接口(`get_features` / `record`)保持不变。 ### Rule Engine `src/rule_engine.py` 十一条确定性规则独立触发并贡献加权分数: | 规则 | 条件 | 权重 | |---|---|---| | `LARGE_AMOUNT` | 金额 > €5,000 | 0.15 | | `VERY_LARGE_AMOUNT` | 金额 > €15,000 | 0.30 | | `AMOUNT_SPIKE_5X` | 金额比率 ≥ 5× | 0.25 | | `AMOUNT_SPIKE_10X` | 金额比率 ≥ 10× | 0.40 | | `VELOCITY_HIGH` | > 8 笔交易/小时 | 0.20 | | `VELOCITY_EXTREME` | > 15 笔交易/小时 | 0.40 | | `HIGH_HOURLY_SPEND` | > €8,000/小时 | 0.25 | | `COUNTRY_MISMATCH` | 异国 | 0.25 | | `UNUSUAL_CATEGORY` | 陌生商户类型 | 0.10 | | `NIGHT_TRANSACTION` | 23:00–05:59 | 0.10 | | `ONLINE_LARGE_AMOUNT` | 在线 且 金额 > €2,500 | 0.15 | 规则分数是**已触发权重之和,上限为 0.85**。 规则易于审计和向合规团队解释——当交易被拦截时,触发的规则能确切地告诉你原因。 ### ML Model `src/ml_model.py` 一个经过校准的统计评分器,模拟已训练的 Isolation Forest 或 XGBoost 模型。 特征权重源自已发布的银行卡欺诈数据集(例如 Kaggle Credit Card Fraud Detection 数据集)中信号的相对重要性。 ``` model_score = 0.35 × sigmoid(amount_ratio − 3.0) # amount anomaly + 0.25 × sigmoid(velocity − 8.0) # velocity anomaly + 0.20 × country_mismatch # geographic signal + 0.10 × unusual_category # behavioural signal + 0.05 × is_night # time signal + 0.05 × high_spend_flag # aggregate spend ``` **替换为真正的训练模型:** ``` # ml_model.py — production swap import joblib class FraudModel: def __init__(self): self.clf = joblib.load("s3://your-bucket/fraud_model_v3.pkl") def predict(self, features: dict) -> float: X = self._to_array(features) return float(self.clf.predict_proba(X)[0, 1]) ``` ### Score Blending & Decisions `src/detector.py` ``` final_score = 0.60 × model_score + 0.40 × rule_score ``` | 分数范围 | 风险等级 | 决策 | |---|---|---| | 0.00 – 0.30 | LOW | **APPROVE** | | 0.30 – 0.55 | MEDIUM | **REVIEW** | | 0.55 – 0.75 | HIGH | **REVIEW** | | 0.75 – 1.00 | CRITICAL | **BLOCK** | REVIEW 决策进入队列等待人工分析。BLOCK 决策立即被拒绝,并可触发账户暂停和警报。 ## 模拟的欺诈模式 `src/simulator.py` 注入五种现实的攻击场景: ### Pattern 1 — Card Testing 攻击者进行 16 笔小额交易(€0.50–€2.00)以验证卡是否有效,随后尝试 €14,500 的电子产品消费。 **检测依据:** `VELOCITY_HIGH`, `VELOCITY_EXTREME`, `AMOUNT_SPIKE_10X` ### Pattern 2 — Account Takeover (Country Jump) 用户在德国进行了一笔正常的 €320 旅行消费,18 分钟后尼日利亚出现了一笔 €3,800 的电子产品消费。 **检测依据:** `COUNTRY_MISMATCH`, `AMOUNT_SPIKE_10X`, `ONLINE_LARGE_AMOUNT` ### Pattern 3 — Night-time High-Value Online 凌晨 03:14 来自中国的一笔 €4,600 消费,该账户此前无海外活动记录。 **检测依据:** `NIGHT_TRANSACTION`, `AMOUNT_SPIKE_10X`, `ONLINE_LARGE_AMOUNT` ### Pattern 4 — Amount Spike 六笔正常的 €55 杂货消费建立了基线,随后一笔 €11,200 的电子产品交易到达——是平均值的 185 倍。 **检测依据:** `LARGE_AMOUNT`, `AMOUNT_SPIKE_5X`, `AMOUNT_SPIKE_10X` ### Pattern 5 — Velocity Burst 4 分钟内发生二十笔交易(card-testing),随后是一笔 €8,900 的消费。 **检测依据:** `VELOCITY_EXTREME`, `AMOUNT_SPIKE_10X`, `LARGE_AMOUNT` ## 运行演示 ``` # Standard run — 250 normal + fraud patterns python main.py # Quick run — 50 normal transactions only (no fraud injected) python main.py --quick # Print feature values for every flagged transaction python main.py --verbose # Skip writing output files python main.py --no-save ``` ## 运行测试 ``` # Run all 62 tests with verbose output python -m unittest tests/test_all.py -v # Run a single test class python -m unittest tests.test_all.TestFraudDetector -v # Run a single test method python -m unittest tests.test_all.TestRuleEngine.test_velocity_extreme_rule -v ``` **测试类与覆盖率:** | 类 | 测试数 | 覆盖内容 | |---|---|---| | `TestTransaction` | 4 | 模型字段、默认值、元数据 | | `TestFeatureStore` | 11 | Velocity、比率、国家/类别标记、窗口过期 | | `TestRuleEngine` | 12 | 每条规则正确触发;分数封顶 | | `TestFraudModel` | 5 | 分数范围、单调性、高/低风险 | | `TestFraudDetector` | 20 | 决策、风险等级、统计、延迟、场景 | | `TestSimulator` | 10 | 计数、唯一性、排序、确定性 | | **总计** | **62** | **100% 通过** | ## 示例输出 ``` ════════════════════════════════════════════════════════════════════ FRAUD DETECTION SYSTEM — Real-time Transaction Scoring Started: 2024-06-01 10:00:00 ════════════════════════════════════════════════════════════════════ Processing 298 transactions ... TX 0169c304_ct_hit user=user_0042 score=0.871 ███████████████████░░░ [CRITICAL] ✗ BLOCK (0.1 ms) rules : LARGE_AMOUNT, AMOUNT_SPIKE_10X, VELOCITY_EXTREME, COUNTRY_MISMATCH feats : ratio=12159.33 vel=16 country_mis=1 night=0 TX 056df56a_spike user=user_0077 score=0.792 █████████████████░░░░░ [CRITICAL] ✗ BLOCK (0.1 ms) rules : LARGE_AMOUNT, AMOUNT_SPIKE_10X, COUNTRY_MISMATCH TX 2d12f797_vb_hit user=user_0111 score=0.758 █████████████████░░░░░ [CRITICAL] ✗ BLOCK (0.1 ms) rules : LARGE_AMOUNT, AMOUNT_SPIKE_10X, VELOCITY_EXTREME (+ 264 LOW-risk transactions approved silently) ════════════════════════════════════════════════════════════════════ FRAUD DETECTION — SUMMARY REPORT ════════════════════════════════════════════════════════════════════ Transactions processed : 298 Average latency : 0.02 ms / transaction LOW 264 ( 88.6%) ██████████████████████ MEDIUM 27 ( 9.1%) ██ HIGH 4 ( 1.3%) CRITICAL 3 ( 1.0%) ✓ APPROVE 264 ⚠ REVIEW 31 ✗ BLOCK 3 ════════════════════════════════════════════════════════════════════ 🚨 3 CRITICAL transaction(s) blocked ``` ## 输出文件 每次运行(除非 `--no-save`)都会在 `outputs/` 中写入两个文件: ### JSON — `fraud_report_YYYYMMDD_HHMMSS.json` ``` [ { "transaction_id": "0169c304-0d4_ct_hit", "user_id": "user_0042", "score": 0.871, "risk_level": "CRITICAL", "decision": "BLOCK", "triggered_rules": ["LARGE_AMOUNT", "AMOUNT_SPIKE_10X", "VELOCITY_EXTREME"], "model_score": 0.8943, "rule_score": 0.85, "features": { "amount": 14500.0, "amount_ratio_to_avg": 12159.33, "tx_count_last_hour": 16, "spend_last_hour": 17.42, "country_mismatch": 1, "unusual_category": 1, "is_online": 1, "hour_of_day": 10, "is_night": 0 }, "processing_time_ms": 0.12 } ] ``` ### CSV — `fraud_report_YYYYMMDD_HHMMSS.csv` ``` transaction_id,user_id,score,risk_level,decision,triggered_rules,model_score,rule_score,processing_time_ms 0169c304_ct_hit,user_0042,0.871,CRITICAL,BLOCK,LARGE_AMOUNT|AMOUNT_SPIKE_10X|VELOCITY_EXTREME,... ``` ## 配置与扩展 ### 更改风险阈值 编辑 `src/detector.py` 中的 `_RISK_BANDS`: ``` _RISK_BANDS = [ ("LOW", 0.00, 0.25, "APPROVE"), # tighter — catch more ("MEDIUM", 0.25, 0.50, "REVIEW"), ("HIGH", 0.50, 0.70, "REVIEW"), ("CRITICAL", 0.70, 1.01, "BLOCK"), ] ``` ### 添加新规则 在 `src/rule_engine.py` 中,追加到 `_RULES` 和 `_WEIGHTS`: ``` ("WEEKEND_LARGE", lambda f, tx: tx.timestamp.weekday() >= 5 and tx.amount > 2000), # then in _WEIGHTS: "WEEKEND_LARGE": 0.15, ``` ### 接入真实 ML 模型 将 `src/ml_model.py` 中的 `FraudModel.predict()` 替换为任何可调用对象,该对象接受特征字典并返回 [0, 1] 范围内的浮点数。 ### 将 FeatureStore 替换为 Redis ``` # src/feature_store.py import redis, json class FeatureStore: def __init__(self, window_seconds=3600): self.r = redis.Redis(host="localhost", port=6379) self.window_seconds = window_seconds def get_features(self, tx): raw = self.r.lrange(f"history:{tx.user_id}", 0, -1) history = [json.loads(x) for x in raw] # ... same logic ... ``` ## 生产路线图 将此从演示转变为生产系统: 1. **Message queue ingestion** — 将交易发布到 Kafka/SQS;使用工作池消费 2. **Redis feature store** — 用 Redis sorted sets 替换内存中的 deques(TTL = window_seconds) 3. **Trained ML model** — 在 [Kaggle Credit Card Fraud Dataset](https://www.kaggle.com/datasets/mlg-ulb/creditcardfraud) 上训练 XGBoost 或 Isolation Forest;通过 SageMaker / Vertex AI 提供服务 4. **Alerting** — 将 BLOCK 决策发布到 SNS/PagerDuty;发送邮件/Slack 通知 5. **Dashboard** — 将分数流式传输到时序存储 并在 Grafana 中可视化 6. **Model monitoring** — 每周跟踪分数分布;在发生漂移时发出警报(模型重新训练触发器) 7. **A/B testing** — 同时运行两个模型版本并比较 precision/recall 8. **REST API** — 将 `FraudDetector.score()` 封装在 FastAPI 中以进行同步评分 ## 依赖项 | 包 | 版本 | 用途 | |---|---|---| | `numpy` | ≥ 1.21 | FeatureStore 中的滚动平均计算 | | `python` | ≥ 3.9 | Dataclasses, 类型提示 | 无其他依赖。所有测试均使用标准库 `unittest` 模块。 ``` pip install numpy ``` ## 许可证 MIT — 可免费使用、修改和分发。
标签:Apex, AWS Serverless, Docker, FinTech, Kafka, NumPy, PMD, Python, Redis, SonarQube插件, TCP/UDP协议, 云计算, 亚毫秒响应, 交易审批, 低延迟, 反欺诈引擎, 安全防御评估, 实时交易风控, 实时流处理, 异常检测, 搜索引擎查询, 支付安全, 数据科学, 无后门, 机器学习, 欺诈检测系统, 特征工程, 网络安全, 规则引擎, 评分卡模型, 请求拦截, 资源验证, 逆向工具, 金融风控, 隐私保护, 风控策略