Aliipou/PipelineGuard

GitHub: Aliipou/PipelineGuard

数据管道健康监控工具,专注捕捉静默故障与性能漂移,防止「成功但零记录」的问题悄然腐蚀下游数据。

Stars: 0 | Forks: 0

# PipelineGuard 你的 pipeline 显示 **SUCCESS**,但处理了零条记录。直到三天后周报出错你才发现。 PipelineGuard 能在 30 秒内捕捉到这种情况。 [![CI](https://static.pigsec.cn/wp-content/uploads/repos/2026/03/40dbb85f54103222.svg)](https://github.com/Aliipou/PipelineGuard/actions/workflows/ci.yml) [![Python](https://img.shields.io/badge/Python-3.11+-3776AB?style=flat&logo=python)](https://python.org) [![License](https://img.shields.io/badge/License-MIT-green?style=flat)](LICENSE) ## 问题所在 数据 pipeline 会撒谎。一个报告 `status: SUCCEEDED` 但 `records_processed: 0` 的 pipeline 属于 **静默故障** —— 也是代价最高的一类。没有报错,没有警报,没有寻呼机响。只有下游数据的错误,直到有人注意到为止。 延迟漂移更加隐蔽。你的夜间 ETL 任务过去只需 40 分钟,现在却要 70 分钟。每一次单独运行看起来都很正常。对趋势视而不见的监控系统会完全忽略它。 ## PipelineGuard 的功能 **静默故障检测** —— 任何报告成功但处理了零条记录的执行,或者成功但伴随错误信息的执行,都会立即被标记为 `SILENT_FAILURE` 并生成 CRITICAL 级别警报。 **延迟漂移检测** —— 每次执行时长都会与滚动百分位基线(p50 + p95)进行比较,并通过 z-score 进行评分。当某个 pipeline 持续运行时间超过其 p50 基线的 25% 时,会在问题演变成危机之前触发 WARNING 警报。 **连续故障追踪** —— 每个 pipeline 可配置故障阈值。三次连续故障(或静默故障)将生成 CRITICAL 级别警报,无论单个故障的严重程度如何。 **Slack 通知** —— 警报会发送到你的频道,包含严重性颜色编码、完整上下文以及可操作的 pipeline 名称。 **每周健康摘要** —— 每周一一份简明的英文摘要:运行了多少任务,有多少静默失败,哪些 pipeline 正在漂移,以及前 5 大风险。工程师和 CTO 都能看懂。 ## 快速开始 ``` git clone https://github.com/Aliipou/PipelineGuard cd PipelineGuard cp deploy/docker/.env.example deploy/docker/.env docker compose -f deploy/docker/docker-compose.yml up -d ``` API 位于 `http://localhost:8000`。交互式文档位于 `http://localhost:8000/docs`。 ### 注册 Pipeline ``` curl -s -X POST http://localhost:8000/api/v1/tenants/{tenant_id}/pipelines \ -H "Content-Type: application/json" \ -d '{ "name": "nightly-user-sync", "source": "postgres://crm", "destination": "bigquery://warehouse", "schedule_cron": "0 2 * * *", "expected_duration_seconds": 1800, "failure_threshold": 3 }' ``` ### 报告执行情况 (Python SDK) ``` from pipelineguard import guard @guard( pipeline_id="", tenant_id="", api_url="http://localhost:8000", api_key="pg_live_...", ) def nightly_user_sync() -> int: users = fetch_users_from_crm() upsert_to_warehouse(users) return len(users) # auto-reported as records_processed # 如果返回 0 -> CRITICAL 告警立即触发 nightly_user_sync() ``` ### 从任何语言报告 ``` curl -s -X POST http://localhost:8000/api/v1/tenants/{tenant_id}/pipelines/{pipeline_id}/executions \ -H "Content-Type: application/json" \ -d '{ "status": "SUCCEEDED", "started_at": "2024-01-15T02:00:00Z", "finished_at": "2024-01-15T02:31:00Z", "duration_seconds": 1860, "records_processed": 0 }' # -> 检测到静默失败,CRITICAL 告警发送至 Slack,无需配置 ``` ## 架构 ``` Your pipelines (Airflow / cron / Celery / K8s jobs) | POST /executions | ┌────────▼─────────┐ │ PipelineGuard │ │ API │ └────────┬─────────┘ │ ┌─────────────────┼──────────────────┐ │ │ │ ┌────────▼───────┐ ┌───────▼────────┐ ┌──────▼──────────┐ │ Silent Failure│ │ Drift Analyzer│ │ Consecutive │ │ Detector │ │ (z-score + │ │ Failure Check │ │ │ │ percentile) │ │ │ └────────┬───────┘ └───────┬────────┘ └──────┬──────────┘ │ │ │ └─────────────────┼──────────────────┘ │ ┌─────────▼──────────┐ │ Alert Engine │ │ (Postgres) │ └─────────┬──────────┘ │ ┌─────────▼──────────┐ │ Slack Notifier │ │ (Block Kit) │ └────────────────────┘ ``` ## 检测逻辑 ### 静默故障 当出现以下情况时,任务执行被归类为 `SILENT_FAILURE`: - `status == SUCCEEDED` 且 `records_processed == 0` - `status == SUCCEEDED` 且 `error_message != ""` 这能捕捉到最常见现实场景中的故障模式:上游数据源返回空值,pipeline 正常退出,下游报告在不知不觉中变得陈旧。 ### 延迟漂移 (DriftAnalyzer) 对于每次执行,PipelineGuard 计算: ``` rolling_window = last 100 durations for this pipeline p50 = median(rolling_window) z_score = (current - mean) / stdev is_drifting = current > p50 * 1.25 # 25% above baseline is_anomaly = |z_score| > 2.5 # 2.5 standard deviations ``` 百分位漂移(趋势检测)和 z-score(尖峰检测)的结合,能以极低的误报率捕捉到逐渐变慢和突发异常两种情况。 ## Slack 警报示例 当检测到静默故障时,会立即触发: ``` 🔴 Silent Failure: nightly-user-sync Pipeline 'nightly-user-sync' (postgres://crm -> bigquery://warehouse) reported success but processed 0 records. Type: Silent Failure Severity: CRITICAL Pipeline: nightly-user-sync Alert ID: `a1b2c3d4...` ──────────────────────────────────────── PipelineGuard | Tenant `acme-corp...` ``` ## API 参考 | Method | Endpoint | Description | |--------|----------|-------------| | POST | `/api/v1/tenants/{tid}/pipelines` | 注册 pipeline | | GET | `/api/v1/tenants/{tid}/pipelines` | 列出 pipeline | | POST | `/api/v1/tenants/{tid}/pipelines/{pid}/executions` | 报告执行情况 | | GET | `/api/v1/tenants/{tid}/pipelines/{pid}/executions` | 执行历史 | | GET | `/api/v1/tenants/{tid}/pipelines/{pid}/latency` | 延迟 + 漂移数据 | | GET | `/api/v1/tenants/{tid}/alerts` | 活动警报 | | POST | `/api/v1/tenants/{tid}/alerts/{aid}/acknowledge` | 确认警报 | | GET | `/api/v1/tenants/{tid}/summary` | 最新每周摘要 | | POST | `/api/v1/tenants/{tid}/summary/generate` | 立即生成摘要 | 完整的 OpenAPI 规范位于 `/docs` (Swagger UI) 或 `/redoc`。 ## 配置 ``` # deploy/docker/.env APP_POSTGRES_HOST=postgres APP_POSTGRES_PASSWORD=your_password APP_REDIS_URL=redis://redis:6379/0 APP_SLACK_WEBHOOK_URL=https://hooks.slack.com/services/T.../B.../... APP_JWT_PRIVATE_KEY=... # RS256 — generate with scripts/generate_keys.py ``` ## 运行测试 ``` pip install -e ".[dev]" make test # unit + integration make test-unit # unit only (no database required) make lint # ruff + mypy ``` ## 生产环境部署 参阅 [deploy/k8s/](deploy/k8s/) 获取 Kubernetes 清单,包含: - API deployment + HPA - Celery worker deployment - PostgreSQL StatefulSet - Redis deployment - Prometheus + Grafana + Loki 可观测性栈 - Network policies(默认拒绝所有,按服务设置允许列表) 参阅 [deploy/terraform/](deploy/terraform/) 获取 AWS 基础设施模块 (ECS, RDS, ElastiCache, VPC)。 ## 许可证 MIT
标签:CI/CD安全, DevSecOps, ETL监控, Llama, Python, Slack通知, 上游代理, 代码质量门禁, 告警系统, 子域名突变, 延迟漂移, 异常检测, 搜索引擎查询, 数据管道, 数据质量, 文档安全, 无后门, 流水线可观测性, 测试用例, 自定义请求头, 请求拦截, 软件供应链安全, 软件工程, 运维监控, 远程方法调用, 逆向工具, 部署护栏, 错误追踪, 静默失败检测