socprime/detectflow-matchnode

GitHub: socprime/detectflow-matchnode

基于 Apache Flink 和 Sigma 规则构建的实时威胁检测匹配节点,从 Kafka 消费事件流并输出带 MITRE ATT&CK 标签的检测结果。

Stars: 7 | Forks: 1

# DetectFlow 匹配节点 [![Version](https://img.shields.io/badge/version-0.9.2-blue.svg)](VERSION) [![Python 3.10](https://img.shields.io/badge/python-3.10-blue.svg)](https://www.python.org/downloads/release/python-3120/) [![FastAPI](https://img.shields.io/badge/FastAPI-0.121.3-green.svg)](https://fastapi.tiangolo.com/) 使用 Apache Flink 和 Sigma 规则进行实时威胁检测。消费来自 Kafka 的事件流,应用 Sigma 规则,并使用 MITRE ATT&CK 技术和规则元数据标记匹配的事件。 本项目是 SOC Prime DetectFlow OSS 的一个组件。有关更多详细信息和说明,请参阅其 [README](https://github.com/socprime/detectflow-main)。 ## 功能特性 - **流处理** — 通过 Apache Flink(Python DataStream API)实时处理事件。 - **Sigma 规则** — 支持 Sigma 规则。 - **Kafka** — 输入事件 topic、规则 topic(broadcast)、输出检测 topic;可选的按规则 metrics topic;SASL/SSL 认证。 - **Checkpointing** — 状态持久化和故障后的恢复。 ## 系统架构 - **JobManager** — 管理作业、checkpoint 和 REST API。 - **TaskManager** — 运行事件匹配作业。 ## Pipeline(数据流) - **事件**:一个或多个 topic 被联合,然后按键(hash/computer/round_robin)进行分发,以分布到各个并行实例中。 - **规则**:从 `earliest` 读取一个 topic,反序列化并广播到所有 Sigma Matcher 分区。 - **Sigma Matcher**:在窗口中(按键)缓冲事件,在计时器触发时加载规则,解析 Sigma,应用字段映射和匹配逻辑。 ## 项目结构 ``` ├── app/ │ ├── main.py # Entry point, CLI, job invocation │ ├── config/ # Settings, logging │ ├── connectors/ # Kafka source/sink │ ├── domain/ # Rules, filters, logsources, sigma_matcher │ ├── jobs/ # sigma_detection — Flink pipeline build │ └── operators/ # sigma_broadcast — KeyedBroadcastProcessFunction ├── lib/ # Flink Kafka connector JAR ├── docker-compose.yml # JobManager + TaskManager(s) ├── Dockerfile ├── pyproject.toml └── .env.example ``` ## 环境要求 - Python 3.10+ - Apache Flink 2.2(包含在容器中) - Kafka(输入 topics、规则 topic、输出 topic) - 依赖项:见 `pyproject.toml`(apache-flink, confluent-kafka, polars, PyYAML, orjson, pydantic, structlog, schema-parser 等) ## 配置说明 将 `.env.example` 复制到 `.env` 并填写: - **Kafka**: `KAFKA_BOOTSTRAP_SERVERS`、认证(SASL/SSL)、`KAFKA_INPUT_TOPICS`、`KAFKA_OUTPUT_TOPIC`、`KAFKA_RULES_TOPIC`,可选 `KAFKA_METRICS_TOPIC`。 - **Job**: `JOB_ID`(多租户)、`OUTPUT_MODE`(matched_only / all_events)、`APPLY_PARSER_TO_OUTPUT_EVENTS`。 - **Flink**: `FLINK_PARALLELISM`、checkpoint(间隔、超时)、`STATE_BACKEND`(推荐 rocksdb)、`CHECKPOINT_PATH`。 - **Keying**: `KEYING_STRATEGY`(hash/computer/round_robin)、`KEY_GROUPS_PER_TASK`,可选 `MAX_PARALLELISM` 用于自动伸缩。 - **RocksDB**: 缓冲区大小、块缓存、压缩(当状态后端为 rocksdb 时)。 - **Watermarks**: `ENABLE_WATERMARKS`、乱序度和空闲超时(可选)。 - **Logging**: `LOG_LEVEL`、`LOG_FORMAT`(json/console)。 详情请参阅 `.env.example` 中的注释。 ## 运行 **Docker Compose**(JobManager + TaskManager): ``` cp .env.example .env ``` ### Docker ``` docker build -t detectflow-matchnode . docker run -p 8000:8000 --env-file .env detectflow-matchnode ``` 作业通过 `standalone-job.sh` 提交,参数为 `--pyModule app.main`。Web UI: http://localhost:8081。 **本地**(单进程,用于开发): ``` uv sync # 在 .env 中设置 Kafka 和 topics python -m app.main --job-id local ``` ## 多租户 每个作业由 `JOB_ID` 标识。来自 `KAFKA_RULES_TOPIC` 的规则通过匹配 `job_id` 的标签/属性进行过滤,因此一个集群可以服务多个租户或场景。输入/输出 topics 可以通过 CLI(`--input-topics`、`--output-topic`)或环境变量设置。
标签:AMSI绕过, Apache Flink, AV绕过, Cloudflare, DetectFlow, EDR, FastAPI, Kafka, MITRE ATT&CK, Python, Sigma 规则, SOC Prime, SonarQube插件, 代码规范检查, 便携式工具, 域名分析, 威胁情报, 威胁检测, 安全编排与自动化, 安全运营中心, 开发工具, 开发者工具, 异常检测, 恶意代码分类, 数据流, 无后门, 流处理, 流式计算, 网络安全, 网络映射, 脆弱性评估, 请求拦截, 逆向工具, 隐私保护