brad-eck/ocsf-pipeline
GitHub: brad-eck/ocsf-pipeline
一个Python安全日志管道,将原始日志标准化为OCSF格式并运行YAML检测规则,实现Detection-as-code。
Stars: 0 | Forks: 0
# ocsf-pipeline
一个 Python pipeline,用于摄取安全日志事件,将其标准化为
[Open Cybersecurity Schema Framework (OCSF)](https://schema.ocsf.io) v1.1.0,
对其运行检测规则,并将标记的事件路由到输出
目标。
作为一个作品集项目构建,展示了在安全数据工程、
OCSF 标准化和 detection-as-code 方面的技能 —— 这些是可组合 SIEM
平台的核心领域。
**支持的来源:** AWS CloudTrail (Phase 1)。可以通过在 `normalizer/sources/` 下实现标准化器并在 `normalizer/mapper.py` 中注册来添加其他来源。
## 目录
- [概述](#overview)
- [架构](#architecture)
- [项目结构](#project-structure)
- [OCSF 映射](#ocsf-mapping)
- [快速开始](#quick-start)
- [用法](#usage)
- [示例日志事件](#sample-log-events)
- [检测规则](#detection-rules)
- [开发阶段](#development-phases)
## 概述
此 pipeline 将原始安全日志事件转换为 OCSF 标准化事件,
使其模式一致,并可与任何兼容 OCSF 的 SIEM 或
分析平台互操作。
**为什么选择 OCSF?**
OCSF 是一个开放标准(由 AWS, Splunk, CrowdStrike 等支持),
定义了安全遥测的通用模式。标准化为 OCSF 意味着
检测规则、查询和仪表板只需编写一次,即可应用于
来自任何来源的数据。
**当前状态:Phase 2 已完成。**
针对 AWS CloudTrail 的摄取、OCSF 标准化和基于 YAML 的检测已完全实现。路由 (Phase 3) 已存根并准备构建。
## 架构
```
Log Events (local JSON)
│
▼
┌───────────────┐
│ Ingestion │ ingestion/reader.py
│ reader │ Reads .json files or directories, yields raw event dicts
└───────┬───────┘
│
▼
┌───────────────┐
│ Normalizer │ normalizer/mapper.py (dispatcher)
│ mapper │ Routes events to the source-specific OCSF normalizer,
│ │ builds fully-populated OCSF event dicts
└───────┬───────┘
│ normalizer/sources/cloudtrail.py (CloudTrail → OCSF)
│ normalizer/sources/.py (future sources)
▼
┌───────────────┐
│ Detection │ detection/engine.py
│ Engine │ Evaluates OCSF events against YAML-defined rules,
│ │ annotates matches with rule metadata and MITRE tags
└───────┬───────┘
│
▼
┌───────────────┐
│ Router │ router/output.py [Phase 3 — stub]
│ output │ Sends events to stdout, NDJSON file, or webhook
└───────────────┘
```
## 项目结构
```
ocsf-pipeline/
├── ingestion/
│ ├── __init__.py
│ └── reader.py # File and directory ingestion
├── normalizer/
│ ├── __init__.py
│ ├── mapper.py # Source-type dispatcher
│ └── sources/
│ ├── __init__.py
│ └── cloudtrail.py # CloudTrail → OCSF field mapping
├── detection/
│ ├── __init__.py
│ ├── engine.py # Rule evaluation engine
│ └── rules/
│ └── sample_rules.yaml # YAML detection rules
├── router/
│ ├── __init__.py
│ └── output.py # Output destinations (Phase 3)
├── sample_logs/
│ └── sample_cloudtrail.json # 5 synthetic CloudTrail events
├── main.py # Pipeline entry point
├── requirements.txt
└── README.md
```
## OCSF 映射
### CloudTrail 源
事件根据 CloudTrail `eventName` 和 `eventSource` 路由到三个 OCSF 类之一:
| CloudTrail event type | OCSF class | class_uid | category_uid |
|---|---|---|---|
| `ConsoleLogin` | Authentication | 3002 | 3 — Identity & Access Management |
| IAM API calls (`iam.amazonaws.com`) | IAM Change | 3006 | 3 — Identity & Access Management |
| All other API calls | API Activity | 6003 | 6 — Application Activity |
### 字段映射参考
#### 通用字段(所有类)
| OCSF field | CloudTrail source |
|---|---|
| `time` | `eventTime` (converted to epoch milliseconds) |
| `metadata.uid` | `eventID` |
| `metadata.original_time` | `eventTime` |
| `metadata.product.name` | `"AWS CloudTrail"` (static) |
| `metadata.product.feature.name` | `eventSource` |
| `actor.user.name` | `userIdentity.userName` → `arn` → `principalId` |
| `actor.user.uid` | `userIdentity.principalId` |
| `actor.user.type_id` | `userIdentity.type` (see table below) |
| `actor.user.account.uid` | `userIdentity.accountId` |
| `src_endpoint.ip` | `sourceIPAddress` |
| `cloud.provider` | `"AWS"` (static) |
| `cloud.region` | `awsRegion` |
| `cloud.account.uid` | `userIdentity.accountId` |
| `status_id` / `status` | `errorCode` present → Failure(2), else Success(1) |
| `raw_data` | Full original event as a JSON string |
#### `actor.user.type_id` 映射
| CloudTrail `userIdentity.type` | OCSF `type_id` | OCSF `type` |
|---|---|---|
| `Root` | 2 | Admin |
| `IAMUser` | 1 | User |
| `AssumedRole` | 99 | Other |
| `FederatedUser` | 99 | Other |
| `AWSService` | 3 | System |
| `AWSAccount` | 3 | System |
#### `activity_id` 映射 (API Activity & IAM Change)
衍生自 `eventName` 的前缀:
| `eventName` prefix | `activity_id` | `activity_name` |
|---|---|---|
| `Create` | 1 | Create |
| `Get`, `Describe`, `List` | 2 | Read |
| `Put`, `Update`, `Modify`, `Enable`, `Disable`, `Stop`, `Start` | 3 | Update |
| `Delete`, `Remove`, `Detach`, `Deregister` | 4 | Delete |
| _(no match)_ | 99 | Other |
#### 严重性分配
| Condition | `severity_id` | `severity` |
|---|---|---|
| `errorCode` is `AccessDenied` or `UnauthorizedOperation` | 3 | Medium |
| `eventName` in high-impact set¹ | 3 | Medium |
| All other events | 1 | Informational |
¹ 高影响事件:`StopLogging`, `DeleteTrail`, `DeleteUser`, `DeleteRole`,
`DeleteAccessKey`, `AttachUserPolicy`, `AttachRolePolicy`
#### Authentication 特定字段 (class_uid 3002)
| OCSF field | CloudTrail source |
|---|---|
| `is_mfa` | `additionalEventData.MFAUsed == "Yes"` |
| `status` | `responseElements.ConsoleLogin` |
| `http_request.user_agent` | `userAgent` |
| `http_request.url` | `additionalEventData.LoginTo` |
| `user` (top-level) | `userIdentity` (duplicated per OCSF Authentication spec) |
#### API 字段 (class_uid 3006 and 6003)
| OCSF field | CloudTrail source |
|---|---|
| `api.service.name` | `eventSource` |
| `api.operation` | `eventName` |
| `api.version` | `eventVersion` |
| `api.request.uid` | `requestID` |
| `api.request.data` | `requestParameters` |
| `api.response.code` | `200` (success) or `400` (error) |
| `api.response.error` | `errorCode` |
| `api.response.error_message` | `errorMessage` |
| `api.response.data` | `responseElements` |
## 快速开始
**依赖:** Python 3.10+ 和 `pyyaml` (`pip install pyyaml`)。
**克隆并运行:**
```
git clone
cd ocsf-pipeline
pip install -r requirements.txt
python3 main.py
```
这将读取 `sample_logs/sample_cloudtrail.json`,将所有 5 个事件标准化为
OCSF,评估检测规则,并将每个事件作为 JSON 打印到 stdout。
触发规则的事件包含一个带有规则元数据的 `detections` 字段。
## 用法
### 针对包含的示例日志运行
```
python3 main.py
```
### 针对特定文件运行
```
python3 main.py path/to/events.json
```
### 针对日志文件目录运行
```
python3 main.py path/to/logs/
```
目录中的所有 `.json` 文件按文件名排序顺序处理。
### 指定日志源类型
```
python3 main.py path/to/events.json --source-type cloudtrail
```
`--source-type` 默认为 `cloudtrail`。随着新来源的添加,其
注册名称将在此处成为有效值。
### 使用自定义规则文件
```
python3 main.py --rules path/to/rules.yaml
```
`--rules` 默认为 `detection/rules/sample_rules.yaml`。
### 使用 `jq` 过滤输出
仅美化打印 Authentication 事件:
```
python3 main.py | jq 'select(.class_uid == 3002)'
```
仅显示未使用 MFA 的事件:
```
python3 main.py | jq 'select(.is_mfa == false)'
```
提取所有 IAM 更改的操作者和操作:
```
python3 main.py | jq 'select(.class_uid == 3006) | {actor: .actor.user.name, op: .api.operation}'
```
显示所有 Medium 严重性事件:
```
python3 main.py | jq 'select(.severity == "Medium")'
```
仅显示触发了检测规则的事件:
```
python3 main.py | jq 'select(.detections | length > 0)'
```
显示每个告警的规则 ID、严重性和操作者:
```
python3 main.py | jq 'select(.detections | length > 0) | {actor: .actor.user.name, detections: .detections}'
```
将标准化输出保存到文件:
```
python3 main.py > normalized_events.ndjson
```
### 将标准化器用作库
```
from ingestion.reader import read_events
from normalizer.mapper import normalize
for raw_event in read_events("sample_logs/sample_cloudtrail.json", source_type="cloudtrail"):
ocsf_event = normalize(raw_event, source_type="cloudtrail")
print(ocsf_event["class_name"], ocsf_event["actor"]["user"]["name"])
```
## 示例日志事件
`sample_logs/sample_cloudtrail.json` 包含五个合成 CloudTrail 事件,
涵盖常见的高信号场景:
| # | `eventName` | Identity | Notable detail | → OCSF class |
|---|---|---|---|---|
| 1 | `ConsoleLogin` | Root | MFA used | Authentication (3002) |
| 2 | `AttachUserPolicy` | IAMUser `devops-admin` | Attaches `AdministratorAccess` to `junior-dev` | IAM Change (3006) |
| 3 | `StopLogging` | IAMUser `ops-engineer` | Disables the main trail | API Activity (6003) |
| 4 | `ConsoleLogin` | IAMUser `contractor-jsmith` | **No MFA** | Authentication (3002) |
| 5 | `CreateAccessKey` | IAMUser `ops-engineer` | Creates key for `ci-deploy-user` | IAM Change (3006) |
所有事件使用 RFC 5737 / RFC 3849 保留 IP 范围 (`203.0.113.0/24`,
`198.51.100.0/24`) 和占位符 AWS 账户/密钥 ID —— 对于公开
仓库是安全的。
## 检测规则
检测规则在 YAML 中定义并存储在 `detection/rules/` 中。
引擎评估标准化的 OCSF 字段,因此规则是与源无关的 ——
无论事件来自 CloudTrail、CSPM
工具还是任何其他 OCSF 生产者,同一规则都有效。
**规则模式:**
```
- id: CT-001
name: Root Account Console Login
description: Detects any AWS Console login using the root account.
severity: High
ocsf_class: [3002] # Only evaluated against Authentication events
conditions:
- field: actor.user.type_id
op: eq
value: 2 # Admin (Root)
- field: activity_id
op: eq
value: 1 # Logon
tags: [T1078.004, initial-access]
```
规则使用点表示法字段路径指向标准化的 OCSF 事件。规则内的
条件为 AND 逻辑;多个规则独立评估 (OR)。
省略 `ocsf_class` 会将规则应用于所有事件类。
**支持的操作符:** `eq`, `neq`, `contains`, `startswith`, `in`, `exists`
**匹配输出:** 触发的规则作为 `detections`
列表附加到事件,每个条目包含 `rule_id`, `rule_name`, `severity`, 和 `tags`。
`detection/rules/sample_rules.yaml` 包含三个活动规则:
- **CT-001** (High) — Root account console login
- **CT-002** (Medium) — Console login without MFA
- **CT-003** (High) — CloudTrail logging disabled
## 添加新来源
1. 创建 `normalizer/sources/.py`,包含一个 `normalize(event: dict) -> dict` 函数。
2. 在 `normalizer/mapper.py` 中注册它:
from normalizer.sources. import normalize as _normalize_
_SOURCE_NORMALIZERS = {
"cloudtrail": _normalize_cloudtrail,
"": _normalize_,
}
3. 如果来源使用非标准 JSON 容器格式,在 `ingestion/reader.py` 中添加一个 `_unwrap` 分支。
4. 使用 `--source-type ` 运行。
## 开发阶段
### 阶段 1 — Ingestion & Normalization ✅
- Source-agnostic ingestion with pluggable source-type unwrapping
- OCSF normalization for Authentication (3002), IAM Change (3006), API Activity (6003) via CloudTrail source
- Synthetic sample logs covering 5 high-signal event types
- Severity and status mapping
### 阶段 2 — Detection Engine ✅
- YAML rule loader (`pyyaml`)
- OCSF field path evaluation with dot-notation
- Supported operators: `eq`, `neq`, `contains`, `startswith`, `in`, `exists`
- Rule match annotations attached to output events as `detections`
- MITRE ATT&CK tagging
### 阶段 3 — Routing & Infrastructure(计划中)
- Output destinations: stdout (pretty/NDJSON), local file append, HTTP webhook
- Additional source ingestion support (`boto3` for S3, etc.)
- Docker containerization
- Optional: scheduled polling, SQS trigger
标签:AMSI绕过, AWS CloudTrail, OCSF, Python, YAML规则, 威胁检测, 安全遥测, 数据工程, 数据管道, 无后门, 日志处理, 日志归一化, 检测即代码, 组合式SIEM, 网络安全, 软件工程, 逆向工具, 隐私保护