socprime/detectflow-matchnode
GitHub: socprime/detectflow-matchnode
基于 Apache Flink 和 Sigma 规则构建的实时威胁检测匹配节点,从 Kafka 消费事件流并输出带 MITRE ATT&CK 标签的检测结果。
Stars: 7 | Forks: 1
# DetectFlow 匹配节点
[](VERSION)
[](https://www.python.org/downloads/release/python-3120/)
[](https://fastapi.tiangolo.com/)
使用 Apache Flink 和 Sigma 规则进行实时威胁检测。消费来自 Kafka 的事件流,应用 Sigma 规则,并使用 MITRE ATT&CK 技术和规则元数据标记匹配的事件。
本项目是 SOC Prime DetectFlow OSS 的一个组件。有关更多详细信息和说明,请参阅其 [README](https://github.com/socprime/detectflow-main)。
## 功能特性
- **流处理** — 通过 Apache Flink(Python DataStream API)实时处理事件。
- **Sigma 规则** — 支持 Sigma 规则。
- **Kafka** — 输入事件 topic、规则 topic(broadcast)、输出检测 topic;可选的按规则 metrics topic;SASL/SSL 认证。
- **Checkpointing** — 状态持久化和故障后的恢复。
## 系统架构
- **JobManager** — 管理作业、checkpoint 和 REST API。
- **TaskManager** — 运行事件匹配作业。
## Pipeline(数据流)
- **事件**:一个或多个 topic 被联合,然后按键(hash/computer/round_robin)进行分发,以分布到各个并行实例中。
- **规则**:从 `earliest` 读取一个 topic,反序列化并广播到所有 Sigma Matcher 分区。
- **Sigma Matcher**:在窗口中(按键)缓冲事件,在计时器触发时加载规则,解析 Sigma,应用字段映射和匹配逻辑。
## 项目结构
```
├── app/
│ ├── main.py # Entry point, CLI, job invocation
│ ├── config/ # Settings, logging
│ ├── connectors/ # Kafka source/sink
│ ├── domain/ # Rules, filters, logsources, sigma_matcher
│ ├── jobs/ # sigma_detection — Flink pipeline build
│ └── operators/ # sigma_broadcast — KeyedBroadcastProcessFunction
├── lib/ # Flink Kafka connector JAR
├── docker-compose.yml # JobManager + TaskManager(s)
├── Dockerfile
├── pyproject.toml
└── .env.example
```
## 环境要求
- Python 3.10+
- Apache Flink 2.2(包含在容器中)
- Kafka(输入 topics、规则 topic、输出 topic)
- 依赖项:见 `pyproject.toml`(apache-flink, confluent-kafka, polars, PyYAML, orjson, pydantic, structlog, schema-parser 等)
## 配置说明
将 `.env.example` 复制到 `.env` 并填写:
- **Kafka**: `KAFKA_BOOTSTRAP_SERVERS`、认证(SASL/SSL)、`KAFKA_INPUT_TOPICS`、`KAFKA_OUTPUT_TOPIC`、`KAFKA_RULES_TOPIC`,可选 `KAFKA_METRICS_TOPIC`。
- **Job**: `JOB_ID`(多租户)、`OUTPUT_MODE`(matched_only / all_events)、`APPLY_PARSER_TO_OUTPUT_EVENTS`。
- **Flink**: `FLINK_PARALLELISM`、checkpoint(间隔、超时)、`STATE_BACKEND`(推荐 rocksdb)、`CHECKPOINT_PATH`。
- **Keying**: `KEYING_STRATEGY`(hash/computer/round_robin)、`KEY_GROUPS_PER_TASK`,可选 `MAX_PARALLELISM` 用于自动伸缩。
- **RocksDB**: 缓冲区大小、块缓存、压缩(当状态后端为 rocksdb 时)。
- **Watermarks**: `ENABLE_WATERMARKS`、乱序度和空闲超时(可选)。
- **Logging**: `LOG_LEVEL`、`LOG_FORMAT`(json/console)。
详情请参阅 `.env.example` 中的注释。
## 运行
**Docker Compose**(JobManager + TaskManager):
```
cp .env.example .env
```
### Docker
```
docker build -t detectflow-matchnode .
docker run -p 8000:8000 --env-file .env detectflow-matchnode
```
作业通过 `standalone-job.sh` 提交,参数为 `--pyModule app.main`。Web UI: http://localhost:8081。
**本地**(单进程,用于开发):
```
uv sync
# 在 .env 中设置 Kafka 和 topics
python -m app.main --job-id local
```
## 多租户
每个作业由 `JOB_ID` 标识。来自 `KAFKA_RULES_TOPIC` 的规则通过匹配 `job_id` 的标签/属性进行过滤,因此一个集群可以服务多个租户或场景。输入/输出 topics 可以通过 CLI(`--input-topics`、`--output-topic`)或环境变量设置。
标签:AMSI绕过, Apache Flink, AV绕过, Cloudflare, DetectFlow, EDR, FastAPI, Kafka, MITRE ATT&CK, Python, Sigma 规则, SOC Prime, SonarQube插件, 代码规范检查, 便携式工具, 域名分析, 威胁情报, 威胁检测, 安全编排与自动化, 安全运营中心, 开发工具, 开发者工具, 异常检测, 恶意代码分类, 数据流, 无后门, 流处理, 流式计算, 网络安全, 网络映射, 脆弱性评估, 请求拦截, 逆向工具, 隐私保护