riparino/observe-logging-service
GitHub: riparino/observe-logging-service
通过 Azure Function 定时采集 Observe 平台审计日志并自动转发至 Microsoft Sentinel 自定义表,实现跨平台安全审计的集中化管理。
Stars: 1 | Forks: 0
# Observe 审计日志 → Microsoft Sentinel
一个 Azure Function worker,通过 Azure Monitor Logs Ingestion API 将 Observe Inc. 的审计日志摄取到 Microsoft Sentinel 自定义表(`ObserveAuditLogs_CL`)中。
## 架构
```
Observe Export Query API
→ Azure Function (timer-triggered, every 5 min)
→ Transform to Sentinel schema
→ Azure Monitor Logs Ingestion API (DCE + DCR)
→ Log Analytics custom table (ObserveAuditLogs_CL)
→ Microsoft Sentinel
```
## 前置条件
- Python 3.11+
- 启用了 Log Analytics 工作区和 Microsoft Sentinel 的 Azure 订阅
- 具有 API 访问权限的 Observe 租户(建议使用服务账户 token)
- 安装了 Azure Functions 扩展的 VS Code(用于部署)
## 引导(首次设置)
`bootstrap/` 文件夹包含一个自动化脚本,用于部署所有 Azure 基础设施。
### 1. 填写参数
```
cd bootstrap
vim params.env # Fill in ALL required values
```
| 参数 | 在哪里找到它 |
|-----------|-----------------|
| `AZURE_SUBSCRIPTION_ID` | `az account show --query id -o tsv` |
| `LOG_ANALYTICS_WORKSPACE_NAME` | Azure Portal → Log Analytics → 你的工作区 |
| `OBSERVE_CUSTOMER_ID` | Observe UI → Settings → Workspace(数字 ID) |
| `OBSERVE_API_TOKEN` | Observe UI → Settings → API Tokens → 创建服务账户 |
| `OBSERVE_AUDIT_DATASET_ID` | Observe UI → Datasets → Audit Events(URL 中的 ID) |
| `ADMIN_ALLOWED_IPS` | 你用于访问 Key Vault 的办公室/VPN 出口 IP |
### 2. 运行引导
```
./bootstrap.sh
```
这将创建:
| 资源 | 用途 |
|----------|---------|
| 资源组 | 所有资源的容器 |
| 存储账户 | 检查点、隔离 blob、Functions 运行时 |
| Key Vault | Observe API 密钥(防火墙锁定,`defaultAction: Deny`) |
| App Insights | 监控和日志记录 |
| Function App | 定时器触发的摄取 worker(Linux,Python 3.11) |
| 数据收集端点 | 日志摄取 API 端点 |
| 数据收集规则 | 将数据路由到自定义表 |
| 自定义表 | Log Analytics 中的 `ObserveAuditLogs_CL` |
| RBAC 角色 | 用于上述所有资源的托管标识权限 |
### 3. 部署函数代码
选择一种方式:
- **VS Code:** `Ctrl+Shift+P` → *Azure Functions: Deploy to Function App* → 选择你的 function app
- **Azure DevOps:** 针对你的 function app 执行 Zip 部署任务
- **CLI:** `func azure functionapp publish --python`
### Azure Policy 警告
企业订阅通常具有可能阻止操作的策略:
| 被阻止的操作 | 症状 | 解决方案 |
|---------------|---------|------------|
| Key Vault IP 规则更改 | 添加 IP 时出现 `RequestDisallowedByPolicy` | 请求豁免 `Microsoft.KeyVault/vaults/write` |
| 没有私有端点的存储 | 在存储账户处部署失败 | 添加私有端点或获取豁免 |
| 缺少必需标签 | 部署失败 | 在 Bicep 参数中添加标签或获取豁免 |
引导脚本会打印警告并在可能的情况下继续执行。你可能需要在获取豁免后手动运行特定步骤。
### 重新运行引导
该脚本是幂等的 —— 可以安全地重新运行。它将跳过现有资源,使用当前值覆盖 KV 密钥,并重新应用所有 Function App 设置。
### 卸载
```
az group delete --name --yes --no-wait
# 自定义表位于 Sentinel 资源组中 — 如需可单独删除:
az monitor log-analytics workspace table delete \
--resource-group \
--workspace-name \
--name ObserveAuditLogs_CL
```
## 安全性
- Key Vault 部署时设置为 `defaultAction: Deny` —— 默认无公共访问权限
- 存储账户部署时设置为 `allowBlobPublicAccess: false`
- Function App 使用系统分配的托管标识(代码中不存储凭证)
- Observe 密钥存储在 Key Vault 中,通过 `@Microsoft.KeyVault(...)` 应用设置引用进行访问
- 只有 Function App 的出站 IP 被加入 Key Vault 防火墙的白名单
- Key Vault 启用了清除保护
## 本地开发
```
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
pip install pytest
cp local.settings.sample.json local.settings.json
# 使用您的值填充 local.settings.json
```
### 运行测试
```
pytest tests/ -v
```
### Dry-Run(查询 Observe 但不上传)
```
python scripts/run_once.py \
--start 2026-05-07T17:00:00Z \
--end 2026-05-07T17:30:00Z \
--dry-run
```
### 上传至 Sentinel
```
python scripts/run_once.py \
--start 2026-05-07T17:00:00Z \
--end 2026-05-07T17:30:00Z \
--upload
```
### 使用 Azure Functions Core Tools 在本地运行
```
func start
```
## 部署(代码更新)
基础设施设置完成后,部署代码更改:
- **VS Code:** `Ctrl+Shift+P` → *Azure Functions: Deploy to Function App* → 选择你的 function app
- **Azure DevOps:** Zip 部署任务(`.funcignore` 控制排除的文件)
- **CLI:** `func azure functionapp publish --python`
## 配置
所有配置均通过环境变量完成。在 Azure 中,密钥使用 Key Vault 引用。
有关完整的变量列表,请参见 `local.settings.sample.json`。
### 核心设置
| 变量 | 描述 |
|----------|-------------|
| `OBSERVE_CUSTOMER_ID` | Observe 客户/租户 ID |
| `OBSERVE_API_TOKEN` | Observe API token(在 Azure 中为 Key Vault 引用) |
| `OBSERVE_AUDIT_DATASET_ID` | Observe 审计事件数据集 ID |
| `LOGS_INGESTION_ENDPOINT` | DCE 日志摄取 URL |
| `DCR_IMMUTABLE_ID` | 数据收集规则不可变 ID |
| `STORAGE_TABLE_ENDPOINT` | Azure Table Storage 端点(检查点) |
| `STORAGE_BLOB_ENDPOINT` | Azure Blob Storage 端点(隔离) |
## 基础设施 (Bicep)
`infra/` 中的模板 —— 由引导脚本自动部署。
| 文件 | 部署内容 |
|------|-----------------|
| `main.bicep` | 存储账户、Key Vault、App Insights、Function App |
| `dcr.bicep` | 数据收集端点 + 数据收集规则 |
| `role-assignments.bicep` | RBAC:KV Secrets User、Monitoring Metrics Publisher、Storage Contributor |
| `table.kql` | 自定义表的参考 schema |
所有模板均已参数化 —— 资源名称是通过传入参数设定的,而非硬编码。
## Sentinel 表结构
`ObserveAuditLogs_CL` 表:
| 列 | 类型 | 描述 |
|--------|------|-------------|
| TimeGenerated | datetime | 事件时间戳(来自 Observe) |
| ObserveTime | datetime | Observe 摄取时间 |
| Operation | string | 操作(登录、读取、创建、更新、删除、llm_invocation) |
| ObjectKind | string | 目标类型(dataset、monitor、dashboard 等) |
| ObjectId | string | 目标资源 ID |
| ObjectName | string | 目标资源显示名称 |
| UserId | string | Observe 用户 ID |
| UserEmail | string | 用户邮箱地址 |
| UserDisplayName | string | 用户显示名称(如可用) |
| LoginType | string | 身份验证方法(saml、password、api_key) |
| LoginStatus | string | 登录结果(成功、失败) |
| Success | boolean | 操作是否成功 |
| SourceIp | string | 客户端 IP 地址 |
| UserAgent | string | HTTP User-Agent |
| Referer | string | HTTP Referer |
| EventUid | string | 确定性事件 UID(用于去重) |
| ObserveCustomerId | string | Observe 租户 ID |
| ObserveTenant | string | Observe 租户显示名称 |
| IngestedAt | datetime | 该记录被摄取到 Sentinel 的时间 |
| RawFields | dynamic | 来自 Observe 的完整原始 FIELDS 对象 |
| RawExtra | dynamic | 来自 Observe 的完整原始 EXTRA 对象 |
| RawEvent | dynamic | 用于取证分析的完整原始行 |
## 项目结构
```
observe-logger-service/
├── function_app.py # Azure Function entry point (timer trigger)
├── host.json # Function host configuration
├── requirements.txt # Python dependencies
├── .funcignore # Files excluded from deployment
├── local.settings.sample.json # Template for local dev settings
│
├── observe_audit_ingestor/ # Core application code
│ ├── config.py # Environment variable loader
│ ├── observe_client.py # Observe Export Query API client
│ ├── transform.py # Row transformation (Observe → Sentinel schema)
│ ├── sentinel_client.py # Azure Monitor Logs Ingestion API client
│ ├── checkpoint.py # Azure Table Storage checkpoint
│ ├── quarantine.py # Blob storage for malformed rows
│ ├── models.py # Data models
│ └── log.py # Logging utilities
│
├── bootstrap/ # First-time infrastructure setup
│ ├── params.env # Parameters to fill in
│ └── bootstrap.sh # Automated setup script
│
├── infra/ # Bicep IaC templates
│ ├── main.bicep # Storage, Key Vault, Function App
│ ├── dcr.bicep # Data Collection Endpoint + Rule
│ ├── role-assignments.bicep # RBAC for managed identity
│ └── table.kql # Custom table schema reference
│
├── scripts/ # Development utilities
│ ├── run_once.py # One-shot query + optional upload
│ └── discover_observe_dataset.py # Dataset discovery helper
│
└── tests/ # Unit tests
├── test_transform.py
├── test_observe_client.py
└── test_sentinel_client.py
```
标签:API集成, Azure Function, Azure Monitor, DCE, DCR, Log Analytics, Microsoft Sentinel, Observe Inc, PB级数据处理, Python, 云端安全, 企业安全, 可观测性, 命令控制, 安全运维, 定时触发器, 审计日志, 数据接入, 数据采集, 无后门, 日志收集, 日志转换, 构建工具, 网络资产管理