socprime/detectflow-backend

GitHub: socprime/detectflow-backend

基于 Apache Flink 和 Kafka 的实时检测流水线后端,支持 Sigma 规则热重载和 Kubernetes 原生部署。

Stars: 9 | Forks: 2

# DetectFlow 后端 [![Version](https://img.shields.io/badge/version-0.9.4-blue.svg)](VERSION) [![Python 3.12](https://img.shields.io/badge/python-3.12-blue.svg)](https://www.python.org/downloads/release/python-3120/) [![FastAPI](https://img.shields.io/badge/FastAPI-0.121.3-green.svg)](https://fastapi.tiangolo.com/) 用于管理带有 Sigma 检测规则的实时 Pipeline 的 Backend API,基于 Apache Flink 和 Kafka 构建。 本项目是 SOC Prime DetectFlow OSS 的一个组件。有关更多详细信息和说明,请参阅其 [README](https://github.com/socprime/detectflow-main)。 ## 概述 本服务提供 REST API 用于: - 创建和管理处理来自 Kafka 的事件的 Pipeline - 应用 Sigma 检测规则进行安全事件标记 - 具有 SSE 流传输的实时仪表板 - SOC Prime Platform 集成(需要 API 密钥) - GitHub 公共开源仓库,包括 SigmaHQ、Microsoft、Splunk 和 Elastic,用于规则同步 ## 功能特性 - **Pipeline 管理** - 创建、配置和监控基于 Flink 的 ETL Pipeline - **实时仪表板** - 用于实时指标和 Pipeline 状态的 SSE 流传输 - **检测规则** - Sigma 规则管理 - **日志源配置** - 解析器脚本和字段映射 - **AI 辅助映射** - 使用 AI 提示生成字段映射 - **JWT 认证** - 访问令牌 + 刷新令牌轮换以确保安全 - **数据库迁移** - 使用 Alembic 进行 Schema 版本控制 - **Kubernetes 原生** - 自动管理 FlinkDeployment CRD - **Flink 健康监控** - 自动跟踪 Pipeline 状态 ## 快速开始 ### 前置条件 - Python 3.12+ - PostgreSQL 14+ - Kafka 集群 - 带有 Flink Operator 的 Kubernetes 集群 ### 安装 ``` # 克隆仓库 git clone cd admin-panel-backend # 安装依赖 uv sync # 配置环境 cp .env.example .env # 根据你的设置编辑 .env # 初始化数据库 uv run python init_database.py # 运行数据库 migrations uv run alembic upgrade head # 运行服务器 uv run python server.py ``` ## API 文档 ### 认证 所有端点(`/health` 除外)都需要 JWT Bearer 令牌认证。 ``` # 登录 curl -X POST http://localhost:8000/api/v1/auth/login \ -H "Content-Type: application/json" \ -d '{"email": "admin@example.com", "password": "password"}' # 使用 token curl http://localhost:8000/api/v1/pipeline \ -H "Authorization: Bearer " ``` ## 配置 ### 环境变量 ``` # 数据库 DATABASE_URL=postgresql+asyncpg://user:pass@localhost:5432/etl_admin # (required) PostgreSQL connection string # Kafka KAFKA_BOOTSTRAP_SERVERS=localhost:9092 # (required) Kafka broker addresses KAFKA_AUTH_METHOD=PLAINTEXT # (optional, default: PLAINTEXT) Authentication method (PLAINTEXT, SASL, SSL) KAFKA_API_KEY= # (optional) For SASL authentication KAFKA_API_SECRET= # (optional) For SASL authentication KAFKA_SSL_CA_LOCATION= # (optional) Path to CA certificate for SSL KAFKA_SSL_CERTIFICATE_LOCATION= # (optional) Path to client certificate for SSL KAFKA_SSL_KEY_LOCATION= # (optional) Path to client private key for SSL KAFKA_SSL_CHECK_HOSTNAME=true # (optional, default: true) Enable SSL hostname verification KAFKA_SIGMA_RULES_TOPIC=sigma-rules # (optional, default: sigma-rules) Topic for rules, filters, parsers (compacted) KAFKA_METRICS_TOPIC=rule-statistics # (optional, default: rule-statistics) Topic for pipeline metrics KAFKA_METRICS_CONSUMER_GROUP=admin-panel-metrics # (optional, default: admin-panel-metrics) KAFKA_ACTIVITY_TOPIC=etl-activity # (optional, default: etl-activity) Topic for activity/audit logs KAFKA_ACTIVITY_CONSUMER_GROUP=admin-panel-activity # (optional, default: admin-panel-activity) KAFKA_DEFAULT_PARTITIONS=1 # (optional, default: 1) Default partitions for new topics KAFKA_DEFAULT_REPLICATION_FACTOR=2 # (optional, default: 2) Default replication factor for new topics # Kubernetes KUBERNETES_NAMESPACE=security # (optional, default: security) K8s namespace for Flink deployments FLINK_IMAGE=flink-sigma-detector:latest # (optional, default: flink-sigma-detector:latest) IMAGE_PULL_POLICY=Always # (optional, default: Always) Image pull policy (Always, IfNotPresent, Never) # Flink 资源 FLINK_TASKMANAGER_CPU=2.0 # (optional, default: 2.0) CPU cores per TaskManager FLINK_TASKMANAGER_MEMORY_GB=4 # (optional, default: 4) Memory per TaskManager in GB FLINK_TASKMANAGER_SLOTS=4 # (optional, default: 4) Number of slots per TaskManager FLINK_JOBMANAGER_CPU=1.0 # (optional, default: 1.0) CPU cores for JobManager FLINK_JOBMANAGER_MEMORY_GB=2 # (optional, default: 2) Memory for JobManager in GB FLINK_METRICS_POLL_INTERVAL=5.0 # (optional, default: 5.0) Flink metrics polling interval (seconds) AUTOSCALER_QUOTA_CPU= # (optional) CPU quota per pipeline; if unset, auto-calculated from TaskManager AUTOSCALER_QUOTA_MEMORY_GB= # (optional) Memory quota per pipeline in GB; if unset, auto-calculated # Dashboard DASHBOARD_BROADCAST_INTERVAL_SECONDS=2.0 # (optional, default: 2.0) SSE broadcast interval (seconds) AUDIT_LOGS_RETENTION_DAYS=30 # (optional, default: 30) Audit logs retention period (days) # 同步 ENABLE_AUTO_SYNC=true # (optional, default: true) Enable automatic repository sync SYNC_API_REPOS_INTERVAL_MINUTES=5 # (optional, default: 5) Sync interval in minutes SYNC_API_REPOS_TIMEOUT_SECONDS=600 # (optional, default: 600) Sync operation timeout (seconds) # Auth (带 refresh token 轮换的 JWT) JWT_SECRET_KEY=your-secret-key # (required in production) Secret key for access tokens JWT_REFRESH_SECRET_KEY=your-refresh-secret-key # (required in production) Secret key for refresh tokens JWT_ALGORITHM=HS256 # (optional, default: HS256) JWT signing algorithm JWT_ACCESS_TOKEN_EXPIRE_MINUTES=15 # (optional, default: 15) Access token expiration (minutes) JWT_REFRESH_TOKEN_EXPIRE_DAYS=7 # (optional, default: 7) Refresh token expiration (days) # TDM API TDM_API_BASE_URL=https://api.tdm.socprime.com # (optional, default: https://api.tdm.socprime.com) SOCPrime TDM API base URL TDM_HOSTNAME=tdm.socprime.com # (optional, default: tdm.socprime.com) # 日志 LOG_LEVEL=INFO # (optional, default: INFO) Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL) ``` 有关完整的配置参考,请参阅 `.env.example`。 ## 开发 ### 项目结构 ``` admin-panel-backend/ ├── apps/ │ ├── core/ # Core modules (auth, database, schemas) │ ├── routers/ # API endpoints │ ├── managers/ # Business logic orchestrators │ ├── modules/ # External integrations (Kafka, PostgreSQL) │ ├── clients/ # External API clients (TDM) │ └── services/ # Background services ├── k8s/ # Kubernetes manifests ├── server.py # Application entry point └── init_database.py # Database initialization ``` ### 运行测试 ``` uv run pytest ``` ### 数据库迁移 ``` # 应用所有 migrations uv run alembic upgrade head # 创建新 migration uv run alembic revision --autogenerate -m "description" # 回滚最后一个 migration uv run alembic downgrade -1 ``` ## 部署 ### Docker ``` docker build -t detectflow-backend . docker run -p 8000:8000 --env-file .env detectflow-backend ``` ## 架构 ``` ┌─────────────┐ ┌──────────────┐ ┌─────────────┐ │ Frontend │────▶│ Admin Panel │────▶│ PostgreSQL │ └─────────────┘ │ API │ └─────────────┘ └──────┬───────┘ │ ┌────────────┼────────────┐ ▼ ▼ ▼ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ Kafka │ │ K8s │ │ SOCPrime │ │ Metrics │ │ Flink │ │ TDM API │ └──────────┘ └──────────┘ └──────────┘ ``` ## Pipeline 工作原理 ### Pipeline 生命周期 ``` 1. CREATE PIPELINE │ ├─► Save config to PostgreSQL │ └─► If enabled=true: └─► Create FlinkDeployment CRD in Kubernetes └─► Flink Operator spawns JobManager + TaskManagers 2. FLINK JOB STARTS │ ├─► Consume events from source_topic (Kafka) │ ├─► Consume rules/filters/parsers from sigma-rules topic (compacted) │ ├─► Apply Sigma detection rules to events │ └─► Write tagged events to destination_topic 3. REAL-TIME UPDATES (Hot-Reload) │ ├─► Update rule/filter/parser │ └─► Admin Panel publishes to sigma-rules topic └─► Flink picks up changes WITHOUT restart ``` ### 热重载 vs 重启 | 变更类型 | 操作 | 停机时间 | |-------------|--------|----------| | 规则(添加/更新/删除) | Kafka 发布 | 无(热重载) | | 过滤器 | Kafka 发布 | 无(热重载) | | 日志源(解析器/映射) | Kafka 发布 | 无(热重载) | | 自定义字段 | Kafka 发布 | 无(热重载) | | 源/目的 Topic | 重建 FlinkDeployment | 约 30 秒 | | 并行度 | 重建 FlinkDeployment | 约 30 秒 | | 启用/禁用 | 创建/删除 FlinkDeployment | 约 30 秒 | ### Kafka Topics | Topic | 用途 | 配置 | |-------|---------|--------| | `{source_topic}` | 输入事件 | 常规 | | `{destination_topic}` | 已标记事件输出 | 常规 | | `sigma-rules` | 规则、过滤器、解析器 | 压缩,无限期保留 | | `sigma-matcher-metrics` | Pipeline 指标 | 常规,7 天保留 | ### 指标流 ``` Flink Job Admin Panel │ │ │ metrics (every 30s) │ ├───────────────────────────►│ Kafka Consumer │ sigma-matcher-metrics │ │ │ │ ▼ │ │ Dashboard Service │ │ │ │ │ ▼ │ │ SSE Stream ──► Frontend │ │ │ │ │ ▼ │ │ PostgreSQL (history) ``` **关键指标:** - `window_total_events` - 每个窗口处理的事件数 - `window_matched_events` - 匹配规则的事件数 - `window_match_rate_percent` - 检测率 - `on_timer_duration_seconds` - 处理延迟 ### 资源管理 Kubernetes 在多个层级强制执行资源限制: 1. **ResourceQuota** - Namespace 级别的硬性限制 2. **LimitRange** - 每个容器的默认值 3. **Flink Autoscaler** - 每个 Pipeline 的配额 ## 技术栈 - **框架**: FastAPI 0.121+ - **数据库**: PostgreSQL + SQLAlchemy 2.0 (AsyncIO) - **消息队列**: Apache Kafka - **流处理**: Apache Flink - **容器编排**: Kubernetes - **认证**: JWT (PyJWT) **版本**: 0.9.4
标签:AI 映射, Alembic, AMSI绕过, Apache Flink, Apache Kafka, AV绕过, CVE扫描, DevSecOps, ETL 管道, FastAPI, JWT 认证, Log Parsing, PMD, PostgreSQL, Python, REST API, Sigma Rule, SSE, 上游代理, 云端安全, 代码规范检查, 后端开发, 威胁检测, 子域名突变, 安全信息与事件管理, 安全运营, 实时流处理, 扫描框架, 搜索引擎爬取, 无后门, 检测规则, 流式计算, 测试用例, 网络安全, 网络资产发现, 请求拦截, 软件成分分析, 逆向工具, 隐私保护