Ashis-kumar-singh-1/road-guardian-accident-detection
GitHub: Ashis-kumar-singh-1/road-guardian-accident-detection
基于多传感器数据流和XGBoost分类器的实时交通事故检测系统,采用规则引擎预过滤加机器学习分类的混合架构实现高召回率的事故识别。
Stars: 0 | Forks: 0
# 🛡️ Road Guardian — ML 事故检测系统
[](https://www.python.org/)
[](https://xgboost.ai/)
[](LICENSE)
## 📋 目录
- [项目概述](#-project-overview)
- [系统架构](#-system-architecture)
- [仓库结构](#-repository-structure)
- [快速开始](#-quick-start)
- [模块深度解析](#-module-deep-dive)
- [配置](#-configuration)
- [示例输出](#-sample-output)
- [嵌入式系统映射](#-embedded-system-mapping)
- [未来改进](#-future-improvements)
## 🎯 项目概述
Road Guardian 是一个生产级机器学习系统,使用多传感器时间序列数据实时检测道路交通事故。该系统通过四阶段流水线处理传感器数据窗口:
```
Raw Sensors → Signal Preprocessing → Feature Extraction → Rule Engine → XGBoost Classifier → Alert
```
### 为什么要用混合方法?
纯 ML 分类器和纯规则引擎各有关键弱点:
| 方法 | 优势 | 弱点 |
|---|---|---|
| 纯规则 | 快速、可解释、对明确事件零漏报 | 无法泛化到新型碰撞特征 |
| 纯 ML | 学习复杂模式 | 黑盒、计算量更大、可能对颠簸误报 |
| **混合(我们的)** | **物理约束消除明显的非碰撞;ML 处理中间的模糊情况** | 需要校准阈值 |
规则引擎充当**硬件速度守护者**(在 STM32 上 < 1 µs),在 XGBoost 推理之前消除 70-80% 的非碰撞窗口。这减少了不必要的处理,并为每个被拒绝的窗口提供了清晰的审计跟踪。
## 🏗️ 系统架构
```
┌─────────────────────────────────────────────────────────────────┐
│ SENSOR LAYER │
│ MPU6050 (±16g) │ ADXL375 (±200g) │ Gyro │ GPS Speed │
└────────────────────────────┬────────────────────────────────────┘
│ Raw time-series @ 100 Hz
▼
┌─────────────────────────────────────────────────────────────────┐
│ PREPROCESSING (preprocessing.py) │
│ 1. Median filter → removes electrical spike outliers │
│ 2. Butterworth LPF → removes road/engine vibration noise │
│ 3. Gravity removal → eliminates 1g DC bias on accel axes │
└────────────────────────────┬────────────────────────────────────┘
│ Clean, gravity-free signal
▼
┌─────────────────────────────────────────────────────────────────┐
│ FEATURE EXTRACTION (feature_engineering.py) │
│ Per 500 ms window → 34-dimensional feature vector: │
│ Peak g, RMS, Jerk, Delta-V, Kurtosis, Impact Duration, │
│ Gyro Magnitude, Speed Drop, Axis Variance … │
└────────────────────────────┬────────────────────────────────────┘
│ Feature vector
▼
┌─────────────────────────────────────────────────────────────────┐
│ RULE ENGINE (rule_engine.py) │
│ Rule 1: Peak acceleration ≥ 4g (NHTSA airbag threshold) │
│ Rule 2: Impact duration ≥ 15 ms (filters EMI spikes) │
│ Rule 3: Vehicle speed ≥ 5 km/h (filters parking events) │
│ → REJECT → No crash (fast path) │
│ → PASS → XGBoost │
└────────────────────────────┬────────────────────────────────────┘
│ Passed windows only
▼
┌─────────────────────────────────────────────────────────────────┐
│ XGBOOST CLASSIFIER (model.py) │
│ • 34 features, 100 estimators, max_depth=5 │
│ • scale_pos_weight handles 5:1 class imbalance │
│ • Threshold tuned for ≥ 95% recall (safety-critical) │
│ • Output: P(crash) ∈ [0, 1] │
└────────────────────────────┬────────────────────────────────────┘
│ crash label + confidence score
▼
🚨 ALERT / DISPATCH
(→ Road Guardian backend via MQTT/HTTP)
```
## 📁 仓库结构
```
road-guardian-ml/
│
├── data/
│ ├── raw/ # Raw or synthetic sensor CSVs
│ └── processed/ # Feature matrices for train/eval
│
├── src/
│ ├── preprocessing.py # Butterworth + median + gravity removal
│ ├── feature_engineering.py # 34-feature extraction from sensor windows
│ ├── rule_engine.py # Physics-based pre-filter (Rule 1–4)
│ ├── model.py # XGBoost wrapper with save/load/threshold tuning
│ ├── train.py # End-to-end training pipeline
│ └── evaluate.py # Metrics + 5 evaluation plots
│
├── notebooks/
│ └── exploratory_analysis.ipynb # EDA: distributions, waveforms, correlations
│
├── models/
│ └── xgboost_model.pkl # Trained model artefact (joblib)
│
├── reports/ # Auto-generated evaluation plots
│ ├── confusion_matrix.png
│ ├── roc_curve.png
│ ├── pr_curve.png
│ ├── feature_importance.png
│ └── prob_histogram.png
│
├── configs/
│ └── config.yaml # All hyperparameters, thresholds, paths
│
├── requirements.txt
└── README.md
```
## ⚡ 快速开始
### 1. 克隆并安装
```
git clone https://github.com/your-username/road-guardian-ml.git
cd road-guardian-ml
pip install -r requirements.txt
```
### 2. 训练模型
```
# Uses synthetic data (no real dataset required)
python -m src.train
# With cross-validation
python -m src.train --cross-validate
# With your own labelled CSV
python -m src.train --data data/raw/your_sensor_data.csv
```
**预期输出:**
```
[INFO] Synthetic dataset generated: 2400 samples (400 crash, 2000 non-crash)
[INFO] Feature matrix built: 2400 samples × 34 features
[INFO] Rule engine: 566/2400 windows passed (23.6%) — 1834 pre-rejected.
[INFO] CV accuracy : 0.9982 ± 0.0025
[INFO] CV recall : 0.9975 ± 0.0031
[INFO] Threshold tuned to 0.42 (target recall ≥ 95%)
[INFO] Model saved → models/xgboost_model.pkl
```
### 3. 评估模型
```
python -m src.evaluate
```
在 `reports/` 中生成 5 个图:混淆矩阵、ROC 曲线、PR 曲线、特征重要性、概率直方图。
### 4. 运行规则引擎测试
```
python -m src.rule_engine
```
### 5. 在笔记本中探索
```
jupyter notebook notebooks/exploratory_analysis.ipynb
```
### 6. 单窗口推理(实时)
```
from src.preprocessing import load_config
from src.model import RoadGuardianModel
from src.feature_engineering import extract_features
from src.rule_engine import RuleEngine
import numpy as np
cfg = load_config()
model = RoadGuardianModel.load("models/xgboost_model.pkl", cfg)
rule_engine = RuleEngine(cfg)
# --- Simulate one incoming sensor window ---
fs = cfg["sampling"]["rate_hz"]
win = cfg["sampling"]["window_size"]
ax = np.random.normal(0, 0.2, win) # Replace with real sensor data
features = extract_features(
ax_low=ax, ay_low=ax*0.5, az_low=np.ones(win),
ax_high=ax*2, ay_high=ax, az_high=np.zeros(win),
gx=np.zeros(win), gy=np.zeros(win), gz=np.zeros(win),
speed_start=60.0, speed_end=55.0, fs=fs
)
# Gate through rule engine first
rule_result = rule_engine.check(
peak_accel_g=features["peak_accel_high_res"],
impact_duration_ms=features["impact_duration_low_ms"],
speed_start_kmh=features["speed_start_kmh"],
speed_drop_kmh=features["speed_drop_kmh"],
)
if rule_result.passed:
label, confidence = model.predict_single(features)
print(f"Decision: {'CRASH' if label else 'NORMAL'} | Confidence: {confidence:.3f}")
else:
print(f"Rule REJECT: {rule_result.triggered_rule}")
```
## 🔬 模块深度解析
### `preprocessing.py` — 信号清洗
| 步骤 | 方法 | 原因 |
|---|---|---|
| 去除尖峰 | 中值滤波器(核=5) | 消除亚毫秒 EMI glitches 而不模糊碰撞开始 |
| 去除噪声 | 巴特沃斯低通滤波器(4 阶,20 Hz) | 碰撞能量 < 15 Hz;路面纹理噪声 > 30 Hz |
| 去除重力 | 移动平均减法(500 ms) | 消除 1g 直流偏移;在 MCU 上比高通 IIR 更便宜 |
### `feature_engineering.py` — 34 个特征
| 类别 | 特征 | 物理含义 |
|---|---|---|
| 能量 | 峰值 g、RMS | 碰撞严重程度 |
| 形状 | 阶跃、峰度 | 开始速率、冲击性 |
| 运动学 | Delta-V、速度下降 | 真实速度变化(伤害预测) |
| 旋转 | 陀螺仪幅值、RMS 陀螺仪 | 翻滚/旋转检测 |
| 时间 | 碰撞持续时间(ms) | 区分碰撞(20-80 ms)和制动(>500 ms) |
### `rule_engine.py` — 物理门控
所有四个规则都可在 `config.yaml` 中按车辆类别调优:
```
Rule 1: peak_accel ≥ 4.0g → based on NHTSA airbag deployment standard
Rule 2: impact_duration ≥ 15 ms → based on FMVSS 208 pulse corridors
Rule 3: speed ≥ 5 km/h → filters stationary sensor bench tests
Rule 4: speed_drop ≥ 20 km/h → soft rule, logs warning, passes to XGBoost
```
### `model.py` — XGBoost 分类器
关键设计决策:
- **`scale_pos_weight = 5`** — 在梯度更新期间,每个碰撞样本计为 5 个正常样本,补偿 5:1 的类别不平衡,无需 SMOTE。
- **阈值调优** — 不使用 0.5,而是在验证集上找到达到 ≥ 95% 碰撞召回率的最低阈值。漏掉真实碰撞远比误报危险。
- **`subsample = 0.8`、`colsample_bytree = 0.8`** — 随机训练防止对少数碰撞类过拟合。
## ⚙️ 配置
所有参数都在 `configs/config.yaml` 中。关键部分:
```
rule_engine:
high_g_threshold: 4.0 # g — change to 3.0 for motorcycles
impact_duration_ms: 15 # ms
speed_drop_threshold: 20.0 # km/h (soft rule)
model:
max_depth: 5
learning_rate: 0.1
n_estimators: 100
scale_pos_weight: 5 # = n_negative / n_positive
filters:
butterworth:
cutoff_hz: 20 # Hz
order: 4
```
## 📊 示例输出
```
ROAD GUARDIAN — MODEL EVALUATION REPORT
============================================================
Accuracy : 0.9982
ROC-AUC : 0.9997
Avg Precision (AP): 0.9994
precision recall f1-score support
No Crash 1.00 1.00 1.00 1608
Crash 0.99 1.00 0.99 392
accuracy 1.00 2000
Top features by XGBoost gain:
peak_accel_high_res 0.4571
rms_accel_low_res 0.3320
peak_jerk_res 0.1893
kurtosis_high_x 0.0059
gyro_peak_magnitude 0.0042
```
## 🔌 嵌入式系统映射
此流水线直接映射到 Road Guardian Raspberry Pi 固件:
| Python 组件 | 嵌入式等价物 |
|---|---|
| `preprocessing.py` 滤波器 | C 中的定点 IIR/FIR,系数相同 |
| `feature_engineering.py` | 34 标量结构打包成 UART 数据包 |
| `rule_engine.py` | 中断处理程序中的整数比较(< 1 µs) |
| `model.py` XGBoost 推理 | Pi 上编译的 `treelite` C 模型(< 3 ms) |
| 阈值 0.42 | 存储在闪存中,可通过 OTA 现场调整 |
Python 和 C 实现通过共享 `config.yaml` 阈值保持同步。
## 🔮 未来改进
- **真实数据集**:在 NHTSA NCAP 障碍测试记录或改装车辆数据上进行微调。
- **SMOTE**:在真实不平衡数据上比较合成过采样与 `scale_pos_weight`。
- **SHAP 可解释性**:为调度 UI 提供每起事件的 SHAP 值,显示哪个传感器触发了检测。
- **时间模型**:在原始窗口上使用 LSTM 或 TCN — 可能捕获窗口统计遗漏的顺序模式。
- **与 GPS 的传感器融合**:整合地图匹配速度(隧道回退),以在 GPS 信号较弱时提高鲁棒性。
- **treelite 部署**:通过 treelite 将 XGBoost 编译为 C,在 Raspberry Pi Zero 上实现 10 倍更快的推理。
- **OTA 阈值更新**:允许按车辆类别更新规则引擎阈值,无需重新训练。
- **联邦学习**:聚合 Road Guardian 设备车队的碰撞模式,无需传输原始传感器数据。
## 🗂️ 真实 CSV 架构
如果提供您自己的标记数据集,CSV 必须包含:
| 列 | 类型 | 描述 |
|---|---|---|
| `ax_low`、`ay_low`、`az_low` | float | MPU6050 每窗口平均 g |
| `ax_low_max`、`ay_low_max` | float | MPU6050 每窗口峰值 g |
| `ax_high`、`ay_high`、`az_high` | float | ADXL375 平均 g |
| `ax_high_max` | float | ADXL375 峰值 g |
| `gx_mean`、`gy_mean`、`gz_mean` | float | 陀螺仪平均值(deg/s) |
| `gx_max`、`gy_max` | float | 陀螺仪峰值(deg/s) |
| `speed_start`、`speed_end` | float | GPS 速度 km/h |
| `speed_drop` | float | speed_start − speed_end |
| `label` | int | 0 = 无碰撞,1 = 碰撞 |
## 📄 许可证
MIT © Road Guardian 项目
<<<<<<< HEAD
标签:Apex, IoT, Python, STM32, XGBoost, 云计算, 交通事故检测, 传感器数据处理, 信号处理, 实时系统, 嵌入式系统, 异常检测, 数据管道, 无后门, 时间序列分析, 智能交通, 机器学习, 特征提取, 规则引擎, 车联网, 软件工程, 边缘计算, 道路安全, 高召回率