andreas-altamirano/cross-system-correlator

GitHub: andreas-altamirano/cross-system-correlator

一个轻量级的 Python 流式模式检测库,用于捕获 AI Agent 跨系统多步骤操作中单操作授权器无法察觉的序列风险。

Stars: 0 | Forks: 0

# cross-system-correlator **用于跨系统 AI agent 操作的流式模式检测。** 捕获单操作授权器无法察觉的多步骤风险。每个独立的工具调用在单独执行时可能都会通过任何策略检查。该关联器会观察随时间推移的序列,并呈现出以下模式:*此操作然后那个操作,在 X 分钟内,由同一个 agent 执行*。 ``` Agent actions flow in: ─────────────────────────────────────────────────────► notion.read_sensitive(page_secret) ◄── normal gmail.send(lawyer@external.com) ◄── normal ─────────────────────────────────────────────────────► ⚠️ EXFILTRATION read sensitive → send external same agent, within 25s ``` [![License: Apache 2.0](https://img.shields.io/badge/License-Apache_2.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) [![Python 3.9+](https://img.shields.io/badge/python-3.9+-blue.svg)](https://www.python.org/downloads/) ## 为什么需要它 Agent 的工具调用通常是一次性授权的。读取就是读取;发送就是发送。两者单独来看可能都没问题。 但许多真实的 agent 风险都是**序列**行为: - **数据泄露** — 读取敏感数据,然后发送到外部地址 - **权限提升** — 检查存在哪些权限,然后立即使用破坏性权限 - **凭证滥用** — 读取 secret,然后进行出站网络调用 - **失控循环** — 在 30 秒内执行 N 次破坏性操作 单操作授权器(每次工具调用允许/拒绝,包括 [Wave engine](https://github.com/andreas-altamirano/wave-engine) 和大多数企业级治理产品)无法察觉这些风险,因为每个单独的步骤看起来都没问题。你需要一层专门监视*数据流*的机制。 这个库就是那一层机制。小巧、确定性、无依赖,即插即用。 ## 安装 ``` pip install cross-system-correlator ``` Python 3.9+。零运行时依赖。 ## 快速开始 ``` from correlator import ( Correlator, AgentAction, exfiltration_pattern, ) # 使用一个或多个 pattern 构建 correlator correlator = Correlator(patterns=[exfiltration_pattern()]) # 注册 alert handler correlator.on_match(lambda m: print(f"ALERT [{m.severity.value}]: {m.pattern_name}")) # 将每个 agent action 通过 observe() 传递 correlator.observe(AgentAction( agent_id="research-agent", system="notion", action="read_page", action_class="read_sensitive", target="page_12345", )) correlator.observe(AgentAction( agent_id="research-agent", system="gmail", action="send", target="lawyer@external.com", metadata={"external": True}, )) # → ALERT [high]: possible_exfiltration ``` ## AgentAction 是什么样的 该关联器是系统无关的。向其输入任何事件流,只需确保每个事件至少包含:agent 身份、系统、操作名称和时间戳。可选但有用的字段:`action_class`(高级类别)和 `target`(资源 ID、接收者等)。 ``` AgentAction( agent_id="research-agent", system="notion", action="read_page", action_class="read_sensitive", # optional category target="page_12345", # optional target metadata={"external": True}, # arbitrary extra fields timestamp=time.time(), # defaults to now ) ``` `action_class` 字段是使模式能够跨系统移植的关键。只要匹配到 `read_sensitive`,无论该敏感读取是来自 Notion、S3、Postgres 还是你标记的任何其他位置,模式都会触发。 ## 预置模式 该库内置了六种可直接使用的模式: | 模式 | 严重程度 | 捕获内容 | |---|---|---| | `exfiltration_pattern()` | HIGH | 敏感读取 → 外部发送(同一 agent) | | `data_egress_pattern()` | HIGH | 敏感读取 → 云存储写入(同一 agent) | | `credential_then_external_pattern()` | HIGH | 凭证读取 → 出站网络调用 | | `cross_agent_data_handoff_pattern()` | HIGH | 由一个 agent 进行的敏感读取 → 由**任何** agent 进行的外部发送 | | `privilege_escalation_pattern()` | MEDIUM | 权限检查 → 破坏性操作 | | `rapid_destructive_burst_pattern()` | CRITICAL | 30 秒内发生 3 次破坏性操作 | 选择适合你环境的模式直接使用: ``` from correlator import ( Correlator, exfiltration_pattern, privilege_escalation_pattern, rapid_destructive_burst_pattern, ) correlator = Correlator(patterns=[ exfiltration_pattern(window_seconds=120), privilege_escalation_pattern(window_seconds=60), rapid_destructive_burst_pattern(), ]) ``` 每个模式都有可调参数(时间窗口等)。 ## 定义你自己的模式 一个 `Pattern` 是一个包含 `PatternStep` 的有序列表再加上一个时间窗口: ``` from correlator import ( Pattern, PatternStep, Severity, match_system, match_action, match_all, match_target_equals_context, capture_target, ) share_after_read = Pattern( name="document_shared_after_read_by_same_agent", description="Same agent reads then shares the same document.", severity=Severity.MEDIUM, window_seconds=300, agent_scope="same", # both steps must come from same agent_id steps=[ PatternStep( name="read_doc", matcher=match_all(match_system("notion"), match_action("read")), capture=capture_target("doc"), # remember which doc ), PatternStep( name="share_same_doc", matcher=match_all( match_system("notion"), match_action("share"), match_target_equals_context("doc"), # must match captured doc ), ), ], ) ``` 模式引擎会处理排序、agent 作用域、时间窗口以及步骤间的上下文传递。你只需声明要查找的内容即可。 ## 匹配器工具包 它们可以组合成步骤匹配器,无需编写 lambda: ``` match_system("notion", "s3") # glob patterns match_action("read*", "delete_*") match_action_class("read_sensitive") match_target_prefix("@external") match_target_regex(r"prod-.*-bucket") match_target_equals_context("doc") # equals a capture from earlier step match_metadata_equals("external", True) match_metadata("amount", lambda v: v > 1000) match_all(match_system("aws"), match_action_class("destructive")) match_any(match_system("gmail"), match_system("slack")) ``` 匹配器只是一个 `Callable[[AgentAction, dict], bool]`。如果需要,你可以编写自己的匹配器。 ## 运行实时演示 ``` git clone https://github.com/andreas-altamirano/cross-system-correlator cd cross-system-correlator pip install -e . python examples/simulate.py ``` 演示通过关联器重放了 12 个 agent 操作。大多数是良性的。三个植入的攻击序列(数据泄露、权限提升、快速破坏性突发)会在完成时实时触发警报。你将看到如下输出: ``` ══════════════════════════════════════════════════════════════════════ ALERT [HIGH] possible_exfiltration Sensitive read followed by external send by the same agent. Agent: research-agent Duration: 25.0s across 2 actions 1. notion.read_page [read_sensitive] → page_secret_strategy 2. gmail.send → lawyer@external.com ══════════════════════════════════════════════════════════════════════ ``` ## 工作原理 引擎维护着一小部分*待处理的部分匹配*。当一个操作到达时: 1. **推进待处理的匹配。** 对于每个进行中的部分匹配,检查此操作是否满足下一个必需的步骤。如果满足,则推进;如果完成了该模式,则触发。如果部分匹配已超过时间窗口,则将其丢弃。 2. **生成新的待处理匹配。** 对于每个已注册的模式,检查此操作是否匹配第一步。如果匹配,则启动一个新的待处理匹配(如果 `agent_scope="same"`,则锁定该 agent)。 3. **触发处理程序**,用于处理任何已完成的匹配。 内存为 O(活动中的部分匹配),受限于 `max_partial_matches`。CPU 为每个操作 O(模式数 + 活动中的部分匹配)。整个过程在进程内运行,受锁保护以实现线程安全,并且零依赖。 对于高吞吐量环境(>10k 次操作/秒),你需要通过 agent_id 在多个关联器实例之间进行分片,或者对模式集进行分区。默认设置对于真实世界中 agent 集群产生的操作量来说是完全足够的。 ## 集成:与 wave-engine 或 mcp-governance-proxy 配合使用 该关联器是独立的——你可以随心所欲地向其输入操作。常见模式: **Sidecar 日志追踪器。** 让你的 agent 将每次工具调用以 JSON 格式记录到 stdout / 文件 / 队列中。一个简单的追踪器进程解析它们并将其提供给 `correlator.observe()`。警报会发送到 Slack、PagerDuty、你的 SIEM 等。 **与 mcp-governance-proxy 内联。** 如果你已经在使用 [mcp-governance-proxy](https://github.com/andreas-altamirano/mcp-governance-proxy) 来评估工具调用,可以添加一个 hook,在每次操作时也调用 `correlator.observe()`。你可以在一个地方同时实现单操作授权(代理)+ 多步骤关联(本库)。 **从 wave-engine 决策管道导入。** 如果你已经集成了 [wave-engine](https://github.com/andreas-altamirano/wave-engine),你生成的每个 Decision 隐式地都是一个操作。从中构造一个 `AgentAction` 并进行观察。 ## 这*不是*什么 - **不是工具授权器。** 它负责观察和警报;它不会拦截正在执行的操作。如果需要拒绝调用,请与操作前授权器结合使用。(参见 [wave-engine](https://github.com/andreas-altamirano/wave-engine) 和 [mcp-governance-proxy](https://github.com/andreas-altamirano/mcp-governance-proxy)。) - **不是 SIEM。** 它是一个专注于 agent 操作的模式引擎。将警报通过管道输出到你现有的 SIEM/通知栈中。 - **不是持久化层。** 部分匹配存在于进程内。如果进程重启,处理中的部分匹配将会丢失。对于大多数用例,这没有问题——模式会在第一步之后的几秒到几分钟内触发。 - **不是内容分类器。** 它不决定哪些 Notion 页面是“敏感的”。你需要在上游使用 `action_class="read_sensitive"` 对它们进行标记;本库信任该标签。 ## 测试 ``` pip install -e ".[dev]" pytest -q ``` 21 项测试,涵盖单步匹配、多步排序、时间窗口过期、agent 作用域、上下文捕获、处理回调、错误弹性以及所有六种预置模式。 ## 许可证 Apache 2.0。参见 [LICENSE](LICENSE)。 ## 致谢 从 AI agent 治理平台 **Surfit** 内部的跨系统关联引擎中提取并泛化。发布此库以便其他团队能够检测多步骤 agent 风险,而无需从头开始构建。 配套库: - [`wave-engine`](https://github.com/andreas-altamirano/wave-engine) — 每次操作连续的 1–5 风险评分 - [`mcp-governance-proxy`](https://github.com/andreas-altamirano/mcp-governance-proxy) — 通过可插拔的评估器代理工具调用的 MCP 服务器
标签:AI安全, Chat Copilot, 人工智能, 多包管理, 异常检测, 数据泄露防护, 流式处理, 用户模式Hook绕过, 网络探测, 行为审计, 逆向工具