BishopSlice/PipelineReliability
GitHub: BishopSlice/PipelineReliability
基于多 Agent 架构的 Fivetran 数据管道故障自动调查、诊断与修复响应系统,支持人工审批环节。
Stars: 0 | Forks: 0
# PRE — Pipeline Reliability Engineer
一个用于调查、诊断和修复 Fivetran 数据 pipeline 故障的多 agent 系统 —— 并在回路中保留人工干预。
**[→ 在线演示](https://pre-agent-498009659419.us-central1.run.app)**
https://github.com/user-attachments/assets/12dad56a-f9a2-4d66-a3b1-f69919c64149
## 跳转到任意章节:
1. [问题背景](#1-the-problem)
2. [在线演示](#2-live-demo)
3. [系统架构](#3-architecture)
4. [多 Agent Pipeline](#4-multi-agent-pipeline)
5. [Fivetran MCP 集成](#5-fivetran-mcp-integration)
6. [工程挑战](#6-engineering-challenges)
7. [生产环境强化](#7-production-hardening)
8. [评估面板](#8-eval-dashboard)
9. [本地运行](#9-running-locally)
10. [测试](#10-tests)
## 1. 问题背景
Fivetran 中的 pipeline 故障主要发生在以下四个层面之一:
| 层级 | 负责内容 |
|---|---|
| **Connector** | 源认证、同步调度、网络连通性 |
| **Schema** | 字段是否存在、类型兼容性、包含/排除配置 |
| **Transformation** | 下游 SQL/dbt 模型执行、依赖解析 |
| **Data Quality** | 行数、NULL 率、重复率、数值范围违规 |
每个层级都会暴露不同的信号,需要调用不同的 API 进行调查,且有着不同的修复路径。Connector 层的故障看起来就像是下游的 schema 故障。Schema 的变更会破坏 transformation,进而引发 Data Quality 告警。这些层级是相互耦合的;仅从表象很难看出故障的根本来源。
数据工程师往往需要花费数小时,通过手动交叉比对 Fivetran 面板、dbt 运行日志以及仓库查询输出,才能开始着手处理。信号客观存在,API 也是现成的,真正的瓶颈在于整个分流排查的循环过程。
PRE 缩短了这一循环:通过实时同步检测故障,确定性地对根本原因层级进行分类,派遣专门的 agent 通过实时 Fivetran API 进行调查,并在真实系统中执行任何修复操作之前,将其路由至人工审批节点。
## 2. 在线演示
**[https://pre-agent-498009659419.us-central1.run.app](https://pre-agent-498009659419.us-central1.run.app)**
当您在某个连接上点击“同步”时:PRE 会触发实时同步,等待其稳定,如果适用还会运行关联的 transformation,并将生成的故障信号传递给 agent pipeline。每一次运行都会直接请求实时的 Fivetran API。
- **连接 1** — HITL 流程:PRE 检测到同步失败,诊断出根本原因,将修复操作排入队列,并一直阻塞直到您批准或拒绝。批准后将执行真实的 Fivetran API 写入操作,并生成一个 Connect Card URL。
- **连接 2** — 多 Agent 交接:同步成功,但 transformation 失败。Schema agent 会调查类型不匹配问题;调查结果将被移交给 Transformation agent。
- **连接 3** — Schema 诊断:由于缺少字段导致 transformation 失败。Schema agent 将根本原因追溯到同步配置。
## 3. 系统架构
```
── Monitoring Phase ──────────────────────────────────────────────────────
│
│ Fivetran sync triggered ──▶ poll until settled ──▶ run transformation
│ │ │
│ └──── sync failed? transformation failed?
│ │ │
│ └─────────────┬───────────────────────────┘
│ │
│ build signal payload
│ (sync_status / transformation_event)
│
── Agent Phase ───────────────────────────────────────────────────────────
│
│ ┌─────────────────────────────────────────┐
│ signal ──▶ │ Orchestrator (PRE) │
│ │ score_layers → classify → blast_radius │
│ └──────┬──────────────────────────────────┘
│ │ AgentTool dispatch (returns to orchestrator)
│ ┌───────────────────┼──────────────────────┐
│ ▼ ▼ ▼
│ ┌──────────┐ ┌──────────┐ ┌─────────────────────┐
│ │Connector │ │ Schema │ │ Transformation / │
│ │ Agent │ │ Agent │ │ Data Quality Agent │
│ └────┬─────┘ └────┬─────┘ └──────────┬──────────┘
│ └─────────────────┬┘──────────────────────────┘
│ │
│ actions_queued_for_approval?
│ │
│ ┌─────────▼──────────┐
│ │ HITL Gate │ ← async def awaits gate_queue.get()
│ │ (truly blocking) │ suspends runner.run_async() until
│ └─────────┬──────────┘ human approves or denies
│ │ approved
│ ┌─────────▼──────────┐
│ │ Post-Fix Validation│ ← LoopAgent
│ │ │ (transformation_runner
│ │ │ → dq_validator × N)
│ └─────────┬──────────┘
│ │
│ ┌─────────▼──────────┐
│ │ IncidentSummary │ ← Pydantic-validated via
│ │ resolved / │ close_incident tool call
│ │ escalated / partial│ (never from LLM text)
│ └────────────────────┘
│
│ All agent tool calls ──▶ Fivetran MCP Server (server.py, 77 tools)
│ │
│ Fivetran REST API
──────────────────────────────────────────────────────────────────────────
```
**技术栈**
| 层级 | 技术 |
|---|---|
| Agent 框架 | Google ADK 2.1.0 (`LlmAgent`, `LoopAgent`, `AgentTool`, `Runner`) |
| 模型 | 运行在 Vertex AI 上的 Gemini 3.1 Flash Lite (`GOOGLE_CLOUD_LOCATION=global`) |
| 工具集成 | Fivetran MCP Server (自定义 `server.py`, stdio 传输) |
| HITL 节点 | 通过 `asyncio.Queue` 阻塞 `runner.run_async()` 的异步 Python 函数 |
| 演示服务器 | FastAPI + Server-Sent Events + Alpine.js |
| 评估持久化 | SQLite (本地) / Firestore 优雅降级 (Cloud Run) |
| 可观测性 | OpenTelemetry → Google Cloud Trace (agent 级别的 spans) |
| 部署 | Google Cloud Run (2Gi RAM, 2 CPU, 3600s 超时, 最少 1 个实例) |
## 4. 多 Agent Pipeline
每个故障层级都有对应的专门 agent 以及经过过滤的工具集。如果让单个 agent 掌握 77 个可用工具,它将面临无法管理的工具选择难题,甚至可能在上下文不充分的情况下触发写操作。
| Agent | 暴露的工具 | 作用域 |
|---|---|---|
| Connector | `get_connection_details`, `run_connection_setup_tests`, `list_connections_in_group` | 认证、同步状态、网络连通性 |
| Schema | `get_connection_schema_config`, `get_connection_column_config`, `list_transformations` | 字段类型漂移、排除规则、schema 差异 |
| Transformation | `get_transformation_details`, `run_transformation`, `get_connection_details` | 执行错误、依赖故障、重新运行 |
| Data Quality | `get_connection_details`, `get_connection_schema_config`, `get_transformation_details` | NULL 激增、重复率、行数异常 |
在诊断期间无法使用写工具 —— 它们仅在 HITL 批准后供 orchestrator 使用。
**4.1. 分类:**
- `score_layers()` 根据信号,使用基于规则的评分标准对各个层级进行打分。
- `classify()` 应用置信度阈值。在平局的情况下,它会优先分配给最早的层级。
- 评分具有确定性。LLM 负责调度分发。
**4.2. AgentTool:**
- 子 agent 被注册为 `AgentTool`,而不是 `sub_agents`。
- 使用 `AgentTool` 时,输出将作为 `FunctionResponse` 返回,随后 orchestrator 会继续执行。
**4.3. HITL 节点:**
- `request_approval` 是一个 `async def` ADK 工具,它会等待 `gate_queue.get()`,在人类做出决定之前,会在工具级别阻塞 `runner.run_async()`。
**4.4. 修复后验证:**
- `LoopAgent` 会确认恢复情况。
- 每个验证 agent 必须通过调用只读工具独立验证状态。
- Fivetran 的写操作是异步的 —— 返回 200 意味着系统已接受请求,但不代表已完成。
- 只有当后续的读取操作返回已验证的终态时,才能确认已成功恢复。
## 5. Fivetran MCP 集成
**5.1. 自定义 MCP server:**
- 这个自定义的 server 暴露了涵盖连接、schema、transformation、destination 和 group 的 77 种工具。
- 它作为子进程通过 MCP 的 stdio 传输方式 (`StdioServerParameters`) 运行。
**5.2. 双重工具集访问模型:**
- 子 agent:只读的 `McpToolset` (`FIVETRAN_ALLOW_WRITES=false`),并配合 `tool_filter` 将每个 agent 的权限限制在其负责的领域内。
- Orchestrator:单独的、启用了写入权限的实例,仅在 HITL 批准后用于执行特定的已批准工具。
**5.3. 结构化错误响应:**
- 所有 Fivetran API 错误 (400–409) 都会返回 `{"error": true, "code": N, "message": "..."}`。
- agent 会根据错误代码进行分支处理,而不是将所有非 200 的响应视为致命错误。
- 409 Conflict 被视为确认信号(操作已在进行中)—— 不会进行重试。
**5.4. 响应缓存:**
- GET 响应会在进程内缓存,TTL 为 30 秒。
- 避免了当多个 agent 在单次运行中调用同一个读取端点时产生的重复往返。
- 写入调用永远不会被缓存。
## 6. 工程挑战
### 6.1. LLM 的非确定性
**A. 结构化输出**
- Gemini 会将 JSON 包裹在 Markdown 代码块中,捏造字段名称,并返回枚举集合之外的值。
- 每个 agent 边界都有一个标准化层(`_strip_markdown_json`, `_coerce_agent_finding`, 字段别名),可在 Pydantic 验证之前吸收并抹平这些差异。
**B. 过早的工具执行**
- Orchestrator 曾在同一个批次中,与 `request_approval` 一起调用了已批准的写工具,导致审批节点未能及时对其进行阻塞。
- `LongRunningFunctionTool` 虽然可以暂停外层循环,但模型的多次重试发生在单次 `runner.run_async()` 调用内部。
- 将 `request_approval` 改为挂起运行器的原生 `async def` 函数,从而消除了竞态条件。
**C. 非结构化关闭**
- Orchestrator 偶尔会输出推理过程描述,而不是调用 `close_incident`。
- 合成回退机制会从 session 状态中最后一次的 `AgentFinding` 构建出一个最小化的升级结果,从而确保 eval DB 始终能接收到可用的数据行。
**D. 虚假的恢复确认**
- Gemini 曾将写工具的 200 响应视为状态已更改的证据。
- 如今,所有的写操作都需要在断言恢复之前,通过随后的只读工具调用确认已达终态。
**E. 上下文窗口激增**
- 级联发生的故障事件会积累大量 `agent_findings`,挤占了系统提示词的空间。
- 设定了一个强制上限,在超过可配置的阈值时进行裁剪,并在发生截断时输出 `logger.warning`。
### 6.2. MCP 集成
**A. 子进程环境**
- 仅将 Fivetran 凭据传递给子进程环境会导致 `ModuleNotFoundError`。
- 子进程必须继承完整的 `os.environ` 才能找到已安装的包。
**B. `request_body` 类型不匹配**
- MCP server 将 `request_body` 声明为 `"type": "string"`。
- Agent 将其作为 Python 字典排入队列。第一次真正的 HITL 批准在 MCP 输入验证阶段失败了。通过允许接受 `oneOf: [string, object]` 修复了此问题。
**C. 隐式的依赖包缺失**
- ADK 2.1.0 将 MCP 的导入封装在 `try/except ImportError` 中,如果未安装 `mcp`,它会静默丢弃这些导入。
- Agent 初始化时没有任何工具,且不报错。已将 `mcp>=1.0.0` 添加为显式依赖。
**D. ADK 版本锁定**
- `google-adk==2.2.0` 破坏了 `McpToolset` 的导入路径。
- 在 `pyproject.toml` 和 Dockerfile 的 `pip install` 步骤中将其锁定为 `==2.1.0`。
## 7. 生产环境强化
PRE 的设计理念是优雅降级,而不是静默失败。关键机制如下:
| 关注点 | 机制 |
|---|---|
| 瞬态 429 错误 | `HttpRetryOptions` |
| 持续的配额耗尽 | 外层重试循环 |
| 运行超时 | `asyncio.wait_for()` 配合升级回退机制 |
| 工具幻觉 | 三次触发强制中断 |
| 推理死循环 | 单 Agent 分发上限 |
| 提示词注入 | 输入过滤与清理 |
| 非结构化关闭 | 合成回退机制 |
| 未验证即写入 | Pydantic 验证过的 `IncidentSummary` 由工具调用写入,而不是从 LLM 文本中解析 |
| 上下文溢出 | 对 `agent_findings` 施加硬性上限,并在截断时输出 `logger.warning` |
## 8. 评估面板
每次运行都会持久化到 eval 存储库 (`/evals`) 中。面板会追踪:
- **两阶段延迟**:监控阶段和 agent 阶段以单独的列分别显示。
- **按 Agent 细分**:每次 agent 运行的持续时间和 token 归因。
- **OTPS**:每秒输出 token 数。
- **P95 / P50 基准**:在 5 次运行后计算得出;当新运行在持续时间或成本上超过 2 倍的 P95 时,会触发 `logger.warning`。
- **Token 预算告警**:从第 1 次运行开始,当 `tokens_out > OUTPUT_TOKEN_BUDGET` 时就会触发。
## 9. 本地运行
**前置条件**:Python 3.11+,一个拥有 API 凭据的 Fivetran 账户,以及一个启用了 Vertex AI 的 GCP 项目。
```
git clone https://github.com/BishopSlice/PipelineReliability.git
cd PipelineReliability
# 安装 dependencies
pip install -e .
# 配置 credentials
cp .env.example .env
# 编辑 .env:设置 FIVETRAN_API_KEY、FIVETRAN_API_SECRET、GOOGLE_CLOUD_PROJECT、
# GOOGLE_GENAI_USE_VERTEXAI=true、GOOGLE_CLOUD_LOCATION=global
# 向 Google Cloud 进行认证
gcloud auth application-default login
# 启动 demo server
python3 -m uvicorn demo.server:app --host 0.0.0.0 --port 8000
# 打开 http://localhost:8000
```
**Cloud Run 部署**(需要经过认证的 `gcloud` CLI):
```
bash scripts/deploy.sh
```
## 10. 测试
97 个确定性的单元测试覆盖了护栏层 —— 即在 LLM 输出到达 session 状态之前对其进行封装和验证的。这里是最容易出现隐蔽回归问题的地方:强制转换 bug 不会导致系统崩溃,但会导致事件被路由到错误的地方。
```
python3 -m pytest tests/ -v # no credentials required
```
覆盖率:信号模型验证、分类器评分(涵盖所有的混淆因素和平局规则)、上下文强制转换、输入过滤与清理、爆炸半径标准化。
完整的 agent 运行需要实时的 Fivetran 凭据,并且会在每次发布之前针对三个演示连接进行手动验证。LLM 的非确定性使得端到端运行如果不进行响应重放,就不适合纳入自动化 CI 流程。
## 许可证
MIT — 详见 [LICENSE](LICENSE)。
标签:Fivetran, PyRIT, 事故响应, 多智能体系统, 数据工程, 数据管道, 用户代理, 自动化运维, 软件工程, 逆向工具