Yassine-Ben-Terras/Multi-Agent-System-for-Data-Pipeline-Silent-Failure-Detection

GitHub: Yassine-Ben-Terras/Multi-Agent-System-for-Data-Pipeline-Silent-Failure-Detection

一个基于 LangGraph 的 AI 多智能体系统,用于自主检测、诊断和修复数据工程管道中不抛异常但数据已损坏的静默故障。

Stars: 0 | Forks: 0

# 🛡️ MAS Pipeline Sentinel 一个生产级的、AI驱动的多智能体系统 (MAS),能够自主**检测**、**诊断**和**修复**现代数据工程管道中的静默故障——在它们影响到仪表板、ML模型或财务报告之前。 ## 问题所在 静默故障是数据工程中最危险的故障。管道成功执行完毕,没有抛出异常,也没有触发警报——但它产生的数据却是**错误的、不完整的或已损坏的**。下游消费者在数小时甚至数天内都在基于错误的数据进行操作。 ## 解决方案 一个五智能体系统,其中每个智能体拥有一个独立的可观测性领域,通过共享事件总线进行通信,并通过中央协调器进行协作,以构建**因果链**——而不仅仅是症状报告。 ``` ┌─────────────────────────────────────────────────────────┐ │ ORCHESTRATOR │ │ (correlates signals · manages severity) │ └──────────┬──────────┬──────────┬──────────┬─────────────┘ │ │ │ │ ┌──────▼──┐ ┌────▼────┐ ┌──▼──────┐ ┌─▼──────────┐ │Ingestion│ │ Schema │ │Quality │ │ Lineage & │ │ Monitor │ │ Watcher │ │Auditor │ │ Impact │ └─────────┘ └─────────┘ └─────────┘ └─────────────┘ │ ┌──────▼──────┐ │Remediation │ │ Agent │ └─────────────┘ ``` ### 五个智能体 | 智能体 | 领域 | 关键信号 | |---|---|---| | **Ingestion Monitor** | 源系统 | 行数异常、到达延迟、校验和漂移 | | **Schema Watcher** | Schema 契约 | 列删除、类型强制转换、语义漂移 | | **Quality Auditor** | dbt 测试结果 | 失败率、NULL 率、分布偏移 | | **Lineage & Impact** | dbt DAG 遍历 | 爆炸半径:模型、仪表板、SLA 影响 | | **Remediation** | 自动化响应 | 隔离、回填、PagerDuty/Jira 事件 | ## 技术栈 | 层 | 技术 | |---|---| | Agent 框架 | [LangGraph](https://github.com/langchain-ai/langgraph) | | LLM 后端 | Anthropic Claude (Sonnet 用于推理,Haiku 用于分类) | | 消息总线 | Apache Kafka (Confluent Cloud 或自托管) | | Agent 状态存储 | PostgreSQL | | 元数据存储 | S3 / 本地 (dbt artifacts: `manifest.json`, `run_results.json`) | | dbt 集成 | dbt Core | | 编排 | Apache Airflow | | 容器化 | Docker + Docker Compose | | 可观测性 | OpenTelemetry | | 云端 | AWS (Lambda, ECS Fargate, EventBridge, S3) | ## 项目结构 ``` mas-pipeline-sentinel/ ├── agents/ │ ├── ingestion_monitor/ # Row count anomaly, checksum drift detection │ ├── schema_watcher/ # Schema contract validation │ ├── quality_auditor/ # dbt test result parsing & analysis │ ├── lineage_impact/ # dbt DAG traversal & blast radius │ ├── remediation/ # Quarantine, backfill, incident creation │ └── orchestrator/ # Signal correlation & severity escalation ├── infra/ │ ├── kafka/ # Kafka topic definitions & configs │ └── postgres/ # State store schema & migrations ├── dbt_integration/ # dbt artifact consumers & schema.yml helpers ├── tests/ │ ├── unit/ # Per-agent unit tests │ └── integration/ # End-to-end pipeline tests ├── config/ # Environment configs (topics, thresholds, playbooks) ├── scripts/ # Setup, seed, and utility scripts ├── docs/ # Architecture diagrams & ADRs ├── docker-compose.yml # Full local stack ├── Makefile # Developer shortcuts └── requirements.txt ``` ## 快速入门 (本地) ### 前置条件 - Docker & Docker Compose - Python 3.11+ - Anthropic API key ### 1. 克隆并配置 ``` git clone https://github.com/YOUR_USERNAME/mas-pipeline-sentinel.git cd mas-pipeline-sentinel cp .env.example .env # 编辑 .env → 添加你的 ANTHROPIC_API_KEY ``` ### 2. 启动本地技术栈 ``` make up ``` ### 3. 在 Shadow 模式下运行(仅观察,不执行操作) ``` make shadow-mode ``` ## 部署阶段 | 阶段 | 启用内容 | 成功标准 | |---|---|---| | **Shadow Mode** (第 1–2 周) | 所有智能体仅观察并记录日志 | 在已知良好的运行中零误报 | | **Alert-Only** (第 3–4 周) | 智能体触发 Slack/电子邮件警报 | >80% 的警报是具有可操作性的 | | **Supervised Remediation** (第 5–6 周) | 仅对 LOW 严重级别自动执行 | MTTD 降低 >50% | | **Full Autonomy** (第 7 周+) | 所有严重级别自主执行 | MTTD <5 分钟,误报率 <5% | ## 文档 - [`docs/architecture.md`](docs/architecture.md) — 完整的系统设计与 ADR - [`docs/agents.md`](docs/agents.md) — 各智能体设计与 observe→reason→signal 循环 - [`docs/event-schema.md`](docs/event-schema.md) — 事件总线主题定义与消息 Schema - [`docs/runbooks.md`](docs/runbooks.md) — 事件响应与修复操作手册 ## 路线图 - [x] 项目脚手架与环境设置 - [ ] Kafka + PostgreSQL 本地基础设施 - [ ] Ingestion Monitor Agent (基线) - [ ] Schema Watcher Agent - [ ] Quality Auditor Agent (dbt 集成) - [ ] Lineage & Impact Agent (manifest.json 遍历) - [ ] Orchestrator 关联逻辑 - [ ] 带有操作手册的 Remediation Agent - [ ] Shadow 模式部署 - [ ] OpenTelemetry 可观测性 - [ ] AWS 生产环境部署 ## 许可证 MIT 许可证 — 详见 [LICENSE](LICENSE) *作为作品集项目构建,旨在演示面向数据工程管道的生产级多智能体系统 (MAS) 设计。*
标签:AI, AIOps, DataOps, IT运维, MAS, PyRIT, Socks5代理, Zenmap, 事件总线, 人工智能, 仪表板, 因果链, 多智能体系统, 异常检测, 影响分析, 数据可观测性, 数据完整性, 数据工程, 数据摄取, 数据治理, 数据管道, 数据血缘, 数据质量, 机器学习模型, 根因分析, 模式监控, 流水线监控, 测试用例, 漏洞利用检测, 生产级, 用户代理, 用户模式Hook绕过, 编排器, 自动修复, 自动化代码审查, 自动化诊断, 请求拦截, 软件工程, 软件成分分析, 逆向工具, 金融报告, 静默故障