Aryan741x/OmniRoute-Smart-Logistics-Group3
GitHub: Aryan741x/OmniRoute-Smart-Logistics-Group3
基于 AWS EMR 和 Delta Lake 构建的生产级智能物流数据管道,融合批处理与实时流处理,实现车队运营监控、驾驶员安全管理与燃料效率审计的一体化解决方案。
Stars: 0 | Forks: 0
# OmniRoute 智能物流引擎
## 概述
本仓库为 OmniRoute 智能物流引擎实现了一个**生产可用、幂等的数据管道**。该管道处理**批处理**和**实时流处理**数据,以管理车队运营、确保驾驶员安全并自动化财务审计。它在 AWS 上采用 Bronze → Silver → Gold (Delta Lake) 架构,并由 Apache Airflow 进行编排。
## 业务需求文档 (BRD) - 核心规范
“通过新功能增强物流引擎”文档定义了以下核心逻辑:
### 1. 批处理调度
- **每天 @ 00:00 UTC**:车辆注册与分配(增量更新)。
- **每天 @ 07:00 UTC**:燃料交易(效率审计)。
- **每月 @ 每月 1 日**:驾驶员状态报告与薪资冷却期结算。
- **每年 @ 1 月 1 日**:维护计划摄取。
### 2. 实时流处理逻辑 (Kafka)
- **违规检测**:标记车速超过 **110 km/h** 或进入 `restricted_zones.json` 中定义的坐标的车辆。
- **惩罚系统**:一次“安全警告”将导致从驾驶员的 `daily_rate` 中扣除 **5%**。
- **停职机制**:达到 **10 次警告**的驾驶员将转为 `SUSPENDED`(停职)状态,并被禁止出车。
- **每月冷却期**:在每月的 1 日,警告计数将重置为 0,并且费率将恢复(针对未停职的驾驶员)。
## 架构
该系统构建在 **AWS** 上,并由 **Apache Airflow** 编排:
- **批处理层**:运行在 S3 Delta Lake 上的临时 EMR 集群执行 Spark 作业。
- **流处理层**:Spark Streaming/Flink 摄取 Kafka 遥测数据以进行实时安全标记。
- **数据库**:PostgreSQL (EC2) 结合变更数据馈送 (CDF) 实现高效的增量导出。
```
flowchart TD
subgraph Ingestion["Source Layer"]
K1[("Kafka Telemetry Stream")]
S3B[("S3 Landing Zone (CSVs)")]
end
subgraph Processing["Processing Layer (EMR)"]
direction TB
B1["Bronze (Raw Data)"]
S1["Silver (Cleansed/SCD2)"]
G1["Gold (Business Aggr)"]
ST1["Streaming Engine (Safety Logic)"]
end
subgraph Storage["Storage & Warehouse"]
DL[("Delta Lake (S3)")]
PG[("PostgreSQL (Export)")]
end
K1 --> ST1
S3B --> B1
B1 --> S1 --> G1
ST1 --> DL
G1 --> DL
DL -- CDF Upsert --> PG
subgraph Orchestration["Airflow Control"]
A1["EC2 Control"]
A2["EMR Lifecycle"]
A3["Branching Logic"]
end
```
## 数据模型与关系
| 层 | 数据集 | 描述 | 关键列 | 频率 |
| :--- | :--- | :--- | :--- | :--- |
| **Bronze** | `vehicle_registry` | 原始车辆目录快照 | `vin`, `model`, `fuel_type` | 每天 |
| | `vehicle_assignment` | 增量的驾驶员与车辆映射 | `vin`, `driver_id`, `daily_rate` | 每天 |
| | `fuel_transactions` | 用于效率计算的日志 | `vin`, `fuel_liters`, `odometer` | 每天 |
| | `telemetry_stream` | **Kafka JSON** 实时事件 | `vin`, `speed`, `lat`, `long` | 实时 |
| **Silver** | `asset_history` | 跟踪驾驶员换班的 **SCD Type 2** | `vin`, `driver_id`, `effective_to` | 批处理 |
| | `clensed_telemetry`| 经过滤的安全违规记录 | `vin`, `strike_type`, `ts` | 流处理 |
| **Gold** | `vehicle_gold` | 附带薪资调整的资产历史 | `vin`, `current_adjusted_rate` | 批处理 |
| | `fuel_audit` | 效率对比 12% 阈值审计 | `vin`, `flag`, `distance` | 每天 |
| | `driver_standing` | 绩效排名与警告重置 | `driver_id`, `strike_count` | 每月 |
## 核心业务逻辑场景
### 1. SCD Type 2 与连续性
通过关闭旧记录(状态:ARCHIVED)并开启新记录(状态:IN-TRANSIT)来处理“驾驶员换班”,以确保不会丢失任何历史费率数据。
### 2. 冲突解决:“最高费率”规则
如果在同一天同一辆车到达了多条记录,系统会使用窗口函数优先处理具有**最高日费率**的记录。
### 3. 效率审计(12% 规则)
标记 `Distance / Fuel` 低于该车型基准线 12% 的车辆。**排除逻辑**:星期日的加油或维护日将被忽略,以避免对怠速进行误罚。
### 4. 安全警告惩罚
如果流处理事件触发了警告:
`Current Adjusted Rate = Base Rate * 0.95`
扣款将累计计算,直至每月重置。
## 设置与执行
### 1. 批处理执行
1. **配置 Airflow 变量**:`pg_host`、`pg_port`、`pg_db`、`postgres_ec2_id`。
2. **上传脚本**:将 `script/` 文件夹同步到 S3。
3. **触发**:通过 Airflow 运行 `omniroute_batch_final`。
### 2. 流处理执行
1. **启动 Kafka 生产者**:确保遥测数据正流入配置的主题。
2. **提交流处理作业**:运行 Spark Streaming 作业以开始有状态的安全处理。
标签:Apache Airflow, Apache Flink, Apache Kafka, Apache Spark, AWS EMR, Bronze-Silver-Gold架构, CDC, Delta Lake, ETL管道, Gradle集成, Medallion架构, PMD, PostgreSQL, Spark Streaming, SQL数据库, 亚马逊云, 代码规范检查, 任务调度, 变更数据捕获, 大数据, 实时流处理, 异常检测, 批处理, 数据仓库, 数据工程, 数据治理, 数据湖, 数据管道, 智能物流, 测试用例, 漏洞探索, 物流数据分析, 目录扫描, 车队管理, 软件工程, 软件成分分析, 运输管理, 逆向工具, 驾驶员安全