preetshah09/SmartMeter-AWS

GitHub: preetshah09/SmartMeter-AWS

基于 AWS EMR PySpark 与 Isolation Forest 的可扩展数据处理 pipeline,每小时处理超过 300 万条智能电表记录并完成异常检测。

Stars: 0 | Forks: 0

# SmartMeter – 集成 ML 的 AWS Cloud Pipeline 这是一个可扩展的 AWS 数据 pipeline,使用在 EMR 上运行的 PySpark 每小时处理超过 300 万条智能电表记录,并包含一个集成 ML 的异常检测系统,该系统使用 Isolation Forest 实现了 91% 的精确率。 ## 架构 ``` Smart Meters (IoT) │ ▼ AWS S3 (Raw) │ ▼ AWS EMR (PySpark) ──── partition pruning + clustering ────► Amazon Redshift │ ▼ Isolation Forest (Sklearn on Spark) │ ▼ Anomaly Flags → S3 → Redshift (anomalies table) │ Apache Airflow (orchestration) + Docker (containerized) ``` ## 技术栈 | 层级 | 技术 | |---|---| | 摄取 | AWS S3(原始落地区) | | 处理 | AWS EMR 上的 PySpark | | 数据仓库 | Amazon Redshift (RA3) | | ML | Scikit-learn Isolation Forest + PySpark UDF | | 编排 | Apache Airflow | | 容器化 | Docker + Docker Compose | | 语言 | Python 3.11, PySpark 3.5 | ## 项目结构 ``` smartmeter-aws/ ├── dags/ │ └── smartmeter_pipeline_dag.py # Airflow DAG ├── processing/ │ ├── emr_spark_job.py # PySpark transformation job │ └── redshift_loader.py # Redshift COPY loader ├── ml/ │ ├── anomaly_detection/ │ │ ├── train.py # Isolation Forest training │ │ └── predict.py # Batch inference on Spark │ └── model_artifacts/ # Serialized models (git-ignored) ├── infrastructure/ │ ├── docker/ │ │ └── Dockerfile │ └── terraform/ │ └── main.tf ├── config/ │ └── settings.py └── tests/ └── test_emr_job.py ``` ## 设置 ``` pip install pyspark==3.5.1 boto3==1.34.0 scikit-learn==1.5.0 \ apache-airflow==2.9.2 apache-airflow-providers-amazon==8.22.0 \ pandas==2.2.2 pyarrow==16.1.0 psycopg2-binary==2.9.9 ``` ### 环境 ``` export AWS_REGION=us-east-1 export S3_BUCKET=smartmeter-data-lake export EMR_CLUSTER_ID=j-XXXXXXXXXX export REDSHIFT_HOST=smartmeter.xxxxxx.us-east-1.redshift.amazonaws.com export REDSHIFT_DB=smartmeter export REDSHIFT_USER=admin export REDSHIFT_PASSWORD=your-password export REDSHIFT_IAM_ROLE=arn:aws:iam::123456789:role/RedshiftS3Access ``` ## 运行 ``` # 在 EMR 上运行 PySpark job aws emr add-steps --cluster-id $EMR_CLUSTER_ID \ --steps Type=Spark,Name=SmartMeter,ActionOnFailure=CONTINUE,\ Args=[--deploy-mode,cluster,--py-files,s3://smartmeter-data-lake/code/processing.zip,\ s3://smartmeter-data-lake/code/emr_spark_job.py,--date,2024-01-15] # 或通过 Airflow 运行 airflow dags trigger smartmeter_pipeline --conf '{"process_date": "2024-01-15"}' # Docker docker-compose up --build ``` ## ML 模型 Isolation Forest 模型基于 30 天的滚动历史数据进行训练,并每周刷新一次。 异常分数 < 阈值(调整至 -0.15)时会触发警报。 实现的指标: - Precision:91% - Recall:87% - F1 Score:0.89
标签:Amazon Web Services, Apache Airflow, Apex, PySpark, 大数据处理, 异常检测, 数据管道, 机器学习, 请求拦截, 软件工程, 逆向工具