flowtracex/zeek-kafka-parquet
GitHub: flowtracex/zeek-kafka-parquet
一个用 Go 编写的高吞吐量数据管道,用于从 Kafka 消费 Zeek 网络日志,进行标准化和富化处理后输出到 Parquet 文件或 Kafka 主题。
Stars: 2 | Forks: 1
# Kafka → Parquet 管道
一个生产级管道,用于从 Kafka 消费 Zeek 网络日志,对其进行标准化和富化,并输出到 Parquet 文件和/或 Kafka 主题。
## 功能特性
- **多日志类型支持**:处理 18+ 种 Zeek 日志类型(DNS、CONN、HTTP、SSL、SSH、FTP、SMTP 等)
- **双重输出**:写入 Parquet 文件和/或 Kafka 主题
- **三层的数据模型**:原始字段、标准化字段和富化字段
- **可配置刷新**:基于大小、时间和事件计数的刷新条件
- **异步处理**:非阻塞写入与后台刷新
- **健康监控**:周期性指标(内存、缓冲区使用率、吞吐量)
## 快速开始
### 构建
```
go build -o zeek-parquet-pipeline main.go
```
### 运行
```
./zeek-parquet-pipeline --config config/config.json
```
## 配置
### config.json
```
{
"kafka": {
"brokers": ["localhost:9092"],
"topic": "zeek-raw",
"group_id": "parquet-writer-group",
"max_poll_records": 1000
},
"output": {
"parquet": {
"enabled": true,
"base_path": "./output",
"file_prefix": "events"
},
"kafka": {
"enabled": true,
"brokers": ["localhost:9092"],
"topic": "zeek-normalized",
"compression": "lz4"
}
},
"log": {
"path": "./log"
},
"write": {
"flush_buffer_mb": 1,
"flush_interval_seconds": 60,
"flush_event_count": 1000
}
}
```
**刷新条件**(或逻辑 - 任一条件触发刷新):
- `flush_buffer_mb`:当缓冲区达到此大小时刷新(MB)
- `flush_interval_seconds`:每隔 N 秒刷新一次(0 表示禁用)
- `flush_event_count`:在 N 个事件后刷新(0 表示禁用)
### normalization.json
**核心概念**:定义每种日志类型的三层数据模型。
- **`source`**:Zeek 日志类型标识符(例如 `zeek_dns`、`zeek_conn`)
- **`promote`**:字段提升映射(原始 Zeek 字段 → 规范的标准化字段)
- 示例:`"id.orig_h": "src_ip"` 将 `id.orig_h` 提升为 `src_ip`
- 提升的字段替换原始字段(无重复)
- **`static`**:添加到此日志类型所有事件的静态字段
- 示例:`"event_type": "dns"`、`"event_class": "dns"`
- **`enrich`**:每种日志类型的富化标志
- `"time": true` → 启用时间分解(年、月、日、小时、工作日)
- `"network": true` → 启用网络分析(私有 IP 检测、方向、服务)
```
{
"dns": {
"source": "zeek_dns",
"promote": {
"ts": "event_time",
"uid": "flow_id",
"id.orig_h": "src_ip",
"id.resp_h": "dst_ip",
"proto": "protocol"
},
"static": {
"event_type": "dns",
"event_class": "dns"
},
"enrich": {
"time": true,
"network": true
}
}
}
```
### schema.json
**核心概念**:定义每种 Zeek 日志类型的原始字段结构。
- 由 `generate_schema.go` 用于自动生成 `schema/events.go`
- 定义所有可能的原始字段及其类型(string、int、float、bool 等)
- 不在 `normalization.json` → `promote` 中的字段在 Parquet 中保留为原始字段
- **重要**:修改此文件后,重新生成 schema:
go run generate_schema.go > schema/events.go
## 架构
```
Kafka (zeek-raw)
↓
Consumer
↓
Normalizer (field promotion)
↓
Enricher (time + network enrichment)
↓
Fan-out
├──→ Parquet Writers (per log_type, async flush)
└──→ Kafka Producer (structured JSON output)
```
## 项目结构
```
.
├── main.go # Pipeline entry point
├── zeek-parquet-pipeline # Compiled binary
├── config/ # Configuration files
│ ├── config.json # Main configuration
│ ├── normalization.json # Field promotion rules
│ └── schema.json # Raw field definitions
├── core/ # Core pipeline components
│ ├── kafka.go # Kafka consumer
│ ├── normalize.go # Field normalization
│ ├── enrich.go # Runtime enrichment (time, network)
│ ├── pipeline_flow.go # Event routing by log_type
│ ├── parquet.go # Parquet writer
│ ├── kafka_producer.go # Kafka output producer
│ ├── fanout.go # Output fan-out
│ └── logger.go # Logging system
├── schema/ # Generated Parquet schemas
│ └── events.go # Auto-generated from schema.json
├── test/ # Test scripts
├── log/ # Runtime logs
└── output/ # Parquet output files
```
## 输出格式
### Parquet 输出
按日志类型和时间分区:
```
output/
├── dns/
│ └── year=2026/month=02/day=02/hour=10/
│ └── events_dns_1.parquet
├── conn/
│ └── year=2026/month=02/day=02/hour=10/
│ └── events_conn_1.parquet
└── ...
```
### Kafka 输出
具有三层结构的结构化 JSON 格式:
```
{
"source": "zeek",
"log_type": "dns",
"raw": {
"ts": 1769868799.213927,
"uid": "CfqDt31quyW8AAJct6",
"id.orig_h": "10.128.0.4",
...
},
"normalized": {
"event_time": 1769868799213,
"ingest_time": 1770089817352,
"flow_id": "CfqDt31quyW8AAJct6",
"src_ip": "10.128.0.4",
"dst_ip": "216.239.34.174",
...
},
"enriched": {
"src_ip_is_private": true,
"dst_ip_is_private": false,
"direction": "outbound",
"event_year": 2026,
...
}
}
```
## 数据模型
### 三层架构
1. **原始层**:所有字段完全保留 Zeek 原始格式
2. **标准化层**:字段提升为规范名称(例如 `id.orig_h` → `src_ip`)
3. **富化层**:派生字段(时间组件、网络分析、方向)
## 开发
### 重新生成 Schema
更新 `schema.json` 或 `normalization.json` 后:
```
go run generate_schema.go > schema/events.go
go build -o zeek-parquet-pipeline main.go
```
### 测试
```
go run test/test_parquet_validation.go output
```
## 环境要求
- Go 1.21+
- Kafka broker(已在 Kafka 2.8+ 上测试)
- Kafka 主题中的 Zeek 日志(嵌套 JSON 格式:`{"dns": {...}}` 或 `{"conn": {...}}`)
## 测试环境
**注意**:此管道已在单台机器上进行了测试。分布式部署中的性能特征可能会有所不同。
### 测试机规格
- **操作系统**:Linux (Ubuntu 22.04.1 LTS)
- **内核**:6.8.0-1045-gcp
- **架构**:x86_64
- **CPU**:Intel Xeon @ 2.20GHz
- 4 CPUs (2 核,每核 2 线程)
- **内存**:11GB RAM
- **存储**:97GB 磁盘空间
### 软件版本
- **Go**:1.22.4
- **Zeek**:8.0.5
- **Kafka**:2.8+ (broker 版本)
### 已测试日志类型
该管道已通过以下 Zeek 日志类型的验证:
- DNS, CONN, HTTP, SSL, SSH, FTP, SMTP, DHCP, RDP, SMB, DCE-RPC, Kerberos, NTLM, SIP, SNMP, RADIUS, Tunnel 及其他(共 18+ 种)
### 性能观察
- **吞吐量**:每秒处理数千个事件(测试环境中观察到:18-31 EPS)
- **内存使用**:低内存占用(观察到约 33-37MB 堆内存)
- **Parquet 文件**:成功生成并具有正确的分区和 schema
- **缓冲区刷新**:验证了所有三种刷新条件(大小、时间、事件计数)
## 依赖项
- `github.com/segmentio/kafka-go` - Kafka 客户端
- `github.com/parquet-go/parquet-go` - Parquet 库
## 监控
健康指标每 30 秒记录到 `log/pipeline.log`:
- 内存使用(RSS 和堆)
- 缓冲区使用情况(字节和百分比)
- 吞吐量(每秒事件数)
- 刷新计数和错误计数
## 日志记录
结构化日志格式:
```
TIMESTAMP | LEVEL | COMPONENT | MESSAGE | CONTEXT
```
示例:
```
2026-02-03T10:30:00Z | INFO | startup | configuration loaded | kafka_topic=zeek-raw
2026-02-03T10:30:30Z | INFO | health | memory=612MB buffer=38% eps=8200
```
标签:AMSI绕过, CONNECTION, DNS, EVTX分析, Go, Golang, Gradle集成, ID昭, Kafka, Parquet, Rootkit, Ruby工具, SonarQube插件, Zeek, 大数据, 威胁检测, 安全编程, 开源, 异步处理, 数据丰富, 数据摄取, 数据湖, 数据管道, 数据规范化, 日志处理, 日志审计, 目录扫描, 网络安全, 网络遥测, 软件工程, 软件成分分析, 隐私保护