dc401/elk-tide-generator
GitHub: dc401/elk-tide-generator
基于大模型驱动的威胁情报转Elasticsearch检测规则自动化平台,具备多级验证、集成测试和自愈式优化能力。
Stars: 0 | Forks: 0
# Elasticsearch 检测 Agent - 自动化 SIEM 检测工程
生产级自动化检测工程解决方案,可将 CTI 情报 → Elasticsearch 检测规则,并具备全面的多级验证、TTP 对齐测试和自愈式优化功能。
截至 1.23 版本,Google ADK 不支持我想要的多区域负载均衡。因此使用了原生 gemini 调用 :(
- 设计与核心代码:Dennis Chow dchow[AT]xtecsystems.com 由 Claude Sonnnet 4.5 优化
- 版本 1.0 2026 年 2 月 9 日
## 功能特性
- **原生 Elasticsearch 格式:** 检测规则采用 Elasticsearch Detection API 格式(Lucene 查询 + ECS 字段)
- **多级智能优化:** 在验证、集成测试和 LLM 评判阶段自动修复规则
- **TTP 意图验证:** 基于 Gemini 的验证确保测试 Payload 与真实攻击模式匹配
- **端到端测试:** 单命令流水线测试,附带全面报告
- **自动化测试:** 3 级验证 + 使用临时 Elasticsearch (Docker) 进行集成测试
- **实证 LLM 评判:** 基于实际 SIEM 测试结果的质量评估
- **YAML 优先 I/O:** 所有规则和输出均采用 YAML(对 LLM 和人类更友好)
- **安全加固:** OWASP LLM Top 10 防护、输入验证、输出清理
- **仅 GitHub 基础设施:** 无需持久化云资源
## 自动区域轮换
**配额管理:** 系统自动在 8 个 GCP Vertex AI 区域之间轮换,以分配 API 配额使用并避免耗尽。
**区域:**
- us-central1
- global
- us-south1
- us-east5
- us-west1
- us-east1
- us-east4
- us-west4
**工作原理:**
- 根据当前 UTC 小时自动选择区域
- 每小时轮换(24 小时循环覆盖所有区域 3 次)
- 整个流水线运行期间使用同一区域
- 无需手动配置
- 相比单区域拥有 8 倍配额容量
**示例:**
```
Hour 00, 08, 16 UTC → us-central1
Hour 01, 09, 17 UTC → global
Hour 02, 10, 18 UTC → us-south1
...and so on
```
这意味着您每天可以多次运行流水线而不会达到配额限制,因为每小时使用不同区域的配额池。
## 架构
```
CTI Files → Detection Agent → Elasticsearch Detection Rules (YAML)
↓
STAGE 1: Validation (with auto-refinement)
├─ Lucene syntax check
├─ ECS field validation (1990 fields)
├─ Iterative field research (3 attempts)
└─ LLM schema validator (research-backed)
↓
STAGE 2: Integration Testing (with auto-refinement)
├─ Deploy to ephemeral Elasticsearch (Docker)
├─ Ingest TP/FN/FP/TN test payloads
├─ Calculate precision/recall metrics
└─ Smart decision: Fix QUERY or TEST CASES
↓
STAGE 3: LLM Judge (with auto-refinement)
├─ Empirical evaluation (actual test results)
├─ Quality scoring (≥0.75 threshold)
└─ Deployment decision: APPROVE/REFINE/REJECT
↓
STAGE 3.5: TTP Intent Validation (optional)
├─ Validate test payload realism
├─ Check command syntax & TTP alignment
└─ Research-backed recommendations
↓
Staging → Human Review → Production
```
## 先决条件
- Python 3.11+(推荐 3.12 或 3.13)
- 启用了 Vertex AI 的 GCP 帐户(用于 Gemini API)
- GitHub 帐户(用于 CI/CD)
- git CLI
- gh CLI (GitHub CLI)
- gcloud CLI ([安装指南](https://cloud.google.com/sdk/docs/install))
- Docker(用于使用 Elasticsearch 进行集成测试)
### GitHub 仓库权限
**重要:** 为了自动创建 PR,您必须启用 GitHub Actions 权限:
1. 在您的仓库中进入 **Settings → Actions → General**
2. 滚动到 **Workflow permissions**
3. 启用:**"Allow GitHub Actions to create and approve pull requests"**
4. 点击 **Save**
如果没有此设置,工作流在尝试自动创建审查 PR 时将失败。
## 快速设置
### 选项 1:自动引导(推荐)
```
#clone and navigate to project
git clone https://github.com/dc401/adk-tide-generator.git
cd adk-tide-generator
#run interactive bootstrap
chmod +x scripts/bootstrap.sh
./scripts/bootstrap.sh
```
引导脚本将:
1. 设置 Python 虚拟环境
2. 安装依赖项
3. 配置 GCP 凭证
4. 设置 GitHub secrets
5. 验证安装
### 选项 2:手动设置
```
#1. create virtual environment
python3.11 -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
#2. install dependencies
pip install --upgrade pip
pip install -r requirements.txt
#3. configure GCP
export GOOGLE_CLOUD_PROJECT="your-project-id"
gcloud auth application-default login
#note: region is auto-selected via round-robin rotation (no manual config needed)
#4. add CTI files
mkdir -p cti_src
cp your-threat-intel.pdf cti_src/
#5. test CTI loading
python run_agent.py --test-cti
#6. generate detection rules
python run_agent.py --interactive
```
## 使用方法
### 交互模式(本地)
```
python run_agent.py --interactive
```
提示您输入:
- CTI 文件夹位置
- 输出目录
- GCP 项目 ID
- 生成前确认
### 非交互模式(CI/CD)
```
python run_agent.py \
--cti-folder cti_src \
--output generated \
--project YOUR_GCP_PROJECT
#note: --location is optional; defaults to auto-selected region in CI/CD
# 或用于本地开发的 'global'
```
### 带自愈优化(默认)
```
#refinement enabled by default (max 3 iterations per rule)
python run_agent.py --cti-folder cti_src --output generated
```
### 禁用优化(用于测试)
```
python run_agent.py --cti-folder cti_src --output generated --no-refinement
```
### 质量驱动重试(带 Elasticsearch 的本地开发)
基于测试的优化,根据集成测试结果自动改进规则:
```
#start elasticsearch first (Docker)
docker run -d --name elasticsearch \
-e "discovery.type=single-node" \
-e "xpack.security.enabled=false" \
-p 9200:9200 \
docker.elastic.co/elasticsearch/elasticsearch:8.12.0
#run with quality retry (requires ES running)
python run_agent.py \
--cti-folder cti_src \
--output generated \
--quality-retry \
--max-iterations 3 \
--precision-threshold 0.60 \
--recall-threshold 0.70
#cleanup
docker stop elasticsearch && docker rm elasticsearch
```
**工作原理:**
1. 从 CTI 生成规则
2. 运行集成测试(TP/FN/FP/TN 评估)
3. 检查质量指标(精确率 ≥60%,召回率 ≥70%)
4. 如果失败:分析失败原因并带反馈重新生成
5. 重复最多 max iterations(默认值:3)
**要求:**
- Elasticsearch 在本地端口 9200 上运行
- Docker(用于临时 ES 容器)
- 专为本地开发设计,非 CI/CD
## 端到端测试
### 运行完整流水线(推荐)
```
#test everything: generation → integration → TTP validation → summary
gh workflow run end-to-end-test.yml
#watch progress
gh run watch
```
有关详细用法和配置选项,请参阅 [END_TO_END_TEST.md](END_TO_END_TEST.md)。
### 重用现有规则
```
#skip generation, test existing artifacts
gh workflow run end-to-end-test.yml \
-f skip_generation=true \
-f existing_run_id=
```
### 仅快速集成测试
```
#disable TTP validation for faster testing
gh workflow run end-to-end-test.yml \
-f run_ttp_validator=false
```
## 验证与测试
### 阶段 1:预集成验证
```
python scripts/validate_rules.py \
--rules-dir generated/detection_rules \
--staging-dir generated/staging \
--project YOUR_GCP_PROJECT
```
验证:
- Lucene 语法(确定性)
- ECS 字段存在性(1990 个字段)
- ECS 字段研究(如果无效则尝试 3 次)
- YAML → JSON 转换
- LLM schema 验证(基于研究)
如果发生失败,每个规则最多自动优化 3 次。
### 阶段 2:集成测试
```
python scripts/execute_detection_tests.py \
--rules-dir generated/detection_rules \
--es-url http://localhost:9200
```
测试:
- 将规则部署到 Elasticsearch(Docker 容器)
- 摄入 TP/FN/FP/TN 测试 Payload
- 计算精确率/召回率/F1/准确率
- 智能决策:修复查询 OR 测试用例
质量阈值:
- 精确率 ≥ 0.60(最多 40% 误报)
- 召回率 ≥ 0.70(至少捕获 70% 的攻击)
### 阶段 3:LLM 评判评估
```
python scripts/run_llm_judge.py \
--rules-dir generated/detection_rules \
--test-results integration_test_results.yml \
--project YOUR_GCP_PROJECT
```
评估:
- 基于实际测试结果的质量评分(≥0.75 阈值)
- 部署建议(APPROVE/REFINE/REJECT)
- 如有需要,根据评判反馈进行优化
### 阶段 3.5:TTP 意图验证
```
python scripts/test_ttp_validator.py generated/detection_rules
```
验证:
- 命令语法真实性(在真实攻击中是否可行?)
- TTP 一致性(Payload 是否符合 MITRE 技术?)
- 字段值真实性(日志值是否真实?)
- 规避技术有效性(FN 案例)
输出:
- 有效/无效测试用例计数
- 高/中/低置信度分数
- 针对无效案例的基于研究的建议
## 多级优化系统
系统在每个失败阶段自动优化规则:
### 验证优化
- **触发条件:** Lucene 语法错误、ECS 字段错误、schema 违规
- **修复方式:** 更正运算符、研究 ECS 字段、修复 MITRE 引用
- **尝试次数:** 每个规则最多 3 次
### 集成测试优化
- **触发条件:** 精确率 < 0.60 或 召回率 < 0.70
- **智能决策:** 分析是需要修复查询还是测试用例
- **修复方式:** 优化查询逻辑 或 更新测试 Payload
- **尝试次数:** 每个规则最多 2 次
### 评判优化
- **触发条件:** 部署决策 = REFINE
- **修复方式:** 应用评判的具体建议
- **尝试次数:** 每个规则最多 2 次
## 项目结构
```
adk-tide-generator/
├── detection_agent/ #core detection generation logic
│ ├── agent.py #main detection agent (5 stages)
│ ├── prompts/ #LLM prompts (external files)
│ │ ├── detection_generator.md #rule generation with ECS research
│ │ ├── validator.md #rule validation with research
│ │ ├── security_scan.md #OWASP LLM Top 10 protection
│ │ └── ttp_validator_prompt.md #TTP intent validation guide
│ ├── schemas/ #Pydantic schemas
│ │ ├── detection_rule.py #ES Detection Rule schema
│ │ └── ecs_flat.yml #1990 ECS fields
│ └── tools/ #custom tools
│ ├── load_cti_files.py #CTI file loader (PDF/DOCX/TXT/MD)
│ ├── validate_lucene.py #Lucene syntax validator
│ ├── validate_ecs_fields.py #ECS field validator
│ ├── research_ecs_field.py #ECS field research tool
│ ├── iterative_validator.py #3-attempt validation wrapper
│ └── ttp_intent_validator.py #TTP test payload validator
│
├── scripts/ #validation and testing scripts
│ ├── bootstrap.sh #interactive setup script
│ ├── validate_rules.py #3-stage validation with refinement
│ ├── execute_detection_tests.py #ES integration testing
│ ├── integration_test_ci.py #CI integration wrapper
│ ├── run_llm_judge.py #empirical LLM judge with refinement
│ ├── test_ttp_validator.py #TTP validator testing
│ ├── demo_ttp_validation.py #TTP validator demonstration
│ ├── stage_passing_rules.py #stage validated rules with UIDs
│ ├── create_review_pr.py #automated PR creation
│ ├── deploy_local_demo.sh #local deployment demo
│ ├── cleanup_staging.sh #clean temp artifacts
│ ├── setup-gcp.sh #GCP setup helper
│ └── setup-github-secrets.sh #GitHub secrets helper
│
├── .github/workflows/ #CI/CD workflows
│ ├── end-to-end-test.yml #master orchestration (6-12 min)
│ ├── generate-detections.yml #rule generation (3-4 min)
│ ├── integration-test-simple.yml #integration testing (1-2 min)
│ ├── llm-judge.yml #LLM quality evaluation
│ ├── mock-deploy.yml #mock SIEM deployment
│ └── cleanup-stale-artifacts.yml #artifact cleanup
│
├── cti_src/ #CTI input files
│ └── sample_cti.md #example CTI file
│
├── generated/ #agent outputs (gitignored)
│ ├── detection_rules/ #generated YAML rules
│ ├── cti_context.yml #CTI analysis context
│ └── staging/ #temp validation artifacts
│
├── production_rules/ #human-approved rules (deployed)
│ └── *.yml #production detection rules
│
├── archived_rules/ #deployment history
│ └── batch_*_deployed_*/ #audit trail with metadata
│
├── run_agent.py #CLI entry point
├── requirements.txt #Python dependencies
├── README.md #this file
├── END_TO_END_TEST.md #end-to-end testing guide
├── TESTING_GUIDE.md #testing documentation
└── ARCHITECTURE_ELASTICSEARCH_NATIVE.md #technical architecture
```
## GitHub Actions 工作流
### 工作流编排
```
┌──────────────────────────────────────────────────────────────┐
│ END-TO-END TEST (Master Orchestration) │
│ .github/workflows/end-to-end-test.yml │
└──────────────────────────────────────────────────────────────┘
│
┌──────────────────┼──────────────────┐
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ GENERATE │ │ INTEGRATION │ │ TTP VALIDATE │
│ (reuses) │ │ TEST │ │ (optional) │
│ │ │ (inline) │ │ (inline) │
│ generate- │ │ │ │ │
│ detections │ │ Downloads │ │ Downloads │
│ .yml via │ │ artifacts │ │ artifacts │
│ workflow_ │ │ │ │ │
│ call │ │ Starts │ │ Runs TTP │
│ │ │ Docker ES │ │ validator │
│ Uploads: │ │ │ │ │
│ detection- │ │ Uploads: │ │ Uploads: │
│ rules │ │ test-results │ │ ttp-report │
└──────────────┘ └──────────────┘ └──────────────┘
│ │ │
└──────────────────┼──────────────────┘
▼
┌──────────────┐
│ SUMMARY │
│ (inline) │
│ │
│ Downloads │
│ all artifacts│
│ │
│ Generates │
│ report │
│ │
│ Uploads: │
│ pipeline- │
│ summary │
└──────────────┘
```
### 工作流描述
#### 1. 端到端测试(主编排)
**文件:** `.github/workflows/end-to-end-test.yml`
**触发方式:** 手动触发
**运行时间:** 6-12 分钟
**编排内容:**
- 通过 `workflow_call` 调用 `generate-detections.yml`(可复用工作流)
- 内联运行集成测试(以避免循环依赖)
- 内联运行 TTP 验证(可选)
- 将所有结果汇总到摘要报告中
**生成的产物:**
- `detection-rules`(来自生成阶段)
- `integration-test-results`(测试指标)
- `ttp-validation-report`(测试用例验证)
- `pipeline-summary`(汇总报告)
**用法:**
```
gh workflow run end-to-end-test.yml
gh workflow run end-to-end-test.yml -f skip_generation=true -f existing_run_id=123456
```
有关详细文档,请参阅 [END_TO_END_TEST.md](END_TO_END_TEST.md)。
#### 2. 生成检测规则
**文件:** `.github/workflows/generate-detections.yml`
**触发方式:** 手动触发、workflow_call(来自 end-to-end-test)或推送到 main
**运行时间:** 3-5 分钟(多个 CTI 文件超时 15 分钟)
**触发时机:**
- **手动:** `gh workflow run generate-detections.yml`
- **自动:** 推送到 main 且 `cti_src/**`、`detection_agent/**` 或 `run_agent.py` 发生变更
- **来自编排:** 通过 `workflow_call` 被 `end-to-end-test.yml` 调用
**工作流步骤:**
1. 清理陈旧产物
2. 安装依赖项
3. 认证到 GCP
4. 验证 CTI 文件(安全检查)
5. 运行带有迭代优化的检测 Agent
6. 验证规则生成
7. 上传 `detection-rules` 产物
#### 3. 集成测试 - Elasticsearch
**文件:** `.github/workflows/integration-test-simple.yml`
**触发方式:** 带 `artifact_run_id` 参数的手动触发
**运行时间:** 1-2 分钟
**目的:** 独立集成测试(可独立运行)
**用法:**
```
gh workflow run integration-test-simple.yml -f artifact_run_id=
```
**功能:**
- 从指定运行下载 `detection-rules` 产物
- 启动临时 Elasticsearch 8.12.0 (Docker)
- 部署规则并摄入测试 Payload
- 计算精确率、召回率、F1、准确率
- 上传 `integration-test-results` 产物
**注意:** 端到端测试内联运行此逻辑,而不是调用此工作流,以避免依赖性问题。
#### 4. 模拟 SIEM 部署
**文件:** `.github/workflows/mock-deploy.yml`
**触发方式:** PR 合并到 main(带有暂存规则)
**运行时间:** 2-3 分钟
**目的:** 演示人工审查后的生产部署。
**工作流:**
1. 创建带有暂存规则的 PR → 人工审查
2. PR 批准并合并 → 触发此工作流
3. 部署到临时 Elasticsearch(模拟生产环境)
4. 将批准的规则移动到 `production_rules/`
5. 归档部署及审计跟踪
#### 5. 清理陈旧产物
**文件:** `.github/workflows/cleanup-stale-artifacts.yml`
**触发方式:** 每周计划(周日 UTC 凌晨 2 点)或手动触发
**运行时间:** <1 分钟
**目的:** 防止产物混乱和配额问题。
**清理内容:**
- 未链接到开放 PR 的生成产物
- 会话结果(临时执行日志)
- 暂存目录
**受保护:**
- `production_rules/`(人工批准的规则)
- 链接到开放 PR 的文件
- 来自近期工作流运行的产物
## 安全特性
### OWASP LLM Top 10 防护
- 扫描 CTI 中的 Prompt 注入模式
- 阻止越狱尝试
- 检测数据投毒
- 验证输出以防止操纵
### 输入验证
- 文件大小限制(每个文件最大 50MB)
- 路径遍历防护
- 仅允许特定文件扩展名(.pdf, .txt, .md, .docx)
- 可疑文件名检测
- 每次运行最多 50 个文件(防止 DoS)
### 输出清理
- 从生成的规则中移除注入模式
- 写入前验证 Lucene 语法
- 带研究依据的 ECS 字段验证
- 带研究的 Schema 验证
## 成本优化
### 模型选择
- **Gemini 2.5 Flash:** 规则生成(快速、便宜,$0.000075/1K 输入 Token)
- **Gemini 2.5 Pro:** 验证、评判、TTP 验证(准确,$0.00125/1K 输入 Token)
### 配额管理
- Agent 间延迟:3.0s
- 激进的退避重试
- 带指数退避的会话级重试
- 每个规则最多 3 次验证迭代
- 每个规则最多 2 次集成测试迭代
- 每个规则最多 2 次评判迭代
### Token 效率
- 外部 Prompt(无内联重复)
- 截断输出(120k 字符限制)
- 段间状态修剪
- 字段研究缓存
## 测试
### 单元测试
```
#test CTI loading
python run_agent.py --test-cti
#test validation only
python scripts/validate_rules.py --rules-dir generated/detection_rules
```
### 集成测试
```
#test with Docker Elasticsearch
docker run -d --name elasticsearch \
-p 9200:9200 \
-e "discovery.type=single-node" \
-e "xpack.security.enabled=false" \
docker.elastic.co/elasticsearch/elasticsearch:8.12.0
python scripts/execute_detection_tests.py \
--rules-dir generated/detection_rules \
--es-url http://localhost:9200
```
### 端到端测试
```
#complete pipeline
gh workflow run end-to-end-test.yml
#watch progress
gh run watch
#download results
gh run download
```
### TTP 验证器测试
```
#validate test payloads for production rules
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
python scripts/test_ttp_validator.py production_rules/
```
## 故障排除
### GitHub Actions PR 创建失败
**错误:** `GitHub Actions is not permitted to create or approve pull requests`
**原因:** 仓库工作流权限未配置
**修复:**
1. 在您的仓库中进入 **Settings → Actions → General**
2. 滚动到 **Workflow permissions**
3. 启用:**"Allow GitHub Actions to create and approve pull requests"**
4. 点击 **Save**
5. 重新运行失败的工作流作业
这是一次性配置。启用后,所有未来的工作流都将能够自动创建 PR。
### GCP 认证错误
```
#re-authenticate
gcloud auth application-default login
#verify project
gcloud config get-value project
#set project if not configured
export GOOGLE_CLOUD_PROJECT="your-project-id"
#note: region auto-selected via round-robin (no GOOGLE_CLOUD_LOCATION needed)
```
### 配额耗尽
**注意:** 系统现在自动在 8 个区域之间轮换以避免配额耗尽。如果您仍然遇到配额限制:
```
#check quota usage
gcloud alpha services quota list \
--service=aiplatform.googleapis.com \
--filter="metric.type:aiplatform.googleapis.com/quota/generate_content_requests_per_minute_per_project_per_region"
#increase delay between agents (edit detection_agent/agent.py)
INTER_AGENT_DELAY = 5.0 # increase from 3.0
```
### 规则生成失败
```
#enable verbose logging
export PYTHONUNBUFFERED=1
python run_agent.py --interactive
#disable refinement for debugging
python run_agent.py --interactive --no-refinement
#check session results
cat session_results/detection_session_*.json | jq .
```
### 集成测试失败
```
#check Elasticsearch status
curl http://localhost:9200/_cluster/health
#manually restart Docker container
docker restart elasticsearch
#check Elasticsearch logs
docker logs elasticsearch
#check version compatibility (must be 8.12.0)
grep "elasticsearch==" requirements.txt
```
### Elasticsearch 版本不匹配
```
#symptom: "Accept version must be either version 8 or 7, but found 9"
#fix: ensure requirements.txt has correct versions
elasticsearch==8.12.0
elastic-transport==8.15.1
```
### TTP 验证错误
```
#check GCP authentication
gcloud auth application-default login
#verify project is set
echo $GOOGLE_CLOUD_PROJECT
#run with explicit project
python scripts/test_ttp_validator.py \
--project YOUR_PROJECT_ID \
generated/detection_rules/
```
## 质量指标
### 目标指标
- **精确率:** ≥ 0.60(最多 40% 误报)
- **召回率:** ≥ 0.70(至少捕获 70% 的攻击)
- **LLM 质量评分:** ≥ 0.75
- **TTP 验证:** 100% 有效测试用例
规则通过以下方式验证:
- Lucene 语法验证
- ECS 字段验证(1990 个字段)
- 使用 Elasticsearch 进行集成测试
- TTP 意图验证
- LLM 质量评判评估
- 部署前的人工审查
## 许可证
MIT 许可证 - 详见 LICENSE 文件
## 致谢
构建使用了:
- Google Gemini 2.5 (Flash & Pro) via Vertex AI
- Elasticsearch Detection Rule API (8.12.0)
- ECS (Elastic Common Schema) 8.11
- luqum (Lucene 查询解析器)
- Pydantic (schema 验证)
- GitHub Actions (CI/CD)
- Docker(临时测试基础设施)
标签:Docker, ECS, Elasticsearch, FTP漏洞扫描, SIEM规则管理, Sigma规则, Terraform, Vertex AI, YAML, 威胁情报, 安全库, 安全编排, 安全防御评估, 开发者工具, 攻击模拟, 目标导入, 网络安全, 规则自动化, 请求拦截, 逆向工具, 隐私保护, 驱动签名利用