JashanLabs/OpsStream-RAG

GitHub: JashanLabs/OpsStream-RAG

面向运维事件响应的实时 RAG 流水线,通过 Kafka 流式摄入操作事件并借助 pgvector 与混合检索实现上下文感知的故障排查。

Stars: 0 | Forks: 0

# OpsStream-RAG 一个用于事件响应的实时 RAG pipeline。通过 Kafka 将操作事件(GitHub commits、Jira tickets、系统日志)流式传输到 pgvector 知识库,然后使用混合检索和 LLM 生成来回答值班查询。 标准的 RAG pipeline 会按计划批量处理文档。这意味着在发生实时故障期间,导致该事件的 commits 尚未被索引。OpsStream-RAG 消除了这一差距 —— 事件在发生后的几秒钟内即被向量化并可查询。

OpsStream-RAG Dashboard

## 架构 六个服务,完全容器化。只需一个 `docker compose up --build -d` 命令即可运行整个技术栈。 ``` GitHub / Jira Webhooks On-Call Engineer │ │ ▼ ▼ ┌────────────────┐ ┌────────────────┐ │ API Gateway │ │ API Gateway │ │ POST /webhooks │ │ POST /ask │ │ (Pydantic) │ │ (RAG generate) │ └───────┬────────┘ └───────┬────────┘ │ │ ▼ │ ┌────────────────┐ │ │ Apache Kafka │ │ │ (3 partitions)│ │ └───────┬────────┘ │ │ │ ┌──────┼──────┐ │ ▼ ▼ ▼ │ Consumer Consumer Consumer │ (batch → embed → upsert) │ │ │ │ │ └──────┼──────┘ │ ▼ │ ┌────────────────┐ │ │ Embedding API │ │ │ Jina v2 local │ │ │ (768-dim) │ │ └───────┬────────┘ │ ▼ │ ┌────────────────┐ ┌────────┴───────┐ │ PostgreSQL 16 │◄─────────────────────┤ Retrieval API │ │ pgvector HNSW │ │ hybrid search │ │ + GIN FTS │ │ + time decay │ └────────────────┘ └────────────────┘ ``` | 服务 | 容器 | 端口 | 作用 | |---|---|---|---| | API Gateway | `opsstream_rag_api` | 8897 | 接收 webhook,生产消息到 Kafka。编排 RAG 答案生成。 | | Kafka | `opsstream_broker` | 9092 | KRaft 模式 broker。包含 3 个分区 topic 以实现并行消费。 | | Consumer | ×3 个副本 | — | 轮询 Kafka,批量处理事件,调用 embedding API,并 upsert 到 Postgres。 | | Embedding API | `opsstream_embedder` | 8000 | 通过 PyTorch 在本地运行 `jina-embeddings-v2-base-code`。模型已内置于 Docker 镜像中。 | | Retrieval API | `opsstream_retrieval` | 8085 | 支持时间衰减重排序的混合向量 + 关键词搜索。 | | PostgreSQL | `opsstream_db` | 5432 | 带有用于向量的 HNSW 索引和用于全文搜索的 GIN 索引的 pgvector。 | ## 摄入的工作原理 1. Webhook 触达 `POST /webhooks/github` 或 `POST /webhooks/jira` 2. Pydantic 验证 payload —— 格式错误的请求会被 HTTP 422 拒绝,绝不会触及 Kafka 3. 根据事件 ID 生成确定性的 UUID v5(相同的输入 → 每次都生成相同的 UUID) 4. 事件通过基于 key 的分区被生产到 Kafka topic `system-events` 5. API 立即返回 —— webhook 响应耗时 <5ms 在消费端: 6. 3 个 consumer 副本以 `enable.auto.commit: False` 模式轮询 Kafka 7. 事件会被缓冲,直到累积达到 10 条或经过 2 秒钟 8. 批次被发送到本地 embedding API(Jina v2,768 维向量) 9. 向量通过 `ON CONFLICT (id) DO NOTHING` 被 upsert 到 PostgreSQL 10. **仅在**数据库事务成功后,才会提交 Kafka offset 如果 consumer 在第 8 步到第 10 步之间崩溃,offset 将保持未提交状态,Kafka 会在重启时重放该批次。确定性的 UUID 和冲突保护机制确保不会有重复数据进入数据库。 ## 检索的工作原理 Retrieval API 针对 PostgreSQL 运行两个并行的基于索引的查询,然后合并结果: **语义搜索** —— 针对 `embedding` 列的 HNSW 索引,按余弦距离排序。捕获语义层面的相似性(_"database crashed"_ 匹配 _"PostgreSQL connection refused"_)。 **关键词搜索** —— 针对 `to_tsvector('english', content)` 的 GIN 索引,按 `ts_rank_cd` 排序。捕获向量搜索可能遗漏的精确匹配:例如 ticket ID `Refs #33173`、测试名称 `test_ForeignKey_using_to_field`、版本号。 这两个结果集会被合并并评分: ``` hybrid_score = 0.7 × semantic_score + 0.3 × keyword_score ``` 然后应用一个时间衰减因子来降低旧事件的权重: ``` final_score = hybrid_score × exp(−0.05 × age_in_days) ``` 这会产生大约 14 天的半衰期 —— 近期事件会自然浮现,而没有会丢弃潜在有用的旧上下文的硬性日期截断。衰减系数存在于应用层,因此对其进行调优不需要修改 schema 或重建索引。 在执行搜索之前,查询本身会先通过 LLM 调用进行优化,从自然语言输入中提取错误代码、服务名称和关键术语。这是一个尽力而为的步骤 —— 如果失败(速率限制、超时),将使用原始查询作为 fallback。 ## 故障处理 | 故障情况 | 处理结果 | |---|---| | PostgreSQL 宕机 | Consumer 以指数退避(5s → 10s → 20s)进行重试。Kafka offset 保持未提交,因此事件会保留在磁盘上并在数据库恢复时重放。 | | Embedding API 崩溃 | Consumer 记录错误并故意崩溃。Docker 的 `restart: unless-stopped` 会将其重新启动。未提交的 offset 会触发重放。 | | 错误的消息格式 | 如果 embedding API 返回 400/422(根本上属于坏数据),consumer 会将该批次路由到死信队列(Dead Letter Queue)topic,并提交 offset 以避免阻塞队列。 | | Consumer 副本死亡 | Kafka 检测到心跳丢失,触发分区重平衡,并将孤立的分区重新分配给存活的副本。 | | 查询时 LLM 速率限制 | API 以指数退避进行重试(最多 5 次)。Retrieval API 的查询优化步骤会平滑降级为使用原始查询。 | ## 快速开始 **要求:** Docker Desktop(Compose V2+),Python 3.11+,一个 [OpenRouter](https://openrouter.ai/) API key(免费额度即可)。 ``` git clone https://github.com/YOUR_USERNAME/OpsStream-RAG.git cd OpsStream-RAG ``` 创建 `.env`: ``` OPENROUTER_API_KEY=sk-or-v1-... LLM_MODEL=google/gemma-4-31b-it:free GITHUB_TOKEN=ghp_... # optional, raises GitHub API rate limit ``` 启动技术栈: ``` docker compose up --build -d ``` 首次构建需要几分钟时间(下载 PyTorch,将 Jina 模型内置到镜像中)。此后,启动将是瞬时的。 ### 加载数据 从 [Django](https://github.com/django/django) 仓库摄入 1,000 个 commits: ``` python -m venv venv venv\Scripts\activate pip install requests python-dotenv python scripts/ingest_historical_github.py ``` 验证: ``` docker exec opsstream_db psql -U admin -d incident_logs -c "SELECT count(*) FROM incident_logs;" ``` ### 查询 ``` curl -X POST http://localhost:8897/ask \ -H "Content-Type: application/json" \ -d '{"question": "Who resolved the parallel test database destruction issue on Windows?"}' ``` ### Streamlit 仪表板 ``` pip install streamlit psycopg[binary] pandas streamlit run scripts/app.py ``` 三个标签页: - **Copilot Chat** —— 提问,获取基于检索到的日志生成的答案 - **Vector Search Explorer** —— 检查原始检索分数(语义、关键词、时间衰减、最终结果) - **Timeline Analytics** —— 随时间变化的摄入率 ### 连接实时 webhook | 平台 | URL | Content Type | |---|---|---| | GitHub | `http://YOUR_HOST:8897/webhooks/github` | `application/json` | | Jira | `http://YOUR_HOST:8897/webhooks/jira` | `application/json` | ## 评估 内置评估测试工具,使用 LLM-as-a-judge 针对黄金数据集(`data/eval_set.json`)进行评估: ``` python scripts/evaluate.py ``` 衡量每个问题的四个指标: | 指标 | 定义 | |---|---| | Context Precision | 检索到的日志中实际相关部分所占的比例 | | Context Recall | 检索到的日志是否包含回答所需的所有信息 | | Faithfulness | 生成的答案是否源自上下文(非幻觉) | | Answer Relevance | 答案是否直接切中问题 | ## 项目结构 ``` OpsStream-RAG/ ├── api/ # Webhook ingestion + RAG orchestration │ ├── main.py # Pydantic models, Kafka producer, /ask endpoint │ └── Dockerfile ├── consumer/ # Kafka consumer workers (3 replicas) │ ├── main.py # Batching, embedding, upsert, DLQ routing │ └── Dockerfile ├── embedding_api/ # Local vector inference │ ├── main.py # Jina v2 via sentence-transformers │ └── Dockerfile # Model downloaded at build time ├── retrieval_api/ # Hybrid search engine │ ├── main.py # HNSW + GIN queries, time decay, query refinement │ └── Dockerfile ├── scripts/ │ ├── app.py # Streamlit dashboard │ ├── evaluate.py # RAG evaluation harness │ └── ingest_historical_github.py ├── data/ │ └── eval_set.json # Golden evaluation dataset ├── docker-compose.yaml # 6 services, 8 containers └── .env # API keys (gitignored) ``` ## License MIT
标签:AV绕过, FastAPI, IT运维, Kubernetes, PMD, RAG, Socks5代理, 凭据扫描, 向量数据库, 实时流处理, 测试用例, 版权保护, 软件成分分析, 逆向工具