omatheusmesmo/incident-response-sre

GitHub: omatheusmesmo/incident-response-sre

基于AI的事件响应SRE演示应用,实现多层级事件响应流程。

Stars: 2 | Forks: 0

# 事件响应 SRE 使用 **Quarkus**、**Quarkus LangChain4j (Agentic**) 和 **Quarkus Flow** 构建的 AI 事件响应 3 层演变演示,从单个无状态 AI 调用,到代理模式工具箱,再到耐用、人工门控、事件驱动的流程。 演示的重点是 **组合**:LangChain4j 提供代理,Flow 为这些代理提供耐用性、人工介入、事件和崩溃恢复。 ## 快速开始 ``` # 默认提供者是 NVIDIA NIM(与 OpenAI 兼容)。导出您的密钥: export NVIDIA_AI_API_KEY=nvapi-... ./mvnw quarkus:dev ``` 没有 NVIDIA 密钥?在 Ollama 上本地运行(Dev Services 启动它并拉取模型): ``` ./mvnw quarkus:dev -Dquarkus.profile=ollama,dev ``` 然后打开 **http://localhost:8080/** 上的实时控制台并触发一个事件以观察其运行 → 暂停以供批准 → 解决。Dev Services 自动启动 PostgreSQL、Kafka(Redpanda)和 WireMock(以及 `ollama` 配置文件上的 Ollama)。 ## 3 层演变 ### 第 1 层:ChatModel(单个 AI 服务) 一个 `@RegisterAiService`,将警报文本转换为结构化分析。无状态:无内存、无迭代、无耐用性。 **端点:** `POST /incidents/alert` ``` curl -X POST http://localhost:8080/incidents/alert \ -H "Content-Type: application/json" \ -d '{"source":"prometheus","service":"api-gateway","metric":"error_rate", "value":15.5,"threshold":5.0,"message":"Error rate 15.5% > 5.0%"}' ``` ### 第 2 层:代理模式(LangChain4j Agentic) 每个代理模式都暴露在其自己的端点上,以便您可以单独查看它们。每个代理图都保持 **一个级别深** - 更高级别的编排是在纯 Java/Flow 中完成的。(一个将 `@ParallelAgent` 和 `@ConditionalAgent` 作为兄弟节点嵌套在同一个 Flow 编译拓扑中的图会导致死锁;保持每个图浅并使用一个 `evidenceText` 值线程共享证据可以避免它。) | 模式 | 端点 | 显示内容 | |------|------|---------| | `@ParallelAgent` | `POST /incidents/parallel` | 扇出:日志 + 指标 + 部署历史证据代理并发运行(在小型/快速模型上)。 | | `@ConditionalAgent` | `POST /incidents/conditional` | 严重性路由器:P1/P2 进行深入的基于证据的诊断,P3/P4 进行轻量级分诊(`@ActivationCondition`)。 | | `@LoopAgent` | `POST /incidents/loop` | 诊断 → 补救 → 评分,直到 `@ExitCondition` 置信度 ≥ 0.8。携带 `@ErrorHandler` 以提高弹性(见下文)。 | | `@SupervisorAgent` | `POST /incidents/commander` | 一个 LLM "事件指挥官" 自主决定咨询哪些领域专家(数据库、Kubernetes、网络、缓存),然后综合他们的发现(`responseStrategy = SUMMARY`)。 | ``` # @SupervisorAgent:指挥官自行将路由到相关专家 curl -X POST http://localhost:8080/incidents/commander \ -H "Content-Type: application/json" \ -d '{"source":"prometheus","service":"checkout-api","metric":"db_pool_active", "value":100,"threshold":80, "message":"p99 8s, HikariCP pool exhausted, threads blocked on connections"}' ``` 所有四个体都使用与第 1 层相同的 `AlertInput` 形状。 **大模型 + 小模型拆分。** 默认提供程序(NVIDIA NIM)使用 `meta/llama-3.3-70b-instruct` 进行推理,并使用较小的 `meta/llama-3.1-8b-instruct`(`@ModelName("evidence")`)用于并行证据代理和监督专家,以保持并发扇出对速率限制的廉价。 #### 代理弹性:`@ErrorHandler` `DiagnosticLoopAgent` 带有一个静态的 `@ErrorHandler`,将三类代理故障转换为优雅的行为而不是崩溃: - **缺少作用域参数**(`MissingArgumentException`)→ 在 `AgenticScope` 中播种一个合理的默认值并 `retry()`。 - **瞬态 LLM 错误**(超时 / 429 / 速率限制)→ 单个 `retry()`(在客户端自己的 `max-retries` 之上)。 - **其他任何东西**(例如,来自模型的无效 JSON)→ 编写一个回退评分并 `result(...)` 一个最佳努力的部分答案,以便循环完成。 ### 第 3 层:Flow(耐用工作流程与 HITL) `IncidentResponseFlow` 协调代理工作并添加代理单独无法做到的事情: - **代理子流程**:诊断阶段是一个作为 Flow `function(...)` 任务(`IncidentDiagnosisService` → 分类 → 收集证据 → 严重性路由诊断)连接的代理管道,其结果作为任务输出导出。 - **耐用性**:由 MVStore 支持的持久性(`~/incident-response-sre-flow.mv.db` 中的一个文件)。工作流程状态在 JVM 完全重启后仍然存在。 - **人工介入**:破坏性补救措施(`RESTART_POD`、`SCALE_DOWN`)需要 SRE 批准。工作流程发出一个批准请求 CloudEvent,在 `listen` 处暂停,并在批准/拒绝事件到达时恢复。 - **外部集成**:HTTP 任务获取 Prometheus 指标,通过部署 API 执行补救措施,并通知 Slack(所有通过 WireMock 在开发/测试中模拟)。 - **端到端类型安全**:每个 Flow 转换都是完全类型化的 - 没有 `Object.class`。Prometheus 响应反序列化为 `MetricsSnapshot` 记录,并将其折叠到 `IncidentResult`(`withLiveMetrics`)中,然后重新导出到工作流程上下文中,因此实时遥测在下游恢复和馈送到事后提示中得以保留。HTTP 请求/响应体(`SlackMessage`/`SlackAck`、`RemediationCommand`)和拒绝分支(`RemediationRejection`)是记录,而不是映射。 - **代理事后分析**:在补救措施之后,一个事后分析代理(`WorkflowPostMortemAgent`)总结事件(使用诊断、补救和实时指标)作为其自己的 Flow `agent(...)` 任务;生成的摘要被捕获回 `IncidentResult`(`withPostMortem`)并投影到 `Incident` 记录上。 - **双向事件驱动**:从 `flow-in` 消费警报/批准,通过 Kafka 发布领域事件(`flow-out`)和每个任务的生存周期(`flow-lifecycle-out`)作为 CloudEvents。 - **读模型投影**:`IncidentEventBridge` 消费工作流程的 `flow-out` 事件并将其投影回 `Incident` JPA 记录(当它暂停以供 HITL 时为 `PENDING_APPROVAL`,当它完成时为 `RESOLVED`),因此 `GET /incidents/{id}` 与实时仪表板保持一致。 **端点:** | 方法 | 路径 | 目的 | |--------|------|---------| | `POST` | `/incidents/workflow` | 为警报启动持久化工作流程。 | | `PUT` | `/incidents/{incidentId}/approve` \| `/reject` | 通过事件记录进行批准/拒绝。 | | `PUT` | `/incidents/workflow/{workflowInstanceId}/approve` \| `/reject` | 通过 **工作流程实例 id** 进行批准/拒绝 - 与 `Incident` 实体解耦,因此即使实体数据库被重置,HITL 仍然可以在重启后工作。 | | `GET` | `/incidents/{incidentId}` | 持久化的事件记录。 | 工作流程是否采取 HITL 路径取决于分诊将补救措施分类为 `破坏性`。以下警报描述了一个不可恢复的挂起服务,可靠地分类为破坏性重启: ``` # 1)启动工作流程并捕获事件 ID INCID=$(curl -s -X POST http://localhost:8080/incidents/workflow \ -H "Content-Type: application/json" \ -d '{"source":"prometheus","service":"payment-service","metric":"thread_deadlock", "value":100,"threshold":1, "message":"payment-service fully deadlocked: all worker threads BLOCKED, 100% of requests failing, liveness probe failing. Process is hung and unrecoverable; standard remediation is an immediate pod restart."}' \ | jq -r .incidentId) echo "incident: $INCID" # 2)进行分类,然后工作流程在批准门处暂停(查看应用日志/实时控制台)。 # 3a)批准 -> 恢复 -> 执行修复 -> 通知 Slack -> 剖析 -> 解决 curl -X PUT "http://localhost:8080/incidents/$INCID/approve" \ -H "Content-Type: application/json" \ -d '{"reviewer":"sre-oncall","reason":"Confirmed deadlock; restart approved"}' # 3b)...或拒绝 -> 拒绝通知 -> 结束(未执行修复) # curl -X PUT "http://localhost:8080/incidents/$INCID/reject" ... # 4)检查持久化的事件 curl -s "http://localhost:8080/incidents/$INCID" | jq ``` 非破坏性警报跳过批准门并直接通过 `executeRemediation → notifySlack → postMortem → resolved` 运行。 ## 持久性:为什么是 MVStore(以及 JPA 的注意事项) Flow 持久性是 **MVStore** (`quarkus-flow-mvstore`),一个基于文件的存储,可以在重启后继续存在。`quarkus-flow-jpa` 在这里 **不** 使用:它的 `JpaPersistenceExecutor.execute(...)` 是 `@Transactional`,因此 JTA 事务在调用线程上开始。当工作流程任务在一个 Vert.x 事件循环线程(Kafka `emitJson` 批准请求)上完成时,持久性检查点也在那里运行并失败,错误信息为 *"@Transactional cannot start a JTA transaction within a reactive pipeline."* MVStore 使用默认的非事务性 `AsyncPersistenceExecutor`,它在事件循环之外进行持久化。 第 2 层代理图被 AOT 编译到 Flow 工作流程中,但没有为 `AgenticScope` 支持的状态提供marshaller,因此它们在内存中运行(`quarkus.flow.persistence.exclude-workflows`)。只有第 3 层 `incident-response` 工作流程被持久化 - 它是需要生存 HITL 暂停的工作流程。PostgreSQL 仍然支持 `Incident`/`Alert` JPA 实体;它与 Flow 持久性无关。 ### 已注册的工作流程(Flow Dev UI) 注册了六个 `WorkflowDefinition` 实例 - 五个 AOT 编译的代理图(在内存中运行)加上耐用的第 3 层流程。每个都可达;没有孤儿。`@SupervisorAgent` 指挥官没有被编译成 Flow 工作流程(纯 LLM 协调),因此它没有出现在这里。 | 工作流程 | 模式 | 通过 | |----------|---------|-----------| | `evidence-gatherer` | @ParallelAgent | `/incidents/parallel`(也由 conditional & loop 重用) | | `severity-router` | @ConditionalAgent | `/incidents/conditional` | | `deep-diagnosis-agent` | @SequenceAgent | `severity-router` 的 P1/P2 分支 | | `light-triage-agent` | @SequenceAgent | `severity-router` 的 P3/P4 分支 | | `diagnostic-loop-agent` | @LoopAgent | `/incidents/loop` | | `incident-response` | Flow(耐用) | `/incidents/workflow` | ## 事件(通过 Kafka 的 CloudEvents) 应用程序既 **从** 也 **发布到** Kafka 主题(通过 Dev Services 的 Redpanda)。一切都是 CloudEvents: | 主题 | 应用方向 | 由 | 携带 | |-------|---------------|----|---------| | `flow-in` | **消费** | Flow 引擎 | 警报和启动或恢复工作流程的批准/拒绝回复 | | `flow-in` | **发布** | `IncidentResource` (`flow-in-outgoing`) | 当 SRE 点击批准/拒绝时发送的批准/拒绝 CloudEvent | | `flow-out` | **发布** | Flow 领域发布者 | 工作流程 `emit`s(`incident.approval.required`、`incident.resolved`) | | `flow-out` | **消费** | `IncidentEventBridge` | 转发到仪表板 WebSocket | | `flow-lifecycle-out` | **发布** | Flow 生命周期发布者 | 每个任务的生存周期(`task.started/completed`、`workflow.started/completed`) | | `flow-lifecycle-out` | **消费** | `IncidentEventBridge` | 驱动每个卡片上的实时每步进度 | 发布是通过 `quarkus.flow.messaging.defaults-enabled=true`(域)和 `quarkus.flow.messaging.lifecycle-enabled=true`(生命周期)启用的。Flow 的领域发布者仅绑定到名为 **`flow-out`** 的通道。 ## 观察演示 ### 实时控制台(推荐) - `http://localhost:8080/` 一个轻量级仪表板(静态 HTML + 由 Flow `flow-out` 主题提供的 `@ServerEndpoint` WebSocket,没有 Quinoa/npm)在 `/console.html`。它从单个屏幕驱动 **所有三层**:运行 L1 ChatModel 和四个 L2 模式(同步结果卡片),并触发 L3 耐用工作流程 - 观察事件实时更新(`TRIAGING → ⏸ AWAITING APPROVAL → RESOLVED`)并直接从卡片中批准/拒绝破坏性补救措施。 实现:`com.acme.sre.messaging.IncidentEventBridge`(消费 `flow-out`/`flow-lifecycle-out`)+ `IncidentDashboardSocket`(`/ws/incidents`)+ `META-INF/resources/{index,console}.html`(着陆 + 控制台)。 ### 其他表面 - **应用程序日志** - 每个任务都会被跟踪;HITL 暂停显示 `Task 'waitSREApproval' started`(暂停),然后在批准后恢复。 - **Swagger UI**: - **Flow Dev UI**: - 注册的工作流程定义;代理 **拓扑** 页面可视化每个代理图。 - **WireMock Dev UI** - 检查外部集成的模拟映射。 ## 架构 相同的警报可以由三层中的任何一层处理;第 3 层重用第 2 层代理管道作为 Flow 任务,并用耐用性、HITL 和事件包装它。 ``` flowchart TD A([Alert Input]) --> L1 A --> L2 A --> L3 subgraph L1["Layer 1 - ChatModel"] AN["IncidentAnalyzer · @RegisterAiService
single stateless AI call"] end subgraph L2["Layer 2 - Agentic patterns (each one level deep)"] P["/parallel · EvidenceGatherer
@ParallelAgent: logs + metrics + deploy"] C["/conditional · SeverityRouter
@ConditionalAgent → DeepDiagnosis / LightTriage"] LP["/loop · DiagnosticLoopAgent
@LoopAgent + @ErrorHandler, exit when score ≥ 0.8"] CM["/commander · IncidentCommander
@SupervisorAgent → DB · K8s · Network · Cache"] end subgraph L3["Layer 3 - IncidentResponseFlow (durable · HITL · event-driven)"] direction TB D["function: agenticDiagnosis
(Layer 2 pipeline as a Flow task)"] M["get: fetchMetrics
→ MetricsSnapshot, withLiveMetrics"] SW{"remediation
destructive?"} AP["emit: approval.required"] WAIT["listen: waitSREApproval"] DEC{"approved?"} EX["post: executeRemediation"] SL["post: notifySlack"] PM["agent: postMortem
withPostMortem"] RES["emit: incidentResolved"] REJ["function: remediationRejected"] RSL["post: notifyRejectionSlack"] FIN(["END"]) D --> M --> SW SW -- yes --> AP --> WAIT --> DEC SW -- no --> EX DEC -- approved --> EX DEC -- rejected --> REJ --> RSL --> FIN EX --> SL --> PM --> RES end ``` 第 3 层工作流程的终端/HITL 状态被投影回 `Incident` JPA 记录(通过 `flow-out` CloudEvents),因此 `GET /incidents/{id}` 与实时控制台保持一致。 ## 项目结构 ``` src/main/java/com/acme/sre/ ├── ai/ │ ├── triage/IncidentAnalyzer Layer 1 - single @RegisterAiService │ ├── diagnostics/ Layer 2 - agentic patterns + leaf agents │ │ ├── EvidenceGatherer (@ParallelAgent), LogsAnalysisAgent, MetricsAnalysisAgent, DeployHistoryAgent │ │ ├── SeverityRouter (@ConditionalAgent), DeepDiagnosisAgent, LightTriageAgent, … │ │ ├── DiagnosticLoopAgent (@LoopAgent + @ErrorHandler), DiagnosticAgent, RemediationAgent, ConfidenceScorer │ │ ├── SeverityClassifier │ │ └── IncidentDiagnosisService plain CDI orchestration used by the Flow task │ ├── commander/ Layer 2 - @SupervisorAgent + 4 domain specialists │ ├── response/WorkflowPostMortemAgent Layer 3 - post-mortem agent (a Flow task) │ └── support/ lenient JSON hardening (ConfidenceScore deser) for agentic/Flow paths ├── flow/IncidentResponseFlow Layer 3 - the durable Workflow descriptor ├── api/ REST boundary │ ├── IncidentResource endpoints (alert / parallel / conditional / loop / commander / workflow / approve / reject / get), returns typed DTOs │ ├── dto/ response DTOs + IncidentView (entity never exposed) + ApiError │ └── ApiException · IncidentNotFoundException · WorkflowNotActiveException · ApiExceptionMapper ├── messaging/ IncidentEventBridge + IncidentDashboardSocket (live console) + IncidentProjectionUpdater + ApprovalEventPublisher (HITL CloudEvent) └── domain/ domain types, split by role ├── model/ JPA entities + enums (Alert, Incident, Severity, ActionType, IncidentStatus) ├── diagnosis/ agentic value objects (Diagnosis, Evidence, RemediationAction, ConfidenceScore, IncidentResult, MetricsSnapshot, …) └── contract/ boundary payloads (AlertInput, TriagePrompt, ApprovalResponse, SlackMessage/SlackAck, RemediationCommand/Rejection) src/main/resources/ ├── application.properties LLM, datasource, Flow persistence + exclude-workflows, Kafka channels └── META-INF/resources/{index,console}.html landing page + the all-layers live console src/test/resources/mappings/ WireMock stubs (Prometheus / deployment / Slack) docker-compose.yml optional persistent Postgres ``` ## 技术堆栈 - **Quarkus** 3.36.0(Java 25) - **Quarkus LangChain4j** 1.11.0.CR1(Agentic;OpenAI/NVIDIA + Ollama 提供商) - **Quarkus Flow** 0.10.0(`mvstore` 持久性、`messaging`、`langchain4j`) - **Quarkus REST**rest-jackson`,反应堆栈) - 类型化 DTO 端点 + 异常映射器 - **Hibernate ORM with Panache** 在 **PostgreSQL** 上 - `Incident`/`Alert` 读取模型 - **SmallRye Reactive Messaging (Kafka)** + **CloudEvents** - 事件总线(`flow-in` / `flow-out` / `flow-lifecycle-out`) - **WebSockets** - 实时控制台馈送 - **PostgreSQL**、**Kafka/Redpanda**、**WireMock** - 所有通过 Dev Services - **Ollama**(可选,`ollama` 配置文件)用于完全本地运行 ## 测试 ``` # 默认套件:确定性,无真实 LLM(代理豆/ChatModel/Flow 被模拟) ./mvnw test # 针对真实 LLM 的端到端(慢;被 `real-llm` JUnit 标签排除) ./mvnw test -Preal-llm ``` 默认套件(26 个测试)涵盖:宽松的置信度解析、`@LoopAgent` 退出条件和 `@ErrorHandler` 恢复逻辑(纯单元测试)、每个模式的第 2 层端点(并行 / 条件 / 循环 / 指挥官)与模拟的代理豆,第 3 层工作流程启动 + HITL 端点、工作流程→`Incident` 读取模型投影,以及 WireMock 模拟的集成。测试在 `test` 配置文件上运行(固定到 Ollama;不需要 NVIDIA 密钥)。 ## 崩溃恢复 在 HITL 处暂停的工作流程被持久化到 MVStore 文件,并在 JVM 的硬 `kill -9` 后继续存在。重启后,Flow 在其相关的 `flow-in` 批准事件到达时重新水化挂起的实例(恢复是事件驱动的,而不是 eager-on-boot)。使用 `/incidents/workflow/{workflowInstanceId}/approve` 端点在重启后通过实例 id 进行批准。`docker-compose.yml` 提供了一个可选的持久化 Postgres,如果您还希望 `Incident` 实体在 JVM 之外继续存在。 ## 相关指南 - Quarkus LangChain4j: - LangChain4j Agentic: - Quarkus Flow: - SmallRye Kafka:
标签:API网关, Flow, Kafka, LangChain4j, LLM评估, NVIDIA NIM, Ollama, OpenAI, PostgreSQL, Quarkus, Redpanda, SonarQube插件, SRE, WireMock, 事件驱动, 人工智能, 偏差过滤, 内存规避, 域名枚举, 多代理系统, 安全响应, 开源框架, 性能指标, 持续部署, 持续集成, 无状态服务, 测试用例, 状态管理, 用户模式Hook绕过, 监控, 网络调试, 自动化, 错误处理, 阈值