chaudharipnarendra-debug/data-engineering-assignment

GitHub: chaudharipnarendra-debug/data-engineering-assignment

一个基于FastAPI和PostgreSQL的端到端数据工程管道,用于实时处理市场数据并实现ETL和VWAP计算。

Stars: 0 | Forks: 0

# 端到端数据工程作业 一个生产级数据管道,它从 FastAPI 服务器摄取合成市场数据,进行验证和转换,并将结果持久化到 PostgreSQL——所有组件均通过 Docker 容器化。 ## 架构 ``` ┌─────────────────┐ poll every 5s ┌──────────────────────────────────┐ │ FastAPI Server │ ─────────────────────► │ ETL Pipeline │ │ /v1/market-data│ │ Extract → Validate → Transform │ │ (5% chaos) │ │ → Load │ └─────────────────┘ └──────────────┬───────────────────┘ │ INSERT ▼ ┌─────────────────┐ │ PostgreSQL │ │ market_data │ │ vwap_results │ └─────────────────┘ ``` ## 项目结构 ``` . ├── api/ │ ├── main.py # FastAPI server with chaos injection │ ├── requirements.txt │ └── Dockerfile ├── etl/ │ ├── pipeline.py # ETL logic: extract, validate, transform, load │ ├── models.py # Pydantic schemas │ ├── requirements.txt │ └── Dockerfile ├── db/ │ └── init.sql # Table definitions and indexes ├── docker-compose.yml ├── .env # Credentials (NOT committed to Git) ├── .gitignore └── README.md ``` ## 快速启动 ### 前置条件 - 已安装 Docker 和 Docker Compose ### 一键运行所有服务 ``` git clone cd data-engineering-assignment docker-compose up --build ``` 搞定。所有三个服务(数据库、API、ETL)将按正确顺序自动启动。 ### 验证运行状态 ``` # 检查 API curl http://localhost:8000/v1/market-data # 检查 ETL 日志 docker logs market_etl -f # 检查 DB 数据 docker exec -it market_db psql -U etluser -d marketdb -c "SELECT * FROM market_data LIMIT 10;" docker exec -it market_db psql -U etluser -d marketdb -c "SELECT * FROM vwap_results LIMIT 10;" ``` ### 停止服务 ``` docker-compose down # stop containers docker-compose down -v # stop + delete DB volume ``` ## 配置 所有设置位于 `.env` 文件中(切勿提交此文件): | 变量名 | 默认值 | 描述 | |---|---|---| | `POSTGRES_DB` | `marketdb` | 数据库名称 | | `POSTGRES_USER` | `etluser` | 数据库用户名 | | `POSTGRES_PASSWORD` | `etlpassword` | 数据库密码 | | `POLL_INTERVAL` | `5` | API 轮询间隔(秒) | | `OUTLIER_THRESHOLD` | `0.15` | 价格偏差百分比,超过此值标记为异常值 | | `MAX_RETRIES` | `3` | API 请求失败时的重试次数 | ## ETL 管道详情 ### 提取 - 每 5 秒轮询 `GET /v1/market-data` 接口 - 遇到 500 错误或超时时,最多重试 3 次,采用指数退避策略 - 如果所有重试失败,则记录日志并跳过该批次数据 ### 转换 - **VWAP** = 每批次每个金融工具的 `Σ(价格 × 成交量) / Σ(成交量)` - **异常值检测**:标记任何 `|价格 − 平均价格| / 平均价格 > 15%` 的记录 ### 质量控制 - 通过 Pydantic 进行**模式验证**——拒绝类型无效、字段缺失或数值为负的记录 - 通过 `ON CONFLICT (instrument_id, timestamp) DO NOTHING` 进行**重复预防** - **结构化日志**报告:`fetched`(已获取)、`processed`(已处理)、`dropped`(已丢弃)、`inserted`(已插入)、`execution_time`(执行时间) ### 示例日志输出 ``` {"time": "2024-01-15T10:30:05Z", "level": "INFO", "message": "Batch complete | fetched=7 | processed=6 | dropped(validation)=1 | inserted=6 | vwap_calculated=6 | execution_time=0.312s"} ``` ## 系统设计问题 ### 1. 扩展到每天 10 亿事件 按每秒约 11,500 个事件计算,当前的轮询架构将无法扩展。所需更改: **数据摄取层**:将轮询 ETL 替换为事件流架构。API 可向 **Apache Kafka**(或 AWS Kinesis)发布消息。这实现了生产者与消费者的解耦,并提供了持久、可重放的事件存储。 **数据处理层**:将单线程 Python 管道替换为 **Apache Spark Structured Streaming** 或 **Flink**。这些框架将处理任务分布到集群,实现跨分区的并行 VWAP 和异常值计算。 **数据存储层**:将 PostgreSQL 替换为列式存储,如 **S3 上的 Apache Parquet**(通过 Athena 或 BigQuery 查询历史数据),以及用于实时查询的时序数据库,如 **ClickHouse** 或 **TimescaleDB**。 **任务编排**:批量作业使用 **Apache Airflow** 或 **Prefect**,流处理工作者使用带自动伸缩的 Kubernetes。 ``` API → Kafka → Spark Streaming → ClickHouse (real-time) → S3/Parquet (historical) ``` ### 2. 生产环境健康检查与监控 三层监控: **管道级检查** - 跟踪每批次的 `records_processed`(处理记录数)。如果降至零(API 故障或 ETL 崩溃)则告警。 - 跟踪 `dropped_rate = dropped / fetched`(丢弃率)。如果 > 20%(上游数据质量下降)则告警。 - 跟踪 `execution_time`(执行时间)。如果超过轮询间隔的 2 倍(管道处理滞后)则告警。 **基础设施级检查** - Docker 健康检查(已在 `docker-compose.yml` 中定义)会重启不健康的容器。 - 数据库连接池监控——连接耗尽时告警。 **可观测性技术栈** - 通过 **Prometheus** 暴露指标(在 Python 中使用 `prometheus_client`)。 - 在 **Grafana** 仪表板中可视化,并为异常模式设置告警。 - 将结构化 JSON 日志发送到 **ELK Stack**(Elasticsearch + Kibana)或 **Datadog**,以实现可搜索的审计跟踪。 ### 3. 幂等性与恢复 **核心策略**:使每次写入操作具有幂等性,从而重新运行失败批次的结果与单次运行的结果一致。 这已通过以下方式实现: ``` ON CONFLICT (instrument_id, timestamp) DO NOTHING ``` 如果管道在批次处理中途崩溃并重启,重新插入相同的记录只会跳过重复项——不会导致数据损坏。 **对于大型批次(10GB 以上)**: 1. **检查点**:在单独的 `pipeline_checkpoints` 表中跟踪最后成功处理的偏移量(Kafka 偏移量或文件字节位置)。重启时,从最后提交的检查点恢复——而不是从头开始。 2. **事务性批处理**:将每个微批次包装在数据库事务中。要么整个批次提交,要么全部回滚。部分写入永远不会到达最终表。 3. **死信队列**:经过 3 次尝试后仍然验证失败的记录,连同错误原因一起写入 `failed_records` 表,便于人工审核和重新处理,且不会阻塞主管道。 ``` # Checkpoint 模式 with conn: insert_batch(records) update_checkpoint(last_offset) # atomic with the insert conn.commit() ```
标签:API开发, AV绕过, Docker, ETL管道, FastAPI, NIDS, PostgreSQL, Pydantic, VWAP计算, 合成数据, 安全防御评估, 容器化, 异常检测, 数据工程, 数据持久化, 数据验证, 测试用例, 混沌工程, 生产级系统, 端到端系统, 请求拦截, 逆向工具, 金融数据分析