riparino/observe-logging-service

GitHub: riparino/observe-logging-service

通过 Azure Function 定时采集 Observe 平台审计日志并自动转发至 Microsoft Sentinel 自定义表,实现跨平台安全审计的集中化管理。

Stars: 1 | Forks: 0

# Observe 审计日志 → Microsoft Sentinel 一个 Azure Function worker,通过 Azure Monitor Logs Ingestion API 将 Observe Inc. 的审计日志摄取到 Microsoft Sentinel 自定义表(`ObserveAuditLogs_CL`)中。 ## 架构 ``` Observe Export Query API → Azure Function (timer-triggered, every 5 min) → Transform to Sentinel schema → Azure Monitor Logs Ingestion API (DCE + DCR) → Log Analytics custom table (ObserveAuditLogs_CL) → Microsoft Sentinel ``` ## 前置条件 - Python 3.11+ - 启用了 Log Analytics 工作区和 Microsoft Sentinel 的 Azure 订阅 - 具有 API 访问权限的 Observe 租户(建议使用服务账户 token) - 安装了 Azure Functions 扩展的 VS Code(用于部署) ## 引导(首次设置) `bootstrap/` 文件夹包含一个自动化脚本,用于部署所有 Azure 基础设施。 ### 1. 填写参数 ``` cd bootstrap vim params.env # Fill in ALL required values ``` | 参数 | 在哪里找到它 | |-----------|-----------------| | `AZURE_SUBSCRIPTION_ID` | `az account show --query id -o tsv` | | `LOG_ANALYTICS_WORKSPACE_NAME` | Azure Portal → Log Analytics → 你的工作区 | | `OBSERVE_CUSTOMER_ID` | Observe UI → Settings → Workspace(数字 ID) | | `OBSERVE_API_TOKEN` | Observe UI → Settings → API Tokens → 创建服务账户 | | `OBSERVE_AUDIT_DATASET_ID` | Observe UI → Datasets → Audit Events(URL 中的 ID) | | `ADMIN_ALLOWED_IPS` | 你用于访问 Key Vault 的办公室/VPN 出口 IP | ### 2. 运行引导 ``` ./bootstrap.sh ``` 这将创建: | 资源 | 用途 | |----------|---------| | 资源组 | 所有资源的容器 | | 存储账户 | 检查点、隔离 blob、Functions 运行时 | | Key Vault | Observe API 密钥(防火墙锁定,`defaultAction: Deny`) | | App Insights | 监控和日志记录 | | Function App | 定时器触发的摄取 worker(Linux,Python 3.11) | | 数据收集端点 | 日志摄取 API 端点 | | 数据收集规则 | 将数据路由到自定义表 | | 自定义表 | Log Analytics 中的 `ObserveAuditLogs_CL` | | RBAC 角色 | 用于上述所有资源的托管标识权限 | ### 3. 部署函数代码 选择一种方式: - **VS Code:** `Ctrl+Shift+P` → *Azure Functions: Deploy to Function App* → 选择你的 function app - **Azure DevOps:** 针对你的 function app 执行 Zip 部署任务 - **CLI:** `func azure functionapp publish --python` ### Azure Policy 警告 企业订阅通常具有可能阻止操作的策略: | 被阻止的操作 | 症状 | 解决方案 | |---------------|---------|------------| | Key Vault IP 规则更改 | 添加 IP 时出现 `RequestDisallowedByPolicy` | 请求豁免 `Microsoft.KeyVault/vaults/write` | | 没有私有端点的存储 | 在存储账户处部署失败 | 添加私有端点或获取豁免 | | 缺少必需标签 | 部署失败 | 在 Bicep 参数中添加标签或获取豁免 | 引导脚本会打印警告并在可能的情况下继续执行。你可能需要在获取豁免后手动运行特定步骤。 ### 重新运行引导 该脚本是幂等的 —— 可以安全地重新运行。它将跳过现有资源,使用当前值覆盖 KV 密钥,并重新应用所有 Function App 设置。 ### 卸载 ``` az group delete --name --yes --no-wait # 自定义表位于 Sentinel 资源组中 — 如需可单独删除: az monitor log-analytics workspace table delete \ --resource-group \ --workspace-name \ --name ObserveAuditLogs_CL ``` ## 安全性 - Key Vault 部署时设置为 `defaultAction: Deny` —— 默认无公共访问权限 - 存储账户部署时设置为 `allowBlobPublicAccess: false` - Function App 使用系统分配的托管标识(代码中不存储凭证) - Observe 密钥存储在 Key Vault 中,通过 `@Microsoft.KeyVault(...)` 应用设置引用进行访问 - 只有 Function App 的出站 IP 被加入 Key Vault 防火墙的白名单 - Key Vault 启用了清除保护 ## 本地开发 ``` python -m venv .venv source .venv/bin/activate pip install -r requirements.txt pip install pytest cp local.settings.sample.json local.settings.json # 使用您的值填充 local.settings.json ``` ### 运行测试 ``` pytest tests/ -v ``` ### Dry-Run(查询 Observe 但不上传) ``` python scripts/run_once.py \ --start 2026-05-07T17:00:00Z \ --end 2026-05-07T17:30:00Z \ --dry-run ``` ### 上传至 Sentinel ``` python scripts/run_once.py \ --start 2026-05-07T17:00:00Z \ --end 2026-05-07T17:30:00Z \ --upload ``` ### 使用 Azure Functions Core Tools 在本地运行 ``` func start ``` ## 部署(代码更新) 基础设施设置完成后,部署代码更改: - **VS Code:** `Ctrl+Shift+P` → *Azure Functions: Deploy to Function App* → 选择你的 function app - **Azure DevOps:** Zip 部署任务(`.funcignore` 控制排除的文件) - **CLI:** `func azure functionapp publish --python` ## 配置 所有配置均通过环境变量完成。在 Azure 中,密钥使用 Key Vault 引用。 有关完整的变量列表,请参见 `local.settings.sample.json`。 ### 核心设置 | 变量 | 描述 | |----------|-------------| | `OBSERVE_CUSTOMER_ID` | Observe 客户/租户 ID | | `OBSERVE_API_TOKEN` | Observe API token(在 Azure 中为 Key Vault 引用) | | `OBSERVE_AUDIT_DATASET_ID` | Observe 审计事件数据集 ID | | `LOGS_INGESTION_ENDPOINT` | DCE 日志摄取 URL | | `DCR_IMMUTABLE_ID` | 数据收集规则不可变 ID | | `STORAGE_TABLE_ENDPOINT` | Azure Table Storage 端点(检查点) | | `STORAGE_BLOB_ENDPOINT` | Azure Blob Storage 端点(隔离) | ## 基础设施 (Bicep) `infra/` 中的模板 —— 由引导脚本自动部署。 | 文件 | 部署内容 | |------|-----------------| | `main.bicep` | 存储账户、Key Vault、App Insights、Function App | | `dcr.bicep` | 数据收集端点 + 数据收集规则 | | `role-assignments.bicep` | RBAC:KV Secrets User、Monitoring Metrics Publisher、Storage Contributor | | `table.kql` | 自定义表的参考 schema | 所有模板均已参数化 —— 资源名称是通过传入参数设定的,而非硬编码。 ## Sentinel 表结构 `ObserveAuditLogs_CL` 表: | 列 | 类型 | 描述 | |--------|------|-------------| | TimeGenerated | datetime | 事件时间戳(来自 Observe) | | ObserveTime | datetime | Observe 摄取时间 | | Operation | string | 操作(登录、读取、创建、更新、删除、llm_invocation) | | ObjectKind | string | 目标类型(dataset、monitor、dashboard 等) | | ObjectId | string | 目标资源 ID | | ObjectName | string | 目标资源显示名称 | | UserId | string | Observe 用户 ID | | UserEmail | string | 用户邮箱地址 | | UserDisplayName | string | 用户显示名称(如可用) | | LoginType | string | 身份验证方法(saml、password、api_key) | | LoginStatus | string | 登录结果(成功、失败) | | Success | boolean | 操作是否成功 | | SourceIp | string | 客户端 IP 地址 | | UserAgent | string | HTTP User-Agent | | Referer | string | HTTP Referer | | EventUid | string | 确定性事件 UID(用于去重) | | ObserveCustomerId | string | Observe 租户 ID | | ObserveTenant | string | Observe 租户显示名称 | | IngestedAt | datetime | 该记录被摄取到 Sentinel 的时间 | | RawFields | dynamic | 来自 Observe 的完整原始 FIELDS 对象 | | RawExtra | dynamic | 来自 Observe 的完整原始 EXTRA 对象 | | RawEvent | dynamic | 用于取证分析的完整原始行 | ## 项目结构 ``` observe-logger-service/ ├── function_app.py # Azure Function entry point (timer trigger) ├── host.json # Function host configuration ├── requirements.txt # Python dependencies ├── .funcignore # Files excluded from deployment ├── local.settings.sample.json # Template for local dev settings │ ├── observe_audit_ingestor/ # Core application code │ ├── config.py # Environment variable loader │ ├── observe_client.py # Observe Export Query API client │ ├── transform.py # Row transformation (Observe → Sentinel schema) │ ├── sentinel_client.py # Azure Monitor Logs Ingestion API client │ ├── checkpoint.py # Azure Table Storage checkpoint │ ├── quarantine.py # Blob storage for malformed rows │ ├── models.py # Data models │ └── log.py # Logging utilities │ ├── bootstrap/ # First-time infrastructure setup │ ├── params.env # Parameters to fill in │ └── bootstrap.sh # Automated setup script │ ├── infra/ # Bicep IaC templates │ ├── main.bicep # Storage, Key Vault, Function App │ ├── dcr.bicep # Data Collection Endpoint + Rule │ ├── role-assignments.bicep # RBAC for managed identity │ └── table.kql # Custom table schema reference │ ├── scripts/ # Development utilities │ ├── run_once.py # One-shot query + optional upload │ └── discover_observe_dataset.py # Dataset discovery helper │ └── tests/ # Unit tests ├── test_transform.py ├── test_observe_client.py └── test_sentinel_client.py ```
标签:API集成, Azure Function, Azure Monitor, DCE, DCR, Log Analytics, Microsoft Sentinel, Observe Inc, PB级数据处理, Python, 云端安全, 企业安全, 可观测性, 命令控制, 安全运维, 定时触发器, 审计日志, 数据接入, 数据采集, 无后门, 日志收集, 日志转换, 构建工具, 网络资产管理