BishopSlice/PipelineReliability

GitHub: BishopSlice/PipelineReliability

基于多 Agent 架构的 Fivetran 数据管道故障自动调查、诊断与修复响应系统,支持人工审批环节。

Stars: 0 | Forks: 0

# PRE — Pipeline Reliability Engineer 一个用于调查、诊断和修复 Fivetran 数据 pipeline 故障的多 agent 系统 —— 并在回路中保留人工干预。 **[→ 在线演示](https://pre-agent-498009659419.us-central1.run.app)** https://github.com/user-attachments/assets/12dad56a-f9a2-4d66-a3b1-f69919c64149 ## 跳转到任意章节: 1. [问题背景](#1-the-problem) 2. [在线演示](#2-live-demo) 3. [系统架构](#3-architecture) 4. [多 Agent Pipeline](#4-multi-agent-pipeline) 5. [Fivetran MCP 集成](#5-fivetran-mcp-integration) 6. [工程挑战](#6-engineering-challenges) 7. [生产环境强化](#7-production-hardening) 8. [评估面板](#8-eval-dashboard) 9. [本地运行](#9-running-locally) 10. [测试](#10-tests) ## 1. 问题背景 Fivetran 中的 pipeline 故障主要发生在以下四个层面之一: | 层级 | 负责内容 | |---|---| | **Connector** | 源认证、同步调度、网络连通性 | | **Schema** | 字段是否存在、类型兼容性、包含/排除配置 | | **Transformation** | 下游 SQL/dbt 模型执行、依赖解析 | | **Data Quality** | 行数、NULL 率、重复率、数值范围违规 | 每个层级都会暴露不同的信号,需要调用不同的 API 进行调查,且有着不同的修复路径。Connector 层的故障看起来就像是下游的 schema 故障。Schema 的变更会破坏 transformation,进而引发 Data Quality 告警。这些层级是相互耦合的;仅从表象很难看出故障的根本来源。 数据工程师往往需要花费数小时,通过手动交叉比对 Fivetran 面板、dbt 运行日志以及仓库查询输出,才能开始着手处理。信号客观存在,API 也是现成的,真正的瓶颈在于整个分流排查的循环过程。 PRE 缩短了这一循环:通过实时同步检测故障,确定性地对根本原因层级进行分类,派遣专门的 agent 通过实时 Fivetran API 进行调查,并在真实系统中执行任何修复操作之前,将其路由至人工审批节点。 ## 2. 在线演示 **[https://pre-agent-498009659419.us-central1.run.app](https://pre-agent-498009659419.us-central1.run.app)** 当您在某个连接上点击“同步”时:PRE 会触发实时同步,等待其稳定,如果适用还会运行关联的 transformation,并将生成的故障信号传递给 agent pipeline。每一次运行都会直接请求实时的 Fivetran API。 - **连接 1** — HITL 流程:PRE 检测到同步失败,诊断出根本原因,将修复操作排入队列,并一直阻塞直到您批准或拒绝。批准后将执行真实的 Fivetran API 写入操作,并生成一个 Connect Card URL。 - **连接 2** — 多 Agent 交接:同步成功,但 transformation 失败。Schema agent 会调查类型不匹配问题;调查结果将被移交给 Transformation agent。 - **连接 3** — Schema 诊断:由于缺少字段导致 transformation 失败。Schema agent 将根本原因追溯到同步配置。 ## 3. 系统架构 ``` ── Monitoring Phase ────────────────────────────────────────────────────── │ │ Fivetran sync triggered ──▶ poll until settled ──▶ run transformation │ │ │ │ └──── sync failed? transformation failed? │ │ │ │ └─────────────┬───────────────────────────┘ │ │ │ build signal payload │ (sync_status / transformation_event) │ ── Agent Phase ─────────────────────────────────────────────────────────── │ │ ┌─────────────────────────────────────────┐ │ signal ──▶ │ Orchestrator (PRE) │ │ │ score_layers → classify → blast_radius │ │ └──────┬──────────────────────────────────┘ │ │ AgentTool dispatch (returns to orchestrator) │ ┌───────────────────┼──────────────────────┐ │ ▼ ▼ ▼ │ ┌──────────┐ ┌──────────┐ ┌─────────────────────┐ │ │Connector │ │ Schema │ │ Transformation / │ │ │ Agent │ │ Agent │ │ Data Quality Agent │ │ └────┬─────┘ └────┬─────┘ └──────────┬──────────┘ │ └─────────────────┬┘──────────────────────────┘ │ │ │ actions_queued_for_approval? │ │ │ ┌─────────▼──────────┐ │ │ HITL Gate │ ← async def awaits gate_queue.get() │ │ (truly blocking) │ suspends runner.run_async() until │ └─────────┬──────────┘ human approves or denies │ │ approved │ ┌─────────▼──────────┐ │ │ Post-Fix Validation│ ← LoopAgent │ │ │ (transformation_runner │ │ │ → dq_validator × N) │ └─────────┬──────────┘ │ │ │ ┌─────────▼──────────┐ │ │ IncidentSummary │ ← Pydantic-validated via │ │ resolved / │ close_incident tool call │ │ escalated / partial│ (never from LLM text) │ └────────────────────┘ │ │ All agent tool calls ──▶ Fivetran MCP Server (server.py, 77 tools) │ │ │ Fivetran REST API ────────────────────────────────────────────────────────────────────────── ``` **技术栈** | 层级 | 技术 | |---|---| | Agent 框架 | Google ADK 2.1.0 (`LlmAgent`, `LoopAgent`, `AgentTool`, `Runner`) | | 模型 | 运行在 Vertex AI 上的 Gemini 3.1 Flash Lite (`GOOGLE_CLOUD_LOCATION=global`) | | 工具集成 | Fivetran MCP Server (自定义 `server.py`, stdio 传输) | | HITL 节点 | 通过 `asyncio.Queue` 阻塞 `runner.run_async()` 的异步 Python 函数 | | 演示服务器 | FastAPI + Server-Sent Events + Alpine.js | | 评估持久化 | SQLite (本地) / Firestore 优雅降级 (Cloud Run) | | 可观测性 | OpenTelemetry → Google Cloud Trace (agent 级别的 spans) | | 部署 | Google Cloud Run (2Gi RAM, 2 CPU, 3600s 超时, 最少 1 个实例) | ## 4. 多 Agent Pipeline 每个故障层级都有对应的专门 agent 以及经过过滤的工具集。如果让单个 agent 掌握 77 个可用工具,它将面临无法管理的工具选择难题,甚至可能在上下文不充分的情况下触发写操作。 | Agent | 暴露的工具 | 作用域 | |---|---|---| | Connector | `get_connection_details`, `run_connection_setup_tests`, `list_connections_in_group` | 认证、同步状态、网络连通性 | | Schema | `get_connection_schema_config`, `get_connection_column_config`, `list_transformations` | 字段类型漂移、排除规则、schema 差异 | | Transformation | `get_transformation_details`, `run_transformation`, `get_connection_details` | 执行错误、依赖故障、重新运行 | | Data Quality | `get_connection_details`, `get_connection_schema_config`, `get_transformation_details` | NULL 激增、重复率、行数异常 | 在诊断期间无法使用写工具 —— 它们仅在 HITL 批准后供 orchestrator 使用。 **4.1. 分类:** - `score_layers()` 根据信号,使用基于规则的评分标准对各个层级进行打分。 - `classify()` 应用置信度阈值。在平局的情况下,它会优先分配给最早的层级。 - 评分具有确定性。LLM 负责调度分发。 **4.2. AgentTool:** - 子 agent 被注册为 `AgentTool`,而不是 `sub_agents`。 - 使用 `AgentTool` 时,输出将作为 `FunctionResponse` 返回,随后 orchestrator 会继续执行。 **4.3. HITL 节点:** - `request_approval` 是一个 `async def` ADK 工具,它会等待 `gate_queue.get()`,在人类做出决定之前,会在工具级别阻塞 `runner.run_async()`。 **4.4. 修复后验证:** - `LoopAgent` 会确认恢复情况。 - 每个验证 agent 必须通过调用只读工具独立验证状态。 - Fivetran 的写操作是异步的 —— 返回 200 意味着系统已接受请求,但不代表已完成。 - 只有当后续的读取操作返回已验证的终态时,才能确认已成功恢复。 ## 5. Fivetran MCP 集成 **5.1. 自定义 MCP server:** - 这个自定义的 server 暴露了涵盖连接、schema、transformation、destination 和 group 的 77 种工具。 - 它作为子进程通过 MCP 的 stdio 传输方式 (`StdioServerParameters`) 运行。 **5.2. 双重工具集访问模型:** - 子 agent:只读的 `McpToolset` (`FIVETRAN_ALLOW_WRITES=false`),并配合 `tool_filter` 将每个 agent 的权限限制在其负责的领域内。 - Orchestrator:单独的、启用了写入权限的实例,仅在 HITL 批准后用于执行特定的已批准工具。 **5.3. 结构化错误响应:** - 所有 Fivetran API 错误 (400–409) 都会返回 `{"error": true, "code": N, "message": "..."}`。 - agent 会根据错误代码进行分支处理,而不是将所有非 200 的响应视为致命错误。 - 409 Conflict 被视为确认信号(操作已在进行中)—— 不会进行重试。 **5.4. 响应缓存:** - GET 响应会在进程内缓存,TTL 为 30 秒。 - 避免了当多个 agent 在单次运行中调用同一个读取端点时产生的重复往返。 - 写入调用永远不会被缓存。 ## 6. 工程挑战 ### 6.1. LLM 的非确定性 **A. 结构化输出** - Gemini 会将 JSON 包裹在 Markdown 代码块中,捏造字段名称,并返回枚举集合之外的值。 - 每个 agent 边界都有一个标准化层(`_strip_markdown_json`, `_coerce_agent_finding`, 字段别名),可在 Pydantic 验证之前吸收并抹平这些差异。 **B. 过早的工具执行** - Orchestrator 曾在同一个批次中,与 `request_approval` 一起调用了已批准的写工具,导致审批节点未能及时对其进行阻塞。 - `LongRunningFunctionTool` 虽然可以暂停外层循环,但模型的多次重试发生在单次 `runner.run_async()` 调用内部。 - 将 `request_approval` 改为挂起运行器的原生 `async def` 函数,从而消除了竞态条件。 **C. 非结构化关闭** - Orchestrator 偶尔会输出推理过程描述,而不是调用 `close_incident`。 - 合成回退机制会从 session 状态中最后一次的 `AgentFinding` 构建出一个最小化的升级结果,从而确保 eval DB 始终能接收到可用的数据行。 **D. 虚假的恢复确认** - Gemini 曾将写工具的 200 响应视为状态已更改的证据。 - 如今,所有的写操作都需要在断言恢复之前,通过随后的只读工具调用确认已达终态。 **E. 上下文窗口激增** - 级联发生的故障事件会积累大量 `agent_findings`,挤占了系统提示词的空间。 - 设定了一个强制上限,在超过可配置的阈值时进行裁剪,并在发生截断时输出 `logger.warning`。 ### 6.2. MCP 集成 **A. 子进程环境** - 仅将 Fivetran 凭据传递给子进程环境会导致 `ModuleNotFoundError`。 - 子进程必须继承完整的 `os.environ` 才能找到已安装的包。 **B. `request_body` 类型不匹配** - MCP server 将 `request_body` 声明为 `"type": "string"`。 - Agent 将其作为 Python 字典排入队列。第一次真正的 HITL 批准在 MCP 输入验证阶段失败了。通过允许接受 `oneOf: [string, object]` 修复了此问题。 **C. 隐式的依赖包缺失** - ADK 2.1.0 将 MCP 的导入封装在 `try/except ImportError` 中,如果未安装 `mcp`,它会静默丢弃这些导入。 - Agent 初始化时没有任何工具,且不报错。已将 `mcp>=1.0.0` 添加为显式依赖。 **D. ADK 版本锁定** - `google-adk==2.2.0` 破坏了 `McpToolset` 的导入路径。 - 在 `pyproject.toml` 和 Dockerfile 的 `pip install` 步骤中将其锁定为 `==2.1.0`。 ## 7. 生产环境强化 PRE 的设计理念是优雅降级,而不是静默失败。关键机制如下: | 关注点 | 机制 | |---|---| | 瞬态 429 错误 | `HttpRetryOptions` | | 持续的配额耗尽 | 外层重试循环 | | 运行超时 | `asyncio.wait_for()` 配合升级回退机制 | | 工具幻觉 | 三次触发强制中断 | | 推理死循环 | 单 Agent 分发上限 | | 提示词注入 | 输入过滤与清理 | | 非结构化关闭 | 合成回退机制 | | 未验证即写入 | Pydantic 验证过的 `IncidentSummary` 由工具调用写入,而不是从 LLM 文本中解析 | | 上下文溢出 | 对 `agent_findings` 施加硬性上限,并在截断时输出 `logger.warning` | ## 8. 评估面板 每次运行都会持久化到 eval 存储库 (`/evals`) 中。面板会追踪: - **两阶段延迟**:监控阶段和 agent 阶段以单独的列分别显示。 - **按 Agent 细分**:每次 agent 运行的持续时间和 token 归因。 - **OTPS**:每秒输出 token 数。 - **P95 / P50 基准**:在 5 次运行后计算得出;当新运行在持续时间或成本上超过 2 倍的 P95 时,会触发 `logger.warning`。 - **Token 预算告警**:从第 1 次运行开始,当 `tokens_out > OUTPUT_TOKEN_BUDGET` 时就会触发。 ## 9. 本地运行 **前置条件**:Python 3.11+,一个拥有 API 凭据的 Fivetran 账户,以及一个启用了 Vertex AI 的 GCP 项目。 ``` git clone https://github.com/BishopSlice/PipelineReliability.git cd PipelineReliability # 安装 dependencies pip install -e . # 配置 credentials cp .env.example .env # 编辑 .env:设置 FIVETRAN_API_KEY、FIVETRAN_API_SECRET、GOOGLE_CLOUD_PROJECT、 # GOOGLE_GENAI_USE_VERTEXAI=true、GOOGLE_CLOUD_LOCATION=global # 向 Google Cloud 进行认证 gcloud auth application-default login # 启动 demo server python3 -m uvicorn demo.server:app --host 0.0.0.0 --port 8000 # 打开 http://localhost:8000 ``` **Cloud Run 部署**(需要经过认证的 `gcloud` CLI): ``` bash scripts/deploy.sh ``` ## 10. 测试 97 个确定性的单元测试覆盖了护栏层 —— 即在 LLM 输出到达 session 状态之前对其进行封装和验证的。这里是最容易出现隐蔽回归问题的地方:强制转换 bug 不会导致系统崩溃,但会导致事件被路由到错误的地方。 ``` python3 -m pytest tests/ -v # no credentials required ``` 覆盖率:信号模型验证、分类器评分(涵盖所有的混淆因素和平局规则)、上下文强制转换、输入过滤与清理、爆炸半径标准化。 完整的 agent 运行需要实时的 Fivetran 凭据,并且会在每次发布之前针对三个演示连接进行手动验证。LLM 的非确定性使得端到端运行如果不进行响应重放,就不适合纳入自动化 CI 流程。 ## 许可证 MIT — 详见 [LICENSE](LICENSE)。
标签:Fivetran, PyRIT, 事故响应, 多智能体系统, 数据工程, 数据管道, 用户代理, 自动化运维, 软件工程, 逆向工具