iBubenok/sigmawatch
GitHub: iBubenok/sigmawatch
基于 Sigma 规则的 Kafka 事件流实时威胁检测服务,为 CEF 格式安全事件提供流式检测能力。
Stars: 0 | Forks: 0
# SigmaWatch
[](https://github.com/iBubenok/sigmawatch/actions/workflows/ci.yml)
[](https://github.com/iBubenok/sigmawatch/releases/latest)
[](LICENSE)
[](https://github.com/iBubenok/sigmawatch/releases/latest)
由 [Sigma rules](https://sigmahq.io/) 驱动的实时威胁检测服务,专为 Apache Kafka 事件流设计。
SigmaWatch 是 [Ambrella](https://umbrella-sis.ru/) **U-SIEM** SOC 平台的组件之一。它从 Kafka 输入 topic 消费 CEF JSON 事件,根据加载的 Sigma 检测规则分析每个事件,并将结构化事件发布到输出 Kafka topic。
## 功能特性
- **流式 Sigma 检测** -- 实时将 Sigma 规则应用于 CEF JSON 事件
- **Apache Kafka 集成** -- 从输入 topic 消费,向输出 topic 生成事件
- **CEF 字段映射** -- 可配置的 CEF 键到 Sigma 字段名称的重映射(含 37 个默认映射)
- **事件富化** -- 输出包含 `sigma_*` 元数据、源事件和 Kafka 坐标的 CEF JSON 事件
- **JSON 和 YAML 配置** -- 根据文件扩展名自动检测格式
- **热重载** -- 无需重启服务即可通过 `SIGHUP` 重新加载 Sigma 规则(Unix)
- **结构化日志** -- 输出到 stderr 的 JSON 或文本格式,具有可配置的日志级别
- **指标** -- 周期性输出 JSON 计数器到 stdout(已消费/已生产消息、检测、错误)
- **健康监控** -- 基于文件的健康标记,用于存活探针(systemd, Kubernetes)
- **优雅停机** -- 在 `SIGTERM`/`SIGINT` 信号下排空传输中的消息,具有可配置超时
- **带退避的重试** -- 针对生产失败指数退避(可配置重试次数、延迟)
- **背压机制** -- 当生产者输出队列已满时暂停消费者
- **跨平台** -- 可在 Linux、macOS 和 Windows 上构建和运行
## 快速开始
### 前置条件
- C++17 编译器 (GCC 10+, Clang 12+, MSVC 2019+)
- CMake 3.16+
- Apache Kafka broker(运行时)
### 构建
```
# 克隆仓库
git clone https://github.com/iBubenok/sigmawatch.git
cd sigmawatch
# 配置并构建
cmake -S cpp -B build -DCMAKE_BUILD_TYPE=Release
cmake --build build -j
# 运行测试
cd build && ctest --output-on-failure
```
如果系统中未找到 librdkafka,它将通过 CMake FetchContent 自动获取。若要使用系统安装的版本:
```
# Linux
sudo apt install librdkafka-dev
# macOS
brew install librdkafka
```
### 运行
1. 准备配置文件(参见下方 [配置](#configuration) 部分)。
2. 确保 Sigma 规则文件在 `sigma.rules_path` 指定的目录中可用。
3. 启动服务:
```
./build/sigmawatch --config config.yaml
```
### 命令行界面
```
Usage: sigmawatch --config [options]
sigmawatch --version
sigmawatch --help
Options:
-c, --config Path to configuration file (JSON/YAML)
-v, --version Print version and exit
-h, --help Print this help and exit
```
## 工作原理
```
Kafka input topic (CEF JSON)
|
v
JSON Parser (RapidJSON)
|
v
CEF Field Mapper (src -> SourceIP, dst -> DestinationIP, ...)
|
v
Sigma Engine (tau::solve)
|
+-- no match --> commit offset, next message
|
+-- match ----> for each triggered rule:
|
v
Incident Formatter (CEF JSON with sigma_* fields)
|
v
Kafka output topic
|
commit offset
```
每个输入事件都是包含 CEF 键的扁平 JSON 对象。SigmaWatch 将 CEF 键重新映射为 Sigma 字段名称,评估所有加载的 Sigma 规则,并为每条触发的规则生成一个事件。原始事件保存在每个事件的 `sourceEvent` 字段中。
## 配置
SigmaWatch 从 JSON 或 YAML 文件读取配置。格式通过扩展名(`.json`, `.yaml`, `.yml`)自动检测。有关每个参数的详细参考,请参阅 [配置参考](docs/CONFIGURATION.md)。
### 最简配置 (YAML)
```
kafka:
bootstrap_servers: localhost:9092
consumer:
group_id: sigmawatch-group
topics:
- security-events
producer:
topic: sigma-incidents
sigma:
rules_path: /etc/sigmawatch/rules
```
### 完整配置参考
```
kafka:
bootstrap_servers: localhost:9092 # Kafka broker address
reconnect_backoff_ms: 100 # Initial reconnect delay (ms)
reconnect_backoff_max_ms: 10000 # Max reconnect delay (ms)
consumer:
group_id: sigmawatch-group # Consumer group ID
topics: # Input topics to consume from
- security-events
auto_offset_reset: earliest # earliest | latest | none
enable_auto_commit: false # Manual commit after produce (at-least-once)
session_timeout_ms: 30000 # Session timeout
max_poll_interval_ms: 300000 # Max poll interval
producer:
topic: sigma-incidents # Output topic for incidents
acks: all # all | -1 | 1 | 0
retries: 5 # Max retry attempts on produce failure
retry_backoff_ms: 1000 # Initial retry delay (ms)
max_retry_backoff_ms: 30000 # Max retry delay (ms)
security:
protocol: PLAINTEXT # PLAINTEXT | SASL_PLAINTEXT | SSL | SASL_SSL
# sasl_mechanism: PLAIN
# sasl_username: user
# sasl_password: ***
# ssl_ca_location: /path/to/ca.pem
sigma:
rules_path: /etc/sigmawatch/rules # Directory with Sigma .yml/.yaml rules
reload_signal: SIGHUP # Signal to trigger hot-reload (Unix only)
filter:
levels: # Only load rules with these levels
- critical
- high
- medium
statuses: # Only load rules with these statuses
- stable
- test
field_mapping: # Override/extend default CEF -> Sigma mapping
src: SourceIP
dst: DestinationIP
# custom_field: SigmaFieldName
service:
batch_size: 100 # Messages per poll batch
batch_timeout_ms: 1000 # Poll timeout (ms)
output_buffer_size: 10000 # Max producer queue size (back-pressure threshold)
graceful_shutdown_timeout_ms: 30000 # Drain timeout on shutdown (ms)
health_file: /var/run/sigmawatch.ready # Health marker file path
logging:
level: info # trace | debug | info | warn | error
format: json # json | text
output: stderr # stderr | stdout
metrics:
enabled: true # Enable periodic metrics output
interval_seconds: 60 # Metrics reporting interval
output: stdout # stdout | stderr
```
### CEF 字段映射
SigmaWatch 内置 37 个默认 CEF 到 Sigma 字段映射。未映射的字段将原样传递,允许 Sigma 规则引用非标准 CEF 扩展。
| CEF Key | Sigma Field | Category |
|---------|-------------|----------|
| `src` | `SourceIP` | Network |
| `dst` | `DestinationIP` | Network |
| `spt` | `SourcePort` | Network |
| `dpt` | `DestinationPort` | Network |
| `proto` | `Protocol` | Network |
| `app` | `ApplicationProtocol` | Network |
| `in` | `ReceivedBytes` | Network |
| `out` | `SentBytes` | Network |
| `suser` | `SourceUserName` | Users |
| `duser` | `DestinationUserName` | Users |
| `shost` | `SourceHostName` | Hosts |
| `dhost` | `DestinationHostName` | Hosts |
| `sproc` | `SourceProcessName` | Processes |
| `dproc` | `DestinationProcessName` | Processes |
| `fname` | `FileName` | Files |
| `filePath` | `FilePath` | Files |
| `act` | `DeviceAction` | Event |
| `msg` | `Message` | Event |
| `cat` | `DeviceEventCategory` | Event |
| `rt` | `ReceiptTime` | Timestamp |
可以通过配置文件中的 `sigma.field_mapping` 添加或覆盖自定义映射。有关完整的 37 条映射表、处理示例、供应商特定扩展和最佳实践,请参阅 [CEF 字段映射参考](docs/CEF_FIELD_MAPPING.md)。另请参阅 [配置参考](docs/CONFIGURATION.md#default-cef-field-mapping) 和 [技术规范](docs/TECHNICAL_SPECIFICATION.md)。
### 事件输出格式
当 Sigma 规则匹配时,SigmaWatch 会生成一个 CEF JSON 事件:
```
{
"deviceVendor": "Ambrella",
"deviceProduct": "SigmaWatch",
"deviceVersion": "0.1.0",
"deviceEventClassID": "sigma:",
"name": "Outbound Connection to C2 Server",
"severity": 8,
"rt": "2026-02-22T10:15:30.245Z",
"msg": "Sigma detection: Outbound Connection to C2 Server [high]",
"sigma_rule_id": "b8a4b143-3f23-4a18-b4c5-2d5e1a4d2fb0",
"sigma_rule_title": "Outbound Connection to C2 Server",
"sigma_level": "high",
"sigma_status": "stable",
"sigma_tags": "attack.command_and_control,attack.t1071",
"sigma_references": "https://attack.mitre.org/techniques/T1071/",
"sigma_matched_fields": "DestinationIP,DestinationPort,Protocol",
"sigma_source_topic": "security-events",
"sigma_source_partition": 2,
"sigma_source_offset": 987654,
"sigma_service_host": "detector-01",
"sourceEvent": { ... }
}
```
**严重性映射(Sigma level -> CEF severity):**
| Sigma Level | CEF Severity |
|-------------|-------------|
| informational | 2 |
| low | 3 |
| medium | 5 |
| high | 8 |
| critical | 10 |
## 信号
| Signal | Action |
|--------|--------|
| `SIGTERM` / `SIGINT` | 优雅停机(排空传输中的消息) |
| `SIGHUP` | 热重载 Sigma 规则无需重启(仅限 Unix) |
## 架构
```
sigmawatch/
├── include/sigmawatch/ # Public headers
│ ├── cli.hpp # CLI argument parsing
│ ├── config.hpp # Configuration (JSON/YAML)
│ ├── consumer.hpp # Kafka consumer
│ ├── detector.hpp # Sigma rule loading and detection
│ ├── field_mapper.hpp # CEF -> Sigma field mapping
│ ├── health.hpp # Health monitoring
│ ├── incident.hpp # Incident formatting
│ ├── json_parser.hpp # JSON parsing (RapidJSON -> Value)
│ ├── json_serializer.hpp # JSON serialization (Value -> string)
│ ├── logger.hpp # Structured logging
│ ├── metrics.hpp # Metrics collection
│ ├── processor.hpp # Message processing pipeline
│ ├── producer.hpp # Kafka producer
│ ├── service.hpp # Service controller (main loop)
│ ├── signal_handler.hpp # Signal handling
│ └── version.hpp # Version constants
├── src/sigmawatch/ # Implementation
│ ├── main.cpp # Entry point
│ └── *.cpp
└── tests/sigmawatch/ # Tests (GoogleTest)
├── test_config.cpp
├── test_field_mapper.cpp
├── test_incident.cpp
├── test_detector.cpp
├── test_processor.cpp
├── test_logger.cpp
├── test_coverage.cpp
├── test_integration.cpp
├── test_fixture_data.cpp
├── test_benchmark.cpp
└── fixtures/ # Test data (CEF events, Sigma rules, configs)
```
## 技术栈
| Component | Technology | Version |
|-----------|-----------|---------|
| Language | C++17 | -- |
| Kafka client | librdkafka | 2.6.1 |
| JSON parsing | RapidJSON | 1.x |
| YAML/config | yaml-cpp | 0.8.0 |
| Sigma engine | Chainsaw tau | -- |
| Testing | GoogleTest | 1.15.2 |
## 文档
- [配置参考](docs/CONFIGURATION.md) -- 所有参数、默认值、验证规则、示例
- [CEF 字段映射参考](docs/CEF_FIELD_MAPPING.md) -- 映射表、处理示例、供应商扩展、最佳实践
- [部署指南](docs/DEPLOYMENT.md) -- Docker, systemd, Kubernetes 部署说明
- [技术规范](docs/TECHNICAL_SPECIFICATION.md) -- 需求、架构、数据格式
- [更新日志](CHANGELOG.md) -- 发布历史
- [安全策略](SECURITY.md) -- 漏洞报告
## 作者
**Yan Bubenok**
- Email: yan@bubenok.com
- GitHub: [@iBubenok](https://github.com/iBubenok)
## 许可证
专有软件。
Copyright (c) 2026 Ambrella LLC. All rights reserved.
详情请参阅 [LICENSE](LICENSE)。
标签:AMSI绕过, ApacheKafka, CEF, Kafka消费者, MacOS, PB级数据处理, Sigma规则, SOC平台, U-SIEM, 事件流处理, 云计算, 威胁检测, 子域名暴力破解, 安全数据管道, 安全运维, 流式计算, 目标导入, 网络安全, 自动化检测, 规则引擎, 软件成分分析, 隐私保护