Aliipou/PipelineGuard
GitHub: Aliipou/PipelineGuard
数据管道健康监控工具,专注捕捉静默故障与性能漂移,防止「成功但零记录」的问题悄然腐蚀下游数据。
Stars: 0 | Forks: 0
# PipelineGuard
你的 pipeline 显示 **SUCCESS**,但处理了零条记录。直到三天后周报出错你才发现。
PipelineGuard 能在 30 秒内捕捉到这种情况。
[](https://github.com/Aliipou/PipelineGuard/actions/workflows/ci.yml)
[](https://python.org)
[](LICENSE)
## 问题所在
数据 pipeline 会撒谎。一个报告 `status: SUCCEEDED` 但 `records_processed: 0` 的 pipeline 属于 **静默故障** —— 也是代价最高的一类。没有报错,没有警报,没有寻呼机响。只有下游数据的错误,直到有人注意到为止。
延迟漂移更加隐蔽。你的夜间 ETL 任务过去只需 40 分钟,现在却要 70 分钟。每一次单独运行看起来都很正常。对趋势视而不见的监控系统会完全忽略它。
## PipelineGuard 的功能
**静默故障检测** —— 任何报告成功但处理了零条记录的执行,或者成功但伴随错误信息的执行,都会立即被标记为 `SILENT_FAILURE` 并生成 CRITICAL 级别警报。
**延迟漂移检测** —— 每次执行时长都会与滚动百分位基线(p50 + p95)进行比较,并通过 z-score 进行评分。当某个 pipeline 持续运行时间超过其 p50 基线的 25% 时,会在问题演变成危机之前触发 WARNING 警报。
**连续故障追踪** —— 每个 pipeline 可配置故障阈值。三次连续故障(或静默故障)将生成 CRITICAL 级别警报,无论单个故障的严重程度如何。
**Slack 通知** —— 警报会发送到你的频道,包含严重性颜色编码、完整上下文以及可操作的 pipeline 名称。
**每周健康摘要** —— 每周一一份简明的英文摘要:运行了多少任务,有多少静默失败,哪些 pipeline 正在漂移,以及前 5 大风险。工程师和 CTO 都能看懂。
## 快速开始
```
git clone https://github.com/Aliipou/PipelineGuard
cd PipelineGuard
cp deploy/docker/.env.example deploy/docker/.env
docker compose -f deploy/docker/docker-compose.yml up -d
```
API 位于 `http://localhost:8000`。交互式文档位于 `http://localhost:8000/docs`。
### 注册 Pipeline
```
curl -s -X POST http://localhost:8000/api/v1/tenants/{tenant_id}/pipelines \
-H "Content-Type: application/json" \
-d '{
"name": "nightly-user-sync",
"source": "postgres://crm",
"destination": "bigquery://warehouse",
"schedule_cron": "0 2 * * *",
"expected_duration_seconds": 1800,
"failure_threshold": 3
}'
```
### 报告执行情况 (Python SDK)
```
from pipelineguard import guard
@guard(
pipeline_id="",
tenant_id="",
api_url="http://localhost:8000",
api_key="pg_live_...",
)
def nightly_user_sync() -> int:
users = fetch_users_from_crm()
upsert_to_warehouse(users)
return len(users) # auto-reported as records_processed
# 如果返回 0 -> CRITICAL 告警立即触发
nightly_user_sync()
```
### 从任何语言报告
```
curl -s -X POST http://localhost:8000/api/v1/tenants/{tenant_id}/pipelines/{pipeline_id}/executions \
-H "Content-Type: application/json" \
-d '{
"status": "SUCCEEDED",
"started_at": "2024-01-15T02:00:00Z",
"finished_at": "2024-01-15T02:31:00Z",
"duration_seconds": 1860,
"records_processed": 0
}'
# -> 检测到静默失败,CRITICAL 告警发送至 Slack,无需配置
```
## 架构
```
Your pipelines
(Airflow / cron / Celery / K8s jobs)
|
POST /executions
|
┌────────▼─────────┐
│ PipelineGuard │
│ API │
└────────┬─────────┘
│
┌─────────────────┼──────────────────┐
│ │ │
┌────────▼───────┐ ┌───────▼────────┐ ┌──────▼──────────┐
│ Silent Failure│ │ Drift Analyzer│ │ Consecutive │
│ Detector │ │ (z-score + │ │ Failure Check │
│ │ │ percentile) │ │ │
└────────┬───────┘ └───────┬────────┘ └──────┬──────────┘
│ │ │
└─────────────────┼──────────────────┘
│
┌─────────▼──────────┐
│ Alert Engine │
│ (Postgres) │
└─────────┬──────────┘
│
┌─────────▼──────────┐
│ Slack Notifier │
│ (Block Kit) │
└────────────────────┘
```
## 检测逻辑
### 静默故障
当出现以下情况时,任务执行被归类为 `SILENT_FAILURE`:
- `status == SUCCEEDED` 且 `records_processed == 0`
- `status == SUCCEEDED` 且 `error_message != ""`
这能捕捉到最常见现实场景中的故障模式:上游数据源返回空值,pipeline 正常退出,下游报告在不知不觉中变得陈旧。
### 延迟漂移 (DriftAnalyzer)
对于每次执行,PipelineGuard 计算:
```
rolling_window = last 100 durations for this pipeline
p50 = median(rolling_window)
z_score = (current - mean) / stdev
is_drifting = current > p50 * 1.25 # 25% above baseline
is_anomaly = |z_score| > 2.5 # 2.5 standard deviations
```
百分位漂移(趋势检测)和 z-score(尖峰检测)的结合,能以极低的误报率捕捉到逐渐变慢和突发异常两种情况。
## Slack 警报示例
当检测到静默故障时,会立即触发:
```
🔴 Silent Failure: nightly-user-sync
Pipeline 'nightly-user-sync' (postgres://crm -> bigquery://warehouse)
reported success but processed 0 records.
Type: Silent Failure Severity: CRITICAL
Pipeline: nightly-user-sync Alert ID: `a1b2c3d4...`
────────────────────────────────────────
PipelineGuard | Tenant `acme-corp...`
```
## API 参考
| Method | Endpoint | Description |
|--------|----------|-------------|
| POST | `/api/v1/tenants/{tid}/pipelines` | 注册 pipeline |
| GET | `/api/v1/tenants/{tid}/pipelines` | 列出 pipeline |
| POST | `/api/v1/tenants/{tid}/pipelines/{pid}/executions` | 报告执行情况 |
| GET | `/api/v1/tenants/{tid}/pipelines/{pid}/executions` | 执行历史 |
| GET | `/api/v1/tenants/{tid}/pipelines/{pid}/latency` | 延迟 + 漂移数据 |
| GET | `/api/v1/tenants/{tid}/alerts` | 活动警报 |
| POST | `/api/v1/tenants/{tid}/alerts/{aid}/acknowledge` | 确认警报 |
| GET | `/api/v1/tenants/{tid}/summary` | 最新每周摘要 |
| POST | `/api/v1/tenants/{tid}/summary/generate` | 立即生成摘要 |
完整的 OpenAPI 规范位于 `/docs` (Swagger UI) 或 `/redoc`。
## 配置
```
# deploy/docker/.env
APP_POSTGRES_HOST=postgres
APP_POSTGRES_PASSWORD=your_password
APP_REDIS_URL=redis://redis:6379/0
APP_SLACK_WEBHOOK_URL=https://hooks.slack.com/services/T.../B.../...
APP_JWT_PRIVATE_KEY=... # RS256 — generate with scripts/generate_keys.py
```
## 运行测试
```
pip install -e ".[dev]"
make test # unit + integration
make test-unit # unit only (no database required)
make lint # ruff + mypy
```
## 生产环境部署
参阅 [deploy/k8s/](deploy/k8s/) 获取 Kubernetes 清单,包含:
- API deployment + HPA
- Celery worker deployment
- PostgreSQL StatefulSet
- Redis deployment
- Prometheus + Grafana + Loki 可观测性栈
- Network policies(默认拒绝所有,按服务设置允许列表)
参阅 [deploy/terraform/](deploy/terraform/) 获取 AWS 基础设施模块 (ECS, RDS, ElastiCache, VPC)。
## 许可证
MIT
标签:CI/CD安全, DevSecOps, ETL监控, Llama, Python, Slack通知, 上游代理, 代码质量门禁, 告警系统, 子域名突变, 延迟漂移, 异常检测, 搜索引擎查询, 数据管道, 数据质量, 文档安全, 无后门, 流水线可观测性, 测试用例, 自定义请求头, 请求拦截, 软件供应链安全, 软件工程, 运维监控, 远程方法调用, 逆向工具, 部署护栏, 错误追踪, 静默失败检测