sanjuthomas/security-event-rag-demo

GitHub: sanjuthomas/security-event-rag-demo

基于金融安全事件的全本地化混合检索增强生成(RAG)系统演示,结合向量检索、BM25与知识图谱,通过自然语言查询 SSI 结算指令的生命周期变更记录。

Stars: 0 | Forks: 0

# 安全事件 RAG 演示 这是一个 monorepo,演示了如何使用完全本地化的容器化技术栈,构建一个**基于金融安全事件的检索增强生成 (RAG) 系统**。 其业务领域是资本市场中台背景下的 **SSI 结算路由模板** 生命周期管理。每一次指令变更——创建、提交、批准、拒绝、暂停、重新激活——都会被记录为结构化的安全事件,通过 Kafka 流式传输,索引到 Qdrant 和 Neo4j 中,并可通过由本地 Ollama LLM 驱动的自然语言聊天界面进行查询。 ## 演示问题 您可以向聊天界面提出以下类型的问题: - _是否存在互相批准对方指令的情况?_ - _Michael 拒绝的指令是谁创建的?_ - _今天创建了多少条指令?_ - _今天关于国际指令的 ALERT(警报)事件有多少?_ - _今天创建的指令中是否还有在等待批准的?_ - _你能向我展示与安全事件 ID `` 关联的指令吗?_ ## 架构 ``` flowchart TB subgraph auth [Identity] ZITADEL[ZITADEL :8080] end subgraph apps [Applications] ILM[instruction-lifecycle-manager :8000] HARNESS[test-harness :8091] ETL[security-event-qdrant-etl :8090] CHAT[security-event-chat :8092] end subgraph policy [Policy] OPA[OPA :8181] end subgraph stores [Data stores] MONGO[(MongoDB replica set)] KAFKA[(Kafka)] QD[(Qdrant :6333)] NEO[(Neo4j :7474)] end subgraph ml [Host ML] OLLAMA[Ollama — bge-m3 + qwen3] end ZITADEL --> ILM ZITADEL --> HARNESS ZITADEL --> ETL ILM --> OPA ILM --> MONGO ILM --> KAFKA KAFKA --> ETL ETL -->|GET /api/v1/instructions/:id| ILM ETL --> QD ETL --> NEO ETL --> OLLAMA CHAT --> QD CHAT --> NEO CHAT --> OLLAMA HARNESS --> ILM ``` ### 数据流 1. **ILM** — 操作员创建/修改指令;OPA 授权该操作;指令版本和安全事件在**单个事务中**被写入 MongoDB;该事件被发布到 Kafka。 2. **ETL** — 消费每一条 Kafka 事件,以服务用户 `etl-reader` 身份进行身份验证,通过 ILM API 获取当前指令,构建合并后的丰富文档,更新 Neo4j 图节点/关系以及 Qdrant 混合检索点(dense + BM25)。 3. **Chat** — 针对用户的每一个问题,并行运行三个检索器(Qdrant 向量检索、Qdrant BM25 检索、Ollama 生成的 Cypher → Neo4j 检索),通过倒数排名融合 (RRF) 合并结果,并利用 Ollama 合成自然语言答案。当问题包含 UUID 时,pipeline 还会在两个存储中执行确定性的精确查找。 **防止循环:** ILM 通过 `SECURITY_EVENT_EXCLUDED_USER_IDS` 抑制了 `etl-reader` 的安全事件发送,因此 ETL 的读取丰富操作永远不会重新触发 Kafka。 ## 服务 | URL | 服务 | 用途 | |-----|---------|---------| | http://localhost:8000/ui/ | ILM | 指令浏览器 | | http://localhost:8000/ui/security-events/ | ILM | 实时安全事件监控器 (SSE) | | http://localhost:8000/docs | ILM | OpenAPI | | http://localhost:8090 | ETL | 搜索控制台 — 向量 / BM25 / 混合检索 / Neo4j | | http://localhost:8091 | 测试工具 | 生成生命周期流量 | | http://localhost:8092 | Chat | 自然语言问答 | | http://localhost:7474/browser/ | Neo4j | 图浏览器 — `neo4j` / `devpassword` | | http://localhost:8080 | ZITADEL | 身份提供者 | ## 组件 | 目录 | 角色 | |-----------|------| | `instruction-lifecycle-manager` | FastAPI 生命周期 API — OPA 授权,MongoDB 持久化(双时态版本控制),Kafka 安全事件发布,指令和安全事件 UI | | `security-event-qdrant-etl` | Kafka 消费者 — 通过 ILM API 丰富事件 → Neo4j 图写入器 + Qdrant 混合索引器 + 搜索控制台 UI | | `security-event-chat` | RAG 聊天 — 三重检索(向量 + BM25 + Cypher),RRF 合并,Ollama 答案合成 | | `security-event-test-harness` | 通过 ZITADEL 身份验证的浏览器 UI,用于驱动 创建 → 提交 → 批准 / 拒绝 的生命周期 | | `neo4j-graph-model` | 图 schema 文档,Cypher 约束/索引,查询示例 | | `opa-policy-seed` | Rego 策略 — 审批矩阵,LOB 所有权,角色检查 | | `zitadel-seed` | 演示用户种子数据 (`users.yaml`) — 中台 + FICC/FX/DESK 审批者 + ETL 服务账号 | | `log-forwarder` | 可选的容器日志转发至 Kafka | ## 前置条件 | 要求 | 说明 | |-------------|-------| | Docker + Docker Compose | 所有容器均在 `docker-compose.yml` 中定义 | | 宿主机上运行着 [Ollama](https://ollama.com) | ETL 和 Chat 需要;容器通过 `host.docker.internal:11434` 访问它 | | 已拉取 `bge-m3:latest` 模型 | `ollama pull bge-m3:latest` | | 已拉取聊天模型 | 默认:`qwen3:30b` — `ollama pull qwen3:30b`(可通过 `OLLAMA_CHAT_MODEL` 替换为任何模型) | ## 快速开始 ``` # 在宿主机上拉取 Ollama models ollama pull bge-m3:latest ollama pull qwen3:30b # or any chat model you prefer # 启动 full stack docker compose up -d # 注入 demo 用户(在 ZITADEL 初始化之后 —— 约 30 秒) PAT=$(docker exec zitadel-login cat /zitadel/bootstrap/login-client.pat | tr -d '\n') cd zitadel-seed && ZITADEL_PAT="$PAT" python3 seed.py # 打开 test harness 并生成一些 lifecycle 流量 open http://localhost:8091 # 打开 chat 并开始提问 open http://localhost:8092 ``` ### 重置所有内容 ``` docker compose down -v --remove-orphans docker compose up -d # 如上所述重新注入 ZITADEL 用户 ``` ## 演示用户 所有密码均为 `Password1!`。登录名遵循 `{user_id}@ssi.local` 格式。 | 用户 | 姓名 | 角色 | LOB | |------|------|------|-----| | `mo-100` | Sarah Chen | 分析师 — 中台创建者 | — | | `mo-101` | James Patel | 分析师 — 中台创建者 | — | | `mo-050` | David Okonkwo | 副总裁 (VP) — 中台创建者 | — | | `mo-010` | Patricia Walsh | 董事总经理 (MD) — 中台创建者 | — | | `ficc-201` | Michael Torres | 副总裁 (Associate) — 审批者 | FICC | | `ficc-300` | Elena Vasquez | 副总裁 (VP) — 审批者 | FICC | | `ficc-400` | Robert Kim | 董事总经理 (MD) — 审批者 | FICC | | `ficc-500` | Caroline Nguyen | 合伙人 — 审批者 | FICC | | `fx-201` | Amira Hassan | 副总裁 (Associate) — 审批者 | FX | | `fx-300` | Lucas Berger | 副总裁 (VP) — 审批者 | FX | | `rates-201` | Nina Johansson | 副总裁 (Associate) — 审批者 | DESK_RATES | | `etl-reader` | — | 服务账号 — ETL 指令读取 | — | ## 指令模型 一条**指令**是一个 **SSI 结算路由模板** — 包含账户、代理链、货币和有效性。它**不是**支付报文;这里不包含金额、起息日或汇款信息。 ``` instruction_type STANDING | SINGLE_USE wire_scope DOMESTIC | INTERNATIONAL currency ISO 4217 (e.g. USD, EUR) funding_account source account debtor / creditor legal entities *_agent bank chain (ABA / BIC / CHIPS) effective_date template validity start end_date template validity end ``` 生命周期:`DRAFT` → `PENDING` → `STANDING | SINGLE_USE` 或 `REJECTED` → `SUSPENDED` → 重新激活或 `USED`。 ## Neo4j 图模型 ETL 围绕每个安全事件构建一个图: ``` flowchart TB U1[User actor] -->|ACTED_AS| SE[SecurityEvent] SE -->|TARGETS| I[Instruction] SE -->|TARGETS_VERSION| IV[InstructionVersion] SE -->|INVOLVES_LOB| PC[ProfitCenter] I -->|HAS_VERSION| IV I -->|CURRENT| IV U2[User creator] -->|CREATED| IV U3[User approver] -->|APPROVED| IV U4[User rejector] -->|REJECTED| IV ``` Cypher 查询示例: ``` -- Instructions created today MATCH (e:SecurityEvent {action: 'CREATE', outcome: 'success'}) WHERE date(datetime(e.timestamp)) = date() RETURN count(DISTINCT e) AS total; -- Who created instructions rejected by Michael (ficc-201) MATCH (u:User {user_id: 'ficc-201'})-[:ACTED_AS]->(e:SecurityEvent {action: 'REJECT'}) MATCH (e)-[:TARGETS_VERSION]->(v:InstructionVersion) RETURN e.event_id, v.creator_user_id, v.instruction_id ORDER BY e.timestamp DESC; -- Instruction linked to a specific security event MATCH (e:SecurityEvent {event_id: $id})-[:TARGETS_VERSION]->(v:InstructionVersion) RETURN v.instruction_id; ``` 请查看 `neo4j-graph-model/relationships.cypher` 获取完整的属性目录。 ## RAG pipeline 详情 ``` User question │ ├─ UUID detected? ──► Exact Qdrant fetch + fixed Neo4j lookup (pinned to top of context) │ ├─► Qdrant dense vector search (bge-m3) ├─► Qdrant BM25 sparse search └─► Ollama → Cypher → Neo4j │ ▼ RRF merge (k=60) + dedupe by event_id │ ▼ Ollama chat synthesis ``` Chat API 的响应包括生成的 Cypher 查询、图数据行、每个来源的耗时,以及标记为 `vector` / `bm25` / `neo4j` / `exact` 的来源卡片。 ## 事务一致性 每一次指令变更(创建、更新、提交、批准、拒绝、暂停、重新激活、使用、删除)都会写入: - 将指令版本写入 `ssi_cash_instructions.instructions` - 将匹配的安全事件写入 `security_events.instruction-lifecycle-manager` 这都在**单个 MongoDB 多文档事务**中完成。Kafka 的发布仅在事务提交后发生。MongoDB 必须作为副本集运行 — `docker-compose.yml` 会自动初始化 `rs0`。 ## 本地开发 ``` # ILM API cd instruction-lifecycle-manager && pip install -e . uvicorn instruction_lifecycle_manager.main:app --reload --port 8000 # ETL + search console cd security-event-qdrant-etl && pip install -e . security-event-search # :8090 # Chat cd security-event-chat && pip install -e . security-event-chat # :8092 # Test harness cd security-event-test-harness && pip install -e . security-event-test-harness-ui # :8091 ``` 每个服务都从环境变量中读取配置(完整列表请参阅各自的 README)。需要本地的 MongoDB、Kafka、Qdrant、Neo4j、OPA、ZITADEL 和 Ollama。 ## 仓库结构 ``` . ├── docker-compose.yml ├── instruction-lifecycle-manager/ # ILM API + instruction / security UIs ├── security-event-qdrant-etl/ # Kafka ETL + search console ├── security-event-chat/ # RAG chat ├── security-event-test-harness/ # E2E test harness UI ├── neo4j-graph-model/ # Graph schema and example queries ├── opa-policy-seed/ # Rego policies ├── zitadel-seed/ # Demo user definitions └── log-forwarder/ # Optional log → Kafka forwarder ``` 每个应用目录都有自己的 README。
标签:AI风险缓解, DLL 劫持, RAG, 事件流处理, 大语言模型, 请求拦截, 资本市场业务, 逆向工具