Daray-dev/Real-Time-Crypto-Pipeline

GitHub: Daray-dev/Real-Time-Crypto-Pipeline

一套基于纯 Python 和 SQL Server 构建的轻量级实时加密货币数据管道,专注于市场行情摄取与异常价格检测。

Stars: 0 | Forks: 0

# 实时加密货币分析管道 一个生产级的流数据处理管道,通过公共 API 获取实时加密货币市场数据,进行实时处理,应用异常检测逻辑,并将其加载到 SQL Server 中,用于时间序列分析和报告。 完全使用 Python 和 T-SQL 构建。没有使用任何第三方编排框架——仅使用干净、可读的工程基础知识。 ## 架构 ```text ┌─────────────────────┐ │ CoinGecko API │ ← 免费的公开市场数据 (价格、交易量、市值) └────────┬────────────┘ │ ▼ ┌─────────────────────┐ │ Python Ingestion │ ← 带有重试逻辑 + 速率限制处理的轮询循环 │ Layer │ └────────┬────────────┘ │ ▼ ┌─────────────────────┐ │ Real-Time │ ← 解析 JSON 响应、验证字段、为数据添加时间戳 │ Processing │ └────────┬────────────┘ │ ▼ ┌─────────────────────┐ │ Anomaly Detection │ ← 滚动 z-score 标记异常的价格波动 │ Engine │ 将警报写入单独的 SQL 表 └────────┬────────────┘ │ ▼ ┌─────────────────────┐ │ SQL Server │ ← 为时间序列查询优化的 DDL schema │ (CryptoDB) │ 在 (symbol, captured_at) 上建立复合索引 └────────┬────────────┘ │ ▼ ┌─────────────────────┐ │ Analytics Layer │ ← 时间序列查询,市场信号报告 └─────────────────────┘ ``` ## 技术栈 | 层级 | 技术 | 语言 | | --- | --- | --- | | 语言 | Python 3.11+ | | | 数据库 | Microsoft SQL Server | | | 查询语言 | T-SQL | | | API 来源 | CoinGecko (免费版) | | | 数据库驱动 | pyodbc | | | HTTP 客户端 | requests | | | 异常检测 | 滚动 z-score (自定义 Python) | | ## 功能特性 - **实时 API 数据摄取** — 轮询多种加密资产的实时价格、交易量和市值 - **容错性** — 优雅地处理超时、HTTP 错误和速率限制,而不会导致管道崩溃 - **DDL schema 设计** — 具有复合索引的结构化 SQL Server 表,用于快速时间序列查询 - **异常检测** — 使用滚动统计窗口标记异常的价格波动 - **警报表** — 单独的 SQL 表捕获标记的事件,用于下游分析 - **端到端 Python** — API → 处理 → SQL 加载,全部在一条干净的管道中完成 ## 项目结构 ```text Real-Time-Crypto-Analytics-Pipeline/ │ ├── Code/ │ ├── connection_test.py # 在运行前验证 SQL Server 连接 │ ├── api_fetch.py # 带有错误处理的 CoinGecko API 数据摄取 │ ├── ddl_schema.sql # CREATE TABLE 语句和索引定义 │ ├── pipeline.py # 主管道编排循环 │ └── anomaly_detection.py # 滚动 z-score 逻辑和警报写入 │ └── README.md ``` ## 入门指南 ### 前置条件 - Python 3.11+ - Microsoft SQL Server (本地或远程实例) - SQL Server Management Studio (SSMS) — 可选但推荐 ### 1. 克隆仓库 ```bash git clone https://https://github.com/Daray-dev/Real-Time-Crypto-Pipeline.git cd Real-Time-Crypto-Analytics-Pipeline ``` ### 2. 安装依赖 ```bash pip install requests pyodbc ``` ### 3. 设置数据库 打开 SSMS 并创建一个新数据库: ```sql CREATE DATABASE CryptoDB; ``` 然后运行 DDL 脚本来创建你的表: ```sql -- 在 SSMS 中针对 CryptoDB 运行 Code/ddl_schema.sql ``` ### 4. 测试你的连接 使用你的服务器名称和数据库名称编辑 `connection_test.py`,然后运行: ```bash python Code/connection_test.py ``` 预期输出:`Connection successful.` ### 5. 运行管道 ```bash python Code/pipeline.py ``` 该管道将开始轮询 API、处理数据,并实时将记录加载到 SQL Server 中。 ## Schema 设计 ```sql -- 核心价格表 CREATE TABLE crypto_prices ( id INT IDENTITY PRIMARY KEY, symbol VARCHAR(20) NOT NULL, price_usd DECIMAL(18,8) NOT NULL, volume_24h DECIMAL(24,2), market_cap DECIMAL(24,2), captured_at DATETIME2 NOT NULL DEFAULT GETUTCDATE(), INDEX ix_symbol_time (symbol, captured_at DESC) ); -- 异常警报表 CREATE TABLE price_alerts ( id INT IDENTITY PRIMARY KEY, symbol VARCHAR(20) NOT NULL, price_usd DECIMAL(18,8) NOT NULL, z_score FLOAT NOT NULL, alert_type VARCHAR(50), triggered_at DATETIME2 NOT NULL DEFAULT GETUTCDATE() ); ``` 在 `(symbol, captured_at DESC)` 上的复合索引确保了对任何时间窗口内的资产进行快速过滤——这对于大规模时间序列性能至关重要。 ## 异常检测逻辑 使用滚动 z-score 方法检测价格异常: 1. 计算可配置窗口内价格的滚动平均值和标准差 2. 计算每个传入价格点的 z-score 3. 标记任何满足 `|z_score| > 阈值` (默认:2.5) 的点 4. 将标记的记录连同计算出的 z-score 一起写入 `price_alerts` 表 这种方法检测相对于近期价格行为的突然飙升和暴跌——比静态阈值警报更具适应性。 ## 关键工程决策 **为什么选择轮询而不是 websocket 流?** 通过 REST 进行轮询更易于实现、调试更方便,并且足以满足分钟级的粒度需求。对于亚秒级延迟要求,升级到 websocket 是自然而然的下一步。 **为什么选择 SQL Server 而不是云数据仓库?** 该项目优先展示核心数据工程基础知识——schema 设计、索引策略、ETL 逻辑——而不依赖云基础设施。该架构可以直接映射到云等效方案 (Azure SQL, Snowflake, BigQuery)。 **为什么在 (symbol, captured_at) 上建立复合索引?** 大多数分析查询都是按资产在时间范围内进行过滤的。如果没有此索引,这些查询将需要进行全表扫描。有了它,它们就是 O(log n) 的查找。 ## 构建此项目的收获 - 错误处理和重试逻辑是任何生产管道中最重要的部分 - Schema 决策会产生复合影响——在设计时建立良好索引的表可以在以后节省数周的重构时间 - 异常检测的窗口大小需要经验性调优——太小会产生噪音,太大会遗漏真实信号 - 发布一个可运行的项目比任何教程学到的都多 ## 后续计划 - 添加 `requirements.txt` 以实现可复现的安装 - 为数据库凭证提供环境变量支持 (`.env` + `python-dotenv`) - 通过 `config.yaml` 支持可配置的轮询间隔和异常阈值 - 提供 Docker 支持,实现便携的本地部署 - 连接到 SQL Server 分析层的 Power BI 仪表板 ## 作者 Ray Molden — 正在向数据工程转型的数据分析师 LinkedIn · GitHub · Newsletter ## 许可证 MIT License — 可自由使用,请注明出处。
标签:CoinGecko API, ETL管道, JSON解析, Python, SQL Server, T-SQL, Z-Score, 代码示例, 价格监控, 加密货币, 实时数据流, 市场情报, 异常检测, 数据入库, 数据分析, 数据工程, 无后门, 时间序列分析, 流式处理, 生产级架构, 逆向工具, 金融数据分析, 金融科技