sanjuthomas/security-event-rag-demo
GitHub: sanjuthomas/security-event-rag-demo
基于金融安全事件的全本地化混合检索增强生成(RAG)系统演示,结合向量检索、BM25与知识图谱,通过自然语言查询 SSI 结算指令的生命周期变更记录。
Stars: 0 | Forks: 0
# 安全事件 RAG 演示
这是一个 monorepo,演示了如何使用完全本地化的容器化技术栈,构建一个**基于金融安全事件的检索增强生成 (RAG) 系统**。
其业务领域是资本市场中台背景下的 **SSI 结算路由模板** 生命周期管理。每一次指令变更——创建、提交、批准、拒绝、暂停、重新激活——都会被记录为结构化的安全事件,通过 Kafka 流式传输,索引到 Qdrant 和 Neo4j 中,并可通过由本地 Ollama LLM 驱动的自然语言聊天界面进行查询。
## 演示问题
您可以向聊天界面提出以下类型的问题:
- _是否存在互相批准对方指令的情况?_
- _Michael 拒绝的指令是谁创建的?_
- _今天创建了多少条指令?_
- _今天关于国际指令的 ALERT(警报)事件有多少?_
- _今天创建的指令中是否还有在等待批准的?_
- _你能向我展示与安全事件 ID `` 关联的指令吗?_
## 架构
```
flowchart TB
subgraph auth [Identity]
ZITADEL[ZITADEL :8080]
end
subgraph apps [Applications]
ILM[instruction-lifecycle-manager :8000]
HARNESS[test-harness :8091]
ETL[security-event-qdrant-etl :8090]
CHAT[security-event-chat :8092]
end
subgraph policy [Policy]
OPA[OPA :8181]
end
subgraph stores [Data stores]
MONGO[(MongoDB replica set)]
KAFKA[(Kafka)]
QD[(Qdrant :6333)]
NEO[(Neo4j :7474)]
end
subgraph ml [Host ML]
OLLAMA[Ollama — bge-m3 + qwen3]
end
ZITADEL --> ILM
ZITADEL --> HARNESS
ZITADEL --> ETL
ILM --> OPA
ILM --> MONGO
ILM --> KAFKA
KAFKA --> ETL
ETL -->|GET /api/v1/instructions/:id| ILM
ETL --> QD
ETL --> NEO
ETL --> OLLAMA
CHAT --> QD
CHAT --> NEO
CHAT --> OLLAMA
HARNESS --> ILM
```
### 数据流
1. **ILM** — 操作员创建/修改指令;OPA 授权该操作;指令版本和安全事件在**单个事务中**被写入 MongoDB;该事件被发布到 Kafka。
2. **ETL** — 消费每一条 Kafka 事件,以服务用户 `etl-reader` 身份进行身份验证,通过 ILM API 获取当前指令,构建合并后的丰富文档,更新 Neo4j 图节点/关系以及 Qdrant 混合检索点(dense + BM25)。
3. **Chat** — 针对用户的每一个问题,并行运行三个检索器(Qdrant 向量检索、Qdrant BM25 检索、Ollama 生成的 Cypher → Neo4j 检索),通过倒数排名融合 (RRF) 合并结果,并利用 Ollama 合成自然语言答案。当问题包含 UUID 时,pipeline 还会在两个存储中执行确定性的精确查找。
**防止循环:** ILM 通过 `SECURITY_EVENT_EXCLUDED_USER_IDS` 抑制了 `etl-reader` 的安全事件发送,因此 ETL 的读取丰富操作永远不会重新触发 Kafka。
## 服务
| URL | 服务 | 用途 |
|-----|---------|---------|
| http://localhost:8000/ui/ | ILM | 指令浏览器 |
| http://localhost:8000/ui/security-events/ | ILM | 实时安全事件监控器 (SSE) |
| http://localhost:8000/docs | ILM | OpenAPI |
| http://localhost:8090 | ETL | 搜索控制台 — 向量 / BM25 / 混合检索 / Neo4j |
| http://localhost:8091 | 测试工具 | 生成生命周期流量 |
| http://localhost:8092 | Chat | 自然语言问答 |
| http://localhost:7474/browser/ | Neo4j | 图浏览器 — `neo4j` / `devpassword` |
| http://localhost:8080 | ZITADEL | 身份提供者 |
## 组件
| 目录 | 角色 |
|-----------|------|
| `instruction-lifecycle-manager` | FastAPI 生命周期 API — OPA 授权,MongoDB 持久化(双时态版本控制),Kafka 安全事件发布,指令和安全事件 UI |
| `security-event-qdrant-etl` | Kafka 消费者 — 通过 ILM API 丰富事件 → Neo4j 图写入器 + Qdrant 混合索引器 + 搜索控制台 UI |
| `security-event-chat` | RAG 聊天 — 三重检索(向量 + BM25 + Cypher),RRF 合并,Ollama 答案合成 |
| `security-event-test-harness` | 通过 ZITADEL 身份验证的浏览器 UI,用于驱动 创建 → 提交 → 批准 / 拒绝 的生命周期 |
| `neo4j-graph-model` | 图 schema 文档,Cypher 约束/索引,查询示例 |
| `opa-policy-seed` | Rego 策略 — 审批矩阵,LOB 所有权,角色检查 |
| `zitadel-seed` | 演示用户种子数据 (`users.yaml`) — 中台 + FICC/FX/DESK 审批者 + ETL 服务账号 |
| `log-forwarder` | 可选的容器日志转发至 Kafka |
## 前置条件
| 要求 | 说明 |
|-------------|-------|
| Docker + Docker Compose | 所有容器均在 `docker-compose.yml` 中定义 |
| 宿主机上运行着 [Ollama](https://ollama.com) | ETL 和 Chat 需要;容器通过 `host.docker.internal:11434` 访问它 |
| 已拉取 `bge-m3:latest` 模型 | `ollama pull bge-m3:latest` |
| 已拉取聊天模型 | 默认:`qwen3:30b` — `ollama pull qwen3:30b`(可通过 `OLLAMA_CHAT_MODEL` 替换为任何模型) |
## 快速开始
```
# 在宿主机上拉取 Ollama models
ollama pull bge-m3:latest
ollama pull qwen3:30b # or any chat model you prefer
# 启动 full stack
docker compose up -d
# 注入 demo 用户(在 ZITADEL 初始化之后 —— 约 30 秒)
PAT=$(docker exec zitadel-login cat /zitadel/bootstrap/login-client.pat | tr -d '\n')
cd zitadel-seed && ZITADEL_PAT="$PAT" python3 seed.py
# 打开 test harness 并生成一些 lifecycle 流量
open http://localhost:8091
# 打开 chat 并开始提问
open http://localhost:8092
```
### 重置所有内容
```
docker compose down -v --remove-orphans
docker compose up -d
# 如上所述重新注入 ZITADEL 用户
```
## 演示用户
所有密码均为 `Password1!`。登录名遵循 `{user_id}@ssi.local` 格式。
| 用户 | 姓名 | 角色 | LOB |
|------|------|------|-----|
| `mo-100` | Sarah Chen | 分析师 — 中台创建者 | — |
| `mo-101` | James Patel | 分析师 — 中台创建者 | — |
| `mo-050` | David Okonkwo | 副总裁 (VP) — 中台创建者 | — |
| `mo-010` | Patricia Walsh | 董事总经理 (MD) — 中台创建者 | — |
| `ficc-201` | Michael Torres | 副总裁 (Associate) — 审批者 | FICC |
| `ficc-300` | Elena Vasquez | 副总裁 (VP) — 审批者 | FICC |
| `ficc-400` | Robert Kim | 董事总经理 (MD) — 审批者 | FICC |
| `ficc-500` | Caroline Nguyen | 合伙人 — 审批者 | FICC |
| `fx-201` | Amira Hassan | 副总裁 (Associate) — 审批者 | FX |
| `fx-300` | Lucas Berger | 副总裁 (VP) — 审批者 | FX |
| `rates-201` | Nina Johansson | 副总裁 (Associate) — 审批者 | DESK_RATES |
| `etl-reader` | — | 服务账号 — ETL 指令读取 | — |
## 指令模型
一条**指令**是一个 **SSI 结算路由模板** — 包含账户、代理链、货币和有效性。它**不是**支付报文;这里不包含金额、起息日或汇款信息。
```
instruction_type STANDING | SINGLE_USE
wire_scope DOMESTIC | INTERNATIONAL
currency ISO 4217 (e.g. USD, EUR)
funding_account source account
debtor / creditor legal entities
*_agent bank chain (ABA / BIC / CHIPS)
effective_date template validity start
end_date template validity end
```
生命周期:`DRAFT` → `PENDING` → `STANDING | SINGLE_USE` 或 `REJECTED` → `SUSPENDED` → 重新激活或 `USED`。
## Neo4j 图模型
ETL 围绕每个安全事件构建一个图:
```
flowchart TB
U1[User actor] -->|ACTED_AS| SE[SecurityEvent]
SE -->|TARGETS| I[Instruction]
SE -->|TARGETS_VERSION| IV[InstructionVersion]
SE -->|INVOLVES_LOB| PC[ProfitCenter]
I -->|HAS_VERSION| IV
I -->|CURRENT| IV
U2[User creator] -->|CREATED| IV
U3[User approver] -->|APPROVED| IV
U4[User rejector] -->|REJECTED| IV
```
Cypher 查询示例:
```
-- Instructions created today
MATCH (e:SecurityEvent {action: 'CREATE', outcome: 'success'})
WHERE date(datetime(e.timestamp)) = date()
RETURN count(DISTINCT e) AS total;
-- Who created instructions rejected by Michael (ficc-201)
MATCH (u:User {user_id: 'ficc-201'})-[:ACTED_AS]->(e:SecurityEvent {action: 'REJECT'})
MATCH (e)-[:TARGETS_VERSION]->(v:InstructionVersion)
RETURN e.event_id, v.creator_user_id, v.instruction_id
ORDER BY e.timestamp DESC;
-- Instruction linked to a specific security event
MATCH (e:SecurityEvent {event_id: $id})-[:TARGETS_VERSION]->(v:InstructionVersion)
RETURN v.instruction_id;
```
请查看 `neo4j-graph-model/relationships.cypher` 获取完整的属性目录。
## RAG pipeline 详情
```
User question
│
├─ UUID detected? ──► Exact Qdrant fetch + fixed Neo4j lookup (pinned to top of context)
│
├─► Qdrant dense vector search (bge-m3)
├─► Qdrant BM25 sparse search
└─► Ollama → Cypher → Neo4j
│
▼
RRF merge (k=60) + dedupe by event_id
│
▼
Ollama chat synthesis
```
Chat API 的响应包括生成的 Cypher 查询、图数据行、每个来源的耗时,以及标记为 `vector` / `bm25` / `neo4j` / `exact` 的来源卡片。
## 事务一致性
每一次指令变更(创建、更新、提交、批准、拒绝、暂停、重新激活、使用、删除)都会写入:
- 将指令版本写入 `ssi_cash_instructions.instructions`
- 将匹配的安全事件写入 `security_events.instruction-lifecycle-manager`
这都在**单个 MongoDB 多文档事务**中完成。Kafka 的发布仅在事务提交后发生。MongoDB 必须作为副本集运行 — `docker-compose.yml` 会自动初始化 `rs0`。
## 本地开发
```
# ILM API
cd instruction-lifecycle-manager && pip install -e .
uvicorn instruction_lifecycle_manager.main:app --reload --port 8000
# ETL + search console
cd security-event-qdrant-etl && pip install -e .
security-event-search # :8090
# Chat
cd security-event-chat && pip install -e .
security-event-chat # :8092
# Test harness
cd security-event-test-harness && pip install -e .
security-event-test-harness-ui # :8091
```
每个服务都从环境变量中读取配置(完整列表请参阅各自的 README)。需要本地的 MongoDB、Kafka、Qdrant、Neo4j、OPA、ZITADEL 和 Ollama。
## 仓库结构
```
.
├── docker-compose.yml
├── instruction-lifecycle-manager/ # ILM API + instruction / security UIs
├── security-event-qdrant-etl/ # Kafka ETL + search console
├── security-event-chat/ # RAG chat
├── security-event-test-harness/ # E2E test harness UI
├── neo4j-graph-model/ # Graph schema and example queries
├── opa-policy-seed/ # Rego policies
├── zitadel-seed/ # Demo user definitions
└── log-forwarder/ # Optional log → Kafka forwarder
```
每个应用目录都有自己的 README。
标签:AI风险缓解, DLL 劫持, RAG, 事件流处理, 大语言模型, 请求拦截, 资本市场业务, 逆向工具