allamiro/vtop-engine
GitHub: allamiro/vtop-engine
VTOP Engine 是一个重放安全、清单驱动的遥测对象传输引擎,确保数据在完整验证之前不会丢失源进度。
Stars: 0 | Forks: 0

# VTOP Engine
**Verified Telemetry Object Protocol Engine** — 一个重放安全、清单驱动的遥测对象传输引擎。
[](https://github.com/allamiro/vtop-engine/actions/workflows/ci.yml?query=branch%3Amain)
[](LICENSE)
[](https://www.rust-lang.org)
[](#status)
VTOP 从 **Kafka topics**、**日志文件**和 **syslog spool 文件**中提取遥测数据;形成自适应批次;将其压缩;计算 SHA‑256 校验和;为每个对象生成一个 **manifest**;将遥测对象及其 manifest 上传到 **兼容 S3 的对象存储**;验证已上传的对象和 manifest;并**仅在验证成功后**提交源进度。
## 目录
- [为什么需要它](#why-it-exists)
- [核心规则](#core-rule)
- [状态机](#state-machine)
- [工作区布局](#workspace-layout)
- [源模式](#source-modes)
- [快速开始](#quick-start)
- [构建和测试](#build-and-test)
- [CLI 用法](#cli-usage)
- [Docker 实验环境](#docker-lab)
- [Manifest 示例](#example-manifest)
- [指标与效率](#metrics-and-efficiency)
- [提交前验证](#verification-before-commit)
- [崩溃后重放](#replay-after-crash)
- [已知限制](#known-limitations)
- [文档](#documentation)
- [许可证](#license)
## 为什么需要它
大多数从日志到对象存储的工具只是将字节移动到 bucket 中。它们**没有**提供跨 Kafka、文件和 syslog spool 的单一、与源无关的 *commit 模型*,并且**强制要求在推进源进度之前进行 manifest 验证**。VTOP 使这条安全规则变得明确,并在代码中严格执行。
`gzip + upload` 无法回答:*对象是否真的完整无误地落地了,现在是否可以安全地丢弃源位置?* VTOP 增加了:
- 一个 **manifest**,将源进度标记 → 对象 SHA‑256 → 验证状态绑定在一起(监管链);
- 一个 **强类型状态机**,使过早的 commit 成为不可能;
- 一个 **重放安全的状态存储**,因此崩溃永远不会推进未经验证的源;
- **提交前验证**,跨可插拔的存储后端进行。
## 核心规则
```
SOURCE_COMMITTED is forbidden until VERIFIED is true.
```
Kafka offset、文件字节偏移量或 syslog spool 偏移量**永远不会**被 commit,直到按以下顺序进行:
1. 批次被密封 (sealed)
2. 创建了压缩对象
3. 计算了对象校验和
4. 上传了对象
5. 生成了 manifest
6. 上传了 manifest
7. 验证了已上传的对象
8. 验证了 manifest
9. **只有在这之后**才提交源进度
这在 [state_machine.rs](crates/vtop-core/src/state_machine.rs) 中被强制执行:`SourceCommitted` 唯一合法的前置状态是 `Verified`,并且相同的防护在状态存储层被重新应用。
## 状态机
```
DISCOVERED → BATCHING → SEALED → COMPRESSED → CHECKSUMMED
→ OBJECT_UPLOADED → MANIFEST_UPLOADED → VERIFIED → SOURCE_COMMITTED
ANY_STATE → FAILED FAILED → REPLAY_REQUIRED → BATCHING
```
非法转换(例如 `SEALED → SOURCE_COMMITTED`)将返回 `VtopError::IllegalStateTransition` / `CommitBeforeVerified`。请参阅 [state_machine.rs](crates/vtop-core/src/state_machine.rs) 中的 `test_cannot_commit_from_*` 测试。完整的规范描述位于 [docs/VTOP_PROTOCOL_DRAFT.md](docs/VTOP_PROTOCOL_DRAFT.md) 中。
## 工作区布局
```
crates/
vtop-core/ protocol-independent logic (state machine, batch, manifest,
checksum, compression, partitioning, config, replay)
vtop-adapters/ source adapters: kafka_source, file_source, syslog_spool_source
vtop-upload/ upload backends: s3_native (primary) + s3cmd/awscli/minio + mock
vtop-state/ SQLite state store (sqlx) — the durable journal
vtop-cli/ the `vtopctl` binary + the engine runtime
examples/ config.yaml, streams.yaml, sample logs
docs/ protocol draft, invention disclosure, prior-art plan, etc.
docker/ Dockerfile + entrypoint
tests/ integration tests (wired into vtop-cli via [[test]] paths)
```
`vtop-core` 对 Kafka、S3 或 CLI **没有**依赖。更深入的介绍位于 [docs/ARCHITECTURE.md](docs/ARCHITECTURE.md) 中。
## 源模式
| 模式 | 进度标记 | 行为 |
|------|-----------------|----------|
| **Kafka** | 每个 topic+partition 的 offset | 使用 `rdkafka` consumer,**始终禁用自动提交**;一个批次 = 一个 topic + 一个 partition + 一个 offset 范围(partition 永远不会混合);offset 仅在 `VERIFIED` 之后提交。 |
| **File** | 字节偏移量 | 逐行读取 append-only 文件,跟踪 `path`、`inode`、字节偏移量、大小、mtime;从最后一次提交的字节开始恢复;部分尾部行永远不会被提交;重放将回退到未提交范围的起始处。 |
| **Syslog spool** | spool 字节偏移量 | 将 rsyslog / syslog-ng spool 文件视为具有 `spool_id` 和字节范围的 append-only 文件。外部收集器负责交付;VTOP 负责批处理、校验和、manifest、上传、验证、重放状态和提交规则。 |
每个对象都会在其旁边写入一个 `*.manifest.json`,将**源进度标记**绑定到对象的 **SHA‑256** 以及一个用于防篡改的自哈希。
### 格式自动检测(混合格式)
格式**不**固定为 CEF。当流未在
[`streams.yaml`](examples/streams.yaml) 中声明 `format` 时,引擎会根据内容**按批次自动检测**
—— CEF、JSON、JSON Lines、syslog(PRI 头)或纯文本。
由于检测是按批次运行的,**不同的格式可以同时在同一个引擎中
流动**:源 A 可以是 CEF,源 B 可以是 JSON,源 C 可以是 syslog,并且
每个批次都会获得正确的对象扩展名(`.cef.gz`, `.jsonl.gz`, ...)并
在 manifest 中记录其检测到的 `format`。`streams.yaml` 中显式的 `format`
始终优先于检测。请参阅 [detect.rs](crates/vtop-core/src/detect.rs)。
## 快速开始
```
# 在 containers 中运行完整的 lab(Kafka + MinIO + engine):
docker compose up -d
docker compose logs -f vtop-engine
# 或者基于 example config 在本地构建并运行:
cargo build --release
cargo run -p vtop-cli -- discover --config examples/config.yaml
```
## 构建和测试
```
cargo fmt --all --check
cargo clippy --workspace --all-targets -- -D warnings
cargo test --workspace
cargo build --release
```
CI 在每次 push 和 pull request 时运行所有这四项测试 —— 请参阅 [.github/workflows/ci.yml](.github/workflows/ci.yml)。
## CLI 用法
可执行文件是 `vtopctl`:
```
cargo run -p vtop-cli -- run --config examples/config.yaml
cargo run -p vtop-cli -- discover --config examples/config.yaml
cargo run -p vtop-cli -- process-once --source kafka --config examples/config.yaml
cargo run -p vtop-cli -- process-once --source file --config examples/config.yaml
cargo run -p vtop-cli -- replay --batch-id
--config examples/config.yaml
cargo run -p vtop-cli -- status --config examples/config.yaml
cargo run -p vtop-cli -- list-batches --config examples/config.yaml --json
cargo run -p vtop-cli -- verify-manifest --manifest s3://telemetry-data/.../batch.manifest.json --config examples/config.yaml
```
每个命令都支持 `--json`(机器可读)和 `--log-level`,失败时以非零状态退出,并且从不打印密钥。
## Docker 实验环境
```
docker compose up -d
docker compose logs -f vtop-engine
```
每个组件都在其**独立的容器**中运行:`kafka`、`kafka-ui` (http://localhost:8080)、`minio` (API `:9000`,控制台 `:9001`,bucket `telemetry-data`)、`minio-init`、`kafka-init`、`vtop-engine`(引擎),以及一个可选的**独立**的 `rsyslog` 收集器(`--profile syslog`),它将数据写入引擎读取的共享 `./data/spool` 数据卷中。引擎永远不会与收集器捆绑在一起。
`kafka-init` 将**多种格式**的**随机化**事件(通过 [docker/seed-events.sh](docker/seed-events.sh))播种到单独的 topic 中 —— `cef_events`、`json_events`、`syslog_events` 和 `mixed_events`(每行随机格式)—— 这样您就可以观察按批次的检测如何对它们进行标记。
**Kafka → MinIO:**
```
docker compose up -d kafka minio minio-init kafka-init
docker compose up -d vtop-engine
docker compose logs -f vtop-engine # format_detected → object_uploaded → verification_passed → source_committed
# 在 http://localhost:9001 → bucket telemetry-data 浏览结果
```
**File → MinIO**(使用生成器生成任何格式的随机数据):
```
docker/seed-events.sh cef 200 > ./data/input/auth.cef.log # CEF
docker/seed-events.sh json 200 > ./data/input/app.json.log # JSON Lines
docker/seed-events.sh syslog 200 > ./data/input/sys.syslog.log # syslog
docker/seed-events.sh mixed 500 > ./data/input/mixed.log # random per line
docker compose up -d vtop-engine
```
随时生成临时测试数据:`docker/seed-events.sh [count]`。
文件流程也没有任何基础设施覆盖,由 [tests/integration_file_to_minio.rs](tests/integration_file_to_minio.rs)(内存 `mock` 后端)进行测试。
## Manifest 示例
```
{
"protocol": "VTOP",
"version": "0.1",
"batch_id": "vtop-20260618T150000Z-app_events-p0-481000-482499-1a2b3c4d",
"tenant": "default",
"source_type": "kafka",
"source_name": "app_events",
"format": "cef",
"compression": "gzip",
"record_count": 1500,
"source_progress": {
"source_type": "kafka",
"topic": "app_events",
"partition": 0,
"start_offset": 481000,
"end_offset": 482499,
"consumer_group": "vtop-engine"
},
"object": {
"uri": "s3://telemetry-data/telemetry-data/tenant=default/source=app/format=cef/year=2026/month=06/day=18/hour=15/vtop-….cef.gz",
"size_bytes": 924822,
"sha256": "abc123…"
},
"manifest": {
"uri": "s3://telemetry-data/…/vtop-….manifest.json",
"sha256": "def456…"
},
"state": "manifest_uploaded",
"verification_status": "not_verified"
}
```
## 指标与效率
引擎对每个批次进行端到端测量,并发出结构化的 `batch_metrics`
事件(以及在 `vtopctl process-once` 下的每批次行):
```
3 records, 114 B->80 B (1.43x, 29.8% saved) in 6 ms | 500 rec/s, 0.00 MiB/s up |
stages: compress=0ms checksum=0ms put_obj=0ms put_manifest=0ms verify=0ms commit=0ms
```
每个批次记录(参见 [metrics.rs](crates/vtop-core/src/metrics.rs)):
- **大小/传输:** 压缩前与压缩后的字节数,**压缩比**,**节省的空间百分比 %** —— 即网络上的对象比源数据小多少。
- **各阶段延迟:** 压缩、校验和、对象上传、manifest 上传、验证、提交 (ms)。`object_upload_ms` 捕获网络成本(到 bucket 的“距离”)。
- **吞吐量:** 记录数/秒、未压缩的 MiB/秒以及压缩对象的**有效上传 MiB/秒**。
`vtopctl process-once --json` 包含每个批次的完整 `metrics` 对象。这些
每批次的记录是 [已知限制](#known-limitations) 下描述的聚合 Prometheus 风格计数器(例如
`bytes_uploaded_total`、`upload_latency_seconds`)的原始输入,它们已被设计但尚未导出。
## 提交前验证
该规则在三个层面上得到执行:
1. 状态机**仅**允许从 `Verified` 转换为 `SourceCommitted`(否则 `transition()` 将返回 `CommitBeforeVerified`)。
2. `SqliteStateStore::update_batch_state` 通过 `transition()` 路由**每一次**状态更改,因此即使在持久层该规则也成立。
3. 引擎 pipeline ([engine.rs](crates/vtop-cli/src/engine.rs)) 仅在 `mark_verified` *之后*调用 `adapter.commit_progress(...)`。如果验证失败,该批次将被标记为 `FAILED`,并且永远不会调用 `commit_progress`。如果在验证之后 commit 本身失败,该批次将保持 `VERIFIED` 状态(不会丢失),并且恢复机制会重试该 commit。
这一点已由 `state_machine.rs` 单元测试和 [tests/integration_replay.rs](tests/integration_replay.rs) (`verification_failure_never_commits`) 所证明。
## 崩溃后重放
如果引擎在 `VERIFIED` 之后但在 `SOURCE_COMMITTED` 之前死亡,源 offset 永远不会推进。重启时,`Engine::recover()` 将:
- 找到一个 `VERIFIED` 但未提交的批次,并**重试源 commit**(该对象已经是持久且经过验证的);
- 将任何**较早的**不完整批次标记为 `REPLAY_REQUIRED`,并从源重新读取它 —— 对于未验证的数据,源进度永远不会推进。
这一点已由 [tests/integration_replay.rs](tests/integration_replay.rs) (`crash_before_commit_is_replayable_then_recovers`) 和 [tests/integration_state_recovery.rs](tests/integration_state_recovery.rs) 所证明。
## 已知限制
- **单部分上传。** 原生 S3 后端使用 `put_object`;针对超大批次的多部分上传(multipart upload)是一个已记录的后续任务(`supports_multipart()` 报告为 `false`)。
- **部分上传的恢复。** 在 `VERIFIED` *之前*崩溃的批次将从源重新播放,而不是从写入一半的本地对象恢复(原型会持久化进度标记,而不是记录 payload)。
- **命令行后端是大小有限的验证器。** `s3cmd` 和 `mc` 仅验证大小和存在性(报告为 `backend_limited`);原生和 `awscli` 后端验证存储的 SHA‑256。
- **Syslog 时间戳解析** 尚未提取到 spool 标记中(`received_time_*` 为 `None`)。
- **Manifest 签名和 S3 Object Lock** 已在设计之中,但尚未实现(参见 [docs/SECURITY_MODEL.md](docs/SECURITY_MODEL.md))。
- **指标** 已设计(Prometheus 风格的名称),但尚未导出;引擎目前会发出结构化的 `tracing` 事件。
- Kafka 集成测试需要运行中的 broker,默认情况下标记为 `#[ignore]`。
## 文档
完整的文档集位于 [docs/](docs/) 中(索引:[docs/README.md](docs/README.md)):
| 文档 | 内容 |
|----------|----------|
| [VTOP_PROTOCOL_DRAFT.md](docs/VTOP_PROTOCOL_DRAFT.md) | 规范协议草案 + 一致性配置文件 |
| [ARCHITECTURE.md](docs/ARCHITECTURE.md) | 架构、runtime 流程和数据流程图 |
| [SECURITY_MODEL.md](docs/SECURITY_MODEL.md) | 安全模型和规范规则 |
| [INVENTION_DISCLOSURE_DRAFT.md](docs/INVENTION_DISCLOSURE_DRAFT.md) | 候选发明披露草案 |
| [PRIOR_ART_SEARCH_PLAN.md](docs/PRIOR_ART_SEARCH_PLAN.md) | 现有技术检索计划 |
## 许可证
[MIT](LICENSE) © 2026 Tamir Suliman.标签:Kafka, Rust, S3存储, SonarQube插件, 可视化界面, 数据一致性, 数据传输, 日志处理, 漏洞探索, 网络流量审计, 请求拦截, 通知系统, 遥测数据