mfrancime/datahaus
GitHub: mfrancime/datahaus
基于 Airflow + DuckDB + Soda 的多交易所加密货币市场数据运维参考平台,实现自动化摄取、质量校验、跨源对账与混沌测试。
Stars: 0 | Forks: 0
# datahaus
一个使用加密货币数据源的 7×24 小时市场数据运营的参考实现——选择加密货币是因为它们提供了公开可用、免许可证、多源的数据,并具有逼真的运营特征(高速率、严格的时效性要求、跨源对账需求)。
datahaus 展示了生产环境下的数据运营是什么样的:自动化摄取、数据质量监控、事件检测、混沌测试以及基于 runbook 的响应——这些是数据运营团队的日常工作。
## 架构
有关五层设计,请参阅 [docs/architecture.md](docs/architecture.md)。
**第二阶段流水线:**
```
Binance + Bybit + OKX APIs → Exchange Adapters → Airflow DAGs → DuckDB → Soda checks
└──→ Reconciliation → Alert Router
```
## 快速开始
**前置条件:** Ubuntu/WSL2、Python 3.10+、Java 17+、`make`
```
git clone ~/datahaus && cd ~/datahaus
# 一次性设置
make setup # install Python dependencies
make airflow-init # initialize Airflow DB + admin user
make db-init # bootstrap DuckDB schema
# 如果从 Phase 1 升级:
make db-migrate # migrate existing Binance data to new schema
# Terminal 1:启动 Airflow
make airflow # scheduler + webserver on :8080
# Terminal 2:操作 pipelines
make trigger-all # trigger all 3 exchange DAGs
# wait ~1-2 min for completion
make db-query-all # verify data from all exchanges
make run-checks-all # run Soda checks (all should pass)
# 对账
make run-reconciliation # check for cross-exchange spread anomalies
make view-alerts # view alert history
# 混沌测试 — 新鲜度
make chaos-freshness-bybit # pause the Bybit DAG
# 等待 freshness threshold 过期,然后:
make run-checks-all # Bybit freshness check FAILS
make chaos-clear ARGS="--exchange bybit" # un-pause
make trigger-bybit # ingest fresh data
make run-checks-all # all checks pass again
# 混沌测试 — 延迟分歧
make chaos-spread # inject fake price anomaly
make run-reconciliation # spread alert fires
make view-alerts # see the alert
make chaos-spread-clean # clean up injected data
```
## Make 目标
| 目标 | 描述 |
|--------|-------------|
| **设置** | |
| `make setup` | 安装系统及 Python 依赖 |
| `make airflow-init` | 初始化 Airflow 数据库和管理员用户 |
| `make db-init` | 引导 DuckDB schema |
| `make db-migrate` | 将第一阶数据迁移至第二阶段 schema |
| **运行** | |
| `make airflow` | 启动 Airflow 独立模式 (scheduler + webserver) |
| `make trigger-binance` | 触发 Binance klines DAG |
| `make trigger-bybit` | 触发 Bybit klines DAG |
| `make trigger-okx` | 触发 OKX klines DAG |
| `make trigger-all` | 触发所有交易所 DAG |
| `make run-reconciliation` | 触发跨交易所对账 |
| `make run-checks` | 运行 Soda 检查 (Binance + 全局) |
| `make run-checks-all` | 运行所有交易所的 Soda 检查 |
| **混沌** | |
| `make chaos-freshness` | 暂停 Binance DAG(默认) |
| `make chaos-freshness-bybit` | 暂停 Bybit DAG |
| `make chaos-freshness-okx` | 暂停 OKX DAG |
| `make chaos-freshness-all` | 暂停所有 DAG |
| `make chaos-spread` | 注入虚假价格异常 |
| `make chaos-spread-clean` | 清理价差混沌 |
| `make chaos-clear` | 取消暂停 DAG |
| **开发** | |
| `make test` | 运行单元测试 |
| `make db-query` | 显示最新的 Binance 数据 |
| `make db-query-all` | 显示各交易所数据摘要 |
| `make view-alerts` | 显示最近的告警 |
| `make help` | 列出所有目标 |
## 技术栈
| 组件 | 工具 |
|-----------|------|
| 编排 | Apache Airflow (standalone) |
| 计算 | 交易所适配器 (纯 Python) |
| 存储 | DuckDB (带有 exchange 列的统一表) |
| 数据质量 | Soda Core (单交易所 + 全局检查) |
| 对账 | 跨交易所价差检测 |
| 告警 | 基于 JSONL 的文件 + 可选的 Slack webhook |
| 混沌脚本 | Python + Airflow REST API + DuckDB 注入 |
## 阶段路线图
- **第一阶段** *(已完成)*:单条端到端流水线 — Binance BTC/USDT klines → DuckDB → Soda 检查 → 手动混沌
- **第二阶段** *(已完成)*:多交易所 (Bybit, OKX)、跨交易所对账、告警路由 + Slack
- **第三阶段**:Chaos Engine 框架、Shift Console UI、脚本化故障场景
- **第四阶段**:Polymarket 作为第二数据域、runbook 库、演示视频
## 为什么选择加密货币数据?
加密货币市场 7x24x365 全天候交易,其可公开访问的 API 不需要许可证或身份验证。这提供了:
- **持续的数据流**,用于真实的运营监控
- **
标签:Apache Airflow, Bybit, DataOps, DuckDB, ETL, IP 地址批量处理, JavaCC, K线数据, OHLCV, OKX, Python, Runbook, Soda, 代理支持, 加密货币, 告警, 命令控制, 多交易所, 对账, 币安, 异常检测, 故障应急, 数据工程, 数据接入, 数据新鲜度监控, 数据校验, 数据质量, 数据运维平台, 数据运营, 数据采集, 无后门, 时序数据库, 混沌工程, 自动化流水线, 行情数据, 逆向工具, 量化金融