flowtracex/zeek-kafka-parquet

GitHub: flowtracex/zeek-kafka-parquet

一个用 Go 编写的高吞吐量数据管道,用于从 Kafka 消费 Zeek 网络日志,进行标准化和富化处理后输出到 Parquet 文件或 Kafka 主题。

Stars: 2 | Forks: 1

# Kafka → Parquet 管道 一个生产级管道,用于从 Kafka 消费 Zeek 网络日志,对其进行标准化和富化,并输出到 Parquet 文件和/或 Kafka 主题。 ## 功能特性 - **多日志类型支持**:处理 18+ 种 Zeek 日志类型(DNS、CONN、HTTP、SSL、SSH、FTP、SMTP 等) - **双重输出**:写入 Parquet 文件和/或 Kafka 主题 - **三层的数据模型**:原始字段、标准化字段和富化字段 - **可配置刷新**:基于大小、时间和事件计数的刷新条件 - **异步处理**:非阻塞写入与后台刷新 - **健康监控**:周期性指标(内存、缓冲区使用率、吞吐量) ## 快速开始 ### 构建 ``` go build -o zeek-parquet-pipeline main.go ``` ### 运行 ``` ./zeek-parquet-pipeline --config config/config.json ``` ## 配置 ### config.json ``` { "kafka": { "brokers": ["localhost:9092"], "topic": "zeek-raw", "group_id": "parquet-writer-group", "max_poll_records": 1000 }, "output": { "parquet": { "enabled": true, "base_path": "./output", "file_prefix": "events" }, "kafka": { "enabled": true, "brokers": ["localhost:9092"], "topic": "zeek-normalized", "compression": "lz4" } }, "log": { "path": "./log" }, "write": { "flush_buffer_mb": 1, "flush_interval_seconds": 60, "flush_event_count": 1000 } } ``` **刷新条件**(或逻辑 - 任一条件触发刷新): - `flush_buffer_mb`:当缓冲区达到此大小时刷新(MB) - `flush_interval_seconds`:每隔 N 秒刷新一次(0 表示禁用) - `flush_event_count`:在 N 个事件后刷新(0 表示禁用) ### normalization.json **核心概念**:定义每种日志类型的三层数据模型。 - **`source`**:Zeek 日志类型标识符(例如 `zeek_dns`、`zeek_conn`) - **`promote`**:字段提升映射(原始 Zeek 字段 → 规范的标准化字段) - 示例:`"id.orig_h": "src_ip"` 将 `id.orig_h` 提升为 `src_ip` - 提升的字段替换原始字段(无重复) - **`static`**:添加到此日志类型所有事件的静态字段 - 示例:`"event_type": "dns"`、`"event_class": "dns"` - **`enrich`**:每种日志类型的富化标志 - `"time": true` → 启用时间分解(年、月、日、小时、工作日) - `"network": true` → 启用网络分析(私有 IP 检测、方向、服务) ``` { "dns": { "source": "zeek_dns", "promote": { "ts": "event_time", "uid": "flow_id", "id.orig_h": "src_ip", "id.resp_h": "dst_ip", "proto": "protocol" }, "static": { "event_type": "dns", "event_class": "dns" }, "enrich": { "time": true, "network": true } } } ``` ### schema.json **核心概念**:定义每种 Zeek 日志类型的原始字段结构。 - 由 `generate_schema.go` 用于自动生成 `schema/events.go` - 定义所有可能的原始字段及其类型(string、int、float、bool 等) - 不在 `normalization.json` → `promote` 中的字段在 Parquet 中保留为原始字段 - **重要**:修改此文件后,重新生成 schema: go run generate_schema.go > schema/events.go ## 架构 ``` Kafka (zeek-raw) ↓ Consumer ↓ Normalizer (field promotion) ↓ Enricher (time + network enrichment) ↓ Fan-out ├──→ Parquet Writers (per log_type, async flush) └──→ Kafka Producer (structured JSON output) ``` ## 项目结构 ``` . ├── main.go # Pipeline entry point ├── zeek-parquet-pipeline # Compiled binary ├── config/ # Configuration files │ ├── config.json # Main configuration │ ├── normalization.json # Field promotion rules │ └── schema.json # Raw field definitions ├── core/ # Core pipeline components │ ├── kafka.go # Kafka consumer │ ├── normalize.go # Field normalization │ ├── enrich.go # Runtime enrichment (time, network) │ ├── pipeline_flow.go # Event routing by log_type │ ├── parquet.go # Parquet writer │ ├── kafka_producer.go # Kafka output producer │ ├── fanout.go # Output fan-out │ └── logger.go # Logging system ├── schema/ # Generated Parquet schemas │ └── events.go # Auto-generated from schema.json ├── test/ # Test scripts ├── log/ # Runtime logs └── output/ # Parquet output files ``` ## 输出格式 ### Parquet 输出 按日志类型和时间分区: ``` output/ ├── dns/ │ └── year=2026/month=02/day=02/hour=10/ │ └── events_dns_1.parquet ├── conn/ │ └── year=2026/month=02/day=02/hour=10/ │ └── events_conn_1.parquet └── ... ``` ### Kafka 输出 具有三层结构的结构化 JSON 格式: ``` { "source": "zeek", "log_type": "dns", "raw": { "ts": 1769868799.213927, "uid": "CfqDt31quyW8AAJct6", "id.orig_h": "10.128.0.4", ... }, "normalized": { "event_time": 1769868799213, "ingest_time": 1770089817352, "flow_id": "CfqDt31quyW8AAJct6", "src_ip": "10.128.0.4", "dst_ip": "216.239.34.174", ... }, "enriched": { "src_ip_is_private": true, "dst_ip_is_private": false, "direction": "outbound", "event_year": 2026, ... } } ``` ## 数据模型 ### 三层架构 1. **原始层**:所有字段完全保留 Zeek 原始格式 2. **标准化层**:字段提升为规范名称(例如 `id.orig_h` → `src_ip`) 3. **富化层**:派生字段(时间组件、网络分析、方向) ## 开发 ### 重新生成 Schema 更新 `schema.json` 或 `normalization.json` 后: ``` go run generate_schema.go > schema/events.go go build -o zeek-parquet-pipeline main.go ``` ### 测试 ``` go run test/test_parquet_validation.go output ``` ## 环境要求 - Go 1.21+ - Kafka broker(已在 Kafka 2.8+ 上测试) - Kafka 主题中的 Zeek 日志(嵌套 JSON 格式:`{"dns": {...}}` 或 `{"conn": {...}}`) ## 测试环境 **注意**:此管道已在单台机器上进行了测试。分布式部署中的性能特征可能会有所不同。 ### 测试机规格 - **操作系统**:Linux (Ubuntu 22.04.1 LTS) - **内核**:6.8.0-1045-gcp - **架构**:x86_64 - **CPU**:Intel Xeon @ 2.20GHz - 4 CPUs (2 核,每核 2 线程) - **内存**:11GB RAM - **存储**:97GB 磁盘空间 ### 软件版本 - **Go**:1.22.4 - **Zeek**:8.0.5 - **Kafka**:2.8+ (broker 版本) ### 已测试日志类型 该管道已通过以下 Zeek 日志类型的验证: - DNS, CONN, HTTP, SSL, SSH, FTP, SMTP, DHCP, RDP, SMB, DCE-RPC, Kerberos, NTLM, SIP, SNMP, RADIUS, Tunnel 及其他(共 18+ 种) ### 性能观察 - **吞吐量**:每秒处理数千个事件(测试环境中观察到:18-31 EPS) - **内存使用**:低内存占用(观察到约 33-37MB 堆内存) - **Parquet 文件**:成功生成并具有正确的分区和 schema - **缓冲区刷新**:验证了所有三种刷新条件(大小、时间、事件计数) ## 依赖项 - `github.com/segmentio/kafka-go` - Kafka 客户端 - `github.com/parquet-go/parquet-go` - Parquet 库 ## 监控 健康指标每 30 秒记录到 `log/pipeline.log`: - 内存使用(RSS 和堆) - 缓冲区使用情况(字节和百分比) - 吞吐量(每秒事件数) - 刷新计数和错误计数 ## 日志记录 结构化日志格式: ``` TIMESTAMP | LEVEL | COMPONENT | MESSAGE | CONTEXT ``` 示例: ``` 2026-02-03T10:30:00Z | INFO | startup | configuration loaded | kafka_topic=zeek-raw 2026-02-03T10:30:30Z | INFO | health | memory=612MB buffer=38% eps=8200 ```
标签:AMSI绕过, CONNECTION, DNS, EVTX分析, Go, Golang, Gradle集成, ID昭, Kafka, Parquet, Rootkit, Ruby工具, SonarQube插件, Zeek, 大数据, 威胁检测, 安全编程, 开源, 异步处理, 数据丰富, 数据摄取, 数据湖, 数据管道, 数据规范化, 日志处理, 日志审计, 目录扫描, 网络安全, 网络遥测, 软件工程, 软件成分分析, 隐私保护