brad-eck/ocsf-pipeline

GitHub: brad-eck/ocsf-pipeline

一个Python安全日志管道,将原始日志标准化为OCSF格式并运行YAML检测规则,实现Detection-as-code。

Stars: 0 | Forks: 0

# ocsf-pipeline 一个 Python pipeline,用于摄取安全日志事件,将其标准化为 [Open Cybersecurity Schema Framework (OCSF)](https://schema.ocsf.io) v1.1.0, 对其运行检测规则,并将标记的事件路由到输出 目标。 作为一个作品集项目构建,展示了在安全数据工程、 OCSF 标准化和 detection-as-code 方面的技能 —— 这些是可组合 SIEM 平台的核心领域。 **支持的来源:** AWS CloudTrail (Phase 1)。可以通过在 `normalizer/sources/` 下实现标准化器并在 `normalizer/mapper.py` 中注册来添加其他来源。 ## 目录 - [概述](#overview) - [架构](#architecture) - [项目结构](#project-structure) - [OCSF 映射](#ocsf-mapping) - [快速开始](#quick-start) - [用法](#usage) - [示例日志事件](#sample-log-events) - [检测规则](#detection-rules) - [开发阶段](#development-phases) ## 概述 此 pipeline 将原始安全日志事件转换为 OCSF 标准化事件, 使其模式一致,并可与任何兼容 OCSF 的 SIEM 或 分析平台互操作。 **为什么选择 OCSF?** OCSF 是一个开放标准(由 AWS, Splunk, CrowdStrike 等支持), 定义了安全遥测的通用模式。标准化为 OCSF 意味着 检测规则、查询和仪表板只需编写一次,即可应用于 来自任何来源的数据。 **当前状态:Phase 2 已完成。** 针对 AWS CloudTrail 的摄取、OCSF 标准化和基于 YAML 的检测已完全实现。路由 (Phase 3) 已存根并准备构建。 ## 架构 ``` Log Events (local JSON) │ ▼ ┌───────────────┐ │ Ingestion │ ingestion/reader.py │ reader │ Reads .json files or directories, yields raw event dicts └───────┬───────┘ │ ▼ ┌───────────────┐ │ Normalizer │ normalizer/mapper.py (dispatcher) │ mapper │ Routes events to the source-specific OCSF normalizer, │ │ builds fully-populated OCSF event dicts └───────┬───────┘ │ normalizer/sources/cloudtrail.py (CloudTrail → OCSF) │ normalizer/sources/.py (future sources) ▼ ┌───────────────┐ │ Detection │ detection/engine.py │ Engine │ Evaluates OCSF events against YAML-defined rules, │ │ annotates matches with rule metadata and MITRE tags └───────┬───────┘ │ ▼ ┌───────────────┐ │ Router │ router/output.py [Phase 3 — stub] │ output │ Sends events to stdout, NDJSON file, or webhook └───────────────┘ ``` ## 项目结构 ``` ocsf-pipeline/ ├── ingestion/ │ ├── __init__.py │ └── reader.py # File and directory ingestion ├── normalizer/ │ ├── __init__.py │ ├── mapper.py # Source-type dispatcher │ └── sources/ │ ├── __init__.py │ └── cloudtrail.py # CloudTrail → OCSF field mapping ├── detection/ │ ├── __init__.py │ ├── engine.py # Rule evaluation engine │ └── rules/ │ └── sample_rules.yaml # YAML detection rules ├── router/ │ ├── __init__.py │ └── output.py # Output destinations (Phase 3) ├── sample_logs/ │ └── sample_cloudtrail.json # 5 synthetic CloudTrail events ├── main.py # Pipeline entry point ├── requirements.txt └── README.md ``` ## OCSF 映射 ### CloudTrail 源 事件根据 CloudTrail `eventName` 和 `eventSource` 路由到三个 OCSF 类之一: | CloudTrail event type | OCSF class | class_uid | category_uid | |---|---|---|---| | `ConsoleLogin` | Authentication | 3002 | 3 — Identity & Access Management | | IAM API calls (`iam.amazonaws.com`) | IAM Change | 3006 | 3 — Identity & Access Management | | All other API calls | API Activity | 6003 | 6 — Application Activity | ### 字段映射参考 #### 通用字段(所有类) | OCSF field | CloudTrail source | |---|---| | `time` | `eventTime` (converted to epoch milliseconds) | | `metadata.uid` | `eventID` | | `metadata.original_time` | `eventTime` | | `metadata.product.name` | `"AWS CloudTrail"` (static) | | `metadata.product.feature.name` | `eventSource` | | `actor.user.name` | `userIdentity.userName` → `arn` → `principalId` | | `actor.user.uid` | `userIdentity.principalId` | | `actor.user.type_id` | `userIdentity.type` (see table below) | | `actor.user.account.uid` | `userIdentity.accountId` | | `src_endpoint.ip` | `sourceIPAddress` | | `cloud.provider` | `"AWS"` (static) | | `cloud.region` | `awsRegion` | | `cloud.account.uid` | `userIdentity.accountId` | | `status_id` / `status` | `errorCode` present → Failure(2), else Success(1) | | `raw_data` | Full original event as a JSON string | #### `actor.user.type_id` 映射 | CloudTrail `userIdentity.type` | OCSF `type_id` | OCSF `type` | |---|---|---| | `Root` | 2 | Admin | | `IAMUser` | 1 | User | | `AssumedRole` | 99 | Other | | `FederatedUser` | 99 | Other | | `AWSService` | 3 | System | | `AWSAccount` | 3 | System | #### `activity_id` 映射 (API Activity & IAM Change) 衍生自 `eventName` 的前缀: | `eventName` prefix | `activity_id` | `activity_name` | |---|---|---| | `Create` | 1 | Create | | `Get`, `Describe`, `List` | 2 | Read | | `Put`, `Update`, `Modify`, `Enable`, `Disable`, `Stop`, `Start` | 3 | Update | | `Delete`, `Remove`, `Detach`, `Deregister` | 4 | Delete | | _(no match)_ | 99 | Other | #### 严重性分配 | Condition | `severity_id` | `severity` | |---|---|---| | `errorCode` is `AccessDenied` or `UnauthorizedOperation` | 3 | Medium | | `eventName` in high-impact set¹ | 3 | Medium | | All other events | 1 | Informational | ¹ 高影响事件:`StopLogging`, `DeleteTrail`, `DeleteUser`, `DeleteRole`, `DeleteAccessKey`, `AttachUserPolicy`, `AttachRolePolicy` #### Authentication 特定字段 (class_uid 3002) | OCSF field | CloudTrail source | |---|---| | `is_mfa` | `additionalEventData.MFAUsed == "Yes"` | | `status` | `responseElements.ConsoleLogin` | | `http_request.user_agent` | `userAgent` | | `http_request.url` | `additionalEventData.LoginTo` | | `user` (top-level) | `userIdentity` (duplicated per OCSF Authentication spec) | #### API 字段 (class_uid 3006 and 6003) | OCSF field | CloudTrail source | |---|---| | `api.service.name` | `eventSource` | | `api.operation` | `eventName` | | `api.version` | `eventVersion` | | `api.request.uid` | `requestID` | | `api.request.data` | `requestParameters` | | `api.response.code` | `200` (success) or `400` (error) | | `api.response.error` | `errorCode` | | `api.response.error_message` | `errorMessage` | | `api.response.data` | `responseElements` | ## 快速开始 **依赖:** Python 3.10+ 和 `pyyaml` (`pip install pyyaml`)。 **克隆并运行:** ``` git clone cd ocsf-pipeline pip install -r requirements.txt python3 main.py ``` 这将读取 `sample_logs/sample_cloudtrail.json`,将所有 5 个事件标准化为 OCSF,评估检测规则,并将每个事件作为 JSON 打印到 stdout。 触发规则的事件包含一个带有规则元数据的 `detections` 字段。 ## 用法 ### 针对包含的示例日志运行 ``` python3 main.py ``` ### 针对特定文件运行 ``` python3 main.py path/to/events.json ``` ### 针对日志文件目录运行 ``` python3 main.py path/to/logs/ ``` 目录中的所有 `.json` 文件按文件名排序顺序处理。 ### 指定日志源类型 ``` python3 main.py path/to/events.json --source-type cloudtrail ``` `--source-type` 默认为 `cloudtrail`。随着新来源的添加,其 注册名称将在此处成为有效值。 ### 使用自定义规则文件 ``` python3 main.py --rules path/to/rules.yaml ``` `--rules` 默认为 `detection/rules/sample_rules.yaml`。 ### 使用 `jq` 过滤输出 仅美化打印 Authentication 事件: ``` python3 main.py | jq 'select(.class_uid == 3002)' ``` 仅显示未使用 MFA 的事件: ``` python3 main.py | jq 'select(.is_mfa == false)' ``` 提取所有 IAM 更改的操作者和操作: ``` python3 main.py | jq 'select(.class_uid == 3006) | {actor: .actor.user.name, op: .api.operation}' ``` 显示所有 Medium 严重性事件: ``` python3 main.py | jq 'select(.severity == "Medium")' ``` 仅显示触发了检测规则的事件: ``` python3 main.py | jq 'select(.detections | length > 0)' ``` 显示每个告警的规则 ID、严重性和操作者: ``` python3 main.py | jq 'select(.detections | length > 0) | {actor: .actor.user.name, detections: .detections}' ``` 将标准化输出保存到文件: ``` python3 main.py > normalized_events.ndjson ``` ### 将标准化器用作库 ``` from ingestion.reader import read_events from normalizer.mapper import normalize for raw_event in read_events("sample_logs/sample_cloudtrail.json", source_type="cloudtrail"): ocsf_event = normalize(raw_event, source_type="cloudtrail") print(ocsf_event["class_name"], ocsf_event["actor"]["user"]["name"]) ``` ## 示例日志事件 `sample_logs/sample_cloudtrail.json` 包含五个合成 CloudTrail 事件, 涵盖常见的高信号场景: | # | `eventName` | Identity | Notable detail | → OCSF class | |---|---|---|---|---| | 1 | `ConsoleLogin` | Root | MFA used | Authentication (3002) | | 2 | `AttachUserPolicy` | IAMUser `devops-admin` | Attaches `AdministratorAccess` to `junior-dev` | IAM Change (3006) | | 3 | `StopLogging` | IAMUser `ops-engineer` | Disables the main trail | API Activity (6003) | | 4 | `ConsoleLogin` | IAMUser `contractor-jsmith` | **No MFA** | Authentication (3002) | | 5 | `CreateAccessKey` | IAMUser `ops-engineer` | Creates key for `ci-deploy-user` | IAM Change (3006) | 所有事件使用 RFC 5737 / RFC 3849 保留 IP 范围 (`203.0.113.0/24`, `198.51.100.0/24`) 和占位符 AWS 账户/密钥 ID —— 对于公开 仓库是安全的。 ## 检测规则 检测规则在 YAML 中定义并存储在 `detection/rules/` 中。 引擎评估标准化的 OCSF 字段,因此规则是与源无关的 —— 无论事件来自 CloudTrail、CSPM 工具还是任何其他 OCSF 生产者,同一规则都有效。 **规则模式:** ``` - id: CT-001 name: Root Account Console Login description: Detects any AWS Console login using the root account. severity: High ocsf_class: [3002] # Only evaluated against Authentication events conditions: - field: actor.user.type_id op: eq value: 2 # Admin (Root) - field: activity_id op: eq value: 1 # Logon tags: [T1078.004, initial-access] ``` 规则使用点表示法字段路径指向标准化的 OCSF 事件。规则内的 条件为 AND 逻辑;多个规则独立评估 (OR)。 省略 `ocsf_class` 会将规则应用于所有事件类。 **支持的操作符:** `eq`, `neq`, `contains`, `startswith`, `in`, `exists` **匹配输出:** 触发的规则作为 `detections` 列表附加到事件,每个条目包含 `rule_id`, `rule_name`, `severity`, 和 `tags`。 `detection/rules/sample_rules.yaml` 包含三个活动规则: - **CT-001** (High) — Root account console login - **CT-002** (Medium) — Console login without MFA - **CT-003** (High) — CloudTrail logging disabled ## 添加新来源 1. 创建 `normalizer/sources/.py`,包含一个 `normalize(event: dict) -> dict` 函数。 2. 在 `normalizer/mapper.py` 中注册它: from normalizer.sources. import normalize as _normalize_ _SOURCE_NORMALIZERS = { "cloudtrail": _normalize_cloudtrail, "": _normalize_, } 3. 如果来源使用非标准 JSON 容器格式,在 `ingestion/reader.py` 中添加一个 `_unwrap` 分支。 4. 使用 `--source-type ` 运行。 ## 开发阶段 ### 阶段 1 — Ingestion & Normalization ✅ - Source-agnostic ingestion with pluggable source-type unwrapping - OCSF normalization for Authentication (3002), IAM Change (3006), API Activity (6003) via CloudTrail source - Synthetic sample logs covering 5 high-signal event types - Severity and status mapping ### 阶段 2 — Detection Engine ✅ - YAML rule loader (`pyyaml`) - OCSF field path evaluation with dot-notation - Supported operators: `eq`, `neq`, `contains`, `startswith`, `in`, `exists` - Rule match annotations attached to output events as `detections` - MITRE ATT&CK tagging ### 阶段 3 — Routing & Infrastructure(计划中) - Output destinations: stdout (pretty/NDJSON), local file append, HTTP webhook - Additional source ingestion support (`boto3` for S3, etc.) - Docker containerization - Optional: scheduled polling, SQS trigger
标签:AMSI绕过, AWS CloudTrail, OCSF, Python, YAML规则, 威胁检测, 安全遥测, 数据工程, 数据管道, 无后门, 日志处理, 日志归一化, 检测即代码, 组合式SIEM, 网络安全, 软件工程, 逆向工具, 隐私保护