chaudharipnarendra-debug/data-engineering-assignment
GitHub: chaudharipnarendra-debug/data-engineering-assignment
一个基于FastAPI和PostgreSQL的端到端数据工程管道,用于实时处理市场数据并实现ETL和VWAP计算。
Stars: 0 | Forks: 0
# 端到端数据工程作业
一个生产级数据管道,它从 FastAPI 服务器摄取合成市场数据,进行验证和转换,并将结果持久化到 PostgreSQL——所有组件均通过 Docker 容器化。
## 架构
```
┌─────────────────┐ poll every 5s ┌──────────────────────────────────┐
│ FastAPI Server │ ─────────────────────► │ ETL Pipeline │
│ /v1/market-data│ │ Extract → Validate → Transform │
│ (5% chaos) │ │ → Load │
└─────────────────┘ └──────────────┬───────────────────┘
│ INSERT
▼
┌─────────────────┐
│ PostgreSQL │
│ market_data │
│ vwap_results │
└─────────────────┘
```
## 项目结构
```
.
├── api/
│ ├── main.py # FastAPI server with chaos injection
│ ├── requirements.txt
│ └── Dockerfile
├── etl/
│ ├── pipeline.py # ETL logic: extract, validate, transform, load
│ ├── models.py # Pydantic schemas
│ ├── requirements.txt
│ └── Dockerfile
├── db/
│ └── init.sql # Table definitions and indexes
├── docker-compose.yml
├── .env # Credentials (NOT committed to Git)
├── .gitignore
└── README.md
```
## 快速启动
### 前置条件
- 已安装 Docker 和 Docker Compose
### 一键运行所有服务
```
git clone
cd data-engineering-assignment
docker-compose up --build
```
搞定。所有三个服务(数据库、API、ETL)将按正确顺序自动启动。
### 验证运行状态
```
# 检查 API
curl http://localhost:8000/v1/market-data
# 检查 ETL 日志
docker logs market_etl -f
# 检查 DB 数据
docker exec -it market_db psql -U etluser -d marketdb -c "SELECT * FROM market_data LIMIT 10;"
docker exec -it market_db psql -U etluser -d marketdb -c "SELECT * FROM vwap_results LIMIT 10;"
```
### 停止服务
```
docker-compose down # stop containers
docker-compose down -v # stop + delete DB volume
```
## 配置
所有设置位于 `.env` 文件中(切勿提交此文件):
| 变量名 | 默认值 | 描述 |
|---|---|---|
| `POSTGRES_DB` | `marketdb` | 数据库名称 |
| `POSTGRES_USER` | `etluser` | 数据库用户名 |
| `POSTGRES_PASSWORD` | `etlpassword` | 数据库密码 |
| `POLL_INTERVAL` | `5` | API 轮询间隔(秒) |
| `OUTLIER_THRESHOLD` | `0.15` | 价格偏差百分比,超过此值标记为异常值 |
| `MAX_RETRIES` | `3` | API 请求失败时的重试次数 |
## ETL 管道详情
### 提取
- 每 5 秒轮询 `GET /v1/market-data` 接口
- 遇到 500 错误或超时时,最多重试 3 次,采用指数退避策略
- 如果所有重试失败,则记录日志并跳过该批次数据
### 转换
- **VWAP** = 每批次每个金融工具的 `Σ(价格 × 成交量) / Σ(成交量)`
- **异常值检测**:标记任何 `|价格 − 平均价格| / 平均价格 > 15%` 的记录
### 质量控制
- 通过 Pydantic 进行**模式验证**——拒绝类型无效、字段缺失或数值为负的记录
- 通过 `ON CONFLICT (instrument_id, timestamp) DO NOTHING` 进行**重复预防**
- **结构化日志**报告:`fetched`(已获取)、`processed`(已处理)、`dropped`(已丢弃)、`inserted`(已插入)、`execution_time`(执行时间)
### 示例日志输出
```
{"time": "2024-01-15T10:30:05Z", "level": "INFO", "message": "Batch complete | fetched=7 | processed=6 | dropped(validation)=1 | inserted=6 | vwap_calculated=6 | execution_time=0.312s"}
```
## 系统设计问题
### 1. 扩展到每天 10 亿事件
按每秒约 11,500 个事件计算,当前的轮询架构将无法扩展。所需更改:
**数据摄取层**:将轮询 ETL 替换为事件流架构。API 可向 **Apache Kafka**(或 AWS Kinesis)发布消息。这实现了生产者与消费者的解耦,并提供了持久、可重放的事件存储。
**数据处理层**:将单线程 Python 管道替换为 **Apache Spark Structured Streaming** 或 **Flink**。这些框架将处理任务分布到集群,实现跨分区的并行 VWAP 和异常值计算。
**数据存储层**:将 PostgreSQL 替换为列式存储,如 **S3 上的 Apache Parquet**(通过 Athena 或 BigQuery 查询历史数据),以及用于实时查询的时序数据库,如 **ClickHouse** 或 **TimescaleDB**。
**任务编排**:批量作业使用 **Apache Airflow** 或 **Prefect**,流处理工作者使用带自动伸缩的 Kubernetes。
```
API → Kafka → Spark Streaming → ClickHouse (real-time)
→ S3/Parquet (historical)
```
### 2. 生产环境健康检查与监控
三层监控:
**管道级检查**
- 跟踪每批次的 `records_processed`(处理记录数)。如果降至零(API 故障或 ETL 崩溃)则告警。
- 跟踪 `dropped_rate = dropped / fetched`(丢弃率)。如果 > 20%(上游数据质量下降)则告警。
- 跟踪 `execution_time`(执行时间)。如果超过轮询间隔的 2 倍(管道处理滞后)则告警。
**基础设施级检查**
- Docker 健康检查(已在 `docker-compose.yml` 中定义)会重启不健康的容器。
- 数据库连接池监控——连接耗尽时告警。
**可观测性技术栈**
- 通过 **Prometheus** 暴露指标(在 Python 中使用 `prometheus_client`)。
- 在 **Grafana** 仪表板中可视化,并为异常模式设置告警。
- 将结构化 JSON 日志发送到 **ELK Stack**(Elasticsearch + Kibana)或 **Datadog**,以实现可搜索的审计跟踪。
### 3. 幂等性与恢复
**核心策略**:使每次写入操作具有幂等性,从而重新运行失败批次的结果与单次运行的结果一致。
这已通过以下方式实现:
```
ON CONFLICT (instrument_id, timestamp) DO NOTHING
```
如果管道在批次处理中途崩溃并重启,重新插入相同的记录只会跳过重复项——不会导致数据损坏。
**对于大型批次(10GB 以上)**:
1. **检查点**:在单独的 `pipeline_checkpoints` 表中跟踪最后成功处理的偏移量(Kafka 偏移量或文件字节位置)。重启时,从最后提交的检查点恢复——而不是从头开始。
2. **事务性批处理**:将每个微批次包装在数据库事务中。要么整个批次提交,要么全部回滚。部分写入永远不会到达最终表。
3. **死信队列**:经过 3 次尝试后仍然验证失败的记录,连同错误原因一起写入 `failed_records` 表,便于人工审核和重新处理,且不会阻塞主管道。
```
# Checkpoint 模式
with conn:
insert_batch(records)
update_checkpoint(last_offset) # atomic with the insert
conn.commit()
```
标签:API开发, AV绕过, Docker, ETL管道, FastAPI, NIDS, PostgreSQL, Pydantic, VWAP计算, 合成数据, 安全防御评估, 容器化, 异常检测, 数据工程, 数据持久化, 数据验证, 测试用例, 混沌工程, 生产级系统, 端到端系统, 请求拦截, 逆向工具, 金融数据分析