Raditya0902/multi-agent-production-incident-response-system

GitHub: Raditya0902/multi-agent-production-incident-response-system

基于LangGraph的多Agent系统,自动化完成生产事件从诊断到代码修复的全流程,支持人工审批、Slack通知和GitHub PR创建。

Stars: 0 | Forks: 0

# 多 Agent 生产事件响应系统 一个完全自主的 AI pipeline,能够检测生产事件、诊断根本原因、生成并验证代码修复、起草个性化客户回复,并在 90 秒内打开 GitHub PR —— 在应用任何修复之前,设有人工介入的审批关卡。 基于 **LangGraph**、**Groq (Llama 3)**、**ChromaDB RAG**、**Streamlit**、**Slack** 和 **GitHub API** 构建。 ## 功能说明 当生产事件发生时,您只需将错误日志和客户投诉粘贴到仪表板中,然后点击 **Run**。系统随后会执行以下操作: 1. **关联** 错误与投诉,并按名称识别受影响的客户 2. **分类严重性** (P0 Critical / P1 High / P2 Medium),基于错误类型、客户数量和关键词 3. **诊断根本原因**,通过 RAG 检索过往事件数据库进行诊断 4. **生成代码补丁**,采用包含修改前后 diff 的结构化格式 5. **暂停等待人工审查** — 您阅读补丁后可以选择批准,或附带反馈拒绝 6. **在隔离沙箱中执行测试** — 如果测试失败,Critic Agent 会分析失败原因,Fix Generator 将重试(最多 3 次) 7. **起草个性化客户回复**,针对每一位受影响的客户 8. **生成事后复盘报告**,格式为 Markdown 9. **发送 Slack 通知** 到您的事件频道 10. **打开 GitHub PR**,将修复应用到真实源文件中 ## Pipeline 架构 ``` flowchart TD A([🚨 Incident Input\nlogs + complaints]) --> B subgraph Phase1["⚙️ Phase 1 — Diagnosis"] B[Correlation Agent\nlinks errors to customers] --> C C[Severity Agent\nP0 / P1 / P2 classification] --> D D[Root Cause Agent\nRAG over past incidents] --> E E[Fix Generator Agent\ngenerates code patch] end E --> F subgraph HITL["👤 Human-in-the-Loop Gate"] F{Human Review\nApprove or Reject?} F -- Reject + feedback --> E F -- Approve --> G end subgraph Phase2["⚙️ Phase 2 — Execution"] G[Execution Agent\nruns tests in sandbox] --> H H{Tests passed?} H -- ✅ Yes --> I[Customer Response Agent\ndrafts replies per customer] H -- ❌ No, retries left --> J[Critic Agent\nanalyzes failure] H -- ❌ No, max retries --> K[🆘 Escalation\nhuman engineer required] J --> E2[Fix Generator Agent\nregenerated patch] E2 --> G I --> L[Incident Report Agent\ngenerates postmortem] end L --> M([💾 Saved to SQLite]) L --> N([📣 Slack notification]) L --> O([🔗 GitHub PR opened]) K --> M K --> N ``` ### 两阶段拆分 该 pipeline 拆分为两个已编译的 LangGraph 图,以支持人工审批关卡而无需持久化的检查点器: | 阶段 | Agent | 触发条件 | |---|---|---| | **阶段 1** | Correlation → Severity → Root Cause → Fix Generator | 点击 "Run" | | **阶段 2** | Execution → [Critic → Fix Generator loop] → Customer Response → Incident Report | 点击 "Approve" | 在审批面板上选择拒绝会直接调用 `fix_generator_agent()`(无需完全重新运行),并停留在审批界面显示新的补丁。 ## 功能特性 - **7 个专门的 AI agent**,由 LangGraph 编排,带有条件重试循环 - **RAG 检索**,通过 ChromaDB + sentence-transformers 实现 —— 过往事件为诊断提供参考 - **严重性分类** (P0/P1/P2) 附带原因说明,在 UI 中以彩色徽章显示 - **人工介入的补丁审批** —— 在执行任何代码前进行审查 - **自我修复重试循环** —— Critic Agent 分析测试失败并指导重新生成(最多 3 次尝试) - **隔离的测试执行** —— 补丁在 subprocess 沙箱中运行(可选 Docker) - **个性化客户回复** —— 所有客户合并到一次 LLM 调用中处理 - **Slack 通知** —— 包含严重性颜色、根本原因、受影响客户和 PR 链接的富文本 Block Kit 消息 - **GitHub PR 创建** —— 测试通过后自动创建分支、提交代码并开启 PR - **SQLite 事件历史** —— 浏览、按严重性筛选以及下载事后复盘报告 - **分析仪表板** —— 严重性分布、MTTR 趋势、重试分布、结果矩阵 - **Webhook API** (FastAPI) —— 外部工具可以通过 8000 端口直接 POST 事件 - **流式输出** —— 当 agent 完成工作时,逐字符流式传输根本原因和修复说明 - **速率限制处理** —— 针对每分钟限制的指数退避(2s → 4s → 8s → 16s);对每日配额耗尽提供明确的错误提示 - **Docker 支持** —— `docker compose up --build` 启动所有服务并带有持久化卷 ## 技术栈 | 层级 | 技术 | |---|---| | Agent 编排 | [LangGraph](https://github.com/langchain-ai/langgraph) | | LLM | [Groq API](https://console.groq.com/) — `llama-3.3-70b-versatile` (或 `llama-3.1-8b-instant`) | | RAG 向量存储 | [ChromaDB](https://www.trychroma.com/) + `sentence-transformers/all-MiniLM-L6-v2` | | 前端 | [Streamlit](https://streamlit.io/) | | Webhook API | [FastAPI](https://fastapi.tiangolo.com/) + [Uvicorn](https://www.uvicorn.org/) | | 通知 | [slack-sdk](https://slack.dev/python-slack-sdk/) | | GitHub 集成 | [PyGithub](https://pygithub.readthedocs.io/) | | 分析图表 | [Plotly](https://plotly.com/python/) | | 数据持久化 | SQLite (标准库 `sqlite3`) | | 测试 | [pytest](https://pytest.org/) | | 容器化 | Docker + Docker Compose | ## 项目结构 ``` ├── agents/ # One file per agent │ ├── __init__.py # Lazy LLM singleton with exponential backoff retry │ ├── state.py # IncidentState TypedDict + create_initial_state() │ ├── correlation_agent.py # Extracts error summary and customer names from logs │ ├── severity_agent.py # Classifies P0 / P1 / P2 with reasoning │ ├── root_cause_agent.py # RAG-augmented root cause diagnosis │ ├── fix_generator_agent.py# Generates code patches in delimited format │ ├── execution_agent.py # Writes + runs test harness in subprocess/Docker │ ├── critic_agent.py # Analyzes test failures and guides next attempt │ ├── customer_response_agent.py # Drafts personalized replies (batched) │ └── incident_report_agent.py # Generates Markdown postmortem │ ├── graph/ │ └── workflow.py # LangGraph graphs: phase1_app, phase2_app, app │ ├── rag/ │ ├── vectorstore.py # ChromaDB wrapper — retrieve_context(), add_documents() │ ├── ingestion.py # Ingest past_incidents/*.md into ChromaDB │ └── past_incidents/ # 4 Markdown incident knowledge-base files │ ├── app/ # Buggy sample source files (one per test case) │ ├── data_processing.py # IndexError at line 42 │ ├── payment_service.py # KeyError at line 87 │ ├── api_gateway.py # KeyError at line 23 │ ├── notification_service.py # AttributeError at line 78 │ ├── order_service.py # IndexError at line 55 │ ├── analytics_service.py # ZeroDivisionError at line 91 │ └── database_service.py # TimeoutError at line 134 │ ├── db/ │ └── history.py # SQLite CRUD — save, list, get, delete, analytics │ ├── notifications/ │ ├── slack.py # Block Kit Slack messages (bot token or webhook URL) │ └── github_pr.py # Branch, commit, and PR creation via PyGithub │ ├── api/ │ └── webhook.py # FastAPI webhook — POST /webhook/incident │ ├── frontend/ │ ├── app.py # Main Streamlit app (5-phase HITL state machine) │ └── pages/ │ ├── 1_History.py # Incident history with severity filter │ └── 2_Analytics.py # Charts: severity, MTTR, retry distribution │ ├── sandbox/ │ └── Dockerfile # Isolated container for code execution │ ├── tests/ │ ├── conftest.py # sys.path fixture │ ├── test_agents.py # Patch parser, router logic, RAG, execution sandbox │ ├── test_history.py # SQLite CRUD tests with temp DB isolation │ ├── test_slack.py # Block builder and transport layer tests │ └── test_github_pr.py # Branch naming, patch application, PR body tests │ ├── data/ │ ├── sample_logs/ # indexerror_log.txt, keyerror_log.txt, timeout_log.txt │ └── sample_complaints/ # complaints.json │ ├── Dockerfile # App image (python:3.11-slim, CPU torch) ├── docker-compose.yml # Streamlit (8501) + FastAPI webhook (8000) ├── entrypoint.sh # RAG ingestion → Streamlit start ├── requirements.txt ├── .env.example # All environment variable documentation └── test_cases.txt # 7 copy-paste test scenarios for the dashboard ``` ## 快速开始 (本地) ### 前置条件 - Python 3.11+ - [Groq API key](https://console.groq.com/) (提供免费额度) ### 1. 克隆并安装 ``` git clone https://github.com/Raditya0902/multi-agent-production-incident-response-system.git cd multi-agent-production-incident-response-system pip install -r requirements.txt ``` ### 2. 配置环境 ``` cp .env.example .env # 打开 .env 并设置 GROQ_API_KEY (必填) # 所有其他变量为可选 — 请参阅下方的 Environment Variables ``` ### 3. 将过往事件导入 ChromaDB ``` python -m rag.ingestion # 输出: "Ingested 4 documents into ChromaDB." ``` ### 4. 运行仪表板 ``` streamlit run frontend/app.py # 在 http://localhost:8501 打开 ``` ### 5. (可选) 运行 Webhook API ``` uvicorn api.webhook:app --port 8000 --reload # API 文档位于 http://localhost:8000/docs ``` ## Docker 在 **端口 8501** 启动 Streamlit 仪表板,并在 **端口 8000** 启动 FastAPI webhook。ChromaDB 和 SQLite 数据在命名的 Docker 卷中持久化存储,跨重启保留。 ``` # 构建并启动所有内容 docker compose up --build # 停止 docker compose down # 停止并移除 volumes (清除事件历史和 RAG) docker compose down -v ``` 入口点会在启动 Streamlit 之前自动运行 `rag.ingestion`,因此 RAG 知识库始终处于填充状态。 ## 环境变量 将 `.env.example` 复制到 `.env` 并填入所需的值。 | 变量 | 必填 | 默认值 | 描述 | |---|---|---|---| | `GROQ_API_KEY` | ✅ 是 | — | 来自 [console.groq.com](https://console.groq.com/) 的 Groq API 密钥 | | `LLM_MODEL` | 否 | `llama-3.3-70b-versatile` | Groq 模型。使用 `llama-3.1-8b-instant` 可获得更高的速率限制 | | `GROQ_TEMPERATURE` | 否 | `0.2` | LLM 温度 (0–1) | | `GROQ_MAX_TOKENS` | 否 | `4096` | 每次 LLM 调用的最大 token 数 | | `MAX_RETRY_ATTEMPTS` | 否 | `3` | 升级前的最大修复重试次数。设为 `1` 用于快速演示 | | `SANDBOX_MODE` | 否 | `subprocess` | 用于测试执行的 `subprocess` 或 `docker` | | `INCIDENT_DB_PATH` | 否 | `./incidents.db` | SQLite 数据库路径 | | `CHROMA_PERSIST_DIR` | 否 | `./rag/chroma_db` | ChromaDB 持久化目录 | | `SLACK_BOT_TOKEN` | 否 | — | Slack bot token (`xoxb-...`)。需要 `chat:write` 权限范围 | | `SLACK_WEBHOOK_URL` | 否 | — | Slack 入站 webhook URL (bot token 的替代方案) | | `SLACK_INCIDENT_CHANNEL` | 否 | `#incidents` | 已解决事件的通知频道 | | `SLACK_ESCALATION_CHANNEL` | 否 | 同上 | 升级警报的频道 | | `GITHUB_TOKEN` | 否 | — | 具有 `repo` 权限范围的 GitHub PAT | | `GITHUB_REPO` | 否 | — | 目标 repo,格式为 `owner/name` | | `GITHUB_BASE_BRANCH` | 否 | `main` | PR 的目标分支 | | `WEBHOOK_API_KEY` | 否 | — | Webhook 端点的 API 密钥。留空 = 无需认证 | ## 运行测试 ``` # 单元测试 (无需 API key — 约 8 秒) pytest tests/ -m "not integration" -v # 集成测试 (需要 GROQ_API_KEY, 运行完整 pipeline) pytest tests/ -m integration -v ``` **63 个单元测试** 涵盖: - 补丁解析器、路由逻辑、执行沙箱 (`test_agents.py`) - 带有临时数据库隔离的 SQLite CRUD (`test_history.py`) - Slack Block Kit 构建器和传输层 (`test_slack.py`) - GitHub 分支命名、补丁应用、PR 正文结构 (`test_github_pr.py`) ## Webhook API FastAPI 服务与仪表板一同在端口 8000 上运行。 ### 端点 | 方法 | 路径 | 描述 | |---|---|---| | `GET` | `/health` | 健康检查 | | `POST` | `/webhook/incident` | 触发完整 pipeline | ### POST /webhook/incident **请求体:** ``` { "logs": "2024-11-15 14:32:01 ERROR [data_processing] ...\nIndexError: list index out of range", "complaints": [ "App crashes every time I upload a CSV!", "Getting an error on file submit — urgent deadline." ], "source": "datadog" } ``` **响应:** ``` { "incident_id": 12, "severity": "P1", "status": "complete", "correlated_error": "IndexError in data_processing.py:42", "root_cause": "Missing bounds check before list indexing...", "tests_passed": true, "escalate_to_human": false, "affected_customers": ["Alice Johnson", "Bob Martinez"], "patch_explanation": "Added bounds check to handle empty batch input.", "run_time_seconds": 47.3, "slack_notified": true, "github_pr_url": "https://github.com/owner/repo/pull/5" } ``` **身份验证:** 在 `.env` 中设置 `WEBHOOK_API_KEY`,并将其作为 `X-Api-Key` 请求头传递。将 `WEBHOOK_API_KEY` 留空可在开发环境中禁用身份验证。 交互式 API 文档:`http://localhost:8000/docs` ## 测试用例 `test_cases.txt` 包含 7 个可直接粘贴到仪表板的场景。每个场景都包含可粘贴到两个输入区域的准确错误日志和客户投诉。 | # | 场景 | 预期严重性 | 亮点 | |---|---|---|---| | 1 | IndexError — 文件上传崩溃 | P1 | 正常路径,首次尝试即通过 | | 2 | KeyError — 支付失败 | P0 | 支付关键词触发 Critical 级别;2 个客户 | | 3 | DB timeout — 连接池 | P1 | 基础设施/性能问题 | | 4 | AttributeError — 欢迎邮件 | P2 | 非关键通知 bug | | 5 | Mass outage — 服务宕机 | P0 | 5 个客户,压力测试 | | 6 | ZeroDivisionError — 分析服务 | P2 | RAG 检索中的新型错误类型 | | 7 | Reject → regenerate 流程 | P2 | 测试人工拒绝 + 补丁循环 | 每个测试用例都对应 `app/` 中的一个真实存在 bug 的源文件,其 bug 刚好位于堆栈跟踪指出的具体行号,因此 GitHub PR 会显示出干净、可合并的 diff。 ## 各 Agent 工作原理 | Agent | 输入 | 输出 | 核心设计 | |---|---|---|---| | **Correlation** | 原始日志 + 投诉 | 关联的错误、客户名称 | 严格的 JSON 提示;针对格式异常的响应提供 regex 回退机制 | | **Severity** | 关联的错误、客户数量 | P0 / P1 / P2 + 原因 | 规则层级:payment/outage → P0,crash/urgent → P1,否则为 P2 | | **Root Cause** | 关联的错误、日志 | 根本原因、相关文件 | 将前 3 个 ChromaDB 结果作为带编号的过往事件上下文注入 | | **Fix Generator** | 根本原因、文件、Critic 反馈 | 代码补丁 (分隔格式) | 使用分隔的 `===FILE=== ===ORIGINAL=== ===FIXED===` 避免 JSON 转义问题 | | **Execution** | 代码补丁 | 测试结果,通过/失败 | 在临时目录中生成合成的 pytest 桩代码;通过 `sys.executable` 运行 | | **Critic** | 失败的测试输出、补丁 | 结构化反馈 | 输出 `WHAT FAILED / WHY / NEXT ATTEMPT MUST` 以指导重新生成 | | **Customer Response** | 客户名称、根本原因 | 每个客户的回复 | 使用 `---CUSTOMER: name---` 分隔符将所有客户合并到一次 LLM 调用中 | | **Incident Report** | 所有状态字段 | Markdown 事后复盘报告 | 基于模板处理事实部分;仅使用 LLM 生成建议 | ## Streamlit UI 页面 | 页面 | 路径 | 描述 | |---|---|---| | **Main** | `/` | 5 阶段 pipeline 运行器,带有实时 Agent 活动和 HITL 审批关卡 | | **History** | `/History` | 所有历史运行,包含严重性筛选、可展开的详情标签页和事后复盘下载 | | **Analytics** | `/Analytics` | Plotly 图表:严重性分布、MTTR 趋势、重试分布、结果矩阵 | ## 许可证 MIT
标签:AIOps, AI智能体, ChromaDB, CISA项目, DLL 劫持, GitHub API, HITL, IP 地址批量处理, Kubernetes, LangGraph, Llama 3, LLM, PyRIT, RAG, Slack, Slack通知, SRE, Streamlit, Unmanaged PE, 事后复盘, 人工智能, 人机协同, 代码审查, 代码生成, 偏差过滤, 多智能体系统, 大语言模型, 工作流自动化, 开源框架, 异常检测, 持续集成, 故障自愈, 根因分析, 渗透测试工具, 生产事故处理, 用户模式Hook绕过, 自动修复, 自动创建PR, 自动化运维, 补丁生成, 访问控制, 请求拦截, 逆向工具