beejak/agentwatch
GitHub: beejak/agentwatch
面向多智能体 AI 系统的观测优先安全平台,在工具调用级别追踪并归因协调失败、静默失败和跨层差异。
Stars: 1 | Forks: 0
```
█████╗ ██████╗ ███████╗███╗ ██╗████████╗
██╔══██╗██╔════╝ ██╔════╝████╗ ██║╚══██╔══╝
███████║██║ ███╗█████╗ ██╔██╗ ██║ ██║
██╔══██║██║ ██║██╔══╝ ██║╚██╗██║ ██║
██║ ██║╚██████╔╝███████╗██║ ╚████║ ██║
╚═╝ ╚═╝ ╚═════╝ ╚══════╝╚═╝ ╚═══╝ ╚═╝
██╗ ██╗ █████╗ ████████╗ ██████╗██╗ ██╗
██║ ██║██╔══██╗╚══██╔══╝██╔════╝██║ ██║
██║ █╗ ██║███████║ ██║ ██║ ███████║
██║███╗██║██╔══██║ ██║ ██║ ██╔══██║
╚███╔███╔╝██║ ██║ ██║ ╚██████╗██║ ██║
╚══╝╚══╝ ╚═╝ ╚═╝ ╚═╝ ╚═════╝╚═╝ ╚═╝
```
**面向多智能体 AI 系统的安全可观测性。**
*追踪每一次工具调用。归因协调失败。捕获静默失败和跨层差异。仅追加的取证记录。*
[](https://www.python.org/downloads/)
[](#testing)
[](LICENSE)
[](https://clickhouse.com)
## 这是什么
WatchTower 是一个面向多智能体 AI 系统的**观测优先**安全平台。它在工具调用级别对智能体执行进行插桩,并将产生的信号转化为取证答案:*哪个* 智能体失败了,*为什么* 失败,在调用树的 *什么位置*,以及失败 *是否* 被静默吞没。它被构建为 **16 个连续的层**,每一层在构建下一层之前都由测试把关。
它观察智能体系统的三个攻击面 —— 输入损坏、能力滥用和多智能体传染 —— 并将所有内容记录到仅追加的 Chronicle 中。
## 可观测性技术栈(16 层)
| 组件 | 功能 |
|-----------|-------------|
| `watchtower/core/` | 规范的 `Signal` 结构(仅定义一次)+ trace/event 模型 |
| `watchtower/discovery/` | 主动智能体发现;未知的智能体在发出信号前被标记 |
| `watchtower/receiver/` | 带有每次发射来源验证(HMAC)的信号摄取 |
| `watchtower/content_inspection/` | 注入 / 越狱 / 数据泄露模式检测(tier-0 过滤器) |
| `watchtower/memory_monitor/` | MINJA + SpAIware 内存完整性检测器 |
| `watchtower/chronicle/` | ClickHouse 仅追加事件存储(无 UPDATE/DELETE,90 天 TTL) |
| `watchtower/verdict/` | 3 阶段判决引擎(确定性 → 基线 → LLM 评判) |
| `watchtower/baseline/` | 基于智能体的 3σ 行为画像(在达到 50 条 trace 之前受限) |
| `watchtower/coord_sigs/` | MAST + 基础设施协调失败签名 |
| `watchtower/analyst/` | SC1 归因 · SC2 静默失败 · SC3 跨层差异 |
| `watchtower/interceptor/` | 停止 · 隔离 · 吊销内存(每个动作均被记录) |
| `watchtower/api/` | 基于 chronicle 和判决的 FastAPI 接口 |
| `watchtower/host_telemetry/` | Sysmon / Falco 宿主机事件关联(process_guid) |
有关完整的层/关卡/不变量规范,请参阅 [`SPEC.md`](SPEC.md);有关数据流、信号结构和 chronicle schema,请参阅 [`ARCHITECTURE.md`](ARCHITECTURE.md)。
## 技术栈
| 关注点 | 技术 |
|---------|-----------|
| 语言 / 运行时 | **Python 3.12**, `asyncio`(所有 I/O 均为异步) |
| 数据模型 / 验证 | **Pydantic v2** — 规范的 `Signal` 结构,仅定义一次 |
| API | **FastAPI** + **Uvicorn**(生命周期管理) |
| Chronicle(审计存储) | **ClickHouse** — MergeTree,仅追加,90 天 TTL |
| 信号传输 / 总线 | **Redis** 流 (`wt:signals`, `wt:interceptor`) + 发布/订阅 (`wt:memory_events`) |
| 行为基线 / 策略 | **PostgreSQL** (pgvector 镜像) |
| 访问图 / 爆炸半径 | **Neo4j** (bolt) |
| Tracing(可选) | **Langfuse** |
| 完整性 | **HMAC-SHA256** 信号签名;兼容 OTel 的信号字段 |
| 判决 LLM 评判器(采样,非核心路径) | 兼容 OpenAI 的客户端(例如 DeepSeek) |
| 开发 / CI | `uv`/`venv`, **Docker Compose**, `pytest`+`pytest-asyncio`+coverage, **GitHub Actions** |
## 架构
被动、仅追加的 side-car:智能体发出 HMAC 签名的信号;WatchTower 负责消费、分析和记录 —— 而不会修改智能体。
```
flowchart TD
A["AI agents (multi-agent system)"] -->|"HMAC-signed Signals"| R["Redis stream wt:signals"]
R --> RX["Receiver L06 — HMAC verify"]
RX --> CI["Content Inspection L05"]
RX --> MM["Memory Integrity Monitor L07"]
RX --> DISC["Discovery L02"]
CI --> CH[("Chronicle L08 — ClickHouse append-only")]
MM --> CH
DISC --> CH
RX --> CH
CH --> VB["Verdict L09 + Baseline L10 + Coord-sigs L11"]
VB --> AN["Analyst L12 — SC1 attribution / SC2 silent / SC3 cross-layer"]
HOST["Host telemetry — Sysmon / Falco"] -->|"process_guid"| COR["Correlator L16"]
COR --> AN
AN --> IN["Interceptor L13 — halt / quarantine / revoke"]
IN -->|"every action logged"| CH
CH --> API["FastAPI L15 — /docs"]
NEO["Access graph — Neo4j"] -.->|"permission / blast radius"| VB
PG["Baseline + policy — PostgreSQL"] -.->|"profiles"| VB
```
## 工作原理
1. **发射。** 每个智能体步骤(工具调用、LLM 调用、移交、内存操作)都作为
`Signal`(20 个兼容 OTel 的字段)发出,并 **HMAC 签名**后推送到 Redis 的 `wt:signals` 流中。
2. **验证 + 分发。** Receiver 检查 *每个* 信号的 HMAC(如果被篡改则拒绝),
然后分发给发现、内容检查和内存完整性监视器。
3. **记录。** 所有数据都会进入 **仅追加的 Chronicle**(ClickHouse)中 ——
永远没有 UPDATE,也没有 DELETE。这是取证的真实来源。
4. **评判 + 画像。** 判决引擎(确定性 → 基线 → 采样的 LLM 评判器)、
基于智能体的行为基线和协调签名库在 chronicle 上运行。
5. **分析。** Analyst 回答三个取证问题 —— **SC1**(将协调失败归因于智能体/动作/调用树位置),
**SC2**(报告成功的静默失败),**SC3**(智能体自述与宿主机遥测的对比)。
6. **行动 + 记录。** Interceptor 可以停止/隔离/吊销内存;每个动作本身也会被记录。
FastAPI 接口暴露 traces、analyst 结果和 interceptor 动作。
## 运行所需条件
- **Python 3.12** + 该安装包(在 venv 中执行 `pip install -e ".[dev]"`)。
- **后端服务**(通过 `docker compose up -d` 或等效方式):Redis、ClickHouse、
PostgreSQL、Neo4j。(对于 chronicle 测试,使用无 root 的单二进制 ClickHouse 即可。)
- **配置由环境变量驱动**(`watchtower/config.py`):通过
`CH_HOST`/`CH_PORT`/`CH_DB`/`CH_USER`/`CH_PASS`, `REDIS_URL`, `PG_DSN`,
`NEO4J_URI`/`NEO4J_USER`/`NEO4J_PASS`,以及 `WT_HMAC_SECRET`(在发射信号的智能体
和 Receiver 之间共享)指向任何基础设施。默认值与 docker-compose 匹配,实现零配置本地开发。
- **智能体端发射器**:智能体必须将 `Signal` 发布到 `wt:signals`(可以使用 OTel exporter 或
`agents/synthetic/` 中的轻量级发射器)。
- **可选**:用于采样判决评判器的兼容 OpenAI 的 `LLM_API_KEY`;为 SC3 关联器
提供数据的宿主机遥测(Sysmon/Falco)。
## 网络 / 部署
```
flowchart LR
subgraph AGENTHOST["Agent host(s)"]
AGENTS["Agents + HMAC/OTel emitter"]
HTEL["Host telemetry (Sysmon/Falco)"]
end
subgraph WTSVC["WatchTower (sidecar / service)"]
RCV["Receiver + 16 layers"]
APISVC["FastAPI :8000"]
end
subgraph INFRA["Backing services (docker compose)"]
REDIS["Redis :6379"]
CHDB["ClickHouse :8123 / :9000"]
PG["PostgreSQL :5432"]
NEO["Neo4j :7687 / :7474"]
LF["Langfuse :3000 (optional)"]
end
AGENTS -->|"Signals → wt:signals"| REDIS
HTEL -->|"host events"| RCV
REDIS --> RCV
RCV --> CHDB
RCV --> PG
RCV --> NEO
RCV -. optional .-> LF
APISVC --> CHDB
OPER["Operator / dashboard"] -->|"HTTP"| APISVC
```
## 快速开始
```
# 1. Infrastructure
docker compose up -d redis clickhouse postgres neo4j
# 2. Install
python3 -m venv .venv && source .venv/bin/activate
pip install -e ".[dev]"
# 3. 运行所有 gate 测试(每层一个,gate 优先,首次失败时停止)
make gate-all
# 4. 完整套件(220 个测试,包含 coverage)
make test
# 5. Proof 场景(SC1 coordination · SC2 silent failure · SC3 cross-layer)
make poc
# 6. API
make api # http://localhost:8000/docs
```
## 插桩智能体并查看运行效果
使用 SDK(`watchtower/emitter.py`)从任何智能体**发射信号**:
```
from watchtower.emitter import SignalEmitter
em = await SignalEmitter("orchestrator").start() # sink="chronicle" (default) or "redis"
root = await em.emit("delegate", trace_id=t, summary="plan + fan out")
await em.emit("tool_use", trace_id=t, agent_id="worker-b",
parent_span_id=root.span_id, status="error", summary="schema mismatch")
await em.flush()
```
信号经过 HMAC 签名。`sink="redis"` 会发布到 `wt:signals` 流(生产环境的
Receiver 路径);`sink="chronicle"` 直接写入仅追加的存储中。
**运行端到端演示**(需要 ClickHouse)—— 发射一个 3 智能体场景,并打印出
输出级别监视器无法获取的取证答案:
```
make demo
```
```
SC1 coordination-failure attribution → failing agent: worker-b (Conflicting Parallel Outputs)
SC2 silent-failure detection → infinite_retry_loop (12 spans, no errors)
SC3 cross-layer discrepancy → agent reported 1, host observed 3, delta 2 (high)
```
完整报告保存至 [`examples/sample_output.json`](examples/sample_output.json)。验证
数据位于 `eval/`(冻结语料库 + 留出指标);请参阅 [`docs/TESTING.md`](docs/TESTING.md)。
**接近真实世界的验证:** `make capture-tier1-llm` 运行一个 **真实的 LLM 智能体**
(DeepSeek),其涌现行为通过 mitmproxy(独立观察者)生成真实的 HTTP 出口流量 ——
在未经过脚本编排的流量中检测到了 SC2/SC3。请参阅
[`docs/REAL_TRAFFIC_VALIDATION.md`](docs/REAL_TRAFFIC_VALIDATION.md)。当设置了 `LLM_API_KEY` 时,判决引擎的
LLM 评判器使用真实的 LLM(否则使用确定性桩程序)。
## 测试
| 命令 | 范围 |
|---------|-------|
| `make gate-NN` | 单层关卡(例如 `make gate-08`) |
| `make gate-all` | 按顺序执行所有关卡,遇到首次失败即停止 |
| `make poc` | SC1 + SC2 + SC3 概念验证场景 |
| `make test` | 完整测试套件(220 个通过)及覆盖率 |
| `make benchmark` | LangSmith 差距对比 |
CI 会在每次推送和 PR 时,针对运行中的 Redis / ClickHouse / Postgres / Neo4j 服务容器运行关卡和概念验证场景。
## 安全不变量
```
✦ Chronicle is APPEND-ONLY. No UPDATE. No DELETE. Ever.
✦ Signal origin is verified by the Receiver on every emission, not just the first.
✦ Verdict always carries score + source + reason — all three, always.
✦ Interceptor logs every action to the Chronicle. Never silent.
✦ Policy Engine is DEFAULT-DENY. Must be permitted, not merely not-forbidden.
✦ A new agent runs in restricted mode until 50 traces exist in its baseline.
✦ Memory writes are intercepted by the Memory Integrity Monitor before Chronicle.
✦ The LLM judge receives the Trace Summariser's output, never the raw trace.
```
## 基础设施
| 组件 | 作用 | 端口 |
|-----------|------|------|
| Redis | 信号流,interceptor 总线 | 6379 |
| ClickHouse | Chronicle —— 仅追加,90 天 TTL | 8123 |
| PostgreSQL | 行为基线,策略存储 | 5432 |
| Neo4j | 智能体信任拓扑,爆炸半径 | 7687 |
如果没有 Docker,使用无 root 的单二进制 ClickHouse 即可进行 chronicle 测试。
## 项目结构
```
watchtower/ 16-layer observability stack
tests/
├── gates/ one gate test per layer
├── poc/ SC1 / SC2 / SC3 proof scenarios
├── scenarios/ attack scenario tests
├── benchmark/ comparison harness
└── harness/ shared test harness
agents/
├── agentic_tester/ LLM-driven adversarial tester for the detectors
├── synthetic/ synthetic agent traffic generator
└── adversarial/ adversarial trace generators
paper/ research paper (replicated in the firewall repo)
docs/ documentation suite
```
## 研究论文
**论文 1 — *WatchTower: Observation-First Forensics for Multi-Agent AI Systems***(作者:Rohit
Jinsiwale)位于 [`paper/observability.pdf`](paper/observability.pdf) 中。其核心论点是:*智能体
的自述并非事实真相。* 其评估完全基于本代码库中的冻结语料库和测试工具
([`eval/`](eval/))—— 留出拆分,2000 个样本的自助法置信区间。
主要结果(来自 [`eval/results/`](eval/results/)):
| 检测器 | WatchTower 召回率 | 自述基线 | 备注 |
|----------|------------------:|---------------------:|-------|
| **SC2** 静默失败 —— 合成 (n=269) | **0.86** [0.78–0.93] | 0.00 | 精确率 0.85,FPR 0.07;朴素成本基线 0.27 |
| **SC2** 静默失败 —— 真实流量 (n=120) | **1.00** | 0.00 | 在 HTTP 代理后捕获的实时 LLM 智能体 |
| **SC3** 跨层差异 (n=269) | **1.00** [1.00–1.00] | 0.00 | 精确率 1.00 |
**开销**(380 条 traces / 5,171 个 spans,单核 CPU,无 GPU):每条 trace 的 p99 —— SC2 0.035 ms,
SC3 0.019 ms,SC1 0.224 ms;**66,647 spans/秒**(可内联部署)。**既定局限性:** SC1
级联情况下的因果根归因约为 0.5(将其归因于第一个错误 span,而非真正的根因)。有关该论文的社交/宣发文案位于 [`paper/SHARE.md`](paper/SHARE.md)。
## 引用
```
@misc{watchtower2026,
title = {WatchTower: Observation-First Forensics for Multi-Agent AI Systems},
author = {Rohit Jinsiwale},
year = {2026},
note = {Under submission. Code + data: https://github.com/beejak/agentwatch ·
Enforcement (Paper 2): https://github.com/beejak/agentwatch-firewall}
}
```
## 贡献
[CONTRIBUTING.md](CONTRIBUTING.md) — 添加检测器、签名和新层。
## License
MIT标签:AI智能体, API集成, ClickHouse, LLM监控, Python, 人工智能, 可观测性, 搜索引擎查询, 无后门, 测试用例, 版权保护, 用户模式Hook绕过