marez8505/SOARPlaybook

GitHub: marez8505/SOARPlaybook

一个基于 Python 的安全编排自动化与响应剧本执行引擎,内置六套事件响应剧本和模拟执行能力,用于学习、原型验证和调试 SOAR 工作流。

Stars: 0 | Forks: 0

# SOARPlaybook **安全编排、自动化与响应 (SOAR) 剧本引擎** [![Python](https://img.shields.io/badge/python-3.8%2B-blue)](https://python.org) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](LICENSE) [![Tests](https://img.shields.io/badge/tests-133%20passed-brightgreen)](#running-tests) 一个专业的、达到作品集级别的 SOAR 剧本执行引擎的 Python 实现。展示了与 Cortex XSOAR、Splunk SOAR 和 Swimlane 平台职位相关的 SOC 自动化工程技能。 ## 目录 - [什么是 SOAR?](#what-is-soar) - [本项目的作用](#what-this-project-does) - [安装说明](#installation) - [快速开始](#quick-start) - [CLI 参考](#cli-reference) - [Playbook YAML 格式](#playbook-yaml-format) - [内置剧本](#included-playbooks) - [MITRE ATT&CK 覆盖范围](#mitre-attck-coverage) - [Action 模块](#action-modules) - [引擎架构](#engine-architecture) - [示例执行输出](#sample-execution-output) - [真实 SOAR 平台映射](#real-soar-platform-mapping) - [运行测试](#running-tests) - [项目结构](#project-structure) - [作者](#author) ## 什么是 SOAR? **安全编排、自动化与响应 (SOAR)** 是一类安全技术,它能够: - **编排** — 将不同的安全工具(SIEM、EDR、Firewalls、Ticketing)连接到统一的工作流中 - **自动化** — 无需人工干预即可执行重复的事件响应任务(IOC lookups、host isolation、firewall blocking) - **响应** — 通过**剧本** 实现标准化、可重复的事件响应——**剧本**是结构化的决策树,可引导分析师和自动化操作完成 IR 生命周期的每一步 ### 剧本驱动的事件响应 传统的 IR 是临时的:分析师收到警报,手动检查多个工具,并即兴做出响应。这种方式缓慢、不一致且无法扩展。 SOAR 剧本通过将*整个响应工作流*定义为代码来解决这个问题: ``` Alert triggers → Playbook starts → Automated enrichment → Decision branch → Action taken → Report generated ``` 现在每个主要的 SOC 都在使用 SOAR 平台。了解如何构建和操作剧本是以下岗位的核心技能: - SOC Analyst (Tier 1/2/3) - Detection Engineer - Security Automation Engineer - Incident Responder - SecOps Platform Engineer ## 本项目的作用 SOARPlaybook 实现了一个完整的剧本执行引擎,它能够: 1. **解析 YAML 剧本定义**,包含完整的结构验证 2. **使用递归状态机逐步执行剧本** 3. **使用安全的 Jinja2 + 表达式评估器评估决策分支** 4. **分发到 action 模块** — 与真实安全工具的模拟集成 5. **管理执行状态** — 变量、输出、分支历史记录 6. **生成完整的审计日志** — 每个步骤都记录有时间戳、输入、输出和持续时间 7. **从执行日志生成 HTML 事件报告** 8. **支持模拟模式** — 在没有真实副作用的情况下 dry-run 任何剧本 9. **处理错误和重试** — on_error 路由和可配置的重试逻辑 ## 安装说明 ``` git clone https://github.com/edwardmarez/soarplaybook cd soarplaybook pip install -r requirements.txt # 或安装为 package pip install -e . ``` **要求:** Python 3.8+, PyYAML, Jinja2, colorama, tabulate ## 快速开始 ### 运行交互式演示 ``` python -m soarplaybook demo ``` 这将执行钓鱼响应剧本,使用真实预填充的变量,展示逐步执行输出,并生成 HTML 事件报告。 ### 列出所有可用的剧本 ``` python -m soarplaybook list ``` ### 模拟勒索软件事件响应 ``` python -m soarplaybook simulate --playbook ransomware_response.yml ``` ### 使用真实变量运行钓鱼响应 ``` python -m soarplaybook run \ --playbook phishing_response.yml \ --var email_sender=attacker@evil.com \ --var email_subject="Urgent: Verify Your Account" \ --var suspicious_url=http://evil.com/login \ --var attachment_hash=e10adc3949ba59abbe56e057f20f883e ``` ### 验证剧本 ``` python -m soarplaybook validate --playbook malware_detection.yml ``` ### 从保存的执行日志生成 HTML 报告 ``` python -m soarplaybook report --execution-log ./logs/exec_2024-01-15.json --format html ``` ## CLI 参考 ``` soarplaybook [options] Commands: run Execute a playbook with real action dispatch simulate Dry-run a playbook (simulation mode — no real side effects) validate Validate a playbook YAML for structural correctness list List all available playbooks with descriptions report Generate HTML or text report from a saved execution log demo Run an interactive demonstration (phishing response) run / simulate options: --playbook, -p PATH Path to playbook YAML (required) --var, -v KEY=VALUE Variable override (repeatable) --log-path PATH Where to save execution log JSON --quiet, -q Suppress step-by-step console output validate options: --playbook, -p PATH Path to playbook YAML (required) list options: --playbooks-dir, -d DIR Directory to scan (default: soarplaybook/playbooks/) report options: --execution-log PATH Path to execution log JSON (required) --format, -f FORMAT Output format: html or text (default: html) --output, -o PATH Output file path ``` ## Playbook YAML 格式 剧本定义为具有结构化 schema 的 YAML 文件。这是一个带有完整注释的示例: ``` playbook: name: "Phishing Email Response" # Human-readable name description: "Automated response..." # Description of what this playbook does author: "Edward Marez" # Playbook author version: "1.0" # Playbook version severity: "medium" # Default severity: critical/high/medium/low tags: # Searchable tags - phishing - email-security mitre_attack: # MITRE ATT&CK technique IDs - T1566.001 # Phishing: Spearphishing Attachment - T1566.002 # Phishing: Spearphishing Link variables: # Playbook-scoped variables (defaults) email_sender: "" affected_users: [] reputation_score: 0 is_malicious: false steps: # Action step — executes an action module - id: "step_1" name: "Extract Email IOCs" type: "action" # action | decision | parallel | manual action: "enrichment.extract_email_iocs" # namespace.method inputs: # Jinja2 {{ variable }} substitution email_sender: "{{ email_sender }}" email_subject: "{{ email_subject }}" outputs: # Named outputs → promoted to variables - urls_found - sender_domain - has_indicators next: "step_2" # Next step ID (null = end) on_error: "step_error" # Jump here on failure (optional) retry: 2 # Retry count on failure (optional) timeout_seconds: 30 # Timeout per step (optional) # Decision step — evaluates a condition and branches - id: "step_2" name: "Evaluate Threat Level" type: "decision" condition: "{{ is_malicious == True or reputation_score > 70 }}" on_true: "step_3_block" # Step ID if condition is true on_false: "step_3_monitor" # Step ID if condition is false # Parallel step — runs multiple steps concurrently - id: "step_parallel" name: "Parallel Enrichment" type: "parallel" parallel_steps: - "enrich_ip" - "enrich_domain" - "check_hash" next: "step_report" # Manual step — simulates analyst decision point - id: "step_manual" name: "Analyst Approval Required" type: "manual" prompt: "Approve host isolation for {{ hostname }}?" next: "step_isolate" ``` ### 变量替换 所有的 `inputs` 字段都支持 Jinja2 `{{ variable }}` 语法。变量来源为: 1. 从剧本中的 `variables:` 部分初始化 2. 被 `--var` CLI 参数覆盖 3. 当步骤产生 `outputs` 时自动更新 示例 — 如果 `step_1` 声明了 `outputs: [sender_domain]`,那么 `step_2` 就可以在任何输入字段中使用 `{{ sender_domain }}`。 ### 条件表达式 决策步骤支持丰富的布尔表达式: ``` # 比较运算符 condition: "{{ score > 70 }}" condition: "{{ score >= 50 and score < 90 }}" condition: "{{ verdict == 'malware' }}" # 布尔逻辑 condition: "{{ is_malicious or is_newly_registered }}" condition: "{{ not is_malicious and threat_feeds_matched == 0 }}" # 列表成员 condition: "{{ 'admin' in affected_accounts }}" # 多条件(真实 SOAR 模式) condition: "{{ is_malicious == True or reputation_score > 70 or vss_deleted == True }}" ``` ## 内置剧本 所有六个剧本均已全面实现,包含 8–15 个步骤、决策分支和逼真的 IR 工作流。 ### 1. 钓鱼邮件响应 (`phishing_response.yml`) **严重性:** 中等 → 升级为高 **步骤:** 13 **工作流:** 提取 IOCs → 检查发件人域名信誉 → WHOIS lookup → 检查 URL → 决策 (是否恶意?) → 阻断域名或创建监控工单 → 检查附件哈希 → 通知用户 → 创建工单 → 生成 HTML 报告 **应对场景:** 鱼叉式网络钓鱼邮件、恶意附件、凭证收集链接 ### 2. 恶意软件检测响应 (`malware_detection.yml`) **严重性:** 高 **步骤:** 12 **工作流:** 通过 threat intel 验证哈希 → 关联活动 → 查找资产关键性 → 决策 (高可信度恶意软件?) → 隔离主机或全面扫描 → 终止恶意进程 → 阻断 C2 IP → 收取证物 → 评估横向移动风险 → 升级给 IR 团队 → 生成报告 **应对场景:** EDR/AV 检测、进程注入、凭证转储 ### 3. 暴力破解攻击响应 (`brute_force_response.yml`) **严重性:** 中等 **步骤:** 11 **工作流:** 检查攻击者 IP 信誉 → 地理位置 → 获取目标用户上下文 → 决策 (阻断或限速?) → 阻断 IP 或创建限速规则 → 决策 (凭证是否泄露?) → 强制重置密码或创建工单 → 决策 (是否为特权账户?) → 紧急升级或标准工单 → 通知用户 **应对场景:** 密码喷洒、凭证填充、VPN/RDP 暴力破解 ### 4. 数据泄露响应 (`data_exfiltration.yml`) **严重性:** 严重 **步骤:** 13 **工作流:** 验证目标 IP 和域名信誉 → 目标地理位置 → 获取用户上下文 → 查找源资产 → 决策 (确认泄露?) → 阻断出站流量并隔离主机或创建调查工单 → 收集取证证据 (浏览器历史记录、云同步、USB 日志) → 创建严重工单 → 通知管理层和法务部门 → 生成取证报告 **应对场景:** DLP 告警、未经授权的云上传、内部数据窃取、C2 隐蔽外发 ### 5. 勒索软件事件响应 (`ransomware_response.yml`) **严重性:** 严重 (紧急) **步骤:** 12 **工作流:** 立即隔离主机 → 阻断 C2 IP → DNS sinkhole C2 域名 → 验证勒索软件哈希 → 关联已知活动 → 收集取证证据 → 决策 (备份是否受损?) → 紧急升级给 CISO 或创建备份恢复工单 → 创建事件工单 → 通知所有受影响用户 → 升级给 IR 团队 → 生成紧急报告 **应对场景:** 活跃的勒索软件、防止横向传播、备份完整性、VSS 删除 ### 6. 内部威胁响应 (`insider_threat.yml`) **严重性:** 高 (敏感) **步骤:** 12 **工作流:** 隐蔽的 AD/HR 用户查找 → 资产上下文 → Threat intel 关联 → 决策 (高风险用户?) → 隐蔽取证收集或监控工单 → 决策 (是否有泄露迹象?) → DLP IP 信誉检查 → 决策 (离职/特权风险?) → 禁用账户或加强监控 → 创建受限调查工单 → 生成机密报告 **应对场景:** UEBA 告警、未经授权的数据访问、非工作时间异常、即将离职的员工 ## MITRE ATT&CK 覆盖范围 | 剧本 | 涵盖的技术 | |---|---| | Phishing Response | T1566.001, T1566.002, T1598.003, T1204.002 | | Malware Detection | T1059.001, T1059.003, T1055, T1547.001, T1486, T1003.001, T1021.002 | | Brute Force | T1110.001, T1110.002, T1110.003, T1110.004, T1078, T1021 | | Data Exfiltration | T1041, T1048, T1567, T1005, T1560, T1071, T1030 | | Ransomware | T1486, T1490, T1489, T1059.001, T1047, T1021.002, T1078, T1070.001, T1562.001 | | Insider Threat | T1078, T1005, T1039, T1025, T1567.002, T1052, T1560 | **涵盖的独特技术总数:28+** ## Action 模块 所有的 action 模块都是模拟的,但在架构上与真实的 SOAR 集成完全一致。每个模块都会: - 返回逼真的、结构化的响应数据 - 记录带有时间戳和输入/输出的每一个 action - 支持 `failure_rate` 参数,用于测试错误处理路径 - 打印在真实环境中将会执行的操作 (在模拟模式下) - 具有可配置的 `simulation=True/False` 模式 | 模块 | 方法 | 真实集成 | |---|---|---| | `ioc_lookup` | `check_ip`, `check_domain`, `check_hash`, `check_url` | VirusTotal, AlienVault OTX, IBM X-Force | | `email_alert` | `notify_users`, `notify_soc`, `notify_management` | Microsoft 365, SendGrid, SMTP | | `firewall` | `block_ip`, `block_domain`, `unblock_ip`, `create_rule` | Palo Alto PAN-OS, Cisco ASA, FortiGate | | `endpoint` | `isolate`, `scan`, `collect_forensics`, `kill_process`, `reset_credentials` | CrowdStrike Falcon, Microsoft Defender for Endpoint | | `ticket` | `create`, `update`, `escalate`, `close` | ServiceNow, Jira, PagerDuty | | `enrichment` | `extract_email_iocs`, `geolocate_ip`, `whois_lookup`, `threat_intel`, `user_context`, `asset_lookup` | Active Directory, MaxMind GeoIP, MISP | | `documentation` | `generate_report`, `generate_summary`, `timeline` | 内部 (HTML 报告生成) | ### 添加自定义 Action ``` from soarplaybook.actions.base_action import BaseAction class MyCustomAction(BaseAction): def my_method(self, target: str = "", **kwargs): self._sim_delay(100, 500) self._sim_banner(f"Running custom action on {target}") # ... your logic here return {"result": "success", "target": target} # 在 soarplaybook/actions/__init__.py 中注册: # "custom.my_method": (MyCustomAction, "my_method"), ``` ## 引擎架构 ``` CLI (main.py) │ ▼ PlaybookParser ← Loads and validates YAML playbook definitions │ ▼ PlaybookEngine ← Walks the step graph, dispatches actions │ ├── DecisionEngine ← Evaluates {{ conditions }}, routes branches │ ├── StateManager ← Tracks variables, step records, audit log │ └── Action Registry ← Maps "namespace.method" → (ActionClass, method) │ ├── IOCLookupAction ├── EmailAlertAction ├── FirewallAction ├── EndpointAction ├── TicketAction ├── EnrichmentAction └── DocumentationAction ReportGenerator ← Produces HTML/text reports from saved execution logs ``` ### 执行流程 ``` 1. PlaybookParser.load("phishing_response.yml") └── Parse YAML → validate structure → normalize → return playbook_def 2. PlaybookEngine.run(playbook_def, variables) └── Create StateManager (init variables, audit log) └── Begin execution loop at first step 3. For each step: a. Substitute {{ variables }} in inputs (DecisionEngine) b. If type=action: dispatch to action module c. If type=decision: evaluate condition → choose branch d. If type=parallel: ThreadPoolExecutor, wait for all e. If type=manual: auto-approve (in sim mode) f. Promote outputs → StateManager.variables g. Advance to next step ID 4. On completion: └── StateManager.complete_execution() └── Save execution log JSON └── Print summary 5. ReportGenerator.generate("html") └── Load execution log └── Render HTML report with timeline, variables, recommendations ``` ## 示例执行输出 ``` ╔══════════════════════════════════════════════╗ Playbook : Phishing Email Response Severity : [MEDIUM] Exec ID : exec-a3f2b891c4d1 Incident : INC-20240315-142301-3f9a ╚══════════════════════════════════════════════╝ ► Step: Extract Email Indicators of Compromise [step_1] ⚡ Executing: enrichment.extract_email_iocs [SIM] · sender_domain: evil.com · urls_found: ['http://evil.com/login'] · has_indicators: True ✓ Completed in 142ms ► Step: Check Sender Domain Reputation [step_2] ⚡ Executing: ioc_lookup.check_domain [SIM] · reputation_score: 99 · is_malicious: True · category: Phishing ✓ Completed in 217ms ► Step: Evaluate Threat Level [step_5] ? Condition: {{ is_malicious == True or reputation_score > 70 or is_newly_registered == True }} → Result: TRUE → step_6_block ✓ Completed in 0ms ► Step: Block Malicious Sender Domain [step_6_block] ⚡ Executing: firewall.block_domain [SIM] · block_success: True · rule_id: DNS-BLOCK-10003 · sinkhole_ip: 100.64.0.1 ✓ Completed in 389ms ... ══════════════════════════════════════════════ EXECUTION SUMMARY ══════════════════════════════════════════════ Incident ID : INC-20240315-142301-3f9a Status : COMPLETED Steps Done : 11 / 11 Duration : 2.14s MITRE : T1566.001, T1566.002, T1598.003, T1204.002 ══════════════════════════════════════════════ ``` ## 真实 SOAR 平台映射 本引擎反映了生产级 SOAR 平台的架构和概念: | SOARPlaybook 概念 | Cortex XSOAR | Splunk SOAR | Swimlane | |---|---|---|---| | `playbook_engine.py` | Playbook runner | Playbook runner | Workflow engine | | `decision_engine.py` | Condition tasks | Decision blocks | Decision shapes | | `state_manager.py` | Context/incident | Container | Application record | | `actions/__init__.py` | App registry | App catalog | Plugin registry | | `ioc_lookup.py` | VirusTotal app | VirusTotal app | VT integration | | `firewall_action.py` | PAN-OS app | Palo Alto app | Firewall plugin | | `endpoint_action.py` | CrowdStrike app | CS Falcon app | EDR plugin | | `ticket_action.py` | ServiceNow app | JIRA app | Ticketing plugin | | Execution log JSON | War room | Event log | Audit trail | | HTML report | Incident report | Investigation report | Case report | ### 与生产级平台的主要区别 | 功能 | SOARPlaybook | 生产级 SOAR | |---|---|---| | Action 执行 | 模拟 / 本地 | 对外部服务的真实 API 调用 | | 剧本编辑器 | YAML 文本文件 | 可视化拖放图形编辑器 | | 身份验证 | N/A | OAuth, API keys, credential vault | | 扩展性 | 单进程 | 分布式任务 worker | | 用户管理 N/A | 具有 SOC 角色的 RBAC | | 集成 | 7 个模拟集成 | 300–1000+ 个商业集成 | ## 运行测试 ``` # 运行完整 test suite PYTHONPATH=. python -m pytest tests/ -v # 运行特定测试文件 PYTHONPATH=. python -m pytest tests/test_engine.py -v # 带 coverage 运行 pip install pytest-cov PYTHONPATH=. python -m pytest tests/ --cov=soarplaybook --cov-report=term-missing ``` **测试覆盖率:** | 文件 | 测试用例 | 覆盖范围 | |---|---|---| | `test_playbook_parser.py` | 20 | YAML 加载、验证、规范化、内置剧本 | | `test_decision_engine.py` | 35 | 条件评估、分支、变量替换 | | `test_actions.py` | 49 | 所有 7 个 action 模块、注册表、失败模式、模拟占位符 | | `test_engine.py` | 29 | StateManager, PlaybookEngine, 端到端剧本执行 | **总计:133 个测试,100% 通过** ## 项目结构 ``` SOARPlaybook/ ├── soarplaybook/ │ ├── __init__.py # Package exports │ ├── __main__.py # python -m soarplaybook entry point │ ├── main.py # CLI: run, validate, list, simulate, report, demo │ ├── report_generator.py # Standalone report generator from execution logs │ ├── utils.py # Shared utilities (logging, timestamps, colors) │ ├── engine/ │ │ ├── __init__.py │ │ ├── playbook_parser.py # YAML loading, validation, normalization │ │ ├── playbook_engine.py # Main execution loop, step dispatch │ │ ├── decision_engine.py # Condition evaluation, variable substitution │ │ └── state_manager.py # Execution state, audit log, persistence │ ├── actions/ │ │ ├── __init__.py # Action registry (namespace.method → class) │ │ ├── base_action.py # Abstract base with simulation/logging helpers │ │ ├── ioc_lookup.py # IP, domain, hash, URL reputation lookups │ │ ├── email_alert.py # User, SOC, and management notifications │ │ ├── firewall_action.py # Block/unblock IP, domain, create rules │ │ ├── endpoint_action.py # Isolate, scan, forensics, kill process, reset creds │ │ ├── ticket_action.py # Create, update, escalate, close tickets │ │ ├── enrichment_action.py # Email IOC extraction, GeoIP, WHOIS, LDAP, CMDB │ │ └── documentation_action.py # HTML/text report generation, timelines │ └── playbooks/ │ ├── phishing_response.yml │ ├── malware_detection.yml │ ├── brute_force_response.yml │ ├── data_exfiltration.yml │ ├── ransomware_response.yml │ └── insider_threat.yml ├── tests/ │ ├── __init__.py │ ├── test_playbook_parser.py # 20 parser tests │ ├── test_decision_engine.py # 35 decision/condition tests │ ├── test_actions.py # 47 action module tests │ └── test_engine.py # 29 engine/state integration tests ├── logs/ # Execution logs (auto-created) ├── reports/ # HTML reports (auto-created) ├── requirements.txt ├── setup.py ├── LICENSE └── README.md ``` ## 作者 **Edward Marez** 旨在展示 SOAR 平台工程技能,包括: - 剧本设计和事件响应自动化 - Python 软件工程 (OOP, ABC, threading, Jinja2) - 安全运营概念 (IOC enrichment, containment, forensics) - MITRE ATT&CK 框架映射 - ITSM 集成模式 (ServiceNow, Jira) - EDR 平台概念 (CrowdStrike Falcon, Microsoft Defender) ## 许可证 MIT License — 详情请参阅 [LICENSE](LICENSE)。
标签:CLI, Cloudflare, Cortex XSOAR, DAST, FTP漏洞扫描, IR, MITRE ATT&CK, MIT协议, Python, SOAR, SOC自动化, Splunk SOAR, Swimlane, WiFi技术, 剧本引擎, 单元测试, 安全响应, 安全工程, 安全编排, 安全运营, 开源, 恶意代码分类, 恶意软件分析, 扫描框架, 投资组合项目, 无后门, 网络安全, 逆向工具, 隐私保护