iBubenok/sigmawatch

GitHub: iBubenok/sigmawatch

基于 Sigma 规则的 Kafka 事件流实时威胁检测服务,为 CEF 格式安全事件提供流式检测能力。

Stars: 0 | Forks: 0

# SigmaWatch [![CI](https://static.pigsec.cn/wp-content/uploads/repos/2026/03/d985d23d93192919.svg)](https://github.com/iBubenok/sigmawatch/actions/workflows/ci.yml) [![Release](https://img.shields.io/github/v/release/iBubenok/sigmawatch)](https://github.com/iBubenok/sigmawatch/releases/latest) [![License](https://img.shields.io/badge/license-proprietary-blue)](LICENSE) [![Platform](https://img.shields.io/badge/platform-linux%20%7C%20macos%20%7C%20windows-lightgrey)](https://github.com/iBubenok/sigmawatch/releases/latest) 由 [Sigma rules](https://sigmahq.io/) 驱动的实时威胁检测服务,专为 Apache Kafka 事件流设计。 SigmaWatch 是 [Ambrella](https://umbrella-sis.ru/) **U-SIEM** SOC 平台的组件之一。它从 Kafka 输入 topic 消费 CEF JSON 事件,根据加载的 Sigma 检测规则分析每个事件,并将结构化事件发布到输出 Kafka topic。 ## 功能特性 - **流式 Sigma 检测** -- 实时将 Sigma 规则应用于 CEF JSON 事件 - **Apache Kafka 集成** -- 从输入 topic 消费,向输出 topic 生成事件 - **CEF 字段映射** -- 可配置的 CEF 键到 Sigma 字段名称的重映射(含 37 个默认映射) - **事件富化** -- 输出包含 `sigma_*` 元数据、源事件和 Kafka 坐标的 CEF JSON 事件 - **JSON 和 YAML 配置** -- 根据文件扩展名自动检测格式 - **热重载** -- 无需重启服务即可通过 `SIGHUP` 重新加载 Sigma 规则(Unix) - **结构化日志** -- 输出到 stderr 的 JSON 或文本格式,具有可配置的日志级别 - **指标** -- 周期性输出 JSON 计数器到 stdout(已消费/已生产消息、检测、错误) - **健康监控** -- 基于文件的健康标记,用于存活探针(systemd, Kubernetes) - **优雅停机** -- 在 `SIGTERM`/`SIGINT` 信号下排空传输中的消息,具有可配置超时 - **带退避的重试** -- 针对生产失败指数退避(可配置重试次数、延迟) - **背压机制** -- 当生产者输出队列已满时暂停消费者 - **跨平台** -- 可在 Linux、macOS 和 Windows 上构建和运行 ## 快速开始 ### 前置条件 - C++17 编译器 (GCC 10+, Clang 12+, MSVC 2019+) - CMake 3.16+ - Apache Kafka broker(运行时) ### 构建 ``` # 克隆仓库 git clone https://github.com/iBubenok/sigmawatch.git cd sigmawatch # 配置并构建 cmake -S cpp -B build -DCMAKE_BUILD_TYPE=Release cmake --build build -j # 运行测试 cd build && ctest --output-on-failure ``` 如果系统中未找到 librdkafka,它将通过 CMake FetchContent 自动获取。若要使用系统安装的版本: ``` # Linux sudo apt install librdkafka-dev # macOS brew install librdkafka ``` ### 运行 1. 准备配置文件(参见下方 [配置](#configuration) 部分)。 2. 确保 Sigma 规则文件在 `sigma.rules_path` 指定的目录中可用。 3. 启动服务: ``` ./build/sigmawatch --config config.yaml ``` ### 命令行界面 ``` Usage: sigmawatch --config [options] sigmawatch --version sigmawatch --help Options: -c, --config Path to configuration file (JSON/YAML) -v, --version Print version and exit -h, --help Print this help and exit ``` ## 工作原理 ``` Kafka input topic (CEF JSON) | v JSON Parser (RapidJSON) | v CEF Field Mapper (src -> SourceIP, dst -> DestinationIP, ...) | v Sigma Engine (tau::solve) | +-- no match --> commit offset, next message | +-- match ----> for each triggered rule: | v Incident Formatter (CEF JSON with sigma_* fields) | v Kafka output topic | commit offset ``` 每个输入事件都是包含 CEF 键的扁平 JSON 对象。SigmaWatch 将 CEF 键重新映射为 Sigma 字段名称,评估所有加载的 Sigma 规则,并为每条触发的规则生成一个事件。原始事件保存在每个事件的 `sourceEvent` 字段中。 ## 配置 SigmaWatch 从 JSON 或 YAML 文件读取配置。格式通过扩展名(`.json`, `.yaml`, `.yml`)自动检测。有关每个参数的详细参考,请参阅 [配置参考](docs/CONFIGURATION.md)。 ### 最简配置 (YAML) ``` kafka: bootstrap_servers: localhost:9092 consumer: group_id: sigmawatch-group topics: - security-events producer: topic: sigma-incidents sigma: rules_path: /etc/sigmawatch/rules ``` ### 完整配置参考 ``` kafka: bootstrap_servers: localhost:9092 # Kafka broker address reconnect_backoff_ms: 100 # Initial reconnect delay (ms) reconnect_backoff_max_ms: 10000 # Max reconnect delay (ms) consumer: group_id: sigmawatch-group # Consumer group ID topics: # Input topics to consume from - security-events auto_offset_reset: earliest # earliest | latest | none enable_auto_commit: false # Manual commit after produce (at-least-once) session_timeout_ms: 30000 # Session timeout max_poll_interval_ms: 300000 # Max poll interval producer: topic: sigma-incidents # Output topic for incidents acks: all # all | -1 | 1 | 0 retries: 5 # Max retry attempts on produce failure retry_backoff_ms: 1000 # Initial retry delay (ms) max_retry_backoff_ms: 30000 # Max retry delay (ms) security: protocol: PLAINTEXT # PLAINTEXT | SASL_PLAINTEXT | SSL | SASL_SSL # sasl_mechanism: PLAIN # sasl_username: user # sasl_password: *** # ssl_ca_location: /path/to/ca.pem sigma: rules_path: /etc/sigmawatch/rules # Directory with Sigma .yml/.yaml rules reload_signal: SIGHUP # Signal to trigger hot-reload (Unix only) filter: levels: # Only load rules with these levels - critical - high - medium statuses: # Only load rules with these statuses - stable - test field_mapping: # Override/extend default CEF -> Sigma mapping src: SourceIP dst: DestinationIP # custom_field: SigmaFieldName service: batch_size: 100 # Messages per poll batch batch_timeout_ms: 1000 # Poll timeout (ms) output_buffer_size: 10000 # Max producer queue size (back-pressure threshold) graceful_shutdown_timeout_ms: 30000 # Drain timeout on shutdown (ms) health_file: /var/run/sigmawatch.ready # Health marker file path logging: level: info # trace | debug | info | warn | error format: json # json | text output: stderr # stderr | stdout metrics: enabled: true # Enable periodic metrics output interval_seconds: 60 # Metrics reporting interval output: stdout # stdout | stderr ``` ### CEF 字段映射 SigmaWatch 内置 37 个默认 CEF 到 Sigma 字段映射。未映射的字段将原样传递,允许 Sigma 规则引用非标准 CEF 扩展。 | CEF Key | Sigma Field | Category | |---------|-------------|----------| | `src` | `SourceIP` | Network | | `dst` | `DestinationIP` | Network | | `spt` | `SourcePort` | Network | | `dpt` | `DestinationPort` | Network | | `proto` | `Protocol` | Network | | `app` | `ApplicationProtocol` | Network | | `in` | `ReceivedBytes` | Network | | `out` | `SentBytes` | Network | | `suser` | `SourceUserName` | Users | | `duser` | `DestinationUserName` | Users | | `shost` | `SourceHostName` | Hosts | | `dhost` | `DestinationHostName` | Hosts | | `sproc` | `SourceProcessName` | Processes | | `dproc` | `DestinationProcessName` | Processes | | `fname` | `FileName` | Files | | `filePath` | `FilePath` | Files | | `act` | `DeviceAction` | Event | | `msg` | `Message` | Event | | `cat` | `DeviceEventCategory` | Event | | `rt` | `ReceiptTime` | Timestamp | 可以通过配置文件中的 `sigma.field_mapping` 添加或覆盖自定义映射。有关完整的 37 条映射表、处理示例、供应商特定扩展和最佳实践,请参阅 [CEF 字段映射参考](docs/CEF_FIELD_MAPPING.md)。另请参阅 [配置参考](docs/CONFIGURATION.md#default-cef-field-mapping) 和 [技术规范](docs/TECHNICAL_SPECIFICATION.md)。 ### 事件输出格式 当 Sigma 规则匹配时,SigmaWatch 会生成一个 CEF JSON 事件: ``` { "deviceVendor": "Ambrella", "deviceProduct": "SigmaWatch", "deviceVersion": "0.1.0", "deviceEventClassID": "sigma:", "name": "Outbound Connection to C2 Server", "severity": 8, "rt": "2026-02-22T10:15:30.245Z", "msg": "Sigma detection: Outbound Connection to C2 Server [high]", "sigma_rule_id": "b8a4b143-3f23-4a18-b4c5-2d5e1a4d2fb0", "sigma_rule_title": "Outbound Connection to C2 Server", "sigma_level": "high", "sigma_status": "stable", "sigma_tags": "attack.command_and_control,attack.t1071", "sigma_references": "https://attack.mitre.org/techniques/T1071/", "sigma_matched_fields": "DestinationIP,DestinationPort,Protocol", "sigma_source_topic": "security-events", "sigma_source_partition": 2, "sigma_source_offset": 987654, "sigma_service_host": "detector-01", "sourceEvent": { ... } } ``` **严重性映射(Sigma level -> CEF severity):** | Sigma Level | CEF Severity | |-------------|-------------| | informational | 2 | | low | 3 | | medium | 5 | | high | 8 | | critical | 10 | ## 信号 | Signal | Action | |--------|--------| | `SIGTERM` / `SIGINT` | 优雅停机(排空传输中的消息) | | `SIGHUP` | 热重载 Sigma 规则无需重启(仅限 Unix) | ## 架构 ``` sigmawatch/ ├── include/sigmawatch/ # Public headers │ ├── cli.hpp # CLI argument parsing │ ├── config.hpp # Configuration (JSON/YAML) │ ├── consumer.hpp # Kafka consumer │ ├── detector.hpp # Sigma rule loading and detection │ ├── field_mapper.hpp # CEF -> Sigma field mapping │ ├── health.hpp # Health monitoring │ ├── incident.hpp # Incident formatting │ ├── json_parser.hpp # JSON parsing (RapidJSON -> Value) │ ├── json_serializer.hpp # JSON serialization (Value -> string) │ ├── logger.hpp # Structured logging │ ├── metrics.hpp # Metrics collection │ ├── processor.hpp # Message processing pipeline │ ├── producer.hpp # Kafka producer │ ├── service.hpp # Service controller (main loop) │ ├── signal_handler.hpp # Signal handling │ └── version.hpp # Version constants ├── src/sigmawatch/ # Implementation │ ├── main.cpp # Entry point │ └── *.cpp └── tests/sigmawatch/ # Tests (GoogleTest) ├── test_config.cpp ├── test_field_mapper.cpp ├── test_incident.cpp ├── test_detector.cpp ├── test_processor.cpp ├── test_logger.cpp ├── test_coverage.cpp ├── test_integration.cpp ├── test_fixture_data.cpp ├── test_benchmark.cpp └── fixtures/ # Test data (CEF events, Sigma rules, configs) ``` ## 技术栈 | Component | Technology | Version | |-----------|-----------|---------| | Language | C++17 | -- | | Kafka client | librdkafka | 2.6.1 | | JSON parsing | RapidJSON | 1.x | | YAML/config | yaml-cpp | 0.8.0 | | Sigma engine | Chainsaw tau | -- | | Testing | GoogleTest | 1.15.2 | ## 文档 - [配置参考](docs/CONFIGURATION.md) -- 所有参数、默认值、验证规则、示例 - [CEF 字段映射参考](docs/CEF_FIELD_MAPPING.md) -- 映射表、处理示例、供应商扩展、最佳实践 - [部署指南](docs/DEPLOYMENT.md) -- Docker, systemd, Kubernetes 部署说明 - [技术规范](docs/TECHNICAL_SPECIFICATION.md) -- 需求、架构、数据格式 - [更新日志](CHANGELOG.md) -- 发布历史 - [安全策略](SECURITY.md) -- 漏洞报告 ## 作者 **Yan Bubenok** - Email: yan@bubenok.com - GitHub: [@iBubenok](https://github.com/iBubenok) ## 许可证 专有软件。 Copyright (c) 2026 Ambrella LLC. All rights reserved. 详情请参阅 [LICENSE](LICENSE)。
标签:AMSI绕过, ApacheKafka, CEF, Kafka消费者, MacOS, PB级数据处理, Sigma规则, SOC平台, U-SIEM, 事件流处理, 云计算, 威胁检测, 子域名暴力破解, 安全数据管道, 安全运维, 流式计算, 目标导入, 网络安全, 自动化检测, 规则引擎, 软件成分分析, 隐私保护