aifask042-lang/Credit-Card-Fraud-Detection

GitHub: aifask042-lang/Credit-Card-Fraud-Detection

基于Python和scikit-learn构建的信用卡欺诈检测机器学习管道,通过SMOTE过采样与随机森林分类器解决极端类别不平衡的二分类问题。

Stars: 0 | Forks: 0

import os import pandas as pd import numpy as np from sklearn.model_selection import train_test_split from sklearn.preprocessing import RobustScaler from imblearn.over_sampling import SMOTE from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import confusion_matrix, classification_report, precision_recall_curve, auc def run_threat_hunting_pipeline(data_path='creditcard.csv'): print("=" * 60) print(" SIEM & 行为异常检测引擎部署") print("=" * 60) ``` # ---------------------------------------------------------------- # STEP 1: TELEMETRY LOG INGESTION & FORENSIC ISOLATION # ---------------------------------------------------------------- if not os.path.exists(data_path): print(f"[-] CRITICAL ERROR: Transaction log file '{data_path}' not found.") print("[*] Please download the credit card dataset and place it in this directory.") return print("[+] Ingesting raw transactional telemetry streams...") df = pd.read_csv(data_path) # Using RobustScaler to neutralize extreme outlier bounds (typical of heavy fraud shifts) print("[+] Normalizing volatile parameters ('Time' and 'Amount') via RobustScaler...") scaler = RobustScaler() df['scaled_amount'] = scaler.fit_transform(df['Amount'].values.reshape(-1, 1)) df['scaled_time'] = scaler.fit_transform(df['Time'].values.reshape(-1, 1)) # Evict raw unscaled telemetry vectors to guarantee feature uniformity df.drop(['Time', 'Amount'], axis=1, inplace=True) # Isolate behavioral indicators (X) from confirmed compromise targets (y) X = df.drop('Class', axis=1) y = df['Class'] # Stratified split ensures the precise 0.17% real-world fraud density ratio # is perfectly mirrored across both sets, preventing statistical skewing. print("[+] Separating live evaluation streams from training architectures...") X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42, stratify=y ) # ---------------------------------------------------------------- # STEP 2: COUNTERING CLASS IMBALANCE (ATTACK VECTOR SYNTHESIS) # ---------------------------------------------------------------- print("[+] Countering extreme class imbalance using SMOTE over-sampling...") # Operational Rule: Synthesize patterns ONLY on training data to maintain # the unadulterated distribution of the actual real-world validation telemetry. smote = SMOTE(random_state=42) X_train_resampled, y_train_resampled = smote.fit_resample(X_train, y_train) print(f" [*] Baseline training size updated from {len(X_train)} logs to {len(X_train_resampled)} vectors.") # ---------------------------------------------------------------- # STEP 3: TRAINING THE RULES DETONATION & CLASSIFICATION ENGINE # ---------------------------------------------------------------- print("[+] Initializing Parallelized Random Forest Threat Analytics Engine...") # n_jobs=-1 forces execution across all available multi-core processor threads for fast indexing threat_engine = RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1) threat_engine.fit(X_train_resampled, y_train_resampled) # ---------------------------------------------------------------- # STEP 4: FORENSIC METRIC EVALUATION & SOC AUDIT # ---------------------------------------------------------------- print("[+] Parsing test telemetry logs through the verification rules...") y_pred = threat_engine.predict(X_test) y_pred_prob = threat_engine.predict_proba(X_test)[:, 1] print("\n" + "=" * 50) print(" DEPLOYED SOC ALERT METRIC MATRIX") print("=" * 50) cm = confusion_matrix(y_test, y_pred) print(f"True Negatives (Safe transactions allowed clean passage) : {cm[0][0]}") print(f"False Positives (False alarms causing customer friction) : {cm[0][1]}") print(f"False Negatives (CRITICAL BREACH: Fraud bypassed rules) : {cm[1][0]}") print(f"True Positives (Successful threat mitigation and block) : {cm[1][1]}") print("\n" + "=" * 50) print(" THREAT ANALYTICS SUMMARY") print("=" * 50) print(classification_report(y_test, y_pred, target_names=["Legitimate Activity", "Compromise/Fraud"])) # Calculate Precision-Recall Area Under the Curve (PR-AUC) # Essential for imbalanced logs where standard ROC curves provide false security confidence precision, recall, _ = precision_recall_curve(y_test, y_pred_prob) pr_auc_score = auc(recall, precision) print("-" * 50) print(f"Overall Forensic Detection Index (PR-AUC score): {pr_auc_score:.4f}") print("=" * 50) ``` if __name__ == "__main__": run_threat_hunting_pipeline()
标签:Apex, 代码示例, 反欺诈, 安全运营, 异常检测, 扫描框架, 数据分析, 机器学习, 逆向工具, 随机森林