Berlin2003/openenv-soc

GitHub: Berlin2003/openenv-soc

基于OpenEnv框架的SOC智能体仿真环境,用于训练和评估AI代理执行日志分析、警报研判及自动化响应等安全运营任务。

Stars: 0 | Forks: 0

## title: OpenEnv SOC emoji: 🛡️ colorFrom: blue colorTo: indigo sdk: docker pinned: false # 🛡️ OpenEnv-SOC: Autonomous Security Operations Center Analyst

OpenEnv HF Space Docker Python Domain

## 目录 1. [动机](#motivation) 2. [Agent 的功能](#what-the-agent-does) 3. [环境架构](#environment-architecture) 4. [动作空间](#action-space) 5. [观察空间](#observation-space) 6. [任务](#tasks) - [任务 1 — 钓鱼误报 (简单)](#task-1--phishing-false-positive-easy) - [任务 2 — 协调的凭证填充 (中等)](#task-2--coordinated-credential-stuffing-medium) - [任务 3 — 勒索软件横向移动 4 跳 (困难)](#task-3--ransomware-lateral-movement-4-hop-hard) 7. [奖励函数](#reward-function) 8. [评分器规范](#grader-specifications) 9. [基线分数](#baseline-scores) 10. [项目结构](#project-structure) 11. [设置与使用](#setup--usage) - [本地开发](#local-development) - [Docker](#docker) - [Hugging Face Space](#hugging-face-space) - [Python 客户端 API](#python-client-api) 12. [环境变量](#environment-variables) 13. [OpenEnv 规范合规性](#openenv-spec-compliance) ## 动机 安全运营中心 (SOC) 每天处理 **数千条安全警报**。Tier-1 和 Tier-2 分析师将 60–80% 的时间花在重复的分类任务上 —— 查询日志、查找威胁情报,并决定是阻止、隔离还是忽略。这导致了: - **警报疲劳** —— 分析师会遗漏埋藏在噪音中的真实威胁 - **缓慢的平均响应时间 (MTTR)** —— 攻击者有数小时或数天的时间进行横向移动 - **高分析师倦怠率** —— SOC 职位的年流失率超过 30% **OpenEnv-SOC** 是第一个在现实的 SOC 分类工作流上训练和评估自主 AI Agent 的 OpenEnv 环境。该环境旨在衡量 Agent 是否能够: 1. 阅读模棱两可、有噪音的警报(而不仅仅是基于关键词的模式匹配) 2. 通过查询 3 个数据源中的带时间戳日志来**主动调查** 3. 导航故意植入的**红鲱鱼 (red-herring) 指标**,以惩罚在没有证据的情况下采取行动的 Agent 4. 应用**精确、有针对性的补救措施** (block/isolate/revoke) 而不造成附带损害 5. 处理跨越 4 台内部主机、多个 IP 和受损服务账户的**多跳攻击链** 该环境填补了一个真正的空白:针对 OpenEnv-SOC 训练的 SOC 分类 Agent 可以在现实的 incident response 任务上进行评估,范围从琐碎的误报分类到复杂的勒索软件遏制。 ## Agent 的功能 该 Agent 充当 **Tier-2 SOC 分析师**。在每一个 episode 中,它: 1. 接收一个**初始安全警报**,其中包含警报 ID、严重性级别、描述和受影响的主机 2. 查询可用的数据源以收集证据: - `firewall` 日志(带时间戳的入站/出站流量) - `endpoint` 日志(每个主机的进程执行、文件活动、登录事件) - `activedirectory` 日志(认证成功/失败、VPN 事件) - `threat_intel` 数据库(IP 信誉、域名判定、恶意软件哈希) 3. 根据发现的情况采取补救行动: - `block_ip` —— 将 IP 添加到外围防火墙阻止列表中 - `isolate_host` —— 将主机与企业网络隔离 - `revoke_session` —— 终止用户或服务账户的所有活动会话 4. 用摘要和关于它是**误报**还是**确认事件**的声明来关闭警报 该环境奖励基于证据的精确行动,并惩罚**过度反应**(阻止安全的 IP,隔离未受影响的主机)和**调查不足**(在没有收集证据的情况下关闭案例)。 ## 环境架构 ``` ┌─────────────────────────────────────────────────────────────┐ │ AI Agent (LLM) │ │ Uses: query_logs / query_threat_intel / block_ip / │ │ isolate_host / revoke_session / close_alert │ └───────────────────┬─────────────────────────────────────────┘ │ HTTP (JSON / Pydantic) ▼ ┌─────────────────────────────────────────────────────────────┐ │ FastAPI Server (server/app.py) │ │ POST /reset POST /step GET /state GET /health │ └───────────────────┬─────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ SOCEnvironment (server/environment.py) │ │ · OpenEnv-compliant episode management │ │ · Shaped step-level reward function │ │ · Task-specific graders (_grade_easy / medium / hard) │ └───────────────────┬─────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ MockCorporateNetwork (server/mock_network.py) │ │ · Timestamped log database across 3 sources │ │ · Threat intelligence lookup (ThreatDB) │ │ · Stateful: isolated_hosts, blocked_ips, revoked_sessions │ │ · Red-herring indicators in every task │ └─────────────────────────────────────────────────────────────┘ ``` **设计原则:** - **部分可观测性** —— Agent 只能看到初始警报。它必须主动拉取日志来收集证据。 - **带时间戳的、现实的日志** —— 所有日志条目都带有 ISO 8601 时间戳、主机名和反映现实 SIEM 输出的结构化字段。 - **确定性评分** —— 场景完全设定了种子,没有随机性。相同的输入产生相同的分数。 - **塑形奖励** —— 对信息性动作的连续每步奖励,对红鲱鱼动作的惩罚。最终的 episode 分数仅为评分器输出(总是在 `[0.0, 1.0]` 之间)。 - **对过度反应的惩罚** —— 阻止安全的 IP、隔离未涉及的主机或吊销干净的账户会降低分数。 ## 动作空间 所有动作都表示为单个 `SOCAction` Pydantic 模型,带有一个必需的 `action_type` 判别器字段。只需提供与所选动作类型相关的参数。 ``` class SOCAction(Action): action_type: Literal[ "query_logs", "query_threat_intel", "block_ip", "isolate_host", "revoke_session", "close_alert" ] # Parameters (supply only those needed for the chosen action_type): source: Optional[Literal["firewall", "endpoint", "activedirectory"]] query: Optional[str] # keyword to search in logs indicator: Optional[str] # IP / domain / filename for threat lookup ip_address: Optional[str] # IP to block hostname: Optional[str] # host to isolate username: Optional[str] # user / service account to revoke alert_id: Optional[str] # required for close_alert resolution_summary: Optional[str] # human-readable closure note is_false_positive: Optional[bool] # True = FP, False = confirmed incident ``` | `action_type` | 必需参数 | 描述 | |---|---|---| | `query_logs` | `source`, `query` | 在所选源中关键字搜索带时间戳的日志。返回带有时间戳的匹配日志行。 | | `query_threat_intel` | `indicator` | 在威胁情报数据库中查找 IP、域名、文件名或用户名。返回判定和上下文。 | | `block_ip` | `ip_address` | 将 IP 添加到外围防火墙 DROP 列表。在 episode 的其余时间内持续有效。 | | `isolate_host` | `hostname` | 将主机与企业网络隔离。在 episode 的其余时间内持续有效。 | | `revoke_session` | `username` | 终止用户或服务账户的所有活动会话。在 episode 的其余时间内持续有效。 | | `close_alert` | `alert_id`, `resolution_summary`, `is_false_positive` | **终止动作** —— 结束 episode 并触发评分器。 | ## 观察空间 每次 `reset()` 和 `step()` 都会返回一个 `SOCObservation`: ``` class Alert(BaseModel): alert_id: str severity: str # "Low" | "Medium" | "High" | "Critical" description: str # Human-readable incident description source_ip: Optional[str] # Attacker IP if known at alert time target_host: Optional[str] # Affected internal host class NetworkStatus(BaseModel): isolated_hosts: List[str] # Hosts currently quarantined blocked_ips: List[str] # IPs currently on the firewall blocklist revoked_sessions: List[str] # Users/accounts with sessions revoked class SOCObservation(Observation): open_alerts: List[Alert] # Active, unresolved alerts network_status: NetworkStatus # Current remediation state last_action_result: str # Timestamped output of the last action step_count: int # Steps taken so far this episode reward: float # Step-level reward signal done: bool # True when episode is over ``` **状态 (只读,通过 `GET /state`):** ``` class SOCState(State): task_name: str step_count: int max_steps: int episode_score: float # Always in [0.0, 1.0] — grader output only done: bool isolated_hosts: List[str] blocked_ips: List[str] revoked_sessions: List[str] ``` ## 任务 ### 任务 1 — 钓鱼误报 (`easy`) **最大步数:** 15 | **严重性:** 中等 **场景:** 员工 `alice.chen@corp.com` 报告了一个可疑的 Slack 链接:`login.secure-oauth.com/sso?redirect=dashboard`。她没有点击它。Helpdesk 创建了一个工单。环境包含 12 条带时间戳的日志条目,显示: - 每天早上有 50 多位同事通过此域名成功进行身份验证 - 防火墙确认其解析为 Cloudflare CDN 节点 `104.18.22.187` - ActiveDirectory 显示多个用户通过此域名的 SSO 认证成功 - 存在一个**红鲱鱼 typosquat 域名** `login.secure0auth.com` 在 ThreatDB 中且是恶意的 —— Agent 必须不要将其与真实的 SSO URL 混淆 **预期的 Agent 行为:** 1. 查询任何日志源或威胁情报以收集证据 2. 将该域名识别为合法的企业 SSO 3. 在不采取任何补救措施的情况下将警报作为**误报**关闭 **容易让 naive Agent 掉进的陷阱:** 在没有调查的情况下立即阻止或隔离的 Agent,或者将 `secure-oauth.com` 与 `secure0auth.com` (typosquat) 混淆的 Agent。 ### 任务 2 — 协调的凭证填充 (`medium`) **最大步数:** 15 | **严重性:** 高 **场景:** SIEM 规则 `VPN-BRUTE-001` 触发:8 分钟内来自 **3 个攻击者 IP** 的 147 次组合 VPN 登录失败。两个账户 (`j.smith`, `m.rodriguez`) 已被成功攻破(MFA 被绕过)。`j.smith` 的会话随后被用于通过 SMB 从 `file-server-02` 读取 820MB 数据 —— 这是一个异常的数据量。 **攻击者 IP(均为恶意):** - `104.22.33.44` — AbuseIPDB 98/100,已确认的僵尸网络 - `185.156.72.10` — AbuseIPDB 94/100,Storm-1044 威胁行为者 - `91.108.4.201` — AbuseIPDB 91/100,Tor 出口中继 **红鲱鱼:** `203.0.113.5` 出现在 VPN 健康检查日志中。ThreatDB 确认它是内部监控探针使用的 RFC 5737 文档 IP。阻止它会导致惩罚。 **预期的 Agent 行为:** 1. 查询 ActiveDirectory 日志以查找失败 + 成功的登录 2. 查询威胁情报以确认所有 3 个攻击者 IP 都是恶意的(并且 `203.0.113.5` 是安全的) 3. 阻止所有 3 个恶意 IP 4. 吊销两个受损用户的会话 5. 作为确认事件关闭 ### 任务 3 — 勒索软件横向移动 4 跳 (`hard`) **最大步数:** 10 | **严重性:** 严重 **场景:** CrowdStrike Falcon 在 `marketing-laptop-02` 上阻止了 `Invoke-Crypt.ps1` (LockBit 4.0 变体)。初始警报未知的是,勒索软件已经通过网络通过**4 跳攻击链**进行了横向移动: ``` marketing-laptop-02 └─[credential dump: svc-backup, svc-monitor] └─► file-server-02 (SMB, 14,822 files renamed to *.locked) └─► backup-server (SSH, backup catalog wiped) └─► db-server-main (PostgreSQL, 2.1M customer records exfiltrated) ``` 两个 C2 IP 正在主动窃取数据: - `185.220.101.52` — 活跃的 LockBit C2 服务器 (14.7GB 出站) - `91.92.248.77` — Tor 守卫中继 / 辅助 C2 通道 (2.4GB 出站) 两个服务账户已被攻破: - `svc-backup` — 用于 SMB 和 SSH 横向移动 - `svc-monitor` — 用于向 PostgreSQL 数据库进行身份验证 **红鲱鱼:** Nagios 监控服务器 `10.0.5.22` 连接到所有主机进行健康检查。其流量模式看起来与横向移动相似。它不是恶意的。 **预期的 Agent 行为:** 1. 调查 `marketing-laptop-02` 上的端点日志以查找凭证转储 2. 通过防火墙 + 端点日志跟踪攻击链 3. 隔离所有 4 台主机:`marketing-laptop-02`、`file-server-02`、`backup-server`、`db-server-main` 4. 阻止两个 C2 IP:`185.220.101.52`、`91.92.248.77` 5. 吊销两个服务账户:`svc-backup`、`svc-monitor` 6. 作为确认事件关闭 —— 在 **8 步内**完成以获得速度奖励 ## 奖励函数 每步奖励在整个轨迹中提供连续信号。**最终的 `episode_score`** 专门设置为 `close_alert` 时的评分器输出,并且总是在 `[0.0, 1.0]` 之间。 | 事件 | 步骤奖励 | |---|---| | 日志查询返回相关的匹配条目 | `+0.05` | | 日志查询未返回匹配项 | `−0.02` | | 命中威胁情报 (ThreatDB 中的已知指标) | `+0.05` | | 未命中威胁情报 (未知指标) | `+0.02` | | 阻止已知安全 / 红鲱鱼 IP | `−0.15` | | 在 `easy` 任务期间隔离任何主机 | `−0.15` | | 在 `easy` 任务期间吊销任何会话 | `−0.10` | | 所有其他补救行动 | `+0.05` | | `close_alert` (终止) | `= 评分器分数` | ## 评分器规范 所有评分器都是**确定性**的 —— 给定相同的动作序列,分数总是相同的。分数通过 `max(0.0, min(1.0, score))` 限制在 `[0.0, 1.0]` 之间。 ### 简单评分器 (`_grade_easy`) | 条件 | 分数 | |---|---| | 声明 `is_false_positive = True` | `+0.50` | | 关闭前至少采取 1 个调查步骤 | `+0.30` | | 没有隔离主机 且 没有吊销会话 | `+0.20` | | 每错误隔离的主机 (过度反应) | `−0.25` | | 每个错误吊销的会话 (过度反应) | `−0.15` | | **完美分数** | **1.00** | ### 中等评分器 (`_grade_medium`) | 条件 | 分数 | |---|---| | 声明 `is_false_positive = False` | `+0.10` | | 每个正确阻止的恶意 IP (最多 ×3) | `每个 +0.20` → 最多 `+0.60` | | 每个正确吊销的受损账户会话 (最多 ×2) | `每个 +0.15` → 最多 `+0.30` | | 每个错误 / 红鲱鱼 IP 被阻止 | `−0.10` | | 每个无辜用户会话被吊销 | `−0.15` | | **完美分数** | **1.00** | ### 困难评分器 (`_grade_hard`) | 条件 | 分数 | |---|---| | 声明 `is_false_positive = False` | `+0.05` | | 每个正确隔离的主机 (最多 ×4) | `每个 +0.125` → 最多 `+0.50` | | 每个正确阻止的 C2 IP (最多 ×2) | `每个 +0.10` → 最多 `+0.20` | | 每个正确吊销的服务账户 (最多 ×2) | `每个 +0.10` → 最多 `+0.20` | | 速度奖励:**在 ≤8 步内满足所有目标** | `+0.05` | | 每个错误隔离的主机 / 错误阻止的 IP / 错误吊销的账户 | `−0.10` | | **完美分数 (含速度奖励)** | **1.00** | | **完美分数 (不含速度奖励)** | **0.95** | ## 基线分数 以下分数是基于评分器设计和任务难度对 `gpt-4o-mini` 的**估计预期性能**。这些估计反映了: - 简单:Agent 可能会识别 SSO 域名并将其作为误报关闭,但可能会跳过一个调查步骤 - 中等:Agent 经常遗漏 3 个攻击者 IP 中的一个或混淆红鲱鱼 IP,并且可能无法追踪两个受损用户 - 困难:Agent 很少在步数预算内追踪完整的 4 跳链;通常只发现 1–2 台主机 | 任务 | 模型 | 估计分数 | 评分器最大值 | |---|---|---|---| | `easy` | gpt-4o-mini | ~0.80 | 1.00 | | `medium` | gpt-4o-mini | ~0.55 | 1.00 | | `hard` | gpt-4o-mini | ~0.25 | 1.00 | | **平均** | | **~0.53** | | 这些估计作为**难度校准参考**。前沿模型 (GPT-4o, Claude 3.5 Sonnet) 预计得分会显著更高,尤其是在中等任务上。困难任务旨在挑战即使是前沿模型 —— 4 跳链需要大多数 LLM 不会自然执行的系统性日志遍历。 ## 项目结构 ``` soc/ ├── inference.py # Baseline OpenAI agent script (root — required by spec) ├── client.py # Async + sync HTTP client for the environment ├── models.py # Pydantic models: SOCAction, SOCObservation, SOCState ├── openenv.yaml # Environment manifest (OpenEnv spec) ├── requirements.txt # Runtime dependencies ├── pyproject.toml # Installable package metadata ├── Dockerfile # Container definition for HF Spaces └── server/ ├── __init__.py ├── app.py # FastAPI server: /reset /step /state /health ├── environment.py # Episode management, reward shaping, graders └── mock_network.py # Deterministic corporate network simulation ``` ## 设置与使用 ### 前置条件 - Python 3.11+ - `pip install -r requirements.txt` ### 本地开发 ``` # 1. 安装 dependencies pip install -r requirements.txt # 2. 启动 environment server (terminal 1) uvicorn server.app:app --host 0.0.0.0 --port 7860 --reload # 3. 验证 server is healthy curl http://localhost:7860/health # → {"status": "healthy", "environment": "OpenEnv-SOC", "version": "2.0.0"} # 4. 运行 baseline inference agent (terminal 2) export HF_TOKEN="your_token_or_openai_key" export API_BASE_URL="https://api.openai.com/v1" export MODEL_NAME="gpt-4o-mini" python inference.py --env-url http://localhost:7860 ``` **预期输出:** ``` Connecting to API: https://api.openai.com/v1 (Model: gpt-4o-mini) ══════════════════════════════════════════════════════════════ TASK: EASY ══════════════════════════════════════════════════════════════ [STEP] Agent calls: query_threat_intel({"indicator": "login.secure-oauth.com"}) ← ThreatDB [login.secure-oauth.com]: SAFE — Verified internal SSO portal ... ← step_reward=0.050 done=False episode_score=0.000 [STEP] Agent calls: close_alert({"alert_id": "ALT-001", ...}) ← Alert ALT-001 closed as FALSE POSITIVE. Grader episode score: 0.800 📊 Final Episode Score: 0.800 ══════════════════════════════════════════════════════════════ BASELINE RESULTS (model=gpt-4o-mini) ══════════════════════════════════════════════════════════════ easy 0.800 |████████████████ | medium 0.550 |███████████ | hard 0.250 |█████ | avg 0.533 ══════════════════════════════════════════════════════════════ ``` ### Docker ``` # Build the image docker build -t openenv-soc . # Run the server docker run -p 7860:7860 openenv-soc # Run inference against the containerised server (from host) export HF_TOKEN="your_token" export API_BASE_URL="https://api.openai.com/v1" export MODEL_NAME="gpt-4o-mini" python inference.py --env-url http://localhost:7860 ``` ### Hugging Face Space 该环境部署为 Hugging Face Space。要直接使用它: ``` # Point the inference script at the HF Space URL export HF_TOKEN="your_hf_token" export API_BASE_URL="https://router.huggingface.co/v1" export MODEL_NAME="meta-llama/Llama-3.3-70B-Instruct" python inference.py --env-url https://Berlin2003-openenv-soc.hf.space ``` ### Python 客户端 API 使用 `SOCEnvClient` 在训练循环中进行编程访问: ``` import asyncio from client import SOCEnvClient from models import SOCAction async def run(): async with SOCEnvClient("http://localhost:7860", task="hard") as env: obs = await env.reset() print(f"Alert: {obs.open_alerts[0].description}") print(f"Severity: {obs.open_alerts[0].severity}") # Step 1: investigate the initial host result = await env.step(SOCAction( action_type="query_logs", source="endpoint", query="Invoke-Crypt" )) print(result.observation.last_action_result) print(f"Step reward: {result.reward}") # Step 2: check threat intel on the payload result = await env.step(SOCAction( action_type="query_threat_intel", indicator="Invoke-Crypt.ps1" )) print(result.observation.last_action_result) # ... continue investigation and remediation ... state = await env.state() print(f"Final episode score: {state.episode_score}") asyncio.run(run()) ``` **同步包装器** (不需要 async): ``` from client import SOCEnvClient from models import SOCAction with SOCEnvClient("http://localhost:7860", task="easy").sync() as env: obs = env.reset() result = env.step(SOCAction( action_type="query_threat_intel", indicator="login.secure-oauth.com" )) print(result.observation.last_action_result) ``` ## 环境变量 | 变量 | 必需 | 描述 | 示例 | |---|---|---|---| | `HF_TOKEN` | ✅ 是 | Hugging Face API 令牌(如果使用 `api.openai.com`,则为 OpenAI 密钥) | `hf_...` | | `API_BASE_URL` | ✅ 是 | LLM API 基础 URL (OpenAI 兼容) | `https://router.huggingface.co/v1` | | `MODEL_NAME` | ✅ 是 | 用于推理的模型标识符 | `gpt-4o-mini` | 如果缺少其中任何一个,推理脚本将以清晰的错误消息退出。 ## OpenEnv 规范合规性 | 要求 | 实现 | |---|---| | `typed Action model` | `SOCAction(Action)` —— 带有 `action_type` 判别器的 Pydantic 模型 | | `typed Observation model` | `SOCObservation(Observation)` —— 包括 `reward`、`done`、`metadata` | | `typed State model` | `SOCState(State)` —— 只读 episode 元数据 | | `reset()` endpoint | `POST /reset` —— 接受 `task` 和 `episode_id`,返回干净的 `SOCObservation` | | `step()` endpoint | `POST /step` —— 接受 `SOCAction`,返回 `{observation, reward, done, info}` | | `state()` endpoint | `GET /state` —— 返回当前的 `SOCState`,其中 `episode_score` ∈ `[0.0, 1.0]` | | `openenv.yaml` | 存在于 repo 根目录,包含 `name`、`version`、`description`、`entrypoint`、`tags`、`tasks` | | `health` endpoint | `GET /health` —— 返回 `{"status": "healthy", "environment": "OpenEnv-SOC", "version": "2.0.0"}` | | 3+ 个带评分器的任务 | `easy`、`medium`、`hard` —— 所有评分器返回 `[0.0, 1.0]` 之间的分数 | | 确定性评分 | 完全设定种子的 `MockCorporateNetwork` —— 没有随机性 | | 容器化部署 | repo 根目录中的 `Dockerfile` | ``` # Validate locally (requires openenv-core installed) openenv validate --config openenv.yaml --url http://localhost:7860 ``` *OpenEnv-SOC v2.0.0 — 为 OpenEnv 社区竞赛构建*
标签:AI安全, Apex, BurpSuite集成, Chat Copilot, Docker, HuggingFace, OpenEnv, Python, 仿真环境, 凭证填充, 勒索软件, 告警研判, 安全运营中心, 安全防御评估, 强化学习, 无后门, 机器学习, 横向移动, 沙箱, 编程规范, 网络安全, 网络安全运营, 网络映射, 网络调试, 自动化, 请求拦截, 逆向工具, 钓鱼检测, 隐私保护, 隔离环境