ssa1004/security-log-search

GitHub: ssa1004/security-log-search

面向企業安全運營的多租戶SIEM後端平台,實現多源日誌歸一化、雙引擎搜索聚合、Sigma規則實時告警與合規審計。

Stars: 0 | Forks: 0

# 安全日志搜索 这是一个 SIEM 形式的后端平台,用于收集、规范化、搜索和分析大规模安全日志。 将来自各种源(防火墙 / EDR / 系统 / 应用日志)的原始事件(raw event)规范化为 ECS(Elastic Common Schema,安全与观测日志标准)或 OCSF(Open Cybersecurity Schema Framework,供应商中立的安全日志标准),然后通过 Kafka 双重输出(dual sink)至 OpenSearch(全文搜索)+ ClickHouse(大规模聚合)。使用 Apache Flink 实时评估关联规则(correlation rule)并触发警报。 详细的架构决策请参考 [docs/adr/](docs/adr/) 中的 15 份 ADR(架构决策记录)—— 涵盖 ECS / OCSF、ClickHouse + OpenSearch 双重输出、Kafka + Flink 关联、多租户 4 层隔离、ISMS-P 控制措施映射、Sigma 规则导入、CloudTrail / K8s audit 适配器、运维仪表板等。 ## 处理流程 ``` sequenceDiagram autonumber participant Src as 방화벽 / EDR / Syslog participant API as REST Ingest API participant Norm as ECS / OCSF Mapper participant K as Kafka (events.normalized) participant OS as OpenSearch participant CH as ClickHouse participant Flink as Flink Job participant DB as PostgreSQL (control plane) participant Op as 운영자 Src->>API: POST /events (raw, idempotency-key) API->>Norm: 정규화 + tenant 검증 Norm->>K: publish events.normalized par 듀얼 sink K-->>OS: index events-{tenant}-write alias and K-->>CH: insert into events_raw (tenant_id 포함) end K-->>Flink: source Flink->>Flink: KeyedProcessFunction + MapState (sliding window) Flink->>K: alerts.fired (rule matched) K-->>DB: INSERT alerts row + audit_entries Op->>API: POST /search (Lucene query + filter + facet) API->>OS: query events-{tenant}-read alias (tenantId 강제) Op->>API: GET /stats (시계열 집계) API->>CH: SELECT ... FROM events_5m_mv (사전집계 MV) ``` ## 模块结构 ``` graph LR domain["security-domain
LogEvent (ECS) + AlertRule + Tenant + AuditEntry"] app["security-application
11 use case + port"] in["security-adapter-in
REST + Kafka consumer"] out["security-adapter-out
JPA + Kafka producer + OpenSearch + ClickHouse"] streaming["security-streaming
Flink job (별도 jar, Spring 미포함)"] boot["security-bootstrap
Boot main + Flyway"] e2e["e2e-tests
Testcontainers"] in --> app out --> app boot --> in boot --> out streaming --> domain e2e --> boot ``` | 模块 | 职责 | |---|---| | `security-domain` | LogEvent (ECS)、OCSF mapper、AlertRule、Tenant、AuditEntry、Severity 等无外部依赖的领域模型 | | `security-application` | 11 个用例(use case)+ 输入/输出端口定义 | | `security-adapter-out` | JPA 控制平面(tenants / alert_rules / alerts / audit_entries),Kafka 生产者(events.normalized + alerts.fired),OpenSearch Java 客户端,ClickHouse JDBC | | `security-adapter-in` | REST API(ingest / search / stats / alert-rules / alerts / admin / audit / tenants),Kafka 消费者(alerts.fired) | | `security-streaming` | Apache Flink 作业 —— 使用 KeyedProcessFunction + MapState + broadcast state 评估规则,Kafka source / sink | | `security-bootstrap` | Spring Boot 主程序,application.yml(配置文件 profile 为 dev / prod),Flyway 数据库迁移,OpenSearch / ClickHouse 初始 schema 应用 | | `e2e-tests` | 基于 Testcontainers(Postgres + Kafka + OpenSearch + ClickHouse)的集成测试场景 | ## 11 个用例 1. **`IngestLogEventUseCase`** — `POST /api/v1/events` — 原始日志 → ECS / OCSF 规范化 → Kafka 发布。 使用 Idempotency-Key 请求头防止重复(通过 Postgres 的 `idempotency_keys` 表)。 2. **`SearchLogEventsUseCase`** — `POST /api/v1/search` — OpenSearch 查询(Lucene 查询字符串 + 过滤器 + 聚合分面)+ 游标分页。自动注入 tenantId。 3. **`AggregateLogStatsUseCase`** — `GET /api/v1/stats` — ClickHouse 查询(5 分钟 / 1 小时 / 1 天的聚合桶,Top-N,百分位数 p95 / p99)。 4. **`DefineAlertRuleUseCase`** — `POST /api/v1/alert-rules` — 警报规则的 CRUD,持久化至 PostgreSQL。 5. **`EvaluateAlertUseCase`** — Flink 作业评估后向 Kafka 发布 `alerts.fired` 主题 + Spring 侧消费者执行 INSERT。 6. **`ListAlertsUseCase`** — `GET /api/v1/alerts` — 基于时间线查询警报。 7. **`ManageOpenSearchIndexUseCase`** — 管理端点(admin endpoint)—— 索引创建 / 别名交换 / ILM 策略 / 触发滚动(rollover)。 8. **`QueryAuditLogUseCase`** — `GET /api/v1/audit` — 满足 ISMS-P 要求。审计谁在何时进行了何种搜索 / 规则更改 / 警报处理。 9. **`OnboardTenantUseCase`** — `POST /api/v1/tenants` — 新租户注册时自动创建 OpenSearch 别名 + 自动配置 ClickHouse 行策略。 10. **`ImportSigmaRuleUseCase`** — `POST /api/v1/sigma-rules` — SigmaHQ YAML(单个 / 多文档) → `AlertRule` 转换 + 返回不支持的转换限制(`mappingNotes`)(ADR-0013)。 11. **`ListImportedSigmaRulesUseCase`** — `GET /api/v1/sigma-rules` — 查询已导入的 Sigma 原始文件及转换结果。用于审查已导入规则是否过时。 ## 多租户隔离 — 4 层架构 1. **OpenSearch 索引命名**: `events-{tenant}-{yyyy.MM.dd}-{seq}`。读取别名(`events-{tenant}-read`)仅指向该租户的索引。 2. **ClickHouse 行策略**: 强制添加 `WHERE tenant_id = currentSetting('tenant_id')` 条件。 3. **JWT 声明**: 所有请求必须包含 `tenant_id` 声明,应用层会将其自动注入到查询中。 4. **查询重写**: SearchService 内部强制将租户过滤器以 AND 方式拼接到用户发送的查询中 —— 用户无法绕过。 ``` sequenceDiagram autonumber participant U as 운영자 (acme) participant API as REST API participant Sec as SecurityFilterChain participant Svc as SearchService participant OS as OpenSearch participant CH as ClickHouse U->>API: POST /search { q: "*" } + JWT API->>Sec: JWT 검증 + tenant claim 추출 Note over Sec: claim.tenant_id = "acme" Sec->>Svc: OperatorContext(tenant=acme) 주입 Svc->>Svc: query rewrite — q AND tenantId:acme par OpenSearch 경로 Svc->>OS: GET events-acme-read/_search Note over OS: alias 가 acme 인덱스만 가리킴 (1) and ClickHouse 경로 Svc->>CH: SET tenant_id='acme'; SELECT ... Note over CH: Row Policy 가 tenant_id 일치 행만 (2) end Note over U,CH: 다른 tenant (globex) 인덱스 / 행은
4 layer 모두에서 차단됨 ``` 详情请参阅 [ADR-0007](docs/adr/0007-multi-tenant-isolation.md)。 ## 技术栈 - **语言**: Java 21(启用虚拟线程) - **框架**: Spring Boot 3.4 - **存储**: - PostgreSQL 16(控制平面: tenants / alert_rules / alerts / audit_entries / idempotency) - OpenSearch 2.x(全文搜索,运维人员临时查询) - ClickHouse 24.x(大规模聚合 / 时间序列) - **消息队列**: Apache Kafka(幂等生产者 + Spring Kafka 消费者) - **流处理**: Apache Flink 1.18(KeyedProcessFunction + MapState + Broadcast State) - **弹性机制**: Resilience4j(针对 OpenSearch / ClickHouse 的 Bulkhead + 熔断器 + 重试) - **可观测性**: Micrometer + Prometheus - **API 文档**: springdoc-openapi (Swagger UI) - **构建 / CI**: Gradle 8, GitHub Actions, Docker 多阶段构建, Helm + ArgoCD - **测试**: JUnit 5 + Mockito + Testcontainers,Flink 关联测试通过直接调用 ProcessFunction 进行(由于 Flink 1.18 + Java 17+ record 序列化问题,LocalExecutionEnvironment 集成测试计划在升级至 1.19 后恢复) ## 快速开始 ### 仅运行单元测试 ``` ./gradlew test ``` ### Testcontainers 集成测试(需要 Docker) ``` ./gradlew :e2e-tests:integrationTest ``` ### 使用 Docker Compose 一键启动依赖和应用 ``` cd infrastructure/docker docker compose up -d # postgres + kafka + opensearch + clickhouse + flink docker compose --profile app up # 위 + 앱 컨테이너 ``` 默认端点: - 应用: - Swagger UI: - Actuator: - OpenSearch Dashboards: - Flink Web UI: ### 独立运行 Flink 作业(模拟生产环境) ``` ./gradlew :security-streaming:jar # 将构建的 jar 提交到 Flink 集群 flink run -c com.example.security.streaming.AlertCorrelationJob \ security-streaming/build/libs/security-streaming-0.1.0.jar ``` ### Sigma 规则导入 → AlertRule → Flink → Alert 演示 一次性导入 `scripts/sample-sigma-rules/` 目录中的 4 个 SigmaHQ 格式规则(暴力破解、端口扫描、可疑 PowerShell、非工作时间管理员登录),并触发暴力破解事件,演示从发生到生成警报的完整流程。 ``` ./scripts/seed_demo_data.sh # 1) tenant 'globex' 추가 + 기본 룰 1건 ./scripts/import_sigma_demo.sh # 2) Sigma 4개 import + 트리거 이벤트 curl -s 'http://localhost:8080/api/v1/alerts?tenantId=acme' | jq # 3) 알람 확인 ``` `scripts/sample-sigma-rules/` 中的 YAML 会被 [SampleSigmaRulesIntegrationTest](security-application/src/test/java/com/example/security/application/sigma/SampleSigmaRulesIntegrationTest.java) 每次进行解析和转换验证,因此如果映射器发生变化,演示也会随之失败,确保不会过时。 ## ISMS-P 控制措施映射 有关在本系统中如何实现 ISMS-P 认证要求的映射,请参阅 [ADR-0010](docs/adr/0010-isms-p-control-mapping.md)。核心内容: - 2.5(用户标识 / 身份验证) → JWT Resource Server + 租户声明 - 2.7(加密) → 静态加密(PostgreSQL TDE / ClickHouse / OpenSearch encryption-at-rest) + 传输中加密(TLS 1.3 + Kafka mTLS) - 2.9(审计) → `audit_entries` 仅追加 + Kafka SIEM sink + 保留 5 年 - 2.10(事件响应) → Flink 关联规则 + 警报工作流 ## ADR 索引 | ADR | 标题 | |---|---| | [0001](docs/adr/0001-hexagonal-architecture.md) | 六边形架构(Hexagonal architecture) + 模块拆分(security-streaming 独立 jar) | | [0002](docs/adr/0002-ecs-vs-ocsf.md) | ECS vs OCSF — 为什么两者都要映射 | | [0003](docs/adr/0003-dual-sink-opensearch-clickhouse.md) | OpenSearch + ClickHouse 双重输出 | | [0004](docs/adr/0004-flink-vs-kafka-streams.md) | Kafka 收集 + Flink 流处理(对比 Kafka Streams) | | [0005](docs/adr/0005-clickhouse-schema.md) | ClickHouse Schema — MergeTree + 按月分区 + 物化视图 | | [0006](docs/adr/0006-opensearch-ilm-alias.md) | OpenSearch ILM + 别名交换 + hot/warm/cold 分层 | | [0007](docs/adr/0007-multi-tenant-isolation.md) | 多租户隔离 — 4 层架构 | | [0008](docs/adr/0008-alert-rule-engine.md) | 警报规则引擎 — Flink CEP + broadcast state 热重载 | | [0009](docs/adr/0009-backpressure.md) | 背压 — Kafka consumer 轮询 + Flink 自身机制 | | [0010](docs/adr/0010-isms-p-control-mapping.md) | ISMS-P 控制措施映射 | | [0011](docs/adr/0011-audit-log-append-only.md) | 审计日志 — 仅追加的 PostgreSQL + 保留 5 年 | | [0012](docs/adr/0012-pii-masking-retention.md) | PII 脱敏 + 保留策略 | | [0013](docs/adr/0013-sigma-rule-import.md) | Sigma 规则导入 → AlertRule 转换 | | [0014](docs/adr/0014-source-adapter-cloudtrail-k8s.md) | 来源映射器拆分 — CloudTrail / K8s audit → ECS | | [0015](docs/adr/0015-observability-dashboards.md) | 运维仪表板 — 基于 RED + USE 模型 | ## 运维指南(简述) - **OpenSearch ILM**: `events-*-write` 别名在大小达到 50GB 或时间达到 30 天时自动触发滚动。hot(7天) → warm(30天) → cold(90天) → delete(1年)。 - **ClickHouse 保留策略**: `events_raw` 按 `toYYYYMM(timestamp)` 分区。每月 1 日凌晨自动 DROP 超过 13 个月的分区。 - **审计保留策略**: `audit_entries` 保留 5 年(ISMS-P 建议)。迁移至单独的归档冷存储。 - **警报处理**: 运维人员通过 `POST /api/v1/alerts/{id}/ack` 确认处理 → 自动记录至 audit_entries。 ### 运维手册 针对故障 / 异常情况的响应步骤已按场景整理在 `docs/runbook/` 目录中: - [`ingest-throughput-drop.md`](docs/runbook/ingest-throughput-drop.md) — 数据摄入速率降至平时水平的 50% 以下 - [`flink-job-not-progressing.md`](docs/runbook/flink-job-not-progressing.md) — Flink 关联作业出现延迟积压 / checkpoint 失败 - [`alert-storm.md`](docs/runbook/alert-storm.md) — 警报激增(判定真实事故 vs 误报 / 规则静音程序) ## 部署 ### Helm chart 生产 / 预发布环境的部署通过 `infrastructure/helm/security-log-search/` 中的 Helm chart 进行(Helm 3.x)。按环境覆盖配置使用 `values-dev.yaml` / `values-staging.yaml` / `values-prod.yaml`。 ``` cd infrastructure/helm/security-log-search # dev — replica 1,禁用 NetworkPolicy / HPA / Ingress helm install slq . --namespace security-log-search --create-namespace \ --values values-dev.yaml # prod — replica 3,HPA (cpu 70%, min 2 max 10),启用 Ingress TLS、NetworkPolicy helm install slq . --namespace security-log-search --create-namespace \ --values values-prod.yaml ``` chart 创建的资源: - Deployment(优雅关闭 — preStop sleep + Spring `server.shutdown=graceful`) - Service (ClusterIP) - ConfigMap(非机密环境变量) + Secret(占位符,生产环境建议使用 SealedSecret / ExternalSecret) - ServiceAccount + Role + RoleBinding(仅读取 configmap / secret —— 符合 ISMS-P 最小权限原则) - HPA(生产环境) / PodDisruptionBudget - 面向普通用户的 Ingress(`/api/v1/{events,search,alerts,sigma-rules,audit,stats}`)+ 独立的管理员主机(`/api/v1/admin`,IP 白名单) - NetworkPolicy(仅允许 postgres / kafka / opensearch / clickhouse / redis 的出站流量) - ServiceMonitor (Prometheus Operator) 详细的配置项 / 环境差异 / SIEM 特性(多租户 / 管理员拆分 / 假设外部集群)请参阅 [infrastructure/helm-log-search/README.md](infrastructure/helm/security-log-search/README.md)。 ### GitOps (ArgoCD) - `infrastructure/argocd/applicationset.yaml` — 自动生成 dev / staging / prod 3 个 Application - 详细使用方法请参阅 [infrastructure/argocd/README.md](infrastructure/argocd/README.md) ## 未来改进 - 基于 ML 的异常检测(目前仅支持基于规则,后续引入无监督基线) - Sigma 规则源自动同步(目前为手动导入 —— 后续计划定期拉取并验证 SigmaHQ 官方仓库的流水线) - 增加源适配器:Microsoft Graph Security、Okta system log、Crowdstrike Falcon stream (目前支持 syslog / firewall / EDR / CloudTrail / K8s audit —— 参见 ADR-0014) - 评估并引入 ClickHouse projection / aggregating MergeTree - 使用 Flink Kubernetes Operator (apache/flink-kubernetes-operator) 管理流处理作业 - 升级至 Flink 1.19+ —— 恢复 `LocalExecutionEnvironment` 集成测试(record 序列化问题已解决) ## Portfolio 套件集成 本仓库可以独立运行,但它也是由 8 个仓库相互紧密结合而成的系统组合的一部分。 整体架构图请参阅个人资料 README —— 。 | 仓库 | 角色 | 与本仓库的关系 | |---|---|---| | [auth-service](https://github.com/ssa1004/auth-service) | OIDC / JWT 签发,暴露 JWK Set | 本仓库验证传入请求的 JWT (issuer-uri / JWK Set) | | [notification-hub](https://github.com/ssa1004/notification-hub) | 多渠道通知(电子邮件 / SMS / push / Slack) | 消费本仓库的 `alerts.fired` Kafka topic → 向运维人员发散。反之,hub 的发送结果(`notification.delivered`)由本仓库收集用于审计 | | [search-service](https://github.com/ssa1004/search-service) | 通用领域搜索(商品 / 文档) | 与本仓库独立 —— search-service 的审计日志由本仓库收集 | | [billing-platform](https://github.com/ssa1004/billing-platform) | 支付 / 结算领域 | 本仓库收集 billing 的应用审计日志 | | [resell-orderbook](https://github.com/ssa1004/resell-orderbook) | 转售订单簿 | 本仓库收集匹配引擎的应用日志 | | [gpu-job-orchestrator](https://github.com/ssa1004/gpu-job-orchestrator) | GPU 训练 job 调度 | 本仓库收集 K8s audit 日志并映射为 ECS (ADR-0014) | | [mini-shop-observability](https://github.com/ssa1004/mini-shop-observability) | OTel / Prometheus / Loki 实验场 | 共享可观测性技术栈 —— 本仓库的 Grafana 仪表板采用相同模式 | | **security-log-search** | 本仓库 — SIEM 收集 / 搜索 / 警报 | — | 本仓库的集成点分为三个方向: 1. **传入的认证** —— 使用 auth-service 的 JWK Set 验证 JWT。声明中的 `tenant_id` 作为查询重写的主要隔离键。 2. **传出的警报** —— 当 Sigma 规则匹配或阈值规则评估成功时,Flink 作业向 `alerts.fired` Kafka topic 发布消息。notification-hub 消费该消息并向运维人员通过各渠道发送。 3. **传入的审计** —— 通过 ingest API 接收其他组合服务的应用日志 / K8s audit / CloudTrail 原始事件。notification-hub 的发送结果也会作为安全事件收集,运维人员可以搜索通知遗漏 / 失败的趋势。 ### 跨仓库序列 — JWT 验证 + 搜索 ``` sequenceDiagram autonumber participant Caller as 다른 service
(billing / search / ...) participant Auth as auth-service participant API as security-log-search
(REST API) participant Sec as SecurityFilterChain participant Svc as SearchService participant OS as OpenSearch participant CH as ClickHouse Caller->>Auth: POST /oauth2/token (client_credentials) Auth-->>Caller: access_token (tenant_id claim 포함) Caller->>API: POST /api/v1/search + Bearer JWT API->>Sec: JWT 검증 (auth-service JWK Set) Sec->>Sec: claim.tenant_id 추출 Sec->>Svc: OperatorContext(tenant=acme) 주입 Svc->>Svc: query rewrite — q AND tenantId:acme par OpenSearch Svc->>OS: GET events-acme-read/_search and ClickHouse Svc->>CH: SET tenant_id='acme'; SELECT ... end Svc-->>API: hits + facets API-->>Caller: 200 OK (다른 tenant 데이터 0건 보장) ``` ### 跨仓库序列 — Sigma 匹配 → 警报 → notification-hub ``` sequenceDiagram autonumber participant Src as 수집 source participant API as security-log-search
ingest API participant K as Kafka
events.normalized / alerts.fired participant Flink as Flink correlation job participant Hub as notification-hub
(consumer) participant Op as 운영자 Src->>API: POST /api/v1/events (raw) API->>K: publish events.normalized K-->>Flink: source Flink->>Flink: Sigma 변환 룰 + threshold 평가
(KeyedProcessFunction + MapState) Flink->>K: publish alerts.fired par 본 repo consumer K-->>API: INSERT alerts row + audit_entries and 외부 consumer K-->>Hub: alerts.fired consume Hub->>Op: 채널별 발송 (이메일 / Slack / SMS) Hub->>K: notification.delivered (결과) K-->>API: ingest 로 다시 수집 → audit end ``` ### 集成演示(模拟) 无需启动全部组合项目,只需使用 **mock auth-service + mock notification-hub** 即可在单台主机上验证本仓库的集成点: ``` docker compose -f infrastructure/docker/docker-compose.integration.yml up -d ./scripts/integration-demo.sh ``` 详细步骤和验证重点请参阅 [scripts/integration-demo.sh](scripts/integration-demo.sh) 的头部注释。 ## 手动 GitHub push 在没有 `gh` CLI 的环境中,可以使用以下方式 push。 ``` git remote add origin https://github.com/ssa1004/security-log-search.git git branch -M main git push -u origin main ```
标签:Apache Flink, ClickHouse, CloudTrail, ECS, EDR, HTTP/HTTPS抓包, ISMS-P, K8s审计日志, Kafka, OCSF, PostgreSQL, Sigma规则, SonarQube插件, Syslog, Terraform, 人工智能安全, 代码规范检查, 全文本检索, 合规性, 后台面板检测, 四层隔离, 域名枚举, 大数据聚合, 子域名变形, 安全信息与事件管理, 安全分析引擎, 安全日志收集, 安全运营平台, 实时关联分析, 微服务架构, 态势感知, 搜索引擎爬取, 数据正则化, 无线安全, 日志搜索, 日志标准化, 流处理, 测试用例, 目标导入, 网络安全, 网络安全审计, 脆弱性评估, 请求拦截, 软件成分分析, 防火墙日志, 隐私保护