andreas-altamirano/cross-system-correlator
GitHub: andreas-altamirano/cross-system-correlator
一个轻量级的 Python 流式模式检测库,用于捕获 AI Agent 跨系统多步骤操作中单操作授权器无法察觉的序列风险。
Stars: 0 | Forks: 0
# cross-system-correlator
**用于跨系统 AI agent 操作的流式模式检测。**
捕获单操作授权器无法察觉的多步骤风险。每个独立的工具调用在单独执行时可能都会通过任何策略检查。该关联器会观察随时间推移的序列,并呈现出以下模式:*此操作然后那个操作,在 X 分钟内,由同一个 agent 执行*。
```
Agent actions flow in:
─────────────────────────────────────────────────────►
notion.read_sensitive(page_secret) ◄── normal
gmail.send(lawyer@external.com) ◄── normal
─────────────────────────────────────────────────────►
⚠️ EXFILTRATION
read sensitive → send external
same agent, within 25s
```
[](https://opensource.org/licenses/Apache-2.0)
[](https://www.python.org/downloads/)
## 为什么需要它
Agent 的工具调用通常是一次性授权的。读取就是读取;发送就是发送。两者单独来看可能都没问题。
但许多真实的 agent 风险都是**序列**行为:
- **数据泄露** — 读取敏感数据,然后发送到外部地址
- **权限提升** — 检查存在哪些权限,然后立即使用破坏性权限
- **凭证滥用** — 读取 secret,然后进行出站网络调用
- **失控循环** — 在 30 秒内执行 N 次破坏性操作
单操作授权器(每次工具调用允许/拒绝,包括 [Wave engine](https://github.com/andreas-altamirano/wave-engine) 和大多数企业级治理产品)无法察觉这些风险,因为每个单独的步骤看起来都没问题。你需要一层专门监视*数据流*的机制。
这个库就是那一层机制。小巧、确定性、无依赖,即插即用。
## 安装
```
pip install cross-system-correlator
```
Python 3.9+。零运行时依赖。
## 快速开始
```
from correlator import (
Correlator, AgentAction, exfiltration_pattern,
)
# 使用一个或多个 pattern 构建 correlator
correlator = Correlator(patterns=[exfiltration_pattern()])
# 注册 alert handler
correlator.on_match(lambda m: print(f"ALERT [{m.severity.value}]: {m.pattern_name}"))
# 将每个 agent action 通过 observe() 传递
correlator.observe(AgentAction(
agent_id="research-agent",
system="notion",
action="read_page",
action_class="read_sensitive",
target="page_12345",
))
correlator.observe(AgentAction(
agent_id="research-agent",
system="gmail",
action="send",
target="lawyer@external.com",
metadata={"external": True},
))
# → ALERT [high]: possible_exfiltration
```
## AgentAction 是什么样的
该关联器是系统无关的。向其输入任何事件流,只需确保每个事件至少包含:agent 身份、系统、操作名称和时间戳。可选但有用的字段:`action_class`(高级类别)和 `target`(资源 ID、接收者等)。
```
AgentAction(
agent_id="research-agent",
system="notion",
action="read_page",
action_class="read_sensitive", # optional category
target="page_12345", # optional target
metadata={"external": True}, # arbitrary extra fields
timestamp=time.time(), # defaults to now
)
```
`action_class` 字段是使模式能够跨系统移植的关键。只要匹配到 `read_sensitive`,无论该敏感读取是来自 Notion、S3、Postgres 还是你标记的任何其他位置,模式都会触发。
## 预置模式
该库内置了六种可直接使用的模式:
| 模式 | 严重程度 | 捕获内容 |
|---|---|---|
| `exfiltration_pattern()` | HIGH | 敏感读取 → 外部发送(同一 agent) |
| `data_egress_pattern()` | HIGH | 敏感读取 → 云存储写入(同一 agent) |
| `credential_then_external_pattern()` | HIGH | 凭证读取 → 出站网络调用 |
| `cross_agent_data_handoff_pattern()` | HIGH | 由一个 agent 进行的敏感读取 → 由**任何** agent 进行的外部发送 |
| `privilege_escalation_pattern()` | MEDIUM | 权限检查 → 破坏性操作 |
| `rapid_destructive_burst_pattern()` | CRITICAL | 30 秒内发生 3 次破坏性操作 |
选择适合你环境的模式直接使用:
```
from correlator import (
Correlator,
exfiltration_pattern,
privilege_escalation_pattern,
rapid_destructive_burst_pattern,
)
correlator = Correlator(patterns=[
exfiltration_pattern(window_seconds=120),
privilege_escalation_pattern(window_seconds=60),
rapid_destructive_burst_pattern(),
])
```
每个模式都有可调参数(时间窗口等)。
## 定义你自己的模式
一个 `Pattern` 是一个包含 `PatternStep` 的有序列表再加上一个时间窗口:
```
from correlator import (
Pattern, PatternStep, Severity,
match_system, match_action, match_all, match_target_equals_context,
capture_target,
)
share_after_read = Pattern(
name="document_shared_after_read_by_same_agent",
description="Same agent reads then shares the same document.",
severity=Severity.MEDIUM,
window_seconds=300,
agent_scope="same", # both steps must come from same agent_id
steps=[
PatternStep(
name="read_doc",
matcher=match_all(match_system("notion"), match_action("read")),
capture=capture_target("doc"), # remember which doc
),
PatternStep(
name="share_same_doc",
matcher=match_all(
match_system("notion"),
match_action("share"),
match_target_equals_context("doc"), # must match captured doc
),
),
],
)
```
模式引擎会处理排序、agent 作用域、时间窗口以及步骤间的上下文传递。你只需声明要查找的内容即可。
## 匹配器工具包
它们可以组合成步骤匹配器,无需编写 lambda:
```
match_system("notion", "s3") # glob patterns
match_action("read*", "delete_*")
match_action_class("read_sensitive")
match_target_prefix("@external")
match_target_regex(r"prod-.*-bucket")
match_target_equals_context("doc") # equals a capture from earlier step
match_metadata_equals("external", True)
match_metadata("amount", lambda v: v > 1000)
match_all(match_system("aws"), match_action_class("destructive"))
match_any(match_system("gmail"), match_system("slack"))
```
匹配器只是一个 `Callable[[AgentAction, dict], bool]`。如果需要,你可以编写自己的匹配器。
## 运行实时演示
```
git clone https://github.com/andreas-altamirano/cross-system-correlator
cd cross-system-correlator
pip install -e .
python examples/simulate.py
```
演示通过关联器重放了 12 个 agent 操作。大多数是良性的。三个植入的攻击序列(数据泄露、权限提升、快速破坏性突发)会在完成时实时触发警报。你将看到如下输出:
```
══════════════════════════════════════════════════════════════════════
ALERT [HIGH] possible_exfiltration
Sensitive read followed by external send by the same agent.
Agent: research-agent
Duration: 25.0s across 2 actions
1. notion.read_page [read_sensitive] → page_secret_strategy
2. gmail.send → lawyer@external.com
══════════════════════════════════════════════════════════════════════
```
## 工作原理
引擎维护着一小部分*待处理的部分匹配*。当一个操作到达时:
1. **推进待处理的匹配。** 对于每个进行中的部分匹配,检查此操作是否满足下一个必需的步骤。如果满足,则推进;如果完成了该模式,则触发。如果部分匹配已超过时间窗口,则将其丢弃。
2. **生成新的待处理匹配。** 对于每个已注册的模式,检查此操作是否匹配第一步。如果匹配,则启动一个新的待处理匹配(如果 `agent_scope="same"`,则锁定该 agent)。
3. **触发处理程序**,用于处理任何已完成的匹配。
内存为 O(活动中的部分匹配),受限于 `max_partial_matches`。CPU 为每个操作 O(模式数 + 活动中的部分匹配)。整个过程在进程内运行,受锁保护以实现线程安全,并且零依赖。
对于高吞吐量环境(>10k 次操作/秒),你需要通过 agent_id 在多个关联器实例之间进行分片,或者对模式集进行分区。默认设置对于真实世界中 agent 集群产生的操作量来说是完全足够的。
## 集成:与 wave-engine 或 mcp-governance-proxy 配合使用
该关联器是独立的——你可以随心所欲地向其输入操作。常见模式:
**Sidecar 日志追踪器。** 让你的 agent 将每次工具调用以 JSON 格式记录到 stdout / 文件 / 队列中。一个简单的追踪器进程解析它们并将其提供给 `correlator.observe()`。警报会发送到 Slack、PagerDuty、你的 SIEM 等。
**与 mcp-governance-proxy 内联。** 如果你已经在使用 [mcp-governance-proxy](https://github.com/andreas-altamirano/mcp-governance-proxy) 来评估工具调用,可以添加一个 hook,在每次操作时也调用 `correlator.observe()`。你可以在一个地方同时实现单操作授权(代理)+ 多步骤关联(本库)。
**从 wave-engine 决策管道导入。** 如果你已经集成了 [wave-engine](https://github.com/andreas-altamirano/wave-engine),你生成的每个 Decision 隐式地都是一个操作。从中构造一个 `AgentAction` 并进行观察。
## 这*不是*什么
- **不是工具授权器。** 它负责观察和警报;它不会拦截正在执行的操作。如果需要拒绝调用,请与操作前授权器结合使用。(参见 [wave-engine](https://github.com/andreas-altamirano/wave-engine) 和 [mcp-governance-proxy](https://github.com/andreas-altamirano/mcp-governance-proxy)。)
- **不是 SIEM。** 它是一个专注于 agent 操作的模式引擎。将警报通过管道输出到你现有的 SIEM/通知栈中。
- **不是持久化层。** 部分匹配存在于进程内。如果进程重启,处理中的部分匹配将会丢失。对于大多数用例,这没有问题——模式会在第一步之后的几秒到几分钟内触发。
- **不是内容分类器。** 它不决定哪些 Notion 页面是“敏感的”。你需要在上游使用 `action_class="read_sensitive"` 对它们进行标记;本库信任该标签。
## 测试
```
pip install -e ".[dev]"
pytest -q
```
21 项测试,涵盖单步匹配、多步排序、时间窗口过期、agent 作用域、上下文捕获、处理回调、错误弹性以及所有六种预置模式。
## 许可证
Apache 2.0。参见 [LICENSE](LICENSE)。
## 致谢
从 AI agent 治理平台 **Surfit** 内部的跨系统关联引擎中提取并泛化。发布此库以便其他团队能够检测多步骤 agent 风险,而无需从头开始构建。
配套库:
- [`wave-engine`](https://github.com/andreas-altamirano/wave-engine) — 每次操作连续的 1–5 风险评分
- [`mcp-governance-proxy`](https://github.com/andreas-altamirano/mcp-governance-proxy) — 通过可插拔的评估器代理工具调用的 MCP 服务器
标签:AI安全, Chat Copilot, 人工智能, 多包管理, 异常检测, 数据泄露防护, 流式处理, 用户模式Hook绕过, 网络探测, 行为审计, 逆向工具