Yassine-Ben-Terras/Multi-Agent-System-for-Data-Pipeline-Silent-Failure-Detection
GitHub: Yassine-Ben-Terras/Multi-Agent-System-for-Data-Pipeline-Silent-Failure-Detection
一个基于 LangGraph 的 AI 多智能体系统,用于自主检测、诊断和修复数据工程管道中不抛异常但数据已损坏的静默故障。
Stars: 0 | Forks: 0
# 🛡️ MAS Pipeline Sentinel
一个生产级的、AI驱动的多智能体系统 (MAS),能够自主**检测**、**诊断**和**修复**现代数据工程管道中的静默故障——在它们影响到仪表板、ML模型或财务报告之前。
## 问题所在
静默故障是数据工程中最危险的故障。管道成功执行完毕,没有抛出异常,也没有触发警报——但它产生的数据却是**错误的、不完整的或已损坏的**。下游消费者在数小时甚至数天内都在基于错误的数据进行操作。
## 解决方案
一个五智能体系统,其中每个智能体拥有一个独立的可观测性领域,通过共享事件总线进行通信,并通过中央协调器进行协作,以构建**因果链**——而不仅仅是症状报告。
```
┌─────────────────────────────────────────────────────────┐
│ ORCHESTRATOR │
│ (correlates signals · manages severity) │
└──────────┬──────────┬──────────┬──────────┬─────────────┘
│ │ │ │
┌──────▼──┐ ┌────▼────┐ ┌──▼──────┐ ┌─▼──────────┐
│Ingestion│ │ Schema │ │Quality │ │ Lineage & │
│ Monitor │ │ Watcher │ │Auditor │ │ Impact │
└─────────┘ └─────────┘ └─────────┘ └─────────────┘
│
┌──────▼──────┐
│Remediation │
│ Agent │
└─────────────┘
```
### 五个智能体
| 智能体 | 领域 | 关键信号 |
|---|---|---|
| **Ingestion Monitor** | 源系统 | 行数异常、到达延迟、校验和漂移 |
| **Schema Watcher** | Schema 契约 | 列删除、类型强制转换、语义漂移 |
| **Quality Auditor** | dbt 测试结果 | 失败率、NULL 率、分布偏移 |
| **Lineage & Impact** | dbt DAG 遍历 | 爆炸半径:模型、仪表板、SLA 影响 |
| **Remediation** | 自动化响应 | 隔离、回填、PagerDuty/Jira 事件 |
## 技术栈
| 层 | 技术 |
|---|---|
| Agent 框架 | [LangGraph](https://github.com/langchain-ai/langgraph) |
| LLM 后端 | Anthropic Claude (Sonnet 用于推理,Haiku 用于分类) |
| 消息总线 | Apache Kafka (Confluent Cloud 或自托管) |
| Agent 状态存储 | PostgreSQL |
| 元数据存储 | S3 / 本地 (dbt artifacts: `manifest.json`, `run_results.json`) |
| dbt 集成 | dbt Core |
| 编排 | Apache Airflow |
| 容器化 | Docker + Docker Compose |
| 可观测性 | OpenTelemetry |
| 云端 | AWS (Lambda, ECS Fargate, EventBridge, S3) |
## 项目结构
```
mas-pipeline-sentinel/
├── agents/
│ ├── ingestion_monitor/ # Row count anomaly, checksum drift detection
│ ├── schema_watcher/ # Schema contract validation
│ ├── quality_auditor/ # dbt test result parsing & analysis
│ ├── lineage_impact/ # dbt DAG traversal & blast radius
│ ├── remediation/ # Quarantine, backfill, incident creation
│ └── orchestrator/ # Signal correlation & severity escalation
├── infra/
│ ├── kafka/ # Kafka topic definitions & configs
│ └── postgres/ # State store schema & migrations
├── dbt_integration/ # dbt artifact consumers & schema.yml helpers
├── tests/
│ ├── unit/ # Per-agent unit tests
│ └── integration/ # End-to-end pipeline tests
├── config/ # Environment configs (topics, thresholds, playbooks)
├── scripts/ # Setup, seed, and utility scripts
├── docs/ # Architecture diagrams & ADRs
├── docker-compose.yml # Full local stack
├── Makefile # Developer shortcuts
└── requirements.txt
```
## 快速入门 (本地)
### 前置条件
- Docker & Docker Compose
- Python 3.11+
- Anthropic API key
### 1. 克隆并配置
```
git clone https://github.com/YOUR_USERNAME/mas-pipeline-sentinel.git
cd mas-pipeline-sentinel
cp .env.example .env
# 编辑 .env → 添加你的 ANTHROPIC_API_KEY
```
### 2. 启动本地技术栈
```
make up
```
### 3. 在 Shadow 模式下运行(仅观察,不执行操作)
```
make shadow-mode
```
## 部署阶段
| 阶段 | 启用内容 | 成功标准 |
|---|---|---|
| **Shadow Mode** (第 1–2 周) | 所有智能体仅观察并记录日志 | 在已知良好的运行中零误报 |
| **Alert-Only** (第 3–4 周) | 智能体触发 Slack/电子邮件警报 | >80% 的警报是具有可操作性的 |
| **Supervised Remediation** (第 5–6 周) | 仅对 LOW 严重级别自动执行 | MTTD 降低 >50% |
| **Full Autonomy** (第 7 周+) | 所有严重级别自主执行 | MTTD <5 分钟,误报率 <5% |
## 文档
- [`docs/architecture.md`](docs/architecture.md) — 完整的系统设计与 ADR
- [`docs/agents.md`](docs/agents.md) — 各智能体设计与 observe→reason→signal 循环
- [`docs/event-schema.md`](docs/event-schema.md) — 事件总线主题定义与消息 Schema
- [`docs/runbooks.md`](docs/runbooks.md) — 事件响应与修复操作手册
## 路线图
- [x] 项目脚手架与环境设置
- [ ] Kafka + PostgreSQL 本地基础设施
- [ ] Ingestion Monitor Agent (基线)
- [ ] Schema Watcher Agent
- [ ] Quality Auditor Agent (dbt 集成)
- [ ] Lineage & Impact Agent (manifest.json 遍历)
- [ ] Orchestrator 关联逻辑
- [ ] 带有操作手册的 Remediation Agent
- [ ] Shadow 模式部署
- [ ] OpenTelemetry 可观测性
- [ ] AWS 生产环境部署
## 许可证
MIT 许可证 — 详见 [LICENSE](LICENSE)
*作为作品集项目构建,旨在演示面向数据工程管道的生产级多智能体系统 (MAS) 设计。*
标签:AI, AIOps, DataOps, IT运维, MAS, PyRIT, Socks5代理, Zenmap, 事件总线, 人工智能, 仪表板, 因果链, 多智能体系统, 异常检测, 影响分析, 数据可观测性, 数据完整性, 数据工程, 数据摄取, 数据治理, 数据管道, 数据血缘, 数据质量, 机器学习模型, 根因分析, 模式监控, 流水线监控, 测试用例, 漏洞利用检测, 生产级, 用户代理, 用户模式Hook绕过, 编排器, 自动修复, 自动化代码审查, 自动化诊断, 请求拦截, 软件工程, 软件成分分析, 逆向工具, 金融报告, 静默故障