aifask042-lang/Credit-Card-Fraud-Detection
GitHub: aifask042-lang/Credit-Card-Fraud-Detection
基于Python和scikit-learn构建的信用卡欺诈检测机器学习管道,通过SMOTE过采样与随机森林分类器解决极端类别不平衡的二分类问题。
Stars: 0 | Forks: 0
import os
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import RobustScaler
from imblearn.over_sampling import SMOTE
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import confusion_matrix, classification_report, precision_recall_curve, auc
def run_threat_hunting_pipeline(data_path='creditcard.csv'):
print("=" * 60)
print(" SIEM & 行为异常检测引擎部署")
print("=" * 60)
```
# ----------------------------------------------------------------
# STEP 1: TELEMETRY LOG INGESTION & FORENSIC ISOLATION
# ----------------------------------------------------------------
if not os.path.exists(data_path):
print(f"[-] CRITICAL ERROR: Transaction log file '{data_path}' not found.")
print("[*] Please download the credit card dataset and place it in this directory.")
return
print("[+] Ingesting raw transactional telemetry streams...")
df = pd.read_csv(data_path)
# Using RobustScaler to neutralize extreme outlier bounds (typical of heavy fraud shifts)
print("[+] Normalizing volatile parameters ('Time' and 'Amount') via RobustScaler...")
scaler = RobustScaler()
df['scaled_amount'] = scaler.fit_transform(df['Amount'].values.reshape(-1, 1))
df['scaled_time'] = scaler.fit_transform(df['Time'].values.reshape(-1, 1))
# Evict raw unscaled telemetry vectors to guarantee feature uniformity
df.drop(['Time', 'Amount'], axis=1, inplace=True)
# Isolate behavioral indicators (X) from confirmed compromise targets (y)
X = df.drop('Class', axis=1)
y = df['Class']
# Stratified split ensures the precise 0.17% real-world fraud density ratio
# is perfectly mirrored across both sets, preventing statistical skewing.
print("[+] Separating live evaluation streams from training architectures...")
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# ----------------------------------------------------------------
# STEP 2: COUNTERING CLASS IMBALANCE (ATTACK VECTOR SYNTHESIS)
# ----------------------------------------------------------------
print("[+] Countering extreme class imbalance using SMOTE over-sampling...")
# Operational Rule: Synthesize patterns ONLY on training data to maintain
# the unadulterated distribution of the actual real-world validation telemetry.
smote = SMOTE(random_state=42)
X_train_resampled, y_train_resampled = smote.fit_resample(X_train, y_train)
print(f" [*] Baseline training size updated from {len(X_train)} logs to {len(X_train_resampled)} vectors.")
# ----------------------------------------------------------------
# STEP 3: TRAINING THE RULES DETONATION & CLASSIFICATION ENGINE
# ----------------------------------------------------------------
print("[+] Initializing Parallelized Random Forest Threat Analytics Engine...")
# n_jobs=-1 forces execution across all available multi-core processor threads for fast indexing
threat_engine = RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1)
threat_engine.fit(X_train_resampled, y_train_resampled)
# ----------------------------------------------------------------
# STEP 4: FORENSIC METRIC EVALUATION & SOC AUDIT
# ----------------------------------------------------------------
print("[+] Parsing test telemetry logs through the verification rules...")
y_pred = threat_engine.predict(X_test)
y_pred_prob = threat_engine.predict_proba(X_test)[:, 1]
print("\n" + "=" * 50)
print(" DEPLOYED SOC ALERT METRIC MATRIX")
print("=" * 50)
cm = confusion_matrix(y_test, y_pred)
print(f"True Negatives (Safe transactions allowed clean passage) : {cm[0][0]}")
print(f"False Positives (False alarms causing customer friction) : {cm[0][1]}")
print(f"False Negatives (CRITICAL BREACH: Fraud bypassed rules) : {cm[1][0]}")
print(f"True Positives (Successful threat mitigation and block) : {cm[1][1]}")
print("\n" + "=" * 50)
print(" THREAT ANALYTICS SUMMARY")
print("=" * 50)
print(classification_report(y_test, y_pred, target_names=["Legitimate Activity", "Compromise/Fraud"]))
# Calculate Precision-Recall Area Under the Curve (PR-AUC)
# Essential for imbalanced logs where standard ROC curves provide false security confidence
precision, recall, _ = precision_recall_curve(y_test, y_pred_prob)
pr_auc_score = auc(recall, precision)
print("-" * 50)
print(f"Overall Forensic Detection Index (PR-AUC score): {pr_auc_score:.4f}")
print("=" * 50)
```
if __name__ == "__main__":
run_threat_hunting_pipeline()
标签:Apex, 代码示例, 反欺诈, 安全运营, 异常检测, 扫描框架, 数据分析, 机器学习, 逆向工具, 随机森林