allamiro/vtop-engine

GitHub: allamiro/vtop-engine

VTOP Engine 是一个重放安全、清单驱动的遥测对象传输引擎,确保数据在完整验证之前不会丢失源进度。

Stars: 0 | Forks: 0

VTOP Engine logo # VTOP Engine **Verified Telemetry Object Protocol Engine** — 一个重放安全、清单驱动的遥测对象传输引擎。 [![CI](https://static.pigsec.cn/wp-content/uploads/repos/2026/06/6aff1c449e042644.svg)](https://github.com/allamiro/vtop-engine/actions/workflows/ci.yml?query=branch%3Amain) [![License: MIT](https://img.shields.io/badge/license-MIT-yellow.svg)](LICENSE) [![Rust](https://img.shields.io/badge/rust-2021-orange.svg)](https://www.rust-lang.org) [![Status: prototype](https://img.shields.io/badge/status-prototype-blue.svg)](#status)
VTOP 从 **Kafka topics**、**日志文件**和 **syslog spool 文件**中提取遥测数据;形成自适应批次;将其压缩;计算 SHA‑256 校验和;为每个对象生成一个 **manifest**;将遥测对象及其 manifest 上传到 **兼容 S3 的对象存储**;验证已上传的对象和 manifest;并**仅在验证成功后**提交源进度。 ## 目录 - [为什么需要它](#why-it-exists) - [核心规则](#core-rule) - [状态机](#state-machine) - [工作区布局](#workspace-layout) - [源模式](#source-modes) - [快速开始](#quick-start) - [构建和测试](#build-and-test) - [CLI 用法](#cli-usage) - [Docker 实验环境](#docker-lab) - [Manifest 示例](#example-manifest) - [指标与效率](#metrics-and-efficiency) - [提交前验证](#verification-before-commit) - [崩溃后重放](#replay-after-crash) - [已知限制](#known-limitations) - [文档](#documentation) - [许可证](#license) ## 为什么需要它 大多数从日志到对象存储的工具只是将字节移动到 bucket 中。它们**没有**提供跨 Kafka、文件和 syslog spool 的单一、与源无关的 *commit 模型*,并且**强制要求在推进源进度之前进行 manifest 验证**。VTOP 使这条安全规则变得明确,并在代码中严格执行。 `gzip + upload` 无法回答:*对象是否真的完整无误地落地了,现在是否可以安全地丢弃源位置?* VTOP 增加了: - 一个 **manifest**,将源进度标记 → 对象 SHA‑256 → 验证状态绑定在一起(监管链); - 一个 **强类型状态机**,使过早的 commit 成为不可能; - 一个 **重放安全的状态存储**,因此崩溃永远不会推进未经验证的源; - **提交前验证**,跨可插拔的存储后端进行。 ## 核心规则 ``` SOURCE_COMMITTED is forbidden until VERIFIED is true. ``` Kafka offset、文件字节偏移量或 syslog spool 偏移量**永远不会**被 commit,直到按以下顺序进行: 1. 批次被密封 (sealed) 2. 创建了压缩对象 3. 计算了对象校验和 4. 上传了对象 5. 生成了 manifest 6. 上传了 manifest 7. 验证了已上传的对象 8. 验证了 manifest 9. **只有在这之后**才提交源进度 这在 [state_machine.rs](crates/vtop-core/src/state_machine.rs) 中被强制执行:`SourceCommitted` 唯一合法的前置状态是 `Verified`,并且相同的防护在状态存储层被重新应用。 ## 状态机 ``` DISCOVERED → BATCHING → SEALED → COMPRESSED → CHECKSUMMED → OBJECT_UPLOADED → MANIFEST_UPLOADED → VERIFIED → SOURCE_COMMITTED ANY_STATE → FAILED FAILED → REPLAY_REQUIRED → BATCHING ``` 非法转换(例如 `SEALED → SOURCE_COMMITTED`)将返回 `VtopError::IllegalStateTransition` / `CommitBeforeVerified`。请参阅 [state_machine.rs](crates/vtop-core/src/state_machine.rs) 中的 `test_cannot_commit_from_*` 测试。完整的规范描述位于 [docs/VTOP_PROTOCOL_DRAFT.md](docs/VTOP_PROTOCOL_DRAFT.md) 中。 ## 工作区布局 ``` crates/ vtop-core/ protocol-independent logic (state machine, batch, manifest, checksum, compression, partitioning, config, replay) vtop-adapters/ source adapters: kafka_source, file_source, syslog_spool_source vtop-upload/ upload backends: s3_native (primary) + s3cmd/awscli/minio + mock vtop-state/ SQLite state store (sqlx) — the durable journal vtop-cli/ the `vtopctl` binary + the engine runtime examples/ config.yaml, streams.yaml, sample logs docs/ protocol draft, invention disclosure, prior-art plan, etc. docker/ Dockerfile + entrypoint tests/ integration tests (wired into vtop-cli via [[test]] paths) ``` `vtop-core` 对 Kafka、S3 或 CLI **没有**依赖。更深入的介绍位于 [docs/ARCHITECTURE.md](docs/ARCHITECTURE.md) 中。 ## 源模式 | 模式 | 进度标记 | 行为 | |------|-----------------|----------| | **Kafka** | 每个 topic+partition 的 offset | 使用 `rdkafka` consumer,**始终禁用自动提交**;一个批次 = 一个 topic + 一个 partition + 一个 offset 范围(partition 永远不会混合);offset 仅在 `VERIFIED` 之后提交。 | | **File** | 字节偏移量 | 逐行读取 append-only 文件,跟踪 `path`、`inode`、字节偏移量、大小、mtime;从最后一次提交的字节开始恢复;部分尾部行永远不会被提交;重放将回退到未提交范围的起始处。 | | **Syslog spool** | spool 字节偏移量 | 将 rsyslog / syslog-ng spool 文件视为具有 `spool_id` 和字节范围的 append-only 文件。外部收集器负责交付;VTOP 负责批处理、校验和、manifest、上传、验证、重放状态和提交规则。 | 每个对象都会在其旁边写入一个 `*.manifest.json`,将**源进度标记**绑定到对象的 **SHA‑256** 以及一个用于防篡改的自哈希。 ### 格式自动检测(混合格式) 格式**不**固定为 CEF。当流未在 [`streams.yaml`](examples/streams.yaml) 中声明 `format` 时,引擎会根据内容**按批次自动检测** —— CEF、JSON、JSON Lines、syslog(PRI 头)或纯文本。 由于检测是按批次运行的,**不同的格式可以同时在同一个引擎中 流动**:源 A 可以是 CEF,源 B 可以是 JSON,源 C 可以是 syslog,并且 每个批次都会获得正确的对象扩展名(`.cef.gz`, `.jsonl.gz`, ...)并 在 manifest 中记录其检测到的 `format`。`streams.yaml` 中显式的 `format` 始终优先于检测。请参阅 [detect.rs](crates/vtop-core/src/detect.rs)。 ## 快速开始 ``` # 在 containers 中运行完整的 lab(Kafka + MinIO + engine): docker compose up -d docker compose logs -f vtop-engine # 或者基于 example config 在本地构建并运行: cargo build --release cargo run -p vtop-cli -- discover --config examples/config.yaml ``` ## 构建和测试 ``` cargo fmt --all --check cargo clippy --workspace --all-targets -- -D warnings cargo test --workspace cargo build --release ``` CI 在每次 push 和 pull request 时运行所有这四项测试 —— 请参阅 [.github/workflows/ci.yml](.github/workflows/ci.yml)。 ## CLI 用法 可执行文件是 `vtopctl`: ``` cargo run -p vtop-cli -- run --config examples/config.yaml cargo run -p vtop-cli -- discover --config examples/config.yaml cargo run -p vtop-cli -- process-once --source kafka --config examples/config.yaml cargo run -p vtop-cli -- process-once --source file --config examples/config.yaml cargo run -p vtop-cli -- replay --batch-id --config examples/config.yaml cargo run -p vtop-cli -- status --config examples/config.yaml cargo run -p vtop-cli -- list-batches --config examples/config.yaml --json cargo run -p vtop-cli -- verify-manifest --manifest s3://telemetry-data/.../batch.manifest.json --config examples/config.yaml ``` 每个命令都支持 `--json`(机器可读)和 `--log-level`,失败时以非零状态退出,并且从不打印密钥。 ## Docker 实验环境 ``` docker compose up -d docker compose logs -f vtop-engine ``` 每个组件都在其**独立的容器**中运行:`kafka`、`kafka-ui` (http://localhost:8080)、`minio` (API `:9000`,控制台 `:9001`,bucket `telemetry-data`)、`minio-init`、`kafka-init`、`vtop-engine`(引擎),以及一个可选的**独立**的 `rsyslog` 收集器(`--profile syslog`),它将数据写入引擎读取的共享 `./data/spool` 数据卷中。引擎永远不会与收集器捆绑在一起。 `kafka-init` 将**多种格式**的**随机化**事件(通过 [docker/seed-events.sh](docker/seed-events.sh))播种到单独的 topic 中 —— `cef_events`、`json_events`、`syslog_events` 和 `mixed_events`(每行随机格式)—— 这样您就可以观察按批次的检测如何对它们进行标记。 **Kafka → MinIO:** ``` docker compose up -d kafka minio minio-init kafka-init docker compose up -d vtop-engine docker compose logs -f vtop-engine # format_detected → object_uploaded → verification_passed → source_committed # 在 http://localhost:9001 → bucket telemetry-data 浏览结果 ``` **File → MinIO**(使用生成器生成任何格式的随机数据): ``` docker/seed-events.sh cef 200 > ./data/input/auth.cef.log # CEF docker/seed-events.sh json 200 > ./data/input/app.json.log # JSON Lines docker/seed-events.sh syslog 200 > ./data/input/sys.syslog.log # syslog docker/seed-events.sh mixed 500 > ./data/input/mixed.log # random per line docker compose up -d vtop-engine ``` 随时生成临时测试数据:`docker/seed-events.sh [count]`。 文件流程也没有任何基础设施覆盖,由 [tests/integration_file_to_minio.rs](tests/integration_file_to_minio.rs)(内存 `mock` 后端)进行测试。 ## Manifest 示例 ``` { "protocol": "VTOP", "version": "0.1", "batch_id": "vtop-20260618T150000Z-app_events-p0-481000-482499-1a2b3c4d", "tenant": "default", "source_type": "kafka", "source_name": "app_events", "format": "cef", "compression": "gzip", "record_count": 1500, "source_progress": { "source_type": "kafka", "topic": "app_events", "partition": 0, "start_offset": 481000, "end_offset": 482499, "consumer_group": "vtop-engine" }, "object": { "uri": "s3://telemetry-data/telemetry-data/tenant=default/source=app/format=cef/year=2026/month=06/day=18/hour=15/vtop-….cef.gz", "size_bytes": 924822, "sha256": "abc123…" }, "manifest": { "uri": "s3://telemetry-data/…/vtop-….manifest.json", "sha256": "def456…" }, "state": "manifest_uploaded", "verification_status": "not_verified" } ``` ## 指标与效率 引擎对每个批次进行端到端测量,并发出结构化的 `batch_metrics` 事件(以及在 `vtopctl process-once` 下的每批次行): ``` 3 records, 114 B->80 B (1.43x, 29.8% saved) in 6 ms | 500 rec/s, 0.00 MiB/s up | stages: compress=0ms checksum=0ms put_obj=0ms put_manifest=0ms verify=0ms commit=0ms ``` 每个批次记录(参见 [metrics.rs](crates/vtop-core/src/metrics.rs)): - **大小/传输:** 压缩前与压缩后的字节数,**压缩比**,**节省的空间百分比 %** —— 即网络上的对象比源数据小多少。 - **各阶段延迟:** 压缩、校验和、对象上传、manifest 上传、验证、提交 (ms)。`object_upload_ms` 捕获网络成本(到 bucket 的“距离”)。 - **吞吐量:** 记录数/秒、未压缩的 MiB/秒以及压缩对象的**有效上传 MiB/秒**。 `vtopctl process-once --json` 包含每个批次的完整 `metrics` 对象。这些 每批次的记录是 [已知限制](#known-limitations) 下描述的聚合 Prometheus 风格计数器(例如 `bytes_uploaded_total`、`upload_latency_seconds`)的原始输入,它们已被设计但尚未导出。 ## 提交前验证 该规则在三个层面上得到执行: 1. 状态机**仅**允许从 `Verified` 转换为 `SourceCommitted`(否则 `transition()` 将返回 `CommitBeforeVerified`)。 2. `SqliteStateStore::update_batch_state` 通过 `transition()` 路由**每一次**状态更改,因此即使在持久层该规则也成立。 3. 引擎 pipeline ([engine.rs](crates/vtop-cli/src/engine.rs)) 仅在 `mark_verified` *之后*调用 `adapter.commit_progress(...)`。如果验证失败,该批次将被标记为 `FAILED`,并且永远不会调用 `commit_progress`。如果在验证之后 commit 本身失败,该批次将保持 `VERIFIED` 状态(不会丢失),并且恢复机制会重试该 commit。 这一点已由 `state_machine.rs` 单元测试和 [tests/integration_replay.rs](tests/integration_replay.rs) (`verification_failure_never_commits`) 所证明。 ## 崩溃后重放 如果引擎在 `VERIFIED` 之后但在 `SOURCE_COMMITTED` 之前死亡,源 offset 永远不会推进。重启时,`Engine::recover()` 将: - 找到一个 `VERIFIED` 但未提交的批次,并**重试源 commit**(该对象已经是持久且经过验证的); - 将任何**较早的**不完整批次标记为 `REPLAY_REQUIRED`,并从源重新读取它 —— 对于未验证的数据,源进度永远不会推进。 这一点已由 [tests/integration_replay.rs](tests/integration_replay.rs) (`crash_before_commit_is_replayable_then_recovers`) 和 [tests/integration_state_recovery.rs](tests/integration_state_recovery.rs) 所证明。 ## 已知限制 - **单部分上传。** 原生 S3 后端使用 `put_object`;针对超大批次的多部分上传(multipart upload)是一个已记录的后续任务(`supports_multipart()` 报告为 `false`)。 - **部分上传的恢复。** 在 `VERIFIED` *之前*崩溃的批次将从源重新播放,而不是从写入一半的本地对象恢复(原型会持久化进度标记,而不是记录 payload)。 - **命令行后端是大小有限的验证器。** `s3cmd` 和 `mc` 仅验证大小和存在性(报告为 `backend_limited`);原生和 `awscli` 后端验证存储的 SHA‑256。 - **Syslog 时间戳解析** 尚未提取到 spool 标记中(`received_time_*` 为 `None`)。 - **Manifest 签名和 S3 Object Lock** 已在设计之中,但尚未实现(参见 [docs/SECURITY_MODEL.md](docs/SECURITY_MODEL.md))。 - **指标** 已设计(Prometheus 风格的名称),但尚未导出;引擎目前会发出结构化的 `tracing` 事件。 - Kafka 集成测试需要运行中的 broker,默认情况下标记为 `#[ignore]`。 ## 文档 完整的文档集位于 [docs/](docs/) 中(索引:[docs/README.md](docs/README.md)): | 文档 | 内容 | |----------|----------| | [VTOP_PROTOCOL_DRAFT.md](docs/VTOP_PROTOCOL_DRAFT.md) | 规范协议草案 + 一致性配置文件 | | [ARCHITECTURE.md](docs/ARCHITECTURE.md) | 架构、runtime 流程和数据流程图 | | [SECURITY_MODEL.md](docs/SECURITY_MODEL.md) | 安全模型和规范规则 | | [INVENTION_DISCLOSURE_DRAFT.md](docs/INVENTION_DISCLOSURE_DRAFT.md) | 候选发明披露草案 | | [PRIOR_ART_SEARCH_PLAN.md](docs/PRIOR_ART_SEARCH_PLAN.md) | 现有技术检索计划 | ## 许可证 [MIT](LICENSE) © 2026 Tamir Suliman.
标签:Kafka, Rust, S3存储, SonarQube插件, 可视化界面, 数据一致性, 数据传输, 日志处理, 漏洞探索, 网络流量审计, 请求拦截, 通知系统, 遥测数据