preetshah09/SmartMeter-AWS
GitHub: preetshah09/SmartMeter-AWS
基于 AWS EMR PySpark 与 Isolation Forest 的可扩展数据处理 pipeline,每小时处理超过 300 万条智能电表记录并完成异常检测。
Stars: 0 | Forks: 0
# SmartMeter – 集成 ML 的 AWS Cloud Pipeline
这是一个可扩展的 AWS 数据 pipeline,使用在 EMR 上运行的 PySpark 每小时处理超过 300 万条智能电表记录,并包含一个集成 ML 的异常检测系统,该系统使用 Isolation Forest 实现了 91% 的精确率。
## 架构
```
Smart Meters (IoT)
│
▼
AWS S3 (Raw)
│
▼
AWS EMR (PySpark) ──── partition pruning + clustering ────► Amazon Redshift
│
▼
Isolation Forest (Sklearn on Spark)
│
▼
Anomaly Flags → S3 → Redshift (anomalies table)
│
Apache Airflow (orchestration) + Docker (containerized)
```
## 技术栈
| 层级 | 技术 |
|---|---|
| 摄取 | AWS S3(原始落地区) |
| 处理 | AWS EMR 上的 PySpark |
| 数据仓库 | Amazon Redshift (RA3) |
| ML | Scikit-learn Isolation Forest + PySpark UDF |
| 编排 | Apache Airflow |
| 容器化 | Docker + Docker Compose |
| 语言 | Python 3.11, PySpark 3.5 |
## 项目结构
```
smartmeter-aws/
├── dags/
│ └── smartmeter_pipeline_dag.py # Airflow DAG
├── processing/
│ ├── emr_spark_job.py # PySpark transformation job
│ └── redshift_loader.py # Redshift COPY loader
├── ml/
│ ├── anomaly_detection/
│ │ ├── train.py # Isolation Forest training
│ │ └── predict.py # Batch inference on Spark
│ └── model_artifacts/ # Serialized models (git-ignored)
├── infrastructure/
│ ├── docker/
│ │ └── Dockerfile
│ └── terraform/
│ └── main.tf
├── config/
│ └── settings.py
└── tests/
└── test_emr_job.py
```
## 设置
```
pip install pyspark==3.5.1 boto3==1.34.0 scikit-learn==1.5.0 \
apache-airflow==2.9.2 apache-airflow-providers-amazon==8.22.0 \
pandas==2.2.2 pyarrow==16.1.0 psycopg2-binary==2.9.9
```
### 环境
```
export AWS_REGION=us-east-1
export S3_BUCKET=smartmeter-data-lake
export EMR_CLUSTER_ID=j-XXXXXXXXXX
export REDSHIFT_HOST=smartmeter.xxxxxx.us-east-1.redshift.amazonaws.com
export REDSHIFT_DB=smartmeter
export REDSHIFT_USER=admin
export REDSHIFT_PASSWORD=your-password
export REDSHIFT_IAM_ROLE=arn:aws:iam::123456789:role/RedshiftS3Access
```
## 运行
```
# 在 EMR 上运行 PySpark job
aws emr add-steps --cluster-id $EMR_CLUSTER_ID \
--steps Type=Spark,Name=SmartMeter,ActionOnFailure=CONTINUE,\
Args=[--deploy-mode,cluster,--py-files,s3://smartmeter-data-lake/code/processing.zip,\
s3://smartmeter-data-lake/code/emr_spark_job.py,--date,2024-01-15]
# 或通过 Airflow 运行
airflow dags trigger smartmeter_pipeline --conf '{"process_date": "2024-01-15"}'
# Docker
docker-compose up --build
```
## ML 模型
Isolation Forest 模型基于 30 天的滚动历史数据进行训练,并每周刷新一次。
异常分数 < 阈值(调整至 -0.15)时会触发警报。
实现的指标:
- Precision:91%
- Recall:87%
- F1 Score:0.89
标签:Amazon Web Services, Apache Airflow, Apex, PySpark, 大数据处理, 异常检测, 数据管道, 机器学习, 请求拦截, 软件工程, 逆向工具