elastic/workflows
GitHub: elastic/workflows
Elastic 官方维护的工作流模板库,提供 57 个覆盖安全、可观测性和搜索场景的预构建 YAML 工作流。
Stars: 40 | Forks: 4
Elastic Workflow Library
针对 Elastic 平台的工作流精选集合,涵盖安全、可观测性和搜索示例。
## 目录 - [概述](#overview) - [快速入门](#quick-start) - [仓库结构](#repository-structure) - [工作流分类](#workflow-categories) - [Workflow Schema](#workflow-schema) - [核心概念](#key-concepts) - [触发器](#triggers) - [变量语法](#variable-syntax) - [Liquid 模板](#liquid-templating) - [错误处理](#error-handling) - [导入 Workflows](#importing-workflows) - [示例](#examples) - [贡献](#contributing) - [许可证](#license) ## 概述 本仓库包含 **57 个 workflows**,专为 Elastic Workflows 设计,这是用于自动化 Elastic Stack 操作的平台功能。这些 workflows 涵盖了广泛的用例: | 类别 | 描述 | |----------|-------------| | **安全** | 威胁检测、事件响应、丰富信息和搜寻 | | **可观测性** | 监控、日志分析和根因分析 | | **搜索** | Elasticsearch 查询、ES\|QL、语义搜索 | | **集成** | Splunk、Slack、Jenkins、JIRA、Caldera 等 | | **AI Agents** | 代理工作流和 AI 驱动的自动化 | | **数据** | ETL、摄取和文档管理 | ### 什么是 Elastic Workflows? Elastic Workflows 提供了一种基于 YAML 的声明式方法,用于自动化 Elastic 平台上的操作。它们与以下组件原生集成: - **Elasticsearch** - 使用 ES|QL 和 DSL 查询、聚合和索引数据 - **Kibana** - 创建案例、管理警报、与安全和可观测性功能交互 - **外部系统** - Splunk、Slack、Jenkins、JIRA 以及任何 HTTP API - **AI/ML** - 与语言模型集成,用于智能分析和代理 ### 关键特性 - **声明式 YAML** - 定义你想要什么,而不是怎么做 - **触发器** - 手动、定时或警报驱动 - **可扩展** - 连接任何 HTTP API 或 Elastic 功能 - **版本控制** - 将 workflows 作为代码存储,在 Git 中跟踪更改 - **可共享** - 在环境之间导入/导出 workflows ## 快速入门 ### 1. 浏览 Workflows 探索按用例组织的 [`workflows/`](./workflows) 目录: ``` workflows/ ├── security/ # Security operations │ ├── detection/ # Alert management, threat detection │ ├── response/ # Incident response, case management │ ├── enrichment/ # Threat intel, IP/hash lookups │ └── hunting/ # Threat hunting queries ├── integrations/ # Third-party integrations │ ├── splunk/ # Splunk queries and enrichment │ ├── slack/ # Channel management, notifications │ ├── jenkins/ # CI/CD automation │ ├── jira/ # Ticket management │ ├── caldera/ # Adversary emulation │ ├── firebase/ # Authentication │ └── snowflake/ # Data warehouse queries ├── search/ # Search and query workflows ├── observability/ # Monitoring and analysis ├── ai-agents/ # AI-powered automation ├── data/ # ETL and data management ├── utilities/ # Common utility workflows └── examples/ # Demo and getting-started ``` ### 2. 检查和自定义 每个 workflow 都包含解释每个部分的内联注释: ``` # ============================================================================= # Workflow: IP Reputation Check # Category: security/enrichment # # Assess the reputation of a given IP address using threat intelligence # ============================================================================= name: IP Reputation Check # CONSTANTS - Update these values for your environment consts: abuseipdb_api_key: YOUR-API-KEY-HERE # Get from AbuseIPDB # INPUTS - Parameters provided at runtime inputs: - name: ip_address type: string required: true ``` ### 3. 导入 Kibana **选项 A: Kibana UI** 1. 在 Kibana 中导航到 **Management → Workflows** 2. 点击 **Create workflow** 3. 粘贴 YAML 内容 4. 保存并测试 **选项 B: API 导入** ``` curl -X POST "https://your-kibana-url/api/workflows" \ -H "kbn-xsrf: true" \ -H "x-elastic-internal-origin: Kibana" \ -H "Content-Type: application/json" \ -H "Authorization: ApiKey YOUR_API_KEY" \ -d '{"yaml": "'"$(cat workflows/security/enrichment/ip-reputation-check.yaml)"'"}' ``` 有关详细说明,请参阅 [docs/importing.md](./docs/importing.md)。 ## 仓库结构 ``` elastic-workflows/ ├── README.md # This file ├── CONTRIBUTING.md # Contribution guidelines ├── LICENSE.txt # Apache 2.0 license ├── workflows/ # All workflow YAML files │ ├── security/ # Security operations │ │ ├── detection/ # Threat detection workflows │ │ ├── response/ # Incident response workflows │ │ ├── enrichment/ # Enrichment workflows │ │ └── hunting/ # Threat hunting workflows │ ├── integrations/ # Third-party integrations │ │ ├── splunk/ │ │ ├── slack/ │ │ ├── jenkins/ │ │ ├── jira/ │ │ ├── caldera/ │ │ ├── firebase/ │ │ └── snowflake/ │ ├── search/ # Search workflows │ ├── observability/ # Observability workflows │ ├── ai-agents/ # AI agent workflows │ ├── data/ # Data/ETL workflows │ ├── utilities/ # Utility workflows │ └── examples/ # Demo workflows └── docs/ # Extended documentation ├── schema.md # Complete YAML schema reference ├── concepts.md # Workflow concepts explained └── importing.md # Import instructions ``` ## 工作流分类 ### 安全 用于安全操作、威胁检测和事件响应的 workflows。 | 类别 | 数量 | 描述 | |----------|-------|-------------| | [security/detection](./workflows/security/detection/) | 8 | 警报管理、威胁检测、规则执行 | | [security/enrichment](./workflows/security/enrichment/) | 5 | VirusTotal、IP 信誉、威胁情报查询 | | [security/response](./workflows/security/response/) | 4 | 事件响应、分类和案例管理 | ### 集成 用于连接 Elastic 与外部系统的 workflows。 | 类别 | 数量 | 描述 | |----------|-------|-------------| | [integrations/splunk](./workflows/integrations/splunk/) | 5 | Splunk 查询、丰富信息、数据检索 | | [integrations/caldera](./workflows/integrations/caldera/) | 4 | MITRE Caldera 对手模拟 | | [integrations/slack](./workflows/integrations/slack/) | 3 | 频道创建、用户管理、通知 | | [integrations/firebase](./workflows/integrations/firebase/) | 2 | Firebase 身份验证 | | [integrations/jenkins](./workflows/integrations/jenkins/) | 1 | CI/CD 构建自动化 | | [integrations/jira](./workflows/integrations/jira/) | 1 | 工单创建 | | [integrations/snowflake](./workflows/integrations/snowflake/) | 1 | 数据仓库查询 | ### 平台功能 | 类别 | 数量 | 描述 | |----------|-------|-------------| | [search](./workflows/search/) | 4 | ES\|QL、语义搜索、网络搜索 | | [observability](./workflows/observability/) | 1 | 监控、日志分析、AI 驱动的可观测性 | | [ai-agents](./workflows/ai-agents/) | 2 | AI 代理调用和自动化 | | [data](./workflows/data/) | 3 | ETL、摄取、文档管理 | | [utilities](./workflows/utilities/) | 11 | 常用操作和辅助工具 | | [examples](./workflows/examples/) | 2 | 入门演示 | ### 精选 Workflows | Workflow | 类别 | 描述 | |----------|----------|-------------| | [IP 信誉检查](./workflows/security/enrichment/ip-reputation-check.yaml) | 安全 | 对照 AbuseIPDB 和地理位置检查 IP | | [哈希威胁检查](./workflows/security/detection/hash-threat-check.yaml) | 安全 | VirusTotal 文件哈希分析 | | [Splunk 查询](./workflows/integrations/splunk/splunk-query.yaml) | 集成 | 执行 Splunk 搜索 | | [创建 Slack 频道](./workflows/integrations/slack/create-slack-channel.yaml) | 集成 | 自动创建 Slack 频道 | | [语义知识搜索](./workflows/search/semantic-knowledge-search.yaml) | 搜索 | AI 驱动的语义搜索 | | [AD 自动分类](./workflows/security/response/ad-automated-triaging.yaml) | 安全 | 自动安全警报分类 workflow | ## Workflow Schema 每个 workflow 都遵循一致的 YAML schema: ``` # Required fields name: "Workflow Name" # Human-readable name steps: # At least one step required - name: "Step Name" # Step identifier type: "action.type" # Action to perform with: # Action parameters key: value # Optional fields description: "What this does" # Detailed description tags: # Categories for organization - observability - search triggers: # How the workflow is invoked - type: scheduled with: every: "1d" # Daily consts: # Reusable constants api_key: "value" inputs: # Runtime parameters - name: query type: string required: true ``` ### 常用操作类型 | 操作 | 描述 | 用例 | |--------|-------------|----------| | `http` | HTTP 请求 | API 调用、webhook | | `elasticsearch.search` | 搜索 ES 索引 | 数据检索 | | `elasticsearch.index` | 索引文档 | 数据存储 | | `kibana.cases` | 案例管理 | 事件响应 | | `kibana.alert` | 警报操作 | 检测 | | `console` | 日志输出 | 调试 | | `foreach` | 遍历数组 | 批处理 | 有关完整的 schema 参考,请参阅 [docs/schema.md](./docs/schema.md)。 ## 核心概念 ### 触发器 Workflows 支持多种触发器类型: ``` # Manual (on-demand) triggers: - type: manual # Scheduled (simple interval) triggers: - type: scheduled with: every: "6h" # Every 6 hours # Alert-driven triggers: - type: alert ``` ### 变量语法 使用双花括号引用值: ``` # Constants url: "{{ consts.api_url }}/endpoint" # Inputs query: "host.ip: {{ inputs.target_ip }}" # Step outputs message: "Found {{ steps.search.output.hits.total }} results" ``` ### Liquid 模板 Workflows 支持 [Liquid](https://shopify.github.io/liquid/) 模板以生成动态内容。使用过滤器内联转换数据。 #### 常用过滤器 | 过滤器 | 描述 | 示例 | |--------|-------------|---------| | `json` | 转换为 JSON 字符串 | `{{ object \| json }}` | | `json_parse` | 将 JSON 字符串解析为对象 | `{{ json_string \| json_parse }}` | | `size` | 获取数组长度或字符串长度 | `{{ items \| size }}` | | `first` / `last` | 获取数组的第一项/最后一项 | `{{ items \| first }}` | | `map` | 从数组中提取属性 | `{{ users \| map: "name" }}` | | `where` | 按属性过滤数组 | `{{ items \| where: "status", "active" }}` | | `where_exp` | 使用表达式过滤 | `{{ items \| where_exp: "item.price > 100" }}` | | `join` | 将数组连接为字符串 | `{{ tags \| join: ", " }}` | | `split` | 将字符串分割为数组 | `{{ csv \| split: "," }}` | | `default` | 默认值 | `{{ name \| default: "Unknown" }}` | | `date` | 格式化日期 | `{{ "now" \| date: "%Y-%m-%d" }}` | | `upcase` / `downcase` | 更改大小写 | `{{ text \| upcase }}` | | `strip` | 移除空白字符 | `{{ text \| strip }}` | | `replace` | 替换子字符串 | `{{ text \| replace: "old", "new" }}` | | `truncate` | 截断字符串 | `{{ text \| truncate: 50 }}` | | `base64_encode` / `base64_decode` | Base64 编码 | `{{ text \| base64_encode }}` | | `url_encode` / `url_decode` | URL 编码 | `{{ text \| url_encode }}` | #### 数组操作 ``` # Filter products where price > 100 {{ products | where_exp: "item.price > 100" }} # Find first matching item {{ products | find: "type", "book" }} # Check if any item matches {{ products | has: "category", "electronics" }} # Remove items matching condition {{ products | reject_exp: "item.stock == 0" }} # Sort by property {{ products | sort: "name" }} # Get unique values {{ items | uniq }} # Concatenate arrays {{ array1 | concat: array2 }} ``` #### 字符串操作 ``` # Format message with data message: "Alert: {{ event.rule.name | upcase }} on {{ event.host.name }}" # Build URL with encoding url: "https://api.example.com/search?q={{ query | url_encode }}" # Extract substring short_hash: "{{ file.hash.sha256 | slice: 0, 8 }}" # Default values for missing data user: "{{ event.user.name | default: 'unknown' }}" ``` #### 控制流 使用 Liquid 标签进行条件逻辑和循环: ``` message: | {%- if steps.search.output.hits.total > 0 -%} Found {{ steps.search.output.hits.total }} results {%- else -%} No results found {%- endif -%} ``` ``` # Loop over items message: | {%- for alert in event.alerts -%} - {{ alert.rule.name }}: {{ alert.severity }} {%- endfor -%} ``` ``` # Assign variables message: | {%- assign severity = event.alerts[0].severity -%} {%- case severity -%} {%- when "critical" -%} 🔴 CRITICAL: Immediate action required {%- when "high" -%} 🟠 HIGH: Investigate promptly {%- else -%} 🟢 Normal priority {%- endcase -%} ```所有支持的过滤器 (点击展开)
**Math**: `abs`, `at_least`, `at_most`, `ceil`, `divided_by`, `floor`, `minus`, `modulo`, `plus`, `round`, `times` **String**: `append`, `capitalize`, `downcase`, `escape`, `lstrip`, `prepend`, `remove`, `remove_first`, `remove_last`, `replace`, `replace_first`, `replace_last`, `rstrip`, `slice`, `split`, `strip`, `strip_html`, `strip_newlines`, `truncate`, `truncatewords`, `upcase` **Array**: `compact`, `concat`, `first`, `group_by`, `group_by_exp`, `join`, `last`, `map`, `pop`, `push`, `reverse`, `shift`, `size`, `sort`, `sort_natural`, `uniq`, `unshift`, `where`, `where_exp`, `find`, `find_exp`, `has`, `has_exp`, `reject`, `reject_exp` **Date**: `date`, `date_to_long_string`, `date_to_rfc822`, `date_to_string`, `date_to_xmlschema` **Encoding**: `base64_decode`, `base64_encode`, `cgi_escape`, `uri_escape`, `url_decode`, `url_encode`, `xml_escape`, `json`, `json_parse` **Utility**: `default`, `escape_once`, `normalize_whitespace`, `number_of_words`, `slugify`, `array_to_sentence_string`标签:API集成, Elastic Security, Elastic Stack, FTP漏洞扫描, IT运维, Liquid, Socks5代理, YAML, 云计算, 可观测性, 响应行动, 安全库, 安全编排, 工作流, 搜索引擎, 模板库, 流量重放, 网络调试, 自动化, 规则引擎, 越狱测试, 运维监控