dc401/elk-tide-generator

GitHub: dc401/elk-tide-generator

基于大模型驱动的威胁情报转Elasticsearch检测规则自动化平台,具备多级验证、集成测试和自愈式优化能力。

Stars: 0 | Forks: 0

# Elasticsearch 检测 Agent - 自动化 SIEM 检测工程 生产级自动化检测工程解决方案,可将 CTI 情报 → Elasticsearch 检测规则,并具备全面的多级验证、TTP 对齐测试和自愈式优化功能。 截至 1.23 版本,Google ADK 不支持我想要的多区域负载均衡。因此使用了原生 gemini 调用 :( - 设计与核心代码:Dennis Chow dchow[AT]xtecsystems.com 由 Claude Sonnnet 4.5 优化 - 版本 1.0 2026 年 2 月 9 日 ## 功能特性 - **原生 Elasticsearch 格式:** 检测规则采用 Elasticsearch Detection API 格式(Lucene 查询 + ECS 字段) - **多级智能优化:** 在验证、集成测试和 LLM 评判阶段自动修复规则 - **TTP 意图验证:** 基于 Gemini 的验证确保测试 Payload 与真实攻击模式匹配 - **端到端测试:** 单命令流水线测试,附带全面报告 - **自动化测试:** 3 级验证 + 使用临时 Elasticsearch (Docker) 进行集成测试 - **实证 LLM 评判:** 基于实际 SIEM 测试结果的质量评估 - **YAML 优先 I/O:** 所有规则和输出均采用 YAML(对 LLM 和人类更友好) - **安全加固:** OWASP LLM Top 10 防护、输入验证、输出清理 - **仅 GitHub 基础设施:** 无需持久化云资源 ## 自动区域轮换 **配额管理:** 系统自动在 8 个 GCP Vertex AI 区域之间轮换,以分配 API 配额使用并避免耗尽。 **区域:** - us-central1 - global - us-south1 - us-east5 - us-west1 - us-east1 - us-east4 - us-west4 **工作原理:** - 根据当前 UTC 小时自动选择区域 - 每小时轮换(24 小时循环覆盖所有区域 3 次) - 整个流水线运行期间使用同一区域 - 无需手动配置 - 相比单区域拥有 8 倍配额容量 **示例:** ``` Hour 00, 08, 16 UTC → us-central1 Hour 01, 09, 17 UTC → global Hour 02, 10, 18 UTC → us-south1 ...and so on ``` 这意味着您每天可以多次运行流水线而不会达到配额限制,因为每小时使用不同区域的配额池。 ## 架构 ``` CTI Files → Detection Agent → Elasticsearch Detection Rules (YAML) ↓ STAGE 1: Validation (with auto-refinement) ├─ Lucene syntax check ├─ ECS field validation (1990 fields) ├─ Iterative field research (3 attempts) └─ LLM schema validator (research-backed) ↓ STAGE 2: Integration Testing (with auto-refinement) ├─ Deploy to ephemeral Elasticsearch (Docker) ├─ Ingest TP/FN/FP/TN test payloads ├─ Calculate precision/recall metrics └─ Smart decision: Fix QUERY or TEST CASES ↓ STAGE 3: LLM Judge (with auto-refinement) ├─ Empirical evaluation (actual test results) ├─ Quality scoring (≥0.75 threshold) └─ Deployment decision: APPROVE/REFINE/REJECT ↓ STAGE 3.5: TTP Intent Validation (optional) ├─ Validate test payload realism ├─ Check command syntax & TTP alignment └─ Research-backed recommendations ↓ Staging → Human Review → Production ``` ## 先决条件 - Python 3.11+(推荐 3.12 或 3.13) - 启用了 Vertex AI 的 GCP 帐户(用于 Gemini API) - GitHub 帐户(用于 CI/CD) - git CLI - gh CLI (GitHub CLI) - gcloud CLI ([安装指南](https://cloud.google.com/sdk/docs/install)) - Docker(用于使用 Elasticsearch 进行集成测试) ### GitHub 仓库权限 **重要:** 为了自动创建 PR,您必须启用 GitHub Actions 权限: 1. 在您的仓库中进入 **Settings → Actions → General** 2. 滚动到 **Workflow permissions** 3. 启用:**"Allow GitHub Actions to create and approve pull requests"** 4. 点击 **Save** 如果没有此设置,工作流在尝试自动创建审查 PR 时将失败。 ## 快速设置 ### 选项 1:自动引导(推荐) ``` #clone and navigate to project git clone https://github.com/dc401/adk-tide-generator.git cd adk-tide-generator #run interactive bootstrap chmod +x scripts/bootstrap.sh ./scripts/bootstrap.sh ``` 引导脚本将: 1. 设置 Python 虚拟环境 2. 安装依赖项 3. 配置 GCP 凭证 4. 设置 GitHub secrets 5. 验证安装 ### 选项 2:手动设置 ``` #1. create virtual environment python3.11 -m venv venv source venv/bin/activate # On Windows: venv\Scripts\activate #2. install dependencies pip install --upgrade pip pip install -r requirements.txt #3. configure GCP export GOOGLE_CLOUD_PROJECT="your-project-id" gcloud auth application-default login #note: region is auto-selected via round-robin rotation (no manual config needed) #4. add CTI files mkdir -p cti_src cp your-threat-intel.pdf cti_src/ #5. test CTI loading python run_agent.py --test-cti #6. generate detection rules python run_agent.py --interactive ``` ## 使用方法 ### 交互模式(本地) ``` python run_agent.py --interactive ``` 提示您输入: - CTI 文件夹位置 - 输出目录 - GCP 项目 ID - 生成前确认 ### 非交互模式(CI/CD) ``` python run_agent.py \ --cti-folder cti_src \ --output generated \ --project YOUR_GCP_PROJECT #note: --location is optional; defaults to auto-selected region in CI/CD # 或用于本地开发的 'global' ``` ### 带自愈优化(默认) ``` #refinement enabled by default (max 3 iterations per rule) python run_agent.py --cti-folder cti_src --output generated ``` ### 禁用优化(用于测试) ``` python run_agent.py --cti-folder cti_src --output generated --no-refinement ``` ### 质量驱动重试(带 Elasticsearch 的本地开发) 基于测试的优化,根据集成测试结果自动改进规则: ``` #start elasticsearch first (Docker) docker run -d --name elasticsearch \ -e "discovery.type=single-node" \ -e "xpack.security.enabled=false" \ -p 9200:9200 \ docker.elastic.co/elasticsearch/elasticsearch:8.12.0 #run with quality retry (requires ES running) python run_agent.py \ --cti-folder cti_src \ --output generated \ --quality-retry \ --max-iterations 3 \ --precision-threshold 0.60 \ --recall-threshold 0.70 #cleanup docker stop elasticsearch && docker rm elasticsearch ``` **工作原理:** 1. 从 CTI 生成规则 2. 运行集成测试(TP/FN/FP/TN 评估) 3. 检查质量指标(精确率 ≥60%,召回率 ≥70%) 4. 如果失败:分析失败原因并带反馈重新生成 5. 重复最多 max iterations(默认值:3) **要求:** - Elasticsearch 在本地端口 9200 上运行 - Docker(用于临时 ES 容器) - 专为本地开发设计,非 CI/CD ## 端到端测试 ### 运行完整流水线(推荐) ``` #test everything: generation → integration → TTP validation → summary gh workflow run end-to-end-test.yml #watch progress gh run watch ``` 有关详细用法和配置选项,请参阅 [END_TO_END_TEST.md](END_TO_END_TEST.md)。 ### 重用现有规则 ``` #skip generation, test existing artifacts gh workflow run end-to-end-test.yml \ -f skip_generation=true \ -f existing_run_id= ``` ### 仅快速集成测试 ``` #disable TTP validation for faster testing gh workflow run end-to-end-test.yml \ -f run_ttp_validator=false ``` ## 验证与测试 ### 阶段 1:预集成验证 ``` python scripts/validate_rules.py \ --rules-dir generated/detection_rules \ --staging-dir generated/staging \ --project YOUR_GCP_PROJECT ``` 验证: - Lucene 语法(确定性) - ECS 字段存在性(1990 个字段) - ECS 字段研究(如果无效则尝试 3 次) - YAML → JSON 转换 - LLM schema 验证(基于研究) 如果发生失败,每个规则最多自动优化 3 次。 ### 阶段 2:集成测试 ``` python scripts/execute_detection_tests.py \ --rules-dir generated/detection_rules \ --es-url http://localhost:9200 ``` 测试: - 将规则部署到 Elasticsearch(Docker 容器) - 摄入 TP/FN/FP/TN 测试 Payload - 计算精确率/召回率/F1/准确率 - 智能决策:修复查询 OR 测试用例 质量阈值: - 精确率 ≥ 0.60(最多 40% 误报) - 召回率 ≥ 0.70(至少捕获 70% 的攻击) ### 阶段 3:LLM 评判评估 ``` python scripts/run_llm_judge.py \ --rules-dir generated/detection_rules \ --test-results integration_test_results.yml \ --project YOUR_GCP_PROJECT ``` 评估: - 基于实际测试结果的质量评分(≥0.75 阈值) - 部署建议(APPROVE/REFINE/REJECT) - 如有需要,根据评判反馈进行优化 ### 阶段 3.5:TTP 意图验证 ``` python scripts/test_ttp_validator.py generated/detection_rules ``` 验证: - 命令语法真实性(在真实攻击中是否可行?) - TTP 一致性(Payload 是否符合 MITRE 技术?) - 字段值真实性(日志值是否真实?) - 规避技术有效性(FN 案例) 输出: - 有效/无效测试用例计数 - 高/中/低置信度分数 - 针对无效案例的基于研究的建议 ## 多级优化系统 系统在每个失败阶段自动优化规则: ### 验证优化 - **触发条件:** Lucene 语法错误、ECS 字段错误、schema 违规 - **修复方式:** 更正运算符、研究 ECS 字段、修复 MITRE 引用 - **尝试次数:** 每个规则最多 3 次 ### 集成测试优化 - **触发条件:** 精确率 < 0.60 或 召回率 < 0.70 - **智能决策:** 分析是需要修复查询还是测试用例 - **修复方式:** 优化查询逻辑 或 更新测试 Payload - **尝试次数:** 每个规则最多 2 次 ### 评判优化 - **触发条件:** 部署决策 = REFINE - **修复方式:** 应用评判的具体建议 - **尝试次数:** 每个规则最多 2 次 ## 项目结构 ``` adk-tide-generator/ ├── detection_agent/ #core detection generation logic │ ├── agent.py #main detection agent (5 stages) │ ├── prompts/ #LLM prompts (external files) │ │ ├── detection_generator.md #rule generation with ECS research │ │ ├── validator.md #rule validation with research │ │ ├── security_scan.md #OWASP LLM Top 10 protection │ │ └── ttp_validator_prompt.md #TTP intent validation guide │ ├── schemas/ #Pydantic schemas │ │ ├── detection_rule.py #ES Detection Rule schema │ │ └── ecs_flat.yml #1990 ECS fields │ └── tools/ #custom tools │ ├── load_cti_files.py #CTI file loader (PDF/DOCX/TXT/MD) │ ├── validate_lucene.py #Lucene syntax validator │ ├── validate_ecs_fields.py #ECS field validator │ ├── research_ecs_field.py #ECS field research tool │ ├── iterative_validator.py #3-attempt validation wrapper │ └── ttp_intent_validator.py #TTP test payload validator │ ├── scripts/ #validation and testing scripts │ ├── bootstrap.sh #interactive setup script │ ├── validate_rules.py #3-stage validation with refinement │ ├── execute_detection_tests.py #ES integration testing │ ├── integration_test_ci.py #CI integration wrapper │ ├── run_llm_judge.py #empirical LLM judge with refinement │ ├── test_ttp_validator.py #TTP validator testing │ ├── demo_ttp_validation.py #TTP validator demonstration │ ├── stage_passing_rules.py #stage validated rules with UIDs │ ├── create_review_pr.py #automated PR creation │ ├── deploy_local_demo.sh #local deployment demo │ ├── cleanup_staging.sh #clean temp artifacts │ ├── setup-gcp.sh #GCP setup helper │ └── setup-github-secrets.sh #GitHub secrets helper │ ├── .github/workflows/ #CI/CD workflows │ ├── end-to-end-test.yml #master orchestration (6-12 min) │ ├── generate-detections.yml #rule generation (3-4 min) │ ├── integration-test-simple.yml #integration testing (1-2 min) │ ├── llm-judge.yml #LLM quality evaluation │ ├── mock-deploy.yml #mock SIEM deployment │ └── cleanup-stale-artifacts.yml #artifact cleanup │ ├── cti_src/ #CTI input files │ └── sample_cti.md #example CTI file │ ├── generated/ #agent outputs (gitignored) │ ├── detection_rules/ #generated YAML rules │ ├── cti_context.yml #CTI analysis context │ └── staging/ #temp validation artifacts │ ├── production_rules/ #human-approved rules (deployed) │ └── *.yml #production detection rules │ ├── archived_rules/ #deployment history │ └── batch_*_deployed_*/ #audit trail with metadata │ ├── run_agent.py #CLI entry point ├── requirements.txt #Python dependencies ├── README.md #this file ├── END_TO_END_TEST.md #end-to-end testing guide ├── TESTING_GUIDE.md #testing documentation └── ARCHITECTURE_ELASTICSEARCH_NATIVE.md #technical architecture ``` ## GitHub Actions 工作流 ### 工作流编排 ``` ┌──────────────────────────────────────────────────────────────┐ │ END-TO-END TEST (Master Orchestration) │ │ .github/workflows/end-to-end-test.yml │ └──────────────────────────────────────────────────────────────┘ │ ┌──────────────────┼──────────────────┐ │ │ │ ▼ ▼ ▼ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ GENERATE │ │ INTEGRATION │ │ TTP VALIDATE │ │ (reuses) │ │ TEST │ │ (optional) │ │ │ │ (inline) │ │ (inline) │ │ generate- │ │ │ │ │ │ detections │ │ Downloads │ │ Downloads │ │ .yml via │ │ artifacts │ │ artifacts │ │ workflow_ │ │ │ │ │ │ call │ │ Starts │ │ Runs TTP │ │ │ │ Docker ES │ │ validator │ │ Uploads: │ │ │ │ │ │ detection- │ │ Uploads: │ │ Uploads: │ │ rules │ │ test-results │ │ ttp-report │ └──────────────┘ └──────────────┘ └──────────────┘ │ │ │ └──────────────────┼──────────────────┘ ▼ ┌──────────────┐ │ SUMMARY │ │ (inline) │ │ │ │ Downloads │ │ all artifacts│ │ │ │ Generates │ │ report │ │ │ │ Uploads: │ │ pipeline- │ │ summary │ └──────────────┘ ``` ### 工作流描述 #### 1. 端到端测试(主编排) **文件:** `.github/workflows/end-to-end-test.yml` **触发方式:** 手动触发 **运行时间:** 6-12 分钟 **编排内容:** - 通过 `workflow_call` 调用 `generate-detections.yml`(可复用工作流) - 内联运行集成测试(以避免循环依赖) - 内联运行 TTP 验证(可选) - 将所有结果汇总到摘要报告中 **生成的产物:** - `detection-rules`(来自生成阶段) - `integration-test-results`(测试指标) - `ttp-validation-report`(测试用例验证) - `pipeline-summary`(汇总报告) **用法:** ``` gh workflow run end-to-end-test.yml gh workflow run end-to-end-test.yml -f skip_generation=true -f existing_run_id=123456 ``` 有关详细文档,请参阅 [END_TO_END_TEST.md](END_TO_END_TEST.md)。 #### 2. 生成检测规则 **文件:** `.github/workflows/generate-detections.yml` **触发方式:** 手动触发、workflow_call(来自 end-to-end-test)或推送到 main **运行时间:** 3-5 分钟(多个 CTI 文件超时 15 分钟) **触发时机:** - **手动:** `gh workflow run generate-detections.yml` - **自动:** 推送到 main 且 `cti_src/**`、`detection_agent/**` 或 `run_agent.py` 发生变更 - **来自编排:** 通过 `workflow_call` 被 `end-to-end-test.yml` 调用 **工作流步骤:** 1. 清理陈旧产物 2. 安装依赖项 3. 认证到 GCP 4. 验证 CTI 文件(安全检查) 5. 运行带有迭代优化的检测 Agent 6. 验证规则生成 7. 上传 `detection-rules` 产物 #### 3. 集成测试 - Elasticsearch **文件:** `.github/workflows/integration-test-simple.yml` **触发方式:** 带 `artifact_run_id` 参数的手动触发 **运行时间:** 1-2 分钟 **目的:** 独立集成测试(可独立运行) **用法:** ``` gh workflow run integration-test-simple.yml -f artifact_run_id= ``` **功能:** - 从指定运行下载 `detection-rules` 产物 - 启动临时 Elasticsearch 8.12.0 (Docker) - 部署规则并摄入测试 Payload - 计算精确率、召回率、F1、准确率 - 上传 `integration-test-results` 产物 **注意:** 端到端测试内联运行此逻辑,而不是调用此工作流,以避免依赖性问题。 #### 4. 模拟 SIEM 部署 **文件:** `.github/workflows/mock-deploy.yml` **触发方式:** PR 合并到 main(带有暂存规则) **运行时间:** 2-3 分钟 **目的:** 演示人工审查后的生产部署。 **工作流:** 1. 创建带有暂存规则的 PR → 人工审查 2. PR 批准并合并 → 触发此工作流 3. 部署到临时 Elasticsearch(模拟生产环境) 4. 将批准的规则移动到 `production_rules/` 5. 归档部署及审计跟踪 #### 5. 清理陈旧产物 **文件:** `.github/workflows/cleanup-stale-artifacts.yml` **触发方式:** 每周计划(周日 UTC 凌晨 2 点)或手动触发 **运行时间:** <1 分钟 **目的:** 防止产物混乱和配额问题。 **清理内容:** - 未链接到开放 PR 的生成产物 - 会话结果(临时执行日志) - 暂存目录 **受保护:** - `production_rules/`(人工批准的规则) - 链接到开放 PR 的文件 - 来自近期工作流运行的产物 ## 安全特性 ### OWASP LLM Top 10 防护 - 扫描 CTI 中的 Prompt 注入模式 - 阻止越狱尝试 - 检测数据投毒 - 验证输出以防止操纵 ### 输入验证 - 文件大小限制(每个文件最大 50MB) - 路径遍历防护 - 仅允许特定文件扩展名(.pdf, .txt, .md, .docx) - 可疑文件名检测 - 每次运行最多 50 个文件(防止 DoS) ### 输出清理 - 从生成的规则中移除注入模式 - 写入前验证 Lucene 语法 - 带研究依据的 ECS 字段验证 - 带研究的 Schema 验证 ## 成本优化 ### 模型选择 - **Gemini 2.5 Flash:** 规则生成(快速、便宜,$0.000075/1K 输入 Token) - **Gemini 2.5 Pro:** 验证、评判、TTP 验证(准确,$0.00125/1K 输入 Token) ### 配额管理 - Agent 间延迟:3.0s - 激进的退避重试 - 带指数退避的会话级重试 - 每个规则最多 3 次验证迭代 - 每个规则最多 2 次集成测试迭代 - 每个规则最多 2 次评判迭代 ### Token 效率 - 外部 Prompt(无内联重复) - 截断输出(120k 字符限制) - 段间状态修剪 - 字段研究缓存 ## 测试 ### 单元测试 ``` #test CTI loading python run_agent.py --test-cti #test validation only python scripts/validate_rules.py --rules-dir generated/detection_rules ``` ### 集成测试 ``` #test with Docker Elasticsearch docker run -d --name elasticsearch \ -p 9200:9200 \ -e "discovery.type=single-node" \ -e "xpack.security.enabled=false" \ docker.elastic.co/elasticsearch/elasticsearch:8.12.0 python scripts/execute_detection_tests.py \ --rules-dir generated/detection_rules \ --es-url http://localhost:9200 ``` ### 端到端测试 ``` #complete pipeline gh workflow run end-to-end-test.yml #watch progress gh run watch #download results gh run download ``` ### TTP 验证器测试 ``` #validate test payloads for production rules export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project) python scripts/test_ttp_validator.py production_rules/ ``` ## 故障排除 ### GitHub Actions PR 创建失败 **错误:** `GitHub Actions is not permitted to create or approve pull requests` **原因:** 仓库工作流权限未配置 **修复:** 1. 在您的仓库中进入 **Settings → Actions → General** 2. 滚动到 **Workflow permissions** 3. 启用:**"Allow GitHub Actions to create and approve pull requests"** 4. 点击 **Save** 5. 重新运行失败的工作流作业 这是一次性配置。启用后,所有未来的工作流都将能够自动创建 PR。 ### GCP 认证错误 ``` #re-authenticate gcloud auth application-default login #verify project gcloud config get-value project #set project if not configured export GOOGLE_CLOUD_PROJECT="your-project-id" #note: region auto-selected via round-robin (no GOOGLE_CLOUD_LOCATION needed) ``` ### 配额耗尽 **注意:** 系统现在自动在 8 个区域之间轮换以避免配额耗尽。如果您仍然遇到配额限制: ``` #check quota usage gcloud alpha services quota list \ --service=aiplatform.googleapis.com \ --filter="metric.type:aiplatform.googleapis.com/quota/generate_content_requests_per_minute_per_project_per_region" #increase delay between agents (edit detection_agent/agent.py) INTER_AGENT_DELAY = 5.0 # increase from 3.0 ``` ### 规则生成失败 ``` #enable verbose logging export PYTHONUNBUFFERED=1 python run_agent.py --interactive #disable refinement for debugging python run_agent.py --interactive --no-refinement #check session results cat session_results/detection_session_*.json | jq . ``` ### 集成测试失败 ``` #check Elasticsearch status curl http://localhost:9200/_cluster/health #manually restart Docker container docker restart elasticsearch #check Elasticsearch logs docker logs elasticsearch #check version compatibility (must be 8.12.0) grep "elasticsearch==" requirements.txt ``` ### Elasticsearch 版本不匹配 ``` #symptom: "Accept version must be either version 8 or 7, but found 9" #fix: ensure requirements.txt has correct versions elasticsearch==8.12.0 elastic-transport==8.15.1 ``` ### TTP 验证错误 ``` #check GCP authentication gcloud auth application-default login #verify project is set echo $GOOGLE_CLOUD_PROJECT #run with explicit project python scripts/test_ttp_validator.py \ --project YOUR_PROJECT_ID \ generated/detection_rules/ ``` ## 质量指标 ### 目标指标 - **精确率:** ≥ 0.60(最多 40% 误报) - **召回率:** ≥ 0.70(至少捕获 70% 的攻击) - **LLM 质量评分:** ≥ 0.75 - **TTP 验证:** 100% 有效测试用例 规则通过以下方式验证: - Lucene 语法验证 - ECS 字段验证(1990 个字段) - 使用 Elasticsearch 进行集成测试 - TTP 意图验证 - LLM 质量评判评估 - 部署前的人工审查 ## 许可证 MIT 许可证 - 详见 LICENSE 文件 ## 致谢 构建使用了: - Google Gemini 2.5 (Flash & Pro) via Vertex AI - Elasticsearch Detection Rule API (8.12.0) - ECS (Elastic Common Schema) 8.11 - luqum (Lucene 查询解析器) - Pydantic (schema 验证) - GitHub Actions (CI/CD) - Docker(临时测试基础设施)
标签:Docker, ECS, Elasticsearch, FTP漏洞扫描, SIEM规则管理, Sigma规则, Terraform, Vertex AI, YAML, 威胁情报, 安全库, 安全编排, 安全防御评估, 开发者工具, 攻击模拟, 目标导入, 网络安全, 规则自动化, 请求拦截, 逆向工具, 隐私保护, 驱动签名利用