Daray-dev/Real-Time-Crypto-Pipeline
GitHub: Daray-dev/Real-Time-Crypto-Pipeline
一套基于纯 Python 和 SQL Server 构建的轻量级实时加密货币数据管道,专注于市场行情摄取与异常价格检测。
Stars: 0 | Forks: 0
# 实时加密货币分析管道
一个生产级的流数据处理管道,通过公共 API 获取实时加密货币市场数据,进行实时处理,应用异常检测逻辑,并将其加载到 SQL Server 中,用于时间序列分析和报告。
完全使用 Python 和 T-SQL 构建。没有使用任何第三方编排框架——仅使用干净、可读的工程基础知识。
## 架构
```text
┌─────────────────────┐
│ CoinGecko API │ ← 免费的公开市场数据 (价格、交易量、市值)
└────────┬────────────┘
│
▼
┌─────────────────────┐
│ Python Ingestion │ ← 带有重试逻辑 + 速率限制处理的轮询循环
│ Layer │
└────────┬────────────┘
│
▼
┌─────────────────────┐
│ Real-Time │ ← 解析 JSON 响应、验证字段、为数据添加时间戳
│ Processing │
└────────┬────────────┘
│
▼
┌─────────────────────┐
│ Anomaly Detection │ ← 滚动 z-score 标记异常的价格波动
│ Engine │ 将警报写入单独的 SQL 表
└────────┬────────────┘
│
▼
┌─────────────────────┐
│ SQL Server │ ← 为时间序列查询优化的 DDL schema
│ (CryptoDB) │ 在 (symbol, captured_at) 上建立复合索引
└────────┬────────────┘
│
▼
┌─────────────────────┐
│ Analytics Layer │ ← 时间序列查询,市场信号报告
└─────────────────────┘
```
## 技术栈
| 层级 | 技术 | 语言 |
| --- | --- | --- |
| 语言 | Python 3.11+ | |
| 数据库 | Microsoft SQL Server | |
| 查询语言 | T-SQL | |
| API 来源 | CoinGecko (免费版) | |
| 数据库驱动 | pyodbc | |
| HTTP 客户端 | requests | |
| 异常检测 | 滚动 z-score (自定义 Python) | |
## 功能特性
- **实时 API 数据摄取** — 轮询多种加密资产的实时价格、交易量和市值
- **容错性** — 优雅地处理超时、HTTP 错误和速率限制,而不会导致管道崩溃
- **DDL schema 设计** — 具有复合索引的结构化 SQL Server 表,用于快速时间序列查询
- **异常检测** — 使用滚动统计窗口标记异常的价格波动
- **警报表** — 单独的 SQL 表捕获标记的事件,用于下游分析
- **端到端 Python** — API → 处理 → SQL 加载,全部在一条干净的管道中完成
## 项目结构
```text
Real-Time-Crypto-Analytics-Pipeline/
│
├── Code/
│ ├── connection_test.py # 在运行前验证 SQL Server 连接
│ ├── api_fetch.py # 带有错误处理的 CoinGecko API 数据摄取
│ ├── ddl_schema.sql # CREATE TABLE 语句和索引定义
│ ├── pipeline.py # 主管道编排循环
│ └── anomaly_detection.py # 滚动 z-score 逻辑和警报写入
│
└── README.md
```
## 入门指南
### 前置条件
- Python 3.11+
- Microsoft SQL Server (本地或远程实例)
- SQL Server Management Studio (SSMS) — 可选但推荐
### 1. 克隆仓库
```bash
git clone https://https://github.com/Daray-dev/Real-Time-Crypto-Pipeline.git
cd Real-Time-Crypto-Analytics-Pipeline
```
### 2. 安装依赖
```bash
pip install requests pyodbc
```
### 3. 设置数据库
打开 SSMS 并创建一个新数据库:
```sql
CREATE DATABASE CryptoDB;
```
然后运行 DDL 脚本来创建你的表:
```sql
-- 在 SSMS 中针对 CryptoDB 运行 Code/ddl_schema.sql
```
### 4. 测试你的连接
使用你的服务器名称和数据库名称编辑 `connection_test.py`,然后运行:
```bash
python Code/connection_test.py
```
预期输出:`Connection successful.`
### 5. 运行管道
```bash
python Code/pipeline.py
```
该管道将开始轮询 API、处理数据,并实时将记录加载到 SQL Server 中。
## Schema 设计
```sql
-- 核心价格表
CREATE TABLE crypto_prices (
id INT IDENTITY PRIMARY KEY,
symbol VARCHAR(20) NOT NULL,
price_usd DECIMAL(18,8) NOT NULL,
volume_24h DECIMAL(24,2),
market_cap DECIMAL(24,2),
captured_at DATETIME2 NOT NULL DEFAULT GETUTCDATE(),
INDEX ix_symbol_time (symbol, captured_at DESC)
);
-- 异常警报表
CREATE TABLE price_alerts (
id INT IDENTITY PRIMARY KEY,
symbol VARCHAR(20) NOT NULL,
price_usd DECIMAL(18,8) NOT NULL,
z_score FLOAT NOT NULL,
alert_type VARCHAR(50),
triggered_at DATETIME2 NOT NULL DEFAULT GETUTCDATE()
);
```
在 `(symbol, captured_at DESC)` 上的复合索引确保了对任何时间窗口内的资产进行快速过滤——这对于大规模时间序列性能至关重要。
## 异常检测逻辑
使用滚动 z-score 方法检测价格异常:
1. 计算可配置窗口内价格的滚动平均值和标准差
2. 计算每个传入价格点的 z-score
3. 标记任何满足 `|z_score| > 阈值` (默认:2.5) 的点
4. 将标记的记录连同计算出的 z-score 一起写入 `price_alerts` 表
这种方法检测相对于近期价格行为的突然飙升和暴跌——比静态阈值警报更具适应性。
## 关键工程决策
**为什么选择轮询而不是 websocket 流?**
通过 REST 进行轮询更易于实现、调试更方便,并且足以满足分钟级的粒度需求。对于亚秒级延迟要求,升级到 websocket 是自然而然的下一步。
**为什么选择 SQL Server 而不是云数据仓库?**
该项目优先展示核心数据工程基础知识——schema 设计、索引策略、ETL 逻辑——而不依赖云基础设施。该架构可以直接映射到云等效方案 (Azure SQL, Snowflake, BigQuery)。
**为什么在 (symbol, captured_at) 上建立复合索引?**
大多数分析查询都是按资产在时间范围内进行过滤的。如果没有此索引,这些查询将需要进行全表扫描。有了它,它们就是 O(log n) 的查找。
## 构建此项目的收获
- 错误处理和重试逻辑是任何生产管道中最重要的部分
- Schema 决策会产生复合影响——在设计时建立良好索引的表可以在以后节省数周的重构时间
- 异常检测的窗口大小需要经验性调优——太小会产生噪音,太大会遗漏真实信号
- 发布一个可运行的项目比任何教程学到的都多
## 后续计划
- 添加 `requirements.txt` 以实现可复现的安装
- 为数据库凭证提供环境变量支持 (`.env` + `python-dotenv`)
- 通过 `config.yaml` 支持可配置的轮询间隔和异常阈值
- 提供 Docker 支持,实现便携的本地部署
- 连接到 SQL Server 分析层的 Power BI 仪表板
## 作者
Ray Molden — 正在向数据工程转型的数据分析师
LinkedIn · GitHub · Newsletter
## 许可证
MIT License — 可自由使用,请注明出处。
标签:CoinGecko API, ETL管道, JSON解析, Python, SQL Server, T-SQL, Z-Score, 代码示例, 价格监控, 加密货币, 实时数据流, 市场情报, 异常检测, 数据入库, 数据分析, 数据工程, 无后门, 时间序列分析, 流式处理, 生产级架构, 逆向工具, 金融数据分析, 金融科技