jaypark81/k8s-detection-pipeline

GitHub: jaypark81/k8s-detection-pipeline

一套面向 AWS EKS 的 Kubernetes 安全事件采集与富化管道,融合 Falco、Tetragon 和审计日志,通过统一的 ECS 规范将运行时安全事件送入 Elastic SIEM 进行检测分析。

Stars: 0 | Forks: 0

![Python](https://img.shields.io/badge/Python-3.12-blue) ![Terraform](https://img.shields.io/badge/IaC-Terraform-purple) ![ECS](https://img.shields.io/badge/Schema-ECS%208.x-orange) # K8S-DETECTION-PIPELINE 一个 Kubernetes 原生的安全检测管道,通过 Kafka 收集、丰富 Falco、Tetragon 和工作负载日志中的运行时安全事件,并将其发送到 Elasticsearch。 ## 架构 ### 管道 1 — 运行时事件收集 (Fluent Bit) ``` ┌─────────────────────────────────────────────────────────────────────┐ │ EKS Cluster │ │ │ │ ┌─────────────────┐ ┌──────────────────┐ ┌──────────────────┐ │ │ │ Pod Security │ │ Falco │ │ Tetragon │ │ │ │ Annotator │ │ (DaemonSet) │ │ (DaemonSet) │ │ │ │ │ │ │ │ │ │ │ │ Watches pods │ │ syscall + K8s │ │ eBPF process │ │ │ │ Patches security│ │ audit events │ │ tracing │ │ │ │ annotations │ │ → stdout (JSON) │ │ → stdout (JSON) │ │ │ └────────┬────────┘ └────────┬─────────┘ └────────┬─────────┘ │ │ │ security.k8s.io/* │ │ │ │ │ ▼ ▼ │ │ │ ┌──────────────────────────────────────────┐ │ │ └─────────►│ Fluent Bit (DaemonSet) │ │ │ │ │ │ │ │ INPUT: tail /var/log/containers/*.log │ │ │ │ FILTER: kubernetes (metadata + annots) │ │ │ │ FILTER: record_modifier (source_type) │ │ │ │ OUTPUT: Kafka │ │ │ └────────────────────┬─────────────────────┘ │ └───────────────────────────────────────────│─────────────────────────┘ │ ▼ ┌────────────────────────┐ │ AWS MSK │ │ │ │ siem-falco │ │ siem-tetragon │ │ siem-k8s │ └───────────┬────────────┘ │ ▼ ┌────────────────────────┐ │ Logstash │ │ │ │ 0200_filter_falco │ │ 0201_filter_tetragon │ │ 0202_filter_k8s │ └───────────┬────────────┘ │ ▼ ┌────────────────────────┐ │ Elasticsearch │ │ │ │ logs-falco-siem │ │ logs-tetragon-siem │ │ logs-kubernetes-siem │ └────────────────────────┘ ``` ### 管道 2 — K8s 审计日志收集 (CloudWatch → Lambda) ``` ┌─────────────────────────────────────────────────────────────────────┐ │ EKS Cluster │ │ │ │ kube-apiserver ──► CloudWatch Logs (/aws/eks//cluster) │ │ │ └─────────────────────────────┬───────────────────────────────────────┘ │ Subscription Filter (push-based) ▼ ┌────────────────────────┐ │ Lambda │ │ cloudwatch-to-kafka │ │ │ │ - IRSA (no static │ │ credentials) │ │ - VPC-attached │ │ - injects source_type │ └───────────┬────────────┘ │ ▼ ┌────────────────────────┐ │ AWS MSK │ │ │ │ siem-k8s-audit │ └───────────┬────────────┘ │ ▼ ┌────────────────────────┐ │ Logstash │ │ │ │ 0203_filter_k8s_audit │ └───────────┬────────────┘ │ ▼ ┌────────────────────────┐ │ Elasticsearch │ │ │ │ logs-kubernetes-siem │ └────────────────────────┘ ``` ## 组件 ### Pod 安全注解器 EKS 不暴露控制平面,因此 API server 审计日志通过 CloudWatch Logs 发送,而不是直接收集。这意味着像 `kubectl exec`、secret 枚举和 RBAC 更改这样的审计日志事件在到达时没有 pod 级别的安全上下文。您无法仅从审计日志中判断所涉及的 pod 是否具有特权、是否有 hostPath 挂载,或者是否自动挂载了 ServiceAccount token。 Pod 安全注解器通过作为 watch 控制器运行解决了这个问题,该控制器根据每个 pod 的规约不断将 `security.k8s.io/*` 注解修补到每个 pod 上。因为 Fluent Bit 在收集容器日志时会读取这些注解,并且审计日志管道通过 pod 身份关联事件,所以像运行时日志、Falco 警报、Tetragon 追踪和审计日志这样的每个事件最终都会携带相同的安全上下文,而无需在查询时进行任何关联操作。 **添加的注解:** | 注解 | 描述 | |---|---| | `security.k8s.io/privileged` | 是否有任何容器以特权模式运行 | | `security.k8s.io/host-pid` | hostPID 已启用 | | `security.k8s.io/host-network` | hostNetwork 已启用 | | `security.k8s.io/host-ipc` | hostIPC 已启用 | | `security.k8s.io/host-path` | 存在 hostPath 卷 | | `security.k8s.io/host-path-mounts` | hostPath 挂载路径 | | `security.k8s.io/host-path-sensitive` | 敏感路径 (/etc, /proc 等) | | `security.k8s.io/capabilities-added` | 新增的 Linux capabilities | | `security.k8s.io/allow-privilege-escalation` | allowPrivilegeEscalation | | `security.k8s.io/run-as-non-root` | runAsNonRoot | | `security.k8s.io/run-as-user` | runAsUser | | `security.k8s.io/automount-sa-token` | automountServiceAccountToken | | `security.k8s.io/network-policy-applied` | NetworkPolicy 覆盖此 pod | | `security.k8s.io/service-account` | ServiceAccount 名称 | | `security.k8s.io/container-ports` | 声明的容器端口 | | `security.k8s.io/image-tag-latest` | 使用 :latest 标签 | | `security.k8s.io/image-pull-policy` | 镜像拉取策略 | ### Fluent Bit DaemonSet 负责从每个节点收集容器日志。 - **输入**:对 `/var/log/containers/*.log` 执行 `tail` — 为 Falco、Tetragon 和工作负载日志设置独立的输入源 - **过滤器**:`kubernetes` — 使用 pod 标签、注解(包括 security.k8s.io/*)、命名空间、节点信息进行丰富 - **过滤器**:`record_modifier` — 添加 `cluster_name` 和 `source_type` - **输出**:Kafka(MSK,在内网中未启用身份验证) ### K8s 审计日志管道 EKS API server 审计日志通过 CloudWatch Logs 收集,并使用 Lambda 函数转发到 Kafka,然后由 Logstash 将其标准化为 ECS 格式。 **收集流程:** ``` kube-apiserver → CloudWatch Logs (/aws/eks//cluster) → Lambda (Subscription Filter, push-based, real-time) → MSK Kafka (siem-k8s-audit) → Logstash (0203_filter_k8s_audit.conf) → Elasticsearch (siem-k8s-audit) ``` **基础设施:** - Lambda 在配置了 IRSA 的 VPC 内运行 — 无需静态凭证 - CloudWatch Subscription Filter 实时触发 Lambda(无轮询) - 通过 Terraform 进行部署和管理 **ECS 字段映射:** | ECS 字段 | 源字段 | |---|---| | `event.action` | `verb` | | `event.outcome` | `responseStatus.code` (2xx → success, 4xx → failure) | | `user.name` | `user.username` | | `source.ip` | `sourceIPs` | | `url.path` | `requestURI` | | `kubernetes.audit.objectRef.resource` | `objectRef.resource` | | `kubernetes.audit.objectRef.subresource` | `objectRef.subresource` | | `kubernetes.audit.objectRef.namespace` | `objectRef.namespace` | | `kubernetes.audit.responseStatus.code` | `responseStatus.code` | | `kubernetes.audit.authorization.decision` | `annotations.authorization.k8s.io/decision` | | `kubernetes.audit.authorization.reason` | `annotations.authorization.k8s.io/reason` | ### Logstash 管道 | 文件 | 角色 | |---|---| | `0000_input.conf` | 所有 siem-* topic 的 Kafka 消费者 | | `0200_filter_falco.conf` | Falco 警报的 ECS 映射 — 映射至 `rule.*`、`process.*`、`event.kind: alert` | | `0201_filter_tetragon.conf` | Tetragon 事件的 ECS 映射 — 映射至 `process.*`,事件类型路由 | | `0202_filter_k8s.conf` | K8s 工作负载日志的 ECS 映射 — 将 `security.k8s.io/*` 提升为 `kubernetes.security.*` | | `0203_filter_k8s_audit.conf` | K8s 审计日志的 ECS 映射 — 映射至 `event.action`、`user.name`、`kubernetes.audit.*` | | `9999_output.conf` | 路由至 Elasticsearch Data Streams 或常规索引 | ### ECS 字段映射 所有事件均映射至 [Elastic Common Schema (ECS) 8.x](https://www.elastic.co/guide/en/ecs/current/index.html): ``` host.name ← kubernetes.host container.name ← kubernetes.container_name container.image.name ← kubernetes.container_image container.id ← kubernetes.docker_id orchestrator.cluster.name ← cluster_name orchestrator.namespace ← kubernetes.namespace_name orchestrator.resource.name ← kubernetes.pod_name orchestrator.resource.ip ← kubernetes.pod_ip orchestrator.type → "kubernetes" kubernetes.security.* ← security.k8s.io/* annotations ``` ## Elasticsearch Data Streams | Data Stream | 来源 | ILM | |---|---|---| | `logs-falco-siem` | Falco 警报 | siem policy | | `logs-tetragon-siem` | Tetragon 进程事件 | siem policy | | `logs-kubernetes-siem` | K8s 工作负载日志 | siem policy | | `siem-k8s-audit` | K8s API server 审计日志 | siem policy | ## 目录结构 ``` K8S-DETECTION-PIPELINE/ ├── falco/ │ └── values.yaml # Helm values (json_output: true) ├── fluent-bit/ │ ├── configmap.yaml # Pipeline config (inputs/filters/outputs) │ ├── daemonset.yaml # DaemonSet + env vars │ └── rbac.yaml # ServiceAccount + ClusterRole ├── pod-security-annotator/ │ ├── pod_security_annotator.py # Python watch controller │ ├── requirements.txt │ ├── Dockerfile │ └── k8s/ │ └── manifests.yaml # RBAC + Deployment ├── lambda/ │ └── cloudwatch_to_kafka/ │ ├── main.py # CloudWatch → Kafka forwarder │ ├── requirements.txt │ └── builder.sh # Lambda layer build script └── logstash/ └── pipeline/ ├── 0000_input.conf ├── 0200_filter_falco.conf ├── 0201_filter_tetragon.conf ├── 0202_filter_k8s.conf ├── 0203_filter_k8s_audit.conf └── 9999_output.conf ``` ## 检测用例 由于每个日志事件中都丰富了安全注解,在 Kibana/Elastic SIEM 中执行以下查询将变得轻而易举: ``` # Privileged pod 执行了 shell kubernetes.security.privileged: true AND rule.name: *shell* # 在没有 NetworkPolicy 的 pod 中执行进程 kubernetes.security.network_policy_applied: false AND event.module: tetragon # 来自具有敏感 host 挂载的 pod 的 Falco 警报 kubernetes.security.host_path_sensitive: true AND event.module: falco # 自动挂载 SA token + 以 root 运行 kubernetes.security.automount_sa_token: true AND kubernetes.security.run_as_non_root: false AND event.module: falco # 通过 kubectl 枚举 Secret kubernetes.audit.objectRef.resource: secrets AND event.action: (get OR list) AND event.outcome: success # Pod exec (kubectl exec) kubernetes.audit.objectRef.resource: pods AND kubernetes.audit.objectRef.subresource: exec # 创建 ClusterRoleBinding event.action: create AND kubernetes.audit.objectRef.resource: clusterrolebindings ``` ## Sigma 检测规则 与针对容器的 MITRE ATT&CK 相映射的平台无关检测规则。 有关 Secret 枚举、Pod Exec 和 ClusterRoleBinding 创建规则,请参见 [`sigma-rules/`](./sigma-rules/)。 ## 许可证 版权所有 2026 jaypark81 根据 Apache License 2.0 版本授权。 详情请参阅 LICENSE。
标签:AMSI绕过, Chrome Headless, EC2, ECS, ECS架构, EKS, Elasticsearch, Falco, Fluent Bit, IaC, K8s运行时安全, Kafka, Kubernetes安全, Python, Sigma规则, SonarQube插件, Terraform, Web截图, 事件流处理, 内容过滤, 威胁检测, 安全信息与事件管理, 安全检测管道, 安全运营, 容器安全, 扫描框架, 搜索引擎爬取, 敏感词过滤, 数据处理, 无后门, 日志收集, 目标导入, 请求拦截