google/secops-wrapper

GitHub: google/secops-wrapper

Google 官方推出的 Python SDK,封装了 SecOps/Chronicle SIEM 的 API,为安全运营场景提供日志摄取、UDM 搜索、告警与案例管理等一站式编程接口。

Stars: 72 | Forks: 38

# Google SecOps SDK for Python [![PyPI version](https://img.shields.io/pypi/v/secops.svg)](https://pypi.org/project/secops/) 一个用于与 Google Security Operations 产品进行交互的 Python SDK,目前支持 Chronicle/SecOps SIEM。 它为常见用例封装了 API,包括 UDM 搜索、实体查询、IoC、告警管理、案例管理以及检测规则管理。 ## 前置条件 请按照以下步骤确保您的环境已正确配置: 1. **为 Google SecOps 配置 Google Cloud 项目** - 您的 Google Cloud 项目必须关联到您的 Google SecOps 实例。 - 必须在您的 Google Cloud 项目中启用 Chronicle API。 - 用于身份验证的项目必须与您在 SecOps 入职期间设置的项目相同。 - 有关详细说明,请参阅[为 Google SecOps 配置 Google Cloud 项目](https://cloud.google.com/chronicle/docs/onboard/configure-cloud-project)。 2. **设置 IAM 权限** - 您使用的服务账号或用户凭据必须具有适当的权限 - 推荐的预定义角色是 **Chronicle API Admin** (`roles/chronicle.admin`) - 如需更精细的访问控制,您可以创建具有特定权限的自定义角色 - 有关详细的权限信息,请参阅[使用 IAM 进行访问控制](https://cloud.google.com/chronicle/docs/onboard/configure-feature-access) 3. **所需信息** - 您的 Chronicle 实例 ID (customer_id) - 您的 Google Cloud 项目编号 (project_id) - 您的首选区域(例如,"us"、"europe"、"asia") ## 安装 ``` pip install secops ``` ## 命令行界面 SDK 还提供了一个全面的命令行界面 (CLI),方便您直接从终端与 Google Security Operations 产品进行交互: ``` # Save your credentials secops config set --customer-id "your-instance-id" --project-id "your-project-id" --region "us" # Now use commands without specifying credentials each time secops search --query "metadata.event_type = \"NETWORK_CONNECTION\"" ``` 有关详细的 CLI 文档和示例,请参阅 [CLI 文档](https://github.com/google/secops-wrapper/blob/main/CLI.md)。 ## 身份验证 SDK 支持两种主要的身份验证方法: ### 1. 应用默认凭据 (ADC) 这是对 SDK 进行身份验证最简单且推荐的方法。应用默认凭据提供了一致的认证方法,适用于不同的 Google Cloud 环境和本地开发。 使用 ADC 有以下几种方式: #### a. 使用 `gcloud` CLI(推荐用于本地开发) ``` # Login and set up application-default credentials gcloud auth application-default login ``` 然后在您的代码中: ``` from secops import SecOpsClient # Initialize with default credentials - no explicit configuration needed client = SecOpsClient() ``` #### b. 使用环境变量 设置指向您的服务账号密钥的环境变量: ``` export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account.json" ``` 然后在您的代码中: ``` from secops import SecOpsClient # Initialize with default credentials - will automatically use the credentials file client = SecOpsClient() ``` #### c. Google Cloud 环境(自动) 在 Google Cloud 服务(Compute Engine、Cloud Functions、Cloud Run 等)上运行时,ADC 会自动生效,无需任何配置: ``` from secops import SecOpsClient # Initialize with default credentials - will automatically use the service account # assigned to your Google Cloud resource client = SecOpsClient() ``` ADC 将按顺序自动尝试以下身份验证方法: 1. 环境变量 `GOOGLE_APPLICATION_CREDENTIALS` 2. Google Cloud SDK 凭据(通过 `gcloud auth application-default login` 设置) 3. Google Cloud 提供的服务账号凭据 4. 本地服务账号模拟凭据 ### 2. 服务账号身份验证 为了获得更明确的控制,您可以使用在与 Google SecOps 关联的 Google Cloud 项目中创建的服务账号进行身份验证。 **关于权限的重要说明:** * 该服务账号需要被授予适当的 Identity and Access Management (IAM) 角色,才能与 Google Secops (Chronicle) API 进行交互。推荐的预定义角色是 **Chronicle API Admin** (`roles/chronicle.admin`)。或者,如果您的安全策略要求更精细的控制,您可以创建一个包含您打算使用的操作所需特定权限的自定义 IAM 角色(例如,`chronicle.instances.get`、`chronicle.events.create`、`chronicle.rules.list` 等)。 一旦服务账号被正确授予权限,您可以通过以下两种方式使用它进行身份验证: #### a. 使用服务账号 JSON 文件 ``` from secops import SecOpsClient # Initialize with service account JSON file client = SecOpsClient(service_account_path="/path/to/service-account.json") ``` #### b. 使用服务账号信息字典 如果您希望以编程方式管理凭据而不使用文件,可以创建一个包含服务账号密钥内容的字典。 ``` from secops import SecOpsClient # Service account details as a dictionary service_account_info = { "type": "service_account", "project_id": "your-project-id", "private_key_id": "key-id", "private_key": "-----BEGIN PRIVATE KEY-----\n...", "client_email": "service-account@project.iam.gserviceaccount.com", "client_id": "client-id", "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/..." } # Initialize with service account info client = SecOpsClient(service_account_info=service_account_info) ``` ### 模拟服务账号 [应用默认凭据](#1-application-default-credentials-adc)和[服务账号身份验证](#2-service-account-authentication)均支持利用相应的 `impersonate_service_account` 参数来模拟服务账号,具体配置如下: ``` from secops import SecOpsClient # Initialize with default credentials and impersonate service account client = SecOpsClient(impersonate_service_account="secops@test-project.iam.gserviceaccount.com") ``` ### 重试配置 SDK 提供了内置的重试功能,可自动处理瞬态错误,例如速率限制 (429)、服务器错误 (500、502、503、504) 和网络问题。您可以在初始化客户端时自定义重试行为: ``` from secops import SecOpsClient from secops.auth import RetryConfig # Define retry configurations retry_config = RetryConfig( total=3, # Maximum number of retries (default: 5) retry_status_codes=[429, 500, 502, 503, 504], # HTTP status codes to retry allowed_methods=["GET", "DELETE"], # HTTP methods to retry backoff_factor=0.5 # Backoff factor (default: 0.3) ) # Initialize with custom retry config client = SecOpsClient(retry_config=retry_config) # Disable retry completely by marking retry config as False client = SecOpsClient(retry_config=False) ``` ## 使用 Chronicle API ### 初始化 Chronicle 客户端 创建 SecOpsClient 后,您需要初始化特定于 Chronicle 的客户端: ``` # Initialize Chronicle client chronicle = client.chronicle( customer_id="your-chronicle-instance-id", # Your Chronicle instance ID project_id="your-project-id", # Your GCP project ID region="us" # Chronicle API region ) ``` [查看可用区域](https://github.com/google/secops-wrapper/blob/main/regions.md) #### API 版本控制 SDK 支持灵活的 API 版本选择: - **默认版本**:在客户端初始化期间设置 `default_api_version`(默认为 `v1alpha`) - **按方法覆盖**:许多方法接受 `api_version` 参数,以针对特定调用覆盖默认设置 **支持的 API 版本:** - `v1` - 稳定的生产环境 API - `v1beta` - 具有较新功能的 Beta API - `v1alpha` - 具有实验性功能的 Alpha API **按方法覆盖版本的示例:** ``` from secops.chronicle.models import APIVersion # Client defaults to v1alpha chronicle = client.chronicle( customer_id="your-chronicle-instance-id", project_id="your-project-id", region="us", default_api_version="v1alpha" ) # Use v1 for a specific rule operation rule = chronicle.get_rule( rule_id="ru_12345678-1234-1234-1234-123456789abc", api_version=APIVersion.V1 # Override to use v1 for this call ) ``` ### 日志提取 将原始日志直接提取到 Chronicle 中: ``` from datetime import datetime, timezone import json # Create a sample log (this is an OKTA log) current_time = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z') okta_log = { "actor": { "alternateId": "mark.taylor@cymbal-investments.org", "displayName": "Mark Taylor", "id": "00u4j7xcb5N6zfiRP5d8", "type": "User" }, "client": { "userAgent": { "rawUserAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36", "os": "Windows 10", "browser": "CHROME" }, "ipAddress": "96.6.127.53", "geographicalContext": { "city": "New York", "state": "New York", "country": "United States", "postalCode": "10118", "geolocation": {"lat": 40.7123, "lon": -74.0068} } }, "displayMessage": "Max sign in attempts exceeded", "eventType": "user.account.lock", "outcome": {"result": "FAILURE", "reason": "LOCKED_OUT"}, "published": "2025-06-19T21:51:50.116Z", "securityContext": { "asNumber": 20940, "asOrg": "akamai technologies inc.", "isp": "akamai international b.v.", "domain": "akamaitechnologies.com", "isProxy": false }, "severity": "DEBUG", "legacyEventType": "core.user_auth.account_locked", "uuid": "5b90a94a-d7ba-11ea-834a-85c24a1b2121", "version": "0" # ... additional OKTA log fields may be included } # Ingest a single log using the default forwarder result = chronicle.ingest_log( log_type="OKTA", # Chronicle log type log_message=json.dumps(okta_log) # JSON string of the log ) print(f"Operation: {result.get('operation')}") # Batch ingestion: Ingest multiple logs in a single request batch_logs = [ json.dumps({"actor": {"displayName": "User 1"}, "eventType": "user.session.start"}), json.dumps({"actor": {"displayName": "User 2"}, "eventType": "user.session.start"}), json.dumps({"actor": {"displayName": "User 3"}, "eventType": "user.session.start"}) ] # Ingest multiple logs in a single API call batch_result = chronicle.ingest_log( log_type="OKTA", log_message=batch_logs # List of log message strings ) print(f"Batch operation: {batch_result.get('operation')}") # Add custom labels to your logs labeled_result = chronicle.ingest_log( log_type="OKTA", log_message=json.dumps(okta_log), labels={"environment": "production", "app": "web-portal", "team": "security"} ) ``` SDK 还支持非 JSON 日志格式。以下是 Windows 事件日志的 XML 示例: ``` # Create a Windows Event XML log xml_content = """ 4624 1 0 12544 0 0x8020000000000000 202117513 Security WIN-SERVER.xyz.net S-1-0-0 - svcUser CLIENT-PC 3 """ # Ingest the XML log - no json.dumps() needed for XML result = chronicle.ingest_log( log_type="WINEVTLOG_XML", # Windows Event Log XML format log_message=xml_content # Raw XML content ) print(f"Operation: {result.get('operation')}") ``` SDK 支持 Chronicle 中所有可用的日志类型。您可以: 1. 查看可用的日志类型: ``` # Get all available log types log_types = chronicle.get_all_log_types() for lt in log_types[:5]: # Show first 5 print(f"{lt.id}: {lt.description}") # Fetch only first 50 log types (single page) log_types_page = chronicle.get_all_log_types(page_size=50) # Fetch specific page using token log_types_next = chronicle.get_all_log_types( page_size=50, page_token="next_page_token" ) ``` 2. 搜索特定的日志类型: ``` # Search for log types related to firewalls firewall_types = chronicle.search_log_types("firewall") for lt in firewall_types: print(f"{lt.id}: {lt.description}") ``` 3. 验证日志类型: ``` # Check if a log type is valid if chronicle.is_valid_log_type("OKTA"): print("Valid log type") else: print("Invalid log type") ``` 4. 分类日志以预测日志类型: ``` # Classify a raw log to determine its type okta_log = '{"eventType": "user.session.start", "actor": {"alternateId": "user@example.com"}}' predictions = chronicle.classify_logs(log_data=okta_log) # Display predictions sorted by confidence score for prediction in predictions: print(f"Log Type: {prediction['logType']}, Score: {prediction['score']}") ``` 5. 使用自定义转发器: ``` # Create or get a custom forwarder forwarder = chronicle.get_or_create_forwarder(display_name="MyCustomForwarder") forwarder_id = forwarder["name"].split("/")[-1] # Use the custom forwarder for log ingestion result = chronicle.ingest_log( log_type="WINDOWS", log_message=json.dumps(windows_log), forwarder_id=forwarder_id ) ``` ### 转发器管理 Chronicle 日志转发器对于处理具有特定配置的日志提取至关重要。SDK 提供了用于创建和管理转发器的全面方法: #### 创建新转发器 ``` # Create a basic forwarder with just a display name forwarder = chronicle.create_forwarder(display_name="MyAppForwarder") # Create a forwarder with optional configuration forwarder = chronicle.create_forwarder( display_name="ProductionForwarder", metadata={"labels": {"env": "prod"}}, upload_compression=True, # Enable upload compression for efficiency enable_server=False # Server functionality disabled, http_settings={ "port":8080, "host":"192.168.0.100", "routeSettings":{ "availableStatusCode": 200, "readyStatusCode": 200, "unreadyStatusCode": 500 } } ) print(f"Created forwarder with ID: {forwarder['name'].split('/')[-1]}") ``` #### 列出所有转发器 检索您的 Chronicle 环境中的所有转发器,并支持分页: ``` # Get the default page size (50) forwarders = chronicle.list_forwarders() # Get forwarders with custom page size forwarders = chronicle.list_forwarders(page_size=100) # Process the forwarders for forwarder in forwarders.get("forwarders", []): forwarder_id = forwarder.get("name", "").split("/")[-1] display_name = forwarder.get("displayName", "") create_time = forwarder.get("createTime", "") print(f"Forwarder ID: {forwarder_id}, Name: {display_name}, Created: {create_time}") ``` #### 获取转发器详情 使用其 ID 检索特定转发器的详细信息: ``` # Get a specific forwarder using its ID forwarder_id = "1234567890" forwarder = chronicle.get_forwarder(forwarder_id=forwarder_id) # Access forwarder properties display_name = forwarder.get("displayName", "") metadata = forwarder.get("metadata", {}) server_enabled = forwarder.get("enableServer", False) print(f"Forwarder {display_name} details:") print(f" Metadata: {metadata}") print(f" Server enabled: {server_enabled}") ``` #### 获取或创建转发器 通过显示名称检索现有的转发器,如果不存在则创建一个新的: ``` # Try to find a forwarder with the specified display name # If not found, create a new one with that display name forwarder = chronicle.get_or_create_forwarder(display_name="ApplicationLogForwarder") # Extract the forwarder ID for use in log ingestion forwarder_id = forwarder["name"].split("/")[-1] ``` #### 更新转发器 使用特定属性更新现有转发器的配置: ``` # Update a forwarder with new properties forwarder = chronicle.update_forwarder( forwarder_id="1234567890", display_name="UpdatedForwarderName", metadata={"labels": {"env": "prod"}}, upload_compression=True ) # Update specific fields using update mask forwarder = chronicle.update_forwarder( forwarder_id="1234567890", display_name="ProdForwarder", update_mask=["display_name"] ) print(f"Updated forwarder: {forwarder['name']}") ``` #### 删除转发器 通过其 ID 删除现有的转发器: ``` # Delete a forwarder by ID chronicle.delete_forwarder(forwarder_id="1234567890") print("Forwarder deleted successfully") ``` ### 日志处理流水线 Chronicle 日志处理流水线允许您在日志数据存储到 Chronicle 之前对其进行转换、过滤和丰富。常见的用例包括删除空的键值对、编辑敏感数据、添加摄入标签、按字段值过滤日志以及提取主机信息。流水线可以与日志类型(带有可选的收集器 ID)和 Feed 相关联,从而为您的数据摄入工作流提供灵活的控制。 SDK 提供了全面的方法来管理流水线、关联流、测试配置以及获取样本日志。 #### 列出流水线 检索您 Chronicle 实例中的所有日志处理流水线: ``` # Get all pipelines result = chronicle.list_log_processing_pipelines() pipelines = result.get("logProcessingPipelines", []) for pipeline in pipelines: pipeline_id = pipeline["name"].split("/")[-1] print(f"Pipeline: {pipeline['displayName']} (ID: {pipeline_id})") # List with pagination result = chronicle.list_log_processing_pipelines( page_size=50, page_token="next_page_token" ) ``` #### 获取流水线详情 检索特定流水线的详细信息: ``` # Get pipeline by ID pipeline_id = "1234567890" pipeline = chronicle.get_log_processing_pipeline(pipeline_id) print(f"Name: {pipeline['displayName']}") print(f"Description: {pipeline.get('description', 'N/A')}") print(f"Processors: {len(pipeline.get('processors', []))}") ``` #### 创建流水线 创建带有处理器的新日志处理流水线: ``` # Define pipeline configuration pipeline_config = { "displayName": "My Custom Pipeline", "description": "Filters and transforms application logs", "processors": [ { "filterProcessor": { "include": { "logMatchType": "REGEXP", "logBodies": [".*error.*", ".*warning.*"], }, "errorMode": "IGNORE", } } ], "customMetadata": [ {"key": "environment", "value": "production"}, {"key": "team", "value": "security"} ] } # Create the pipeline (server generates ID) created_pipeline = chronicle.create_log_processing_pipeline( pipeline=pipeline_config ) pipeline_id = created_pipeline["name"].split("/")[-1] print(f"Created pipeline with ID: {pipeline_id}") ``` #### 更新流水线 更新现有流水线的配置: ``` # Get the existing pipeline first pipeline = chronicle.get_log_processing_pipeline(pipeline_id) # Update specific fields updated_config = { "name": pipeline["name"], "description": "Updated description", "processors": pipeline["processors"] } # Patch with update mask updated_pipeline = chronicle.update_log_processing_pipeline( pipeline_id=pipeline_id, pipeline=updated_config, update_mask="description" ) print(f"Updated: {updated_pipeline['displayName']}") ``` #### 删除流水线 删除现有的流水线: ``` # Delete by ID chronicle.delete_log_processing_pipeline(pipeline_id) print("Pipeline deleted successfully") # Delete with etag for concurrency control chronicle.delete_log_processing_pipeline( pipeline_id=pipeline_id, etag="etag_value" ) ``` #### 将流与流水线关联 将日志流(按日志类型或 Feed)与流水线关联: ``` # Associate by log type streams = [ {"logType": "WINEVTLOG"}, {"logType": "LINUX"} ] chronicle.associate_streams( pipeline_id=pipeline_id, streams=streams ) print("Streams associated successfully") # Associate by feed ID feed_streams = [ {"feed": "feed-uuid-1"}, {"feed": "feed-uuid-2"} ] chronicle.associate_streams( pipeline_id=pipeline_id, streams=feed_streams ) ``` #### 解除流与流水线的关联 从流水线中移除流的关联: ``` # Dissociate streams streams = [{"logType": "WINEVTLOG"}] chronicle.dissociate_streams( pipeline_id=pipeline_id, streams=streams ) print("Streams dissociated successfully") ``` #### 获取关联的流水线 查找与特定流关联的流水线: ``` # Find pipeline for a log type stream_query = {"logType": "WINEVTLOG"} associated = chronicle.fetch_associated_pipeline(stream=stream_query) if associated: print(f"Associated pipeline: {associated['name']}") else: print("No pipeline associated with this stream") # Find pipeline for a feed feed_query = {"feed": "feed-uuid"} associated = chronicle.fetch_associated_pipeline(stream=feed_query) ``` #### 获取样本日志 检索特定流的样本日志: ``` # Fetch sample logs for log types streams = [ {"logType": "WINEVTLOG"}, {"logType": "LINUX"} ] result = chronicle.fetch_sample_logs_by_streams( streams=streams, sample_logs_count=10 ) for log in result.get("logs", []): print(f"Log: {log}") ``` #### 测试流水线 在部署前针对样本日志测试流水线配置: ``` import base64 from datetime import datetime, timezone # Define pipeline to test pipeline_config = { "displayName": "Test Pipeline", "processors": [ { "filterProcessor": { "include": { "logMatchType": "REGEXP", "logBodies": [".*"], }, "errorMode": "IGNORE", } } ] } # Create test logs with base64-encoded data current_time = datetime.now(timezone.utc).isoformat() log_data = base64.b64encode(b"Sample log entry").decode("utf-8") input_logs = [ { "data": log_data, "logEntryTime": current_time, "collectionTime": current_time, } ] # Test the pipeline result = chronicle.test_pipeline( pipeline=pipeline_config, input_logs=input_logs ) print(f"Processed {len(result.get('logs', []))} logs") for processed_log in result.get("logs", []): print(f"Result: {processed_log}") ``` 5. 使用自定义时间戳: ``` from datetime import datetime, timedelta, timezone # Define custom timestamps log_entry_time = datetime.now(timezone.utc) - timedelta(hours=1) collection_time = datetime.now(timezone.utc) result = chronicle.ingest_log( log_type="OKTA", log_message=json.dumps(okta_log), log_entry_time=log_entry_time, # When the log was generated collection_time=collection_time # When the log was collected ) ``` 将 UDM 事件直接提取到 Chronicle 中: ``` import uuid from datetime import datetime, timezone # Generate a unique ID event_id = str(uuid.uuid4()) # Get current time in ISO 8601 format current_time = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") # Create a UDM event for a network connection network_event = { "metadata": { "id": event_id, "event_timestamp": current_time, "event_type": "NETWORK_CONNECTION", "product_name": "My Security Product", "vendor_name": "My Company" }, "principal": { "hostname": "workstation-1", "ip": "192.168.1.100", "port": 12345 }, "target": { "ip": "203.0.113.10", "port": 443 }, "network": { "application_protocol": "HTTPS", "direction": "OUTBOUND" } } # Ingest a single UDM event result = chronicle.ingest_udm(udm_events=network_event) print(f"Ingested event with ID: {event_id}") # Create a second event process_event = { "metadata": { # No ID - one will be auto-generated "event_timestamp": current_time, "event_type": "PROCESS_LAUNCH", "product_name": "My Security Product", "vendor_name": "My Company" }, "principal": { "hostname": "workstation-1", "process": { "command_line": "ping 8.8.8.8", "pid": 1234 }, "user": { "userid": "user123" } } } # Ingest multiple UDM events in a single call result = chronicle.ingest_udm(udm_events=[network_event, process_event]) print("Multiple events ingested successfully") ``` 将实体导入 Chronicle: ``` # Create a sample entity entity = { "metadata": { "collected_timestamp": "2025-01-01T00:00:00Z", "vendor_name": "TestVendor", "product_name": "TestProduct", "entity_type": "USER", }, "entity": { "user": { "userid": "testuser", } }, } # Import a single entity result = chronicle.import_entities(entities=entity, log_type="TEST_LOG_TYPE") print(f"Imported entity: {result}") # Import multiple entities entity2 = { "metadata": { "collected_timestamp": "2025-01-01T00:00:00Z", "vendor_name": "TestVendor", "product_name": "TestProduct", "entity_type": "ASSET", }, "entity": { "asset": { "hostname": "testhost", } }, } entities = [entity, entity2] result = chronicle.import_entities(entities=entities, log_type="TEST_LOG_TYPE") print(f"Imported entities: {result}") ``` ### 数据导出 您可以使用 Data Export API 将 Chronicle 日志导出到 Google Cloud Storage: ``` from datetime import datetime, timedelta, timezone # Set time range for export end_time = datetime.now(timezone.utc) start_time = end_time - timedelta(days=1) # Last 24 hours # Get available log types for export available_log_types = chronicle.fetch_available_log_types( start_time=start_time, end_time=end_time ) # Print available log types for log_type in available_log_types["available_log_types"]: print(f"{log_type.display_name} ({log_type.log_type.split('/')[-1]})") print(f" Available from {log_type.start_time} to {log_type.end_time}") # Create a data export for a single log type (legacy method) export = chronicle.create_data_export( gcs_bucket="projects/my-project/buckets/my-export-bucket", start_time=start_time, end_time=end_time, log_type="GCP_DNS" # Single log type to export ) # Create a data export for multiple log types export_multiple = chronicle.create_data_export( gcs_bucket="projects/my-project/buckets/my-export-bucket", start_time=start_time, end_time=end_time, log_types=["WINDOWS", "LINUX", "GCP_DNS"] # Multiple log types to export ) # Get the export ID export_id = export["name"].split("/")[-1] print(f"Created export with ID: {export_id}") print(f"Status: {export['data_export_status']['stage']}") # List recent exports recent_exports = chronicle.list_data_export(page_size=10) print(f"Found {len(recent_exports.get('dataExports', []))} recent exports") # Print details of recent exports for item in recent_exports.get("dataExports", []): item_id = item["name"].split("/")[-1] if "dataExportStatus" in item: status = item["dataExportStatus"]["stage"] else: status = item["data_export_status"]["stage"] print(f"Export ID: {item_id}, Status: {status}") # Check export status status = chronicle.get_data_export(export_id) # Update an export that is in IN_QUEUE state if status.get("dataExportStatus", {}).get("stage") == "IN_QUEUE": # Update with a new start time updated_start = start_time + timedelta(hours=2) update_result = chronicle.update_data_export( data_export_id=export_id, start_time=updated_start, # Optionally update other parameters like end_time, gcs_bucket, or log_types ) print("Export updated successfully") # Cancel an export if needed if status.get("dataExportStatus", {}).get("stage") in ["IN_QUEUE", "PROCESSING"]: cancelled = chronicle.cancel_data_export(export_id) print(f"Export has been cancelled. New status: {cancelled['data_export_status']['stage']}") # Export all log types at once export_all = chronicle.create_data_export( gcs_bucket="projects/my-project/buckets/my-export-bucket", start_time=start_time, end_time=end_time, export_all_logs=True ) print(f"Created export for all logs. Status: {export_all['data_export_status']['stage']}") ``` Data Export API 支持: - 将一个、多个或所有日志类型导出到 Google Cloud Storage - 列出最近的导出记录并过滤结果 - 检查导出状态和进度 - 更新队列中的导出任务 - 取消正在进行的导出任务 - 获取特定时间范围内可用的日志类型 如果您在使用 Data Export 功能时遇到任何问题,请将详细信息及复现步骤提交到我们的 Issue 跟踪器中。 ### 基本 UDM 搜索 搜索网络连接事件: ``` from datetime import datetime, timedelta, timezone # Set time range for queries end_time = datetime.now(timezone.utc) start_time = end_time - timedelta(hours=24) # Last 24 hours # Perform UDM search results = chronicle.search_udm( query=""" metadata.event_type = "NETWORK_CONNECTION" ip != "" """, start_time=start_time, end_time=end_time, max_events=5 ) # Example response: { "events": [ { "name": "projects/my-project/locations/us/instances/my-instance/events/encoded-event-id", "udm": { "metadata": { "eventTimestamp": "2024-02-09T10:30:00Z", "eventType": "NETWORK_CONNECTION" }, "target": { "ip": ["192.168.1.100"], "port": 443 }, "principal": { "hostname": "workstation-1" } } } ], "total_events": 1, "more_data_available": false } ``` ### UDM 搜索视图 检索带有附加上下文信息(包括检测数据)的 UDM 搜索结果: ``` from datetime import datetime, timedelta, timezone # Set time range for queries end_time = datetime.now(timezone.utc) start_time = end_time - timedelta(hours=24) # Last 24 hours # Fetch UDM search view results results = chronicle.fetch_udm_search_view( query='metadata.event_type = "NETWORK_CONNECTION"', start_time=start_time, end_time=end_time, max_events=5, # Limit to 5 events max_detections=10, # Get up to 10 detections snapshot_query='feedback_summary.status = "OPEN"', # Filter for open alerts case_insensitive=True # Case-insensitive search ) ``` ### 获取 UDM 字段值 搜索与查询匹配的已提取 UDM 字段值: ``` # Search for fields containing "source" results = chronicle.find_udm_field_values( query="source", page_size=10 ) # Example response: { "valueMatches": [ { "fieldPath": "metadata.ingestion_labels.key", "value": "source", "ingestionTime": "2025-08-18T08:00:11.670673Z", "matchEnd": 6 }, { "fieldPath": "additional.fields.key", "value": "source", "ingestionTime": "2025-02-18T19:45:01.811426Z", "matchEnd": 6 } ], "fieldMatches": [ { "fieldPath": "about.labels.value" }, { "fieldPath": "additional.fields.value.string_value" } ], "fieldMatchRegex": "source" } ``` ### 原始日志搜索 使用查询语言在 Chronicle 中搜索原始日志: ``` from datetime import datetime, timedelta, timezone # Set time range for search end_time = datetime.now(timezone.utc) start_time = end_time - timedelta(hours=24) results = chronicle.search_raw_logs( query='raw != "authentication"', start_time=start_time, end_time=end_time, snapshot_query='status = "success"', max_aggregations_per_field=100, page_size=20 ) ``` ### 统计查询 获取按主机名分组的网络连接统计信息: ``` stats = chronicle.get_stats( query="""metadata.event_type = "NETWORK_CONNECTION" match: target.hostname outcome: $count = count(metadata.id) order: $count desc""", start_time=start_time, end_time=end_time, max_events=1000, max_values=10, timeout=180 ) # Example response: { "columns": ["hostname", "count"], "rows": [ {"hostname": "server-1", "count": 1500}, {"hostname": "server-2", "count": 1200} ], "total_rows": 2 } ``` ### CSV 导出 将特定字段导出为 CSV 格式: ``` csv_data = chronicle.fetch_udm_search_csv( query='metadata.event_type = "NETWORK_CONNECTION"', start_time=start_time, end_time=end_time, fields=["timestamp", "user", "hostname", "process name"] ) # Example response: """ metadata.eventTimestamp,principal.hostname,target.ip,target.port 2024-02-09T10:30:00Z,workstation-1,192.168.1.100,443 2024-02-09T10:31:00Z,workstation-2,192.168.1.101,80 """ ``` ### 查询验证 在执行前验证 UDM 查询: ``` query = 'target.ip != "" and principal.hostname = "test-host"' validation = chronicle.validate_query(query) # Example response: { "isValid": true, "queryType": "QUERY_TYPE_UDM_QUERY", "suggestedFields": [ "target.ip", "principal.hostname" ] } ``` ### 自然语言搜索 使用自然语言而不是 UDM 查询语法来搜索事件: ``` from datetime import datetime, timedelta, timezone # Set time range for queries end_time = datetime.now(timezone.utc) start_time = end_time - timedelta(hours=24) # Last 24 hours # Option 1: Translate natural language to UDM query udm_query = chronicle.translate_nl_to_udm("show me network connections") print(f"Translated query: {udm_query}") # Example output: 'metadata.event_type="NETWORK_CONNECTION"' # Then run the query manually if needed results = chronicle.search_udm( query=udm_query, start_time=start_time, end_time=end_time ) # Option 2: Perform complete search with natural language results = chronicle.nl_search( text="show me failed login attempts", start_time=start_time, end_time=end_time, max_events=100 ) # Example response (same format as search_udm): { "events": [ { "event": { "metadata": { "eventTimestamp": "2024-02-09T10:30:00Z", "eventType": "USER_LOGIN" }, "principal": { "user": { "userid": "jdoe" } }, "securityResult": { "action": "BLOCK", "summary": "Failed login attempt" } } } ], "total_events": 1 } ``` 自然语言搜索功能支持各种查询模式: - "Show me network connections" - "Find suspicious processes" - "Show login failures in the last hour" - "Display connections to IP address 192.168.1.100" 如果自然语言无法转换为有效的 UDM 查询,将引发 `APIError`,并附带指示无法生成有效查询的消息。 ### 实体摘要 获取有关特定实体(如 IP 地址、域或文件哈希)的详细信息。该函数会根据提供的值自动检测实体类型,并获取包含相关实体、告警、时间线、普遍性等在内的综合摘要。 ``` # IP address summary ip_summary = chronicle.summarize_entity( value="8.8.8.8", start_time=start_time, end_time=end_time ) # Domain summary domain_summary = chronicle.summarize_entity( value="google.com", start_time=start_time, end_time=end_time ) # File hash summary (SHA256) file_hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" file_summary = chronicle.summarize_entity( value=file_hash, start_time=start_time, end_time=end_time ) # Optionally hint the preferred type if auto-detection might be ambiguous user_summary = chronicle.summarize_entity( value="jdoe", start_time=start_time, end_time=end_time, preferred_entity_type="USER" ) # Example response structure (EntitySummary object): # Access attributes like: ip_summary.primary_entity, ip_summary.related_entities, # ip_summary.alert_counts, ip_summary.timeline, ip_summary.prevalence, etc. # Example fields within the EntitySummary object: # primary_entity: { # "name": "entities/...", # "metadata": { # "entityType": "ASSET", # Or FILE, DOMAIN_NAME, USER, etc. # "interval": { "startTime": "...", "endTime": "..." } # }, # "metric": { "firstSeen": "...", "lastSeen": "..." }, # "entity": { # Contains specific details like 'asset', 'file', 'domain' # "asset": { "ip": ["8.8.8.8"] } # } # } # related_entities: [ { ... similar to primary_entity ... } ] # alert_counts: [ { "rule": "Rule Name", "count": 5 } ] # timeline: { "buckets": [ { "alertCount": 1, "eventCount": 10 } ], "bucketSize": "3600s" } # prevalence: [ { "prevalenceTime": "...", "count": 100 } ] # file_metadata_and_properties: { # Only for FILE entities # "metadata": [ { "key": "...", "value": "..." } ], # "properties": [ { "title": "...", "properties": [ { "key": "...", "value": "..." } ] } ] # } ``` ### 列出 IoC (失陷指标) 检索与已提取事件匹配的 IoC: ``` iocs = chronicle.list_iocs( start_time=start_time, end_time=end_time, max_matches=1000, add_mandiant_attributes=True, prioritized_only=False ) # Process the results for ioc in iocs['matches']: ioc_type = next(iter(ioc['artifactIndicator'].keys())) ioc_value = next(iter(ioc['artifactIndicator'].values())) print(f"IoC Type: {ioc_type}, Value: {ioc_value}") print(f"Sources: {', '.join(ioc['sources'])}") ``` IoC 响应包括: - 指标本身(域名、IP、哈希等) - 来源和类别 - 您环境中受影响的资产 - 首次和最后一次出现的时间戳 - 置信度分数和严重性评级 - 关联的威胁行为者和恶意软件家族(带有 Mandiant 属性) ### 告警与案例管理 检索告警及其关联的案例: ``` # Get non-closed alerts alerts = chronicle.get_alerts( start_time=start_time, end_time=end_time, snapshot_query='feedback_summary.status != "CLOSED"', max_alerts=100 ) # Get alerts from the response alert_list = alerts.get('alerts', {}).get('alerts', []) # Extract case IDs from alerts case_ids = {alert.get('caseName') for alert in alert_list if alert.get('caseName')} # Get case details using the batch API if case_ids: cases = chronicle.get_cases(list(case_ids)) # Process cases for case in cases.cases: print(f"Case: {case.display_name}") print(f"Priority: {case.priority}") print(f"Status: {case.status}") print(f"Stage: {case.stage}") # Access SOAR platform information if available if case.soar_platform_info: print(f"SOAR Case ID: {case.soar_platform_info.case_id}") print(f"SOAR Platform: {case.soar_platform_info.platform_type}") ``` 告警响应包括: - 进度状态和完成状态 - 告警计数(基线和已过滤的) - 告警详情(规则信息、检测详情等) - 案例关联 您可以使用带有以下字段的 snapshot 查询参数来过滤告警: - `detection.rule_name` - `detection.alert_state` - `feedback_summary.verdict` - `feedback_summary.priority` - `feedback_summary.status` ### 案例管理助手 `CaseList` 类提供了用于处理案例的助手方法: ``` # Get details for specific cases (uses the batch API) cases = chronicle.get_cases(["case-id-1", "case-id-2"]) # Filter cases by priority high_priority = cases.filter_by_priority("PRIORITY_HIGH") # Filter cases by status open_cases = cases.filter_by_status("STATUS_OPEN") # Look up a specific case case = cases.get_case("case-id-1") ``` ### 案例管理 Chronicle 提供全面的案例管理功能,用于跟踪和管理安全调查。SDK 支持列出、检索、更新以及对案例执行批量操作。 #### 列出案例 检索带有可选过滤和分页的案例: ``` # List all cases with default pagination result = chronicle.list_cases(page_size=50) for case_data in result["cases"]: case_id = case_data["name"].split("/")[-1] print(f"Case {case_id}: {case_data['displayName']}") # List with filtering open_cases = chronicle.list_cases( page_size=100, filter_query='status = "OPENED"', order_by="createTime desc" ) # Get cases as a flat list instead of paginated dict cases_list = chronicle.list_cases(page_size=50, as_list=True) for case in cases_list: print(f"{case['displayName']}: {case['priority']}") ``` #### 获取案例详情 检索特定案例的详细信息: ``` # Get case by ID case = chronicle.get_case("12345") print(f"Case: {case.display_name}") print(f"Priority: {case.priority}") print(f"Status: {case.status}") print(f"Stage: {case.stage}") # Get case with expanded fields case_expanded = chronicle.get_case("12345", expand="tags,products") ``` #### 更新案例 使用部分更新方式更新案例字段: ``` # Update case priority updated_case = chronicle.patch_case( case_name="12345", case_data={"priority": "PRIORITY_HIGH"}, update_mask="priority" ) # Update multiple fields updated_case = chronicle.patch_case( case_name="12345", case_data={ "priority": "PRIORITY_MEDIUM", "stage": "Investigation" }, update_mask="priority,stage" ) ``` #### 合并案例 将多个案例合并为一个目标案例: ``` # Merge source cases into target case result = chronicle.merge_cases( case_ids=[12345, 67890], case_to_merge_with=11111 ) if result.get("isRequestValid"): print(f"Cases merged into case {result['newCaseId']}") else: print(f"Merge failed: {result.get('errors')}") ``` #### 批量操作 同时执行多个案例的操作: ``` # Bulk add tags chronicle.execute_bulk_add_tag( case_ids=[12345, 67890], tags=["phishing", "high-priority"] ) # Bulk assign cases chronicle.execute_bulk_assign( case_ids=[12345, 67890], username="@SecurityTeam" ) # Bulk change priority chronicle.execute_bulk_change_priority( case_ids=[12345, 67890], priority="PRIORITY_HIGH" ) # Bulk change stage chronicle.execute_bulk_change_stage( case_ids=[12345, 67890], stage="Remediation" ) # Bulk close cases chronicle.execute_bulk_close( case_ids=[12345, 67890], close_reason="NOT_MALICIOUS", root_cause="False positive - benign activity", close_comment="Verified with asset owner" ) # Bulk reopen cases chronicle.execute_bulk_reopen( case_ids=[12345, 67890], reopen_comment="New evidence discovered" ) ``` ### 调查管理 Chronicle 调查提供了针对告警和案例的自动化分析和建议。SDK 提供了列出、检索、触发以及获取关联调查的方法。 #### 列出调查 检索您 Chronicle 实例中的所有调查: ``` # List all investigations result = chronicle.list_investigations() investigations = result.get("investigations", []) for inv in investigations: print(f"Investigation: {inv['displayName']}") print(f" Status: {inv.get('status', 'N/A')}") print(f" Verdict: {inv.get('verdict', 'N/A')}") # List with pagination result = chronicle.list_investigations(page_size=50, page_token="token") ``` #### 获取调查详情 通过其 ID 检索特定调查: ``` # Get investigation by ID investigation = chronicle.get_investigation(investigation_id="inv_123") print(f"Name: {investigation['displayName']}") print(f"Status: {investigation.get('status')}") print(f"Verdict: {investigation.get('verdict')}") print(f"Confidence: {investigation.get('confidence')}") ``` #### 为告警触发调查 为特定告警创建新的调查: ``` # Trigger investigation for an alert investigation = chronicle.trigger_investigation(alert_id="alert_123") print(f"Investigation created: {investigation['name']}") print(f"Status: {investigation.get('status')}") print(f"Trigger type: {investigation.get('triggerType')}") ``` #### 获取关联的调查 检索与告警或案例关联的调查: ``` from secops.chronicle import DetectionType # Fetch investigations for specific alerts result = chronicle.fetch_associated_investigations( detection_type=DetectionType.ALERT, alert_ids=["alert_123", "alert_456"], association_limit_per_detection=5 ) # Process associations associations_list = result.get("associationsList", {}) for alert_id, data in associations_list.items(): investigations = data.get("investigations", []) print(f"Alert {alert_id}: {len(investigations)} investigation(s)") for inv in investigations: print(f" - {inv['displayName']}: {inv.get('verdict', 'N/A')}") # Fetch investigations for cases case_result = chronicle.fetch_associated_investigations( detection_type=DetectionType.CASE, case_ids=["case_123"], association_limit_per_detection=3 ) # You can also use string values for detection_type result = chronicle.fetch_associated_investigations( detection_type="ALERT", # or "DETECTION_TYPE_ALERT" alert_ids=["alert_123"] ) ``` ### 生成 UDM 键/值映射 Chronicle 提供了一项功能,可以为给定的行日志生成 UDM 键值映射。 ``` mapping = chronicle.generate_udm_key_value_mappings( log_format="JSON", log='{"events":[{"id":"123","user":"test_user","source_ip":"192.168.1.10"}]}', use_array_bracket_notation=True, compress_array_fields=False, ) print(f"Generated UDM key/value mapping: {mapping}") ``` ``` # Generate UDM key-value mapping udm_mapping = chronicle.generate_udm_mapping(log_type="WINDOWS_AD") print(udm_mapping) ``` ## 解析器管理 Chronicle 解析器用于将原始日志数据处理并规范化为 Chronicle 的 Unified Data Model (UDM) 格式。解析器将各种日志格式(JSON、XML、CEF 等)转换为标准化结构,从而实现对不同数据源的一致查询和分析。 SDK 提供了对管理 Chronicle 解析器的全面支持: ### 创建解析器 创建新解析器: ``` parser_text = """ filter { mutate { replace => { "event1.idm.read_only_udm.metadata.event_type" => "GENERIC_EVENT" "event1.idm.read_only_udm.metadata.vendor_name" => "ACME Labs" } } grok { match => { "message" => ["^(?P<_firstWord>[^\s]+)\s.*$"] } on_error => "_grok_message_failed" } if ![_grok_message_failed] { mutate { replace => { "event1.idm.read_only_udm.metadata.description" => "%{_firstWord}" } } } mutate { merge => { "@output" => "event1" } } } """ log_type = "WINDOWS_AD" # Create the parser parser = chronicle.create_parser( log_type=log_type, parser_code=parser_text, validated_on_empty_logs=True # Whether to validate parser on empty logs ) parser_id = parser.get("name", "").split("/")[-1] print(f"Parser ID: {parser_id}") ``` ### 管理解析器 检索、列出、复制、激活/停用和删除解析器: ``` # List all parsers (returns complete list) parsers = chronicle.list_parsers() for parser in parsers: parser_id = parser.get("name", "").split("/")[-1] state = parser.get("state") print(f"Parser ID: {parser_id}, State: {state}") # Manual pagination: get raw API response with nextPageToken response = chronicle.list_parsers(page_size=50) parsers = response.get("parsers", []) next_token = response.get("nextPageToken") # Use next_token for subsequent calls: # response = chronicle.list_parsers(page_size=50, page_token=next_token) log_type = "WINDOWS_AD" # Get specific parser parser = chronicle.get_parser(log_type=log_type, id=parser_id) print(f"Parser content: {parser.get('text')}") # Activate/Deactivate parser chronicle.activate_parser(log_type=log_type, id=parser_id) chronicle.deactivate_parser(log_type=log_type, id=parser_id) # Copy an existing parser as a starting point copied_parser = chronicle.copy_parser(log_type=log_type, id="pa_existing_parser") # Delete parser chronicle.delete_parser(log_type=log_type, id=parser_id) # Force delete an active parser chronicle.delete_parser(log_type=log_type, id=parser_id, force=True) # Activate a release candidate parser chronicle.activate_release_candidate_parser(log_type=log_type, id="pa_release_candidate") ``` ### 针对样本日志运行解析器 在一个或多个样本日志上运行解析器: ``` # Sample parser code that extracts fields from logs parser_text = """ filter { mutate { replace => { "event1.idm.read_only_udm.metadata.event_type" => "GENERIC_EVENT" "event1.idm.read_only_udm.metadata.vendor_name" => "ACME Labs" } } grok { match => { "message" => ["^(?P<_firstWord>[^\s]+)\s.*$"] } on_error => "_grok_message_failed" } if ![_grok_message_failed] { mutate { replace => { "event1.idm.read_only_udm.metadata.description" => "%{_firstWord}" } } } mutate { merge => { "@output" => "event1" } } } """ log_type = "WINDOWS_AD" # Sample log entries to test sample_logs = [ '{"message": "ERROR: Failed authentication attempt"}', '{"message": "WARNING: Suspicious activity detected"}', '{"message": "INFO: User logged in successfully"}' ] # Run parser evaluation result = chronicle.run_parser( log_type=log_type, parser_code=parser_text, parser_extension_code=None, # Optional parser extension logs=sample_logs, statedump_allowed=False # Enable if using statedump filters ) # Check the results if "runParserResults" in result: for i, parser_result in enumerate(result["runParserResults"]): print(f"\nLog {i+1} parsing result:") if "parsedEvents" in parser_result: print(f" Parsed events: {parser_result['parsedEvents']}") if "errors" in parser_result: print(f" Errors: {parser_result['errors']}") # Run parser with statedump for debugging # Statedump provides internal parser state useful for troubleshooting result_with_statedump = chronicle.run_parser( log_type=log_type, parser_code=parser_text, parser_extension_code=None, logs=sample_logs, statedump_allowed=True, # Enable statedump in parser output parse_statedump=True # Parse statedump string into structured format ) # Check statedump results (useful for parser debugging) if "runParserResults" in result_with_statedump: for i, parser_result in enumerate(result_with_statedump["runParserResults"]): if "statedumpResults" in parser_result: for dump in parser_result["statedumpResults"]: statedump = dump.get("statedumpResult", {}) print(f"\nParser state for log {i+1}:") print(f" Info: {statedump.get('info', '')}") print(f" State: {statedump.get('state', {})}") ``` `run_parser` 函数包含全面的验证: - 验证提供了日志类型和解析器代码 - 确保日志以字符串列表的形式提供 - 强制执行大小限制(每条日志 10MB,总计 50MB,最多 1000 条日志) - 为不同的失败场景提供详细的错误消息 ### 完整的解析器工作流示例 以下是一个完整的示例,演示了检索解析器、针对日志运行它以及提取解析后的 UDM 事件: ``` # Step 1: List and retrieve an OKTA parser parsers = chronicle.list_parsers(log_type="OKTA") parser_id = parsers[0]["name"].split("/")[-1] parser_details = chronicle.get_parser(log_type="OKTA", id=parser_id) # Extract and decode parser code import base64 parser_code = base64.b64decode(parser_details["cbn"]).decode('utf-8') # Step 2: Run the parser against a sample log okta_log = { "actor": {"alternateId": "user@example.com", "displayName": "Test User"}, "eventType": "user.account.lock", "outcome": {"result": "FAILURE", "reason": "LOCKED_OUT"}, "published": "2025-06-19T21:51:50.116Z" # ... other OKTA log fields } result = chronicle.run_parser( log_type="OKTA", parser_code=parser_code, parser_extension_code=None, logs=[json.dumps(okta_log)] ) # Step 3: Extract and ingest the parsed UDM event if result["runParserResults"][0]["parsedEvents"]: # parsedEvents is a dict with 'events' key containing the actual events list parsed_events_data = result["runParserResults"][0]["parsedEvents"] if isinstance(parsed_events_data, dict) and "events" in parsed_events_data: events = parsed_events_data["events"] if events and len(events) > 0: # Extract the first event if "event" in events[0]: udm_event = events[0]["event"] else: udm_event = events[0] # Ingest the parsed UDM event back into Chronicle ingest_result = chronicle.ingest_udm(udm_events=udm_event) print(f"UDM event ingested: {ingest_result}") ``` 此工作流可用于: - 在部署前测试解析器 - 了解日志如何转换为 UDM 格式 - 使用更新后的解析器重新处理日志 - 调试解析问题 ## 解析器扩展 解析器扩展提供了一种灵活的方法来扩展现有默认(或自定义)解析器的功能,而无需替换它们。扩展允许您通过添加新的解析逻辑、提取和转换字段以及更新或删除 UDM 字段映射来自定义解析器流水线。 SDK 提供了对管理 Chronicle 解析器扩展的全面支持: ### 列出解析器扩展 列出某个日志类型的解析器扩展: ``` log_type = "OKTA" extensions = chronicle.list_parser_extensions(log_type) print(f"Found {len(extensions["parserExtensions"])} parser extensions for log type: {log_type}") ``` ### 创建新的解析器扩展 使用 CBN 代码片段、字段提取器或动态解析创建新的解析器扩展: ``` log_type = "OKTA" field_extractor = { "extractors": [ { "preconditionPath": "severity", "preconditionValue": "Info", "preconditionOp": "EQUALS", "fieldPath": "displayMessage", "destinationPath": "udm.metadata.description", } ], "logFormat": "JSON", "appendRepeatedFields": True, } chronicle.create_parser_extension(log_type, field_extractor=field_extractor) ``` ### 获取解析器扩展 获取解析器扩展详情: ``` log_type = "OKTA" extension_id = "1234567890" extension = chronicle.get_parser_extension(log_type, extension_id) print(extension) ``` ### 激活解析器扩展 激活解析器扩展: ``` log_type = "OKTA" extension_id = "1234567890" chronicle.activate_parser_extension(log_type, extension_id) ``` ### 删除解析器扩展 删除解析器扩展: ``` log_type = "OKTA" extension_id = "1234567890" chronicle.delete_parser_extension(log_type, extension_id) ``` ## 关注列表管理 ### 创建关注列表 创建新的关注列表: ``` watchlist = chronicle.create_watchlist( name="my_watchlist", display_name="my_watchlist", multiplying_factor=1.5, description="My new watchlist" ) ``` ### 更新关注列表 按 ID 更新关注列表: ``` updated_watchlist = chronicle.update_watchlist( watchlist_id="abc-123-def", display_name="Updated Watchlist Name", description="Updated description", multiplying_factor=2.0, entity_population_mechanism={"manual": {}}, watchlist_user_preferences={"pinned": True} ) ``` ### 删除关注列表 按 ID 删除关注列表: ``` chronicle.delete_watchlist("acb-123-def", force=True) ``` ### 获取关注列表 按 ID 获取关注列表: ``` watchlist = chronicle.get_watchlist("acb-123-def") ``` ### 列出所有关注列表 列出所有关注列表: ``` # List watchlists (returns dict with pagination metadata) watchlists = chronicle.list_watchlists() for watchlist in watchlists.get("watchlists", []): print(f"Watchlist: {watchlist.get('displayName')}") # List watchlists as a direct list (automatically fetches all pages) watchlists = chronicle.list_watchlists(as_list=True) for watchlist in watchlists: print(f"Watchlist: {watchlist.get('displayName')}") ``` ## 规则管理 SDK 提供了对管理 Chronicle 检测规则的全面支持: ### 创建规则 使用 YARA-L 2.0 语法创建新的检测规则: ``` rule_text = """ rule simple_network_rule { meta: description = "Example rule to detect network connections" author = "SecOps SDK Example" severity = "Medium" priority = "Medium" yara_version = "YL2.0" rule_version = "1.0" events: $e.metadata.event_type = "NETWORK_CONNECTION" $e.principal.hostname != "" condition: $e } """ # Create the rule rule = chronicle.create_rule(rule_text) rule_id = rule.get("name", "").split("/")[-1] print(f"Rule ID: {rule_id}") ``` ### 管理规则 检索、列出、更新、启用/禁用和删除规则: ``` # List all rules rules = chronicle.list_rules() for rule in rules.get("rules", []): rule_id = rule.get("name", "").split("/")[-1] enabled = rule.get("deployment", {}).get("enabled", False) print(f"Rule ID: {rule_id}, Enabled: {enabled}") # List paginated rules and `REVISION_METADATA_ONLY` view rules = chronicle.list_rules(view="REVISION_METADATA_ONLY",page_size=50) print(f"Fetched {len(rules.get("rules"))} rules") # Get specific rule rule = chronicle.get_rule(rule_id) print(f"Rule content: {rule.get('text')}") # Update rule updated_rule = chronicle.update_rule(rule_id, updated_rule_text) # Enable/disable rule deployment = chronicle.enable_rule(rule_id, enabled=True) # Enable deployment = chronicle.enable_rule(rule_id, enabled=False) # Disable # Delete rule chronicle.delete_rule(rule_id) ``` ### 规则部署 管理规则的部署(启用/告警/归档状态及运行频率): ``` # Get current deployment for a rule deployment = chronicle.get_rule_deployment(rule_id) # List deployments (paginated) page = chronicle.list_rule_deployments(page_size=10) # List deployments with filter filtered = chronicle.list_rule_deployments(filter_query="enabled=true") # Update deployment fields (partial updates supported) chronicle.update_rule_deployment( rule_id=rule_id, enabled=True, # continuously execute alerting=False, # detections do not generate alerts run_frequency="LIVE" # LIVE | HOURLY | DAILY ) # Archive a rule (must set enabled to False when archived=True) chronicle.update_rule_deployment( rule_id=rule_id, archived=True ) ``` ### 搜索规则 使用正则表达式搜索规则: ``` # Search for rules containing specific patterns results = chronicle.search_rules("suspicious process") for rule in results.get("rules", []): rule_id = rule.get("name", "").split("/")[-1] print(f"Rule ID: {rule_id}, contains: 'suspicious process'") # Find rules mentioning a specific MITRE technique mitre_rules = chronicle.search_rules("T1055") print(f"Found {len(mitre_rules.get('rules', []))} rules mentioning T1055 technique") ``` ### 测试规则 针对历史数据测试规则,以在部署前验证其有效性: ``` from datetime import datetime, timedelta, timezone # Define time range for testing end_time = datetime.now(timezone.utc) start_time = end_time - timedelta(days=7) # Test against last 7 days # Rule to test rule_text = """ rule test_rule { meta: description = "Test rule for validation" author = "Test Author" severity = "Low" yara_version = "YL2.0" rule_version = "1.0" events: $e.metadata.event_type = "NETWORK_CONNECTION" condition: $e } """ # Test the rule test_results = chronicle.run_rule_test( rule_text=rule_text, start_time=start_time, end_time=end_time, max_results=100 ) # Process streaming results detection_count = 0 for result in test_results: result_type = result.get("type") if result_type == "progress": # Progress update percent_done = result.get("percentDone", 0) print(f"Progress: {percent_done}%") elif result_type == "detection": # Detection result detection_count += 1 detection = result.get("detection", {}) print(f"Detection {detection_count}:") # Process detection details if "rule_id" in detection: print(f" Rule ID: {detection['rule_id']}") if "data" in detection: print(f" Data: {detection['data']}") elif result_type == "error": # Error information print(f"Error: {result.get('message', 'Unknown error')}") print(f"Finished testing. Found {detection_count} detection(s).") ``` # 提取 UDM 事件以进行编程处理 ``` udm_events = [] for result in chronicle.run_rule_test(rule_text, start_time, end_time, max_results=100): if result.get("type") == "detection": detection = result.get("detection", {}) result_events = detection.get("resultEvents", {}) for var_name, var_data in result_events.items(): event_samples = var_data.get("eventSamples", []) for sample in event_samples: event = sample.get("event") if event: udm_events.append(event) # Process the UDM events for event in udm_events: # Process each UDM event metadata = event.get("metadata", {}) print(f"Event type: {metadata.get('eventType')}") ``` ### 回溯搜索 针对历史数据运行规则以查找过去的匹配项: ``` from datetime import datetime, timedelta, timezone # Set time range for retrohunt end_time = datetime.now(timezone.utc) start_time = end_time - timedelta(days=7) # Search past 7 days # Create retrohunt retrohunt = chronicle.create_retrohunt(rule_id, start_time, end_time) operation_id = retrohunt.get("name", "").split("/")[-1] # Check retrohunt status retrohunt_status = chronicle.get_retrohunt(rule_id, operation_id) state = retrohunt_status.get("state", "") # List retrohunts for a rule retrohunts = chronicle.list_retrohunts(rule_id) ``` ### 检测和错误 监控规则检测和执行错误: ``` from datetime import datetime, timedelta start_time = datetime.now(timezone.utc) end_time = start_time - timedelta(days=7) # List detections for a rule detections = chronicle.list_detections( rule_id=rule_id, start_time=start_time, end_time=end_time, list_basis="CREATED_TIME" ) for detection in detections.get("detections", []): detection_id = detection.get("id", "") event_time = detection.get("eventTime", "") alerting = detection.get("alertState", "") == "ALERTING" print(f"Detection: {detection_id}, Time: {event_time}, Alerting: {alerting}") # List execution errors for a rule errors = chronicle.list_errors(rule_id) for error in errors.get("ruleExecutionErrors", []): error_message = error.get("error_message", "") create_time = error.get("create_time", "") print(f"Error: {error_message}, Time: {create_time}") ``` ### 规则告警 搜索由规则生成的告警: ``` # Set time range for alert search end_time = datetime.now(timezone.utc) start_time = end_time - timedelta(days=7) # Search past 7 days # Search for rule alerts alerts_response = chronicle.search_rule_alerts( start_time=start_time, end_time=end_time, page_size=10 ) # The API returns a nested structure where alerts are grouped by rule # Extract and process all alerts from this structure all_alerts = [] too_many_alerts = alerts_response.get('tooManyAlerts', False) # Process the nested response structure - alerts are grouped by rule for rule_alert in alerts_response.get('ruleAlerts', []): # Extract rule metadata rule_metadata = rule_alert.get('ruleMetadata', {}) rule_id = rule_metadata.get('properties', {}).get('ruleId', 'Unknown') rule_name = rule_metadata.get('properties', {}).get('name', 'Unknown') # Get alerts for this rule rule_alerts = rule_alert.get('alerts', []) # Process each alert for alert in rule_alerts: # Extract important fields alert_id = alert.get("id", "") detection_time = alert.get("detectionTimestamp", "") commit_time = alert.get("commitTimestamp", "") alerting_type = alert.get("alertingType", "") print(f"Alert ID: {alert_id}") print(f"Rule ID: {rule_id}") print(f"Rule Name: {rule_name}") print(f"Detection Time: {detection_time}") # Extract events from the alert if 'resultEvents' in alert: for var_name, event_data in alert.get('resultEvents', {}).items(): if 'eventSamples' in event_data: for sample in event_data.get('eventSamples', []): if 'event' in sample: event = sample['event'] # Process event data event_type = event.get('metadata', {}).get('eventType', 'Unknown') print(f"Event Type: {event_type}") ``` 如果响应中的 `tooManyAlerts` 为 True,请考虑使用更小的时间窗口或更具体的过滤器来缩小搜索范围。 ### 策划规则集 查询策划规则: ``` # List all curated rules (returns dict with pagination metadata) result = chronicle.list_curated_rules() for rule in result.get("curatedRules", []): rule_id = rule.get("name", "").split("/")[-1] display_name = rule.get("description") description = rule.get("description") print(f"Rule: {display_name}, Description: {description}") # List all curated rules as a direct list rules = chronicle.list_curated_rules(as_list=True) for rule in rules: rule_id = rule.get("name", "").split("/")[-1] display_name = rule.get("description") print(f"Rule: {display_name}") # Get a curated rule rule = chronicle.get_curated_rule("ur_ttp_lol_Atbroker") # Get a curated rule set by display name # NOTE: This is a linear scan of all curated rules which may be inefficient for large rule sets. rule_set = chronicle.get_curated_rule_by_name("Atbroker.exe Abuse") ``` 搜索策划规则的检测: ``` from datetime import datetime, timedelta, timezone from secops.chronicle.models import AlertState, ListBasis # Search for detections from a specific curated rule end_time = datetime.now(timezone.utc) start_time = end_time - timedelta(days=7) result = chronicle.search_curated_detections( rule_id="ur_ttp_GCP_MassSecretDeletion", start_time=start_time, end_time=end_time, list_basis=ListBasis.DETECTION_TIME, alert_state=AlertState.ALERTING, page_size=100 ) detections = result.get("curatedDetections", []) print(f"Found {len(detections)} detections") # Check if more results are available if "nextPageToken" in result: # Retrieve next page next_result = chronicle.search_curated_detections( rule_id="ur_ttp_GCP_MassSecretDeletion", start_time=start_time, end_time=end_time, list_basis=ListBasis.DETECTION_TIME, page_token=result["nextPageToken"], page_size=100 ) ``` 查询策划规则集: ``` # List all curated rule sets (returns dict with pagination metadata) result = chronicle.list_curated_rule_sets() for rule_set in result.get("curatedRuleSets", []): rule_set_id = rule_set.get("name", "").split("/")[-1] display_name = rule_set.get("displayName") print(f"Rule Set: {display_name}, ID: {rule_set_id}") # List all curated rule sets as a direct list rule_sets = chronicle.list_curated_rule_sets(as_list=True) for rule_set in rule_sets: rule_set_id = rule_set.get("name", "").split("/")[-1] display_name = rule_set.get("displayName") print(f"Rule Set: {display_name}, ID: {rule_set_id}") # Get a curated rule set by ID rule_set = chronicle.get_curated_rule_set("00ad672e-ebb3-0dd1-2a4d-99bd7c5e5f93") ``` 查询策划规则集类别: ``` # List all curated rule set categories (returns dict with pagination metadata) result = chronicle.list_curated_rule_set_categories() for rule_set_category in result.get("curatedRuleSetCategories", []): rule_set_category_id = rule_set_category.get("name", "").split("/")[-1] display_name = rule_set_category.get("displayName") print(f"Rule Set Category: {display_name}, ID: {rule_set_category_id}") # List all curated rule set categories as a direct list rule_set_categories = chronicle.list_curated_rule_set_categories(as_list=True) for rule_set_category in rule_set_categories: rule_set_category_id = rule_set_category.get("name", "").split("/")[-1] display_name = rule_set_category.get("displayName") print(f"Rule Set Category: {display_name}, ID: {rule_set_category_id}") # Get a curated rule set category by ID rule_set_category = chronicle.get_curated_rule_set_category("110fa43d-7165-2355-1985-a63b7cdf90e8") ``` 管理策划规则集部署(为策划规则集开启或关闭告警(精确或广泛)): ``` # List all curated rule set deployments (returns dict with pagination metadata) result = chronicle.list_curated_rule_set_deployments() for rs_deployment in result.get("curatedRuleSetDeployments", []): rule_set_id = rs_deployment.get("name", "").split("/")[-3] category_id = rs_deployment.get("name", "").split("/")[-5] deployment_status = rs_deployment.get("name", "").split("/")[-1] display_name = rs_deployment.get("displayName") alerting = rs_deployment.get("alerting", False) print( f"Rule Set: {display_name}," f"Rule Set ID: {rule_set_id}", f"Category ID: {category_id}", f"Precision: {deployment_status}", f"Alerting: {alerting}", ) # List all curated rule set deployments as a direct list rule_set_deployments = chronicle.list_curated_rule_set_deployments(as_list=True) for rs_deployment in rule_set_deployments: rule_set_id = rs_deployment.get("name", "").split("/")[-3] display_name = rs_deployment.get("displayName") print(f"Rule Set: {display_name}, ID: {rule_set_id}") # Get curated rule set deployment by ID rule_set_deployment = chronicle.get_curated_rule_set_deployment("00ad672e-ebb3-0dd1-2a4d-99bd7c5e5f93") # Get curated rule set deployment by rule set display name # NOTE: This is a linear scan of all curated rules which may be inefficient for large rule sets. rule_set_deployment = chronicle.get_curated_rule_set_deployment_by_name("Azure - Network") # Update multiple curated rule set deployments # Define deployments for rule sets deployments = [ { "category_id": "category-uuid", "rule_set_id": "ruleset-uuid", "precision": "broad", "enabled": True, "alerting": False } ] chronicle.batch_update_curated_rule_set_deployments(deployments) # Update a single curated rule set deployment chronicle.update_curated_rule_set_deployment( category_id="category-uuid", rule_set_id="ruleset-uuid", precision="broad", enabled=True, alerting=False ) ``` ### 规则验证 在创建或更新 YARA-L2 规则之前对其进行验证: ``` # Example rule rule_text = """ rule test_rule { meta: description = "Test rule for validation" author = "Test Author" severity = "Low" yara_version = "YL2.0" rule_version = "1.0" events: $e.metadata.event_type = "NETWORK_CONNECTION" condition: $e } """ # Validate the rule result = chronicle.validate_rule(rule_text) if result.success: print("Rule is valid") else: print(f"Rule is invalid: {result.message}") if result.position: print(f"Error at line {result.position['startLine']}, column {result.position['startColumn']}") ``` ### 规则排除 规则排除允许您将特定事件从 Chronicle 的检测触发中排除。它们对于过滤掉已知的误报或从生产环境检测中排除测试/开发流量非常有用。 ``` from secops.chronicle.rule_exclusion import RuleExclusionType from datetime import datetime, timedelta # Create a new rule exclusion exclusion = chronicle.create_rule_exclusion( display_name="Exclusion Display Name", refinement_type=RuleExclusionType.DETECTION_EXCLUSION, query='(domain = "google.com")' ) # Get exclusion id from name exclusion_id = exclusion["name"].split("/")[-1] # Get details of a specific rule exclusion exclusion_details = chronicle.get_rule_exclusion(exclusion_id) print(f"Exclusion: {exclusion_details.get('display_name')}") print(f"Query: {exclusion_details.get('query')}") # List all rule exclusions with pagination exclusions = chronicle.list_rule_exclusions(page_size=10) for exclusion in exclusions.get("findingsRefinements", []): print(f"- {exclusion.get('display_name')}: {exclusion.get('name')}") # Update an existing exclusion updated = chronicle.patch_rule_exclusion( exclusion_id=exclusion_id, display_name="Updated Exclusion", query='(ip = "8.8.8.8")', update_mask="display_name,query" ) # Manage deployment settings chronicle.update_rule_exclusion_deployment( exclusion_id, enabled=True, archived=False, detection_exclusion_application={ "curatedRules": [], "curatedRuleSets": [], "rules": [], } ) # Compute rule exclusion Activity for provided time period end_time = datetime.utcnow() start_time = end_time - timedelta(days=7) activity = chronicle.compute_rule_exclusion_activity( exclusion_id, start_time=start_time, end_time=end_time ) ``` ### 精选内容规则 精选内容规则是 Chronicle Content Hub 中提供的预构建检测规则。这些策划的规则可帮助您快速部署检测,而无需编写自定义规则。 ``` # List all featured content rules rules = chronicle.list_featured_content_rules() for rule in rules.get("featuredContentRules", []): rule_id = rule.get("name", "").split("/")[-1] content_metadata = rule.get("contentMetadata", {}) display_name = content_metadata.get("displayName", "Unknown") severity = rule.get("severity", "UNSPECIFIED") print(f"Rule: {display_name} [{rule_id}] - Severity: {severity}") # List with pagination result = chronicle.list_featured_content_rules(page_size=10) rules = result.get("featuredContentRules", []) next_page_token = result.get("nextPageToken") if next_page_token: next_page = chronicle.list_featured_content_rules( page_size=10, page_token=next_page_token ) # Filter list filtered_rules = chronicle.list_featured_content_rules( filter_expression=( 'category_name:"Threat Detection" AND ' 'rule_precision:"Precise"' ) ) ``` ## 数据表和参考列表 Chronicle 提供了两种在检测规则中管理和引用结构化数据的方法:数据表和参考列表。它们可用于维护受信任/可疑实体的列表、上下文信息的映射或任何其他用于检测的结构化数据。 ### 数据表 数据表是具有已定义列和数据类型的结构化数据集合。可以在检测规则中引用它们,以使用附加上下文来增强您的检测。 #### 创建数据表 ``` from secops.chronicle.data_table import DataTableColumnType # Create a data table with different column types data_table = chronicle.create_data_table( name="suspicious_ips", description="Known suspicious IP addresses with context", header={ "ip_address": DataTableColumnType.CIDR, # Alternately, you can map a column to an entity proto field # See: https://cloud.google.com/chronicle/docs/investigation/data-tables#map_column_names_to_entity_fields_optional # "ip_address": "entity.asset.ip" "port": DataTableColumnType.NUMBER, "severity": DataTableColumnType.STRING, "description": DataTableColumnType.STRING }, # Optional: Set additional column options (valid options: repeatedValues, keyColumns) column_options: {"ip_address": {"repeatedValues": True}}, # Optional: Add initial rows rows=[ ["192.168.1.100", 3232, "High", "Scanning activity"], ["10.0.0.5", 9000, "Medium", "Suspicious login attempts"] ] ) print(f"Created table: {data_table['name']}") ``` #### 管理数据表 ``` # List all data tables tables = chronicle.list_data_tables() for table in tables: table_id = table["name"].split("/")[-1] print(f"Table: {table_id}, Created: {table.get('createTime')}") # Get a specific data table's details table_details = chronicle.get_data_table("suspicious_ips") print(f"Column count: {len(table_details.get('columnInfo', []))}") # Update a data table's properties updated_table = chronicle.update_data_table( "suspicious_ips", description="Updated description for suspicious IPs", row_time_to_live="72h" # Set TTL for rows to 72 hours update_mask=["description", "row_time_to_live"] ) print(f"Updated data table: {updated_table['name']}") # Add rows to a data table chronicle.create_data_table_rows( "suspicious_ips", [ ["172.16.0.1", "Low", "Unusual outbound connection"], ["192.168.2.200", "Critical", "Data exfiltration attempt"] ] ) # List rows in a data table rows = chronicle.list_data_table_rows("suspicious_ips") for row in rows: row_id = row["name"].split("/")[-1] values = row.get("values", []) print(f"Row {row_id}: {values}") # Delete specific rows by ID row_ids = [rows[0]["name"].split("/")[-1], rows[1]["name"].split("/")[-1]] chronicle.delete_data_table_rows("suspicious_ips", row_ids) # Replace all rows in a data table with new rows chronicle.replace_data_table_rows( name="suspicious_ips", # Data table Name rows=[ ["192.168.100.1", "Critical", "Active scanning"], ["10.1.1.5", "High", "Brute force attempts"], ["172.16.5.10", "Medium", "Suspicious traffic"] ] ) # Bulk update rows in a data table row_updates = [ { "name": "projects/my-project/locations/us/instances/my-instance/dataTables/suspicious_ips/dataTableRows/row123", # Full resource name "values": ["192.168.100.1", "Critical", "Updated description"] }, { "name": "projects/my-project/locations/us/instances/my-instance/dataTables/suspicious_ips/dataTableRows/row456", # Full resource name "values": ["10.1.1.5", "High", "Updated brute force info"], "update_mask": "values" # Optional: only update values field } ] # Execute bulk update chronicle.update_data_table_rows( name="suspicious_ips", row_updates=row_updates ) # Delete a data table chronicle.delete_data_table("suspicious_ips", force=True) # force=True deletes even if it has rows ``` ### 参考列表 参考列表是简单的值列表(字符串、CIDR 块或正则表达式模式),可以在检测规则中引用。它们对于维护白名单、黑名单或任何其他分类的值集非常有用。 #### 创建参考列表 ``` from secops.chronicle.reference_list import ReferenceListSyntaxType, ReferenceListView # Create a reference list with string entries string_list = chronicle.create_reference_list( name="admin_accounts", description="Administrative user accounts", entries=["admin", "administrator", "root", "system"], syntax_type=ReferenceListSyntaxType.STRING ) print(f"Created reference list: {string_list['name']}") # Create a reference list with CIDR entries cidr_list = chronicle.create_reference_list( name="trusted_networks", description="Internal network ranges", entries=["10.0.0.0/8", "192.168.0.0/16", "172.16.0.0/12"], syntax_type=ReferenceListSyntaxType.CIDR ) # Create a reference list with regex patterns regex_list = chronicle.create_reference_list( name="email_patterns", description="Email patterns to watch for", entries=[".*@suspicious\\.com", "malicious_.*@.*\\.org"], syntax_type=ReferenceListSyntaxType.REGEX ) ``` #### 管理参考列表 ``` # List all reference lists (basic view without entries) lists = chronicle.list_reference_lists(view=ReferenceListView.BASIC) for ref_list in lists: list_id = ref_list["name"].split("/")[-1] print(f"List: {list_id}, Description: {ref_list.get('description')}") # Get a specific reference list including all entries admin_list = chronicle.get_reference_list("admin_accounts", view=ReferenceListView.FULL) entries = [entry.get("value") for entry in admin_list.get("entries", [])] print(f"Admin accounts: {entries}") # Update reference list entries chronicle.update_reference_list( name="admin_accounts", entries=["admin", "administrator", "root", "system", "superuser"] ) # Update reference list description chronicle.update_reference_list( name="admin_accounts", description="Updated administrative user accounts list" ) ``` ### 在 YARA-L 规则中使用 数据表和参考列表均可在 YARA-L 检测规则中引用。 #### 在规则中使用数据表 ``` rule detect_with_data_table { meta: description = "Detect connections to suspicious IPs" author = "SecOps SDK Example" severity = "Medium" yara_version = "YL2.0" events: $e.metadata.event_type = "NETWORK_CONNECTION" $e.target.ip != "" $lookup in data_table.suspicious_ips $lookup.ip_address = $e.target.ip $severity = $lookup.severity condition: $e and $lookup and $severity = "High" } ``` #### 在规则中使用参考列表 ``` rule detect_with_reference_list { meta: description = "Detect admin account usage from untrusted networks" author = "SecOps SDK Example" severity = "High" yara_version = "YL2.0" events: $login.metadata.event_type = "USER_LOGIN" $login.principal.user.userid in reference_list.admin_accounts not $login.principal.ip in reference_list.trusted_networks condition: $login } ``` ## Gemini AI 您可以使用 Chronicle 的 Gemini AI 来获取安全洞察、生成检测规则、解释安全概念等: ``` # Query Gemini with a security question response = chronicle.gemini("What is Windows event ID 4625?") # Get text content (combines TEXT blocks and stripped HTML content) text_explanation = response.get_text_content() print("Explanation:", text_explanation) # Work with different content blocks for block in response.blocks: print(f"Block type: {block.block_type}") if block.block_type == "TEXT": print("Text content:", block.content) elif block.block_type == "CODE": print(f"Code ({block.title}):", block.content) elif block.block_type == "HTML": print("HTML content (with tags):", block.content) # Get all code blocks code_blocks = response.get_code_blocks() for code_block in code_blocks: print(f"Code block ({code_block.title}):", code_block.content) # Get all HTML blocks (with HTML tags preserved) html_blocks = response.get_html_blocks() for html_block in html_blocks: print(f"HTML block (with tags):", html_block.content) # Check for references if response.references: print(f"Found {len(response.references)} references") # Check for suggested actions for action in response.suggested_actions: print(f"Suggested action: {action.display_text} ({action.action_type})") if action.navigation: print(f"Action URI: {action.navigation.target_uri}") ``` ### 响应内容方法 `GeminiResponse` 类提供了多种方法来处理响应内容: - `get_text_content()`:返回所有 TEXT 块的组合字符串,以及移除了 HTML 标签的 HTML 块中的文本内容 - `get_code_blocks()`:返回 `block_type == "CODE"` 的块列表 - `get_html_blocks()`:返回 `block_type == "HTML"` 的块列表(保留 HTML 标签) - `get_raw_response()`:以字典形式返回完整的、未处理的 API 响应 这些方法帮助您以结构化的方式处理不同类型的内容。 ### 访问原始 API 响应 对于高级用例或调试,您可以访问原始 API 响应: ``` # Get the complete raw API response response = chronicle.gemini("What is Windows event ID 4625?") raw_response = response.get_raw_response() # Now you can access any part of the original JSON structure print(json.dumps(raw_response, indent=2)) # Example of navigating the raw response structure if "responses" in raw_response: for resp in raw_response["responses"]: if "blocks" in resp: print(f"Found {len(resp['blocks'])} blocks in raw response") ``` 这允许您直接访问原始的 API 响应格式,这对于访问高级功能或故障排除非常有用。 ### 手动选择加入 如果您的账号拥有足够的权限,您可以在使用前手动选择启用 Gemini: ``` # Manually opt-in to Gemini opt_success = chronicle.opt_in_to_gemini() if opt_success: print("Successfully opted in to Gemini") else: print("Unable to opt-in due to permission issues") # Then use Gemini as normal response = chronicle.gemini("What is Windows event ID 4625?") ``` 这在您希望明确控制何时进行选择启用的环境中非常有用。 ### 生成检测规则 Chronicle Gemini 可以生成用于检测的 YARA-L 规则: ``` # Generate a rule to detect potential security issues rule_response = chronicle.gemini("Write a rule to detect powershell downloading a file called gdp.zip") # Extract the generated rule(s) code_blocks = rule_response.get_code_blocks() if code_blocks: rule = code_blocks[0].content print("Generated rule:", rule) # Check for rule editor action for action in rule_response.suggested_actions: if action.display_text == "Open in Rule Editor" and action.action_type == "NAVIGATION": rule_editor_url = action.navigation.target_uri print("Rule can be opened in editor:", rule_editor_url) ``` ### 获取威胁情报信息 获取有关恶意软件、威胁行为者、文件、漏洞的详细信息: ``` # Ask about a CVE cve_response = chronicle.gemini("tell me about CVE-2021-44228") # Get the explanation cve_explanation = cve_response.get_text_content() print("CVE explanation:", cve_explanation) ``` ### 维护对话上下文 您可以通过重用相同的对话 ID 来维护对话上下文: ``` # Start a conversation initial_response = chronicle.gemini("What is a DDoS attack?") # Get the conversation ID from the response conversation_id = initial_response.name.split('/')[-3] # Extract from format: .../conversations/{id}/messages/{id} # Ask a follow-up question in the same conversation context followup_response = chronicle.gemini( "What are the most common mitigation techniques?", conversation_id=conversation_id ) # Gemini will remember the context of the previous question about DDoS ``` ## Feed 管理 Feed 用于将数据摄入到 Chronicle 中。SDK 提供了管理 Feed 的方法。 ``` import json # List existing feeds feeds = chronicle.list_feeds() print(f"Found {len(feeds)} feeds") # Create a new feed feed_details = { "logType": f"projects/your-project-id/locations/us/instances/your-chronicle-instance-id/logTypes/WINEVTLOG", "feedSourceType": "HTTP", "httpSettings": { "uri": "https://example.com/example_feed", "sourceType": "FILES", }, "labels": {"environment": "production", "created_by": "secops_sdk"} } created_feed = chronicle.create_feed( display_name="My New Feed", details=feed_details ) # Get feed ID from name feed_id = created_feed["name"].split("/")[-1] print(f"Feed created with ID: {feed_id}") # Get feed details feed_details = chronicle.get_feed(feed_id) print(f"Feed state: {feed_details.get('state')}") # Update feed updated_details = { "httpSettings": { "uri": "https://example.com/updated_feed_url", "sourceType": "FILES" }, "labels": {"environment": "production", "updated": "true"} } updated_feed = chronicle.update_feed( feed_id=feed_id, display_name="Updated Feed Name", details=updated_details ) # Disable feed disabled_feed = chronicle.disable_feed(feed_id) print(f"Feed disabled. State: {disabled_feed.get('state')}") # Enable feed enabled_feed = chronicle.enable_feed(feed_id) print(f"Feed enabled. State: {enabled_feed.get('state')}") # Generate secret for feed (for supported feed types) try: secret_result = chronicle.generate_secret(feed_id) print(f"Generated secret: {secret_result.get('secret')}") except Exception as e: print(f"Error generating secret for feed: {e}") # Delete feed chronicle.delete_feed(feed_id) print("Feed deleted successfully") ``` Feed API 支持不同的 Feed 类型,例如 HTTP、HTTPS Push 和 S3 存储桶数据源等。每种 Feed 类型都有特定的配置选项,可以在 `details` 字典中指定。 ## Chronicle 仪表板 Chronicle Dashboard API 提供了在 Chronicle 中管理原生仪表板和仪表板图表的方法。 ### 创建仪表板 ``` # Create a dashboard dashboard = chronicle.create_dashboard( display_name="Security Overview", description="Dashboard showing security metrics", access_type="PRIVATE" # "PRIVATE" or "PUBLIC" ) dashboard_id = dashboard["name"].split("/")[-1] print(f"Created dashboard with ID: {dashboard_id}") ``` ### 获取特定仪表板详情 ``` # Get a specific dashboard dashboard = chronicle.get_dashboard( dashboard_id="dashboard-id-here", view="FULL" # Optional: "BASIC" or "FULL" ) print(f"Dashboard Details: {dashboard}") ``` ### 列出仪表板 ``` dashboards = chronicle.list_dashboards() for dashboard in dashboards.get("nativeDashboards", []): print(f"- {dashboard.get('displayName')}") # List dashboards with pagination(first page) dashboards = chronicle.list_dashboards(page_size=10) for dashboard in dashboards.get("nativeDashboards", []): print(f"- {dashboard.get('displayName')}") # Get next page if available if "nextPageToken" in dashboards: next_page = chronicle.list_dashboards( page_size=10, page_token=dashboards["nextPageToken"] ) ``` ### 更新现有仪表板详情 ``` filters = [ { "id": "GlobalTimeFilter", "dataSource": "GLOBAL", "filterOperatorAndFieldValues": [ {"filterOperator": "PAST", "fieldValues": ["7", "DAY"]} ], "displayName": "Global Time Filter", "chartIds": [], "isStandardTimeRangeFilter": True, "isStandardTimeRangeFilterEnabled": True, } ] charts = [ { "dashboardChart": "projects//locations//instances//dashboardCharts/", "chartLayout": {"startX": 0, "spanX": 48, "startY": 0, "spanY": 26}, "filtersIds": ["GlobalTimeFilter"], } ] # Update a dashboard updated_dashboard = chronicle.update_dashboard( dashboard_id="dashboard-id-here", display_name="Updated Security Dashboard", filters=filters, charts=charts, ) print(f"Updated dashboard: {updated_dashboard['displayName']}") ``` ### 复制现有仪表板 ``` # Duplicate a dashboard duplicate = chronicle.duplicate_dashboard( dashboard_id="dashboard-id-here", display_name="Copy of Security Dashboard", access_type="PRIVATE" ) duplicate_id = duplicate["name"].split("/")[-1] print(f"Created duplicate dashboard with ID: {duplicate_id}") ``` #### 导入仪表板 从 JSON 文件导入仪表板。 ``` import os from secops.chronicle import client # Assumes the CHRONICLE_SA_KEY environment variable is set with service account JSON chronicle_client = client.Client() # Path to the dashboard file dashboard = { "dashboard": {...} "dashboardCharts": [...], "dashboardQueries": [...] } # Import the dashboard try: new_dashboard = chronicle_client.import_dashboard(dashboard) print(new_dashboard) except Exception as e: print(f"An error occurred: {e}") ``` #### 导出仪表板 将仪表板导出为字典。 ``` # Export a dashboard dashboards = chronicle.export_dashboard(dashboard_names=[""]) ``` ### 向现有仪表板添加图表 ``` # Define chart configuration query = """ metadata.event_type = "NETWORK_DNS" match: principal.hostname outcome: $dns_query_count = count(metadata.id) order: principal.hostname asc """ chart_layout = { "startX": 0, "spanX": 12, "startY": 0, "spanY": 8 } chart_datasource = { "dataSources": ["UDM"] } interval = { "relativeTime": { "timeUnit": "DAY", "startTimeVal": "1" } } # Add chart to dashboard chart = chronicle.add_chart( dashboard_id="dashboard-id-here", display_name="DNS Query Metrics", query=query, chart_layout=chart_layout, chart_datasource=chart_datasource, interval=interval, tile_type="VISUALIZATION" # Option: "VISUALIZATION" or "BUTTOn" ) ``` ### 获取仪表板图表详情 ``` # Get dashboard chart details dashboard_chart = chronicle.get_chart( chart_id="chart-id-here" ) print(f"Dashboard Chart Details: {dashboard_chart}") ``` ### 编辑仪表板图表 ``` # Dashboard Query updated details updated_dashboard_query={ "name": "project//instance//dashboardQueries/", "query": 'metadata.event_type = "USER_LOGIN"\nmatch:\n principal.user.userid\noutcome:\n $logon_count = count(metadata.id)\norder:\n $logon_count desc\nlimit: 10', "input": {"relativeTime": {"timeUnit": "DAY", "startTimeVal": "1"}}, "etag": "123456", # Latest etag from server } # Dashboard Chart updated details updated_dashboard_chart={ "name": "project//instance//dashboardCharts/", "display_name": "Updated chart display Name", "description": "Updated chart description", "etag": "12345466", # latest etag from server "visualization": {}, "chart_datasource":{"data_sources":[]} } updated_chart = chronicle.edit_chart( dashboard_id="dashboard-id-here", dashboard_chart=updated_dashboard_chart, dashboard_query=updated_dashboard_query ) print(f"Updated dashboard chart: {updated_chart}") ``` ### 从现有仪表板中移除图表 ``` # Remove chart from dashboard chronicle.remove_chart( dashboard_id="dashboard-id-here", chart_id="chart-id-here" ) ``` ### 删除现有仪表板 ``` # Delete a dashboard chronicle.delete_dashboard(dashboard_id="dashboard-id-here") print("Dashboard deleted successfully") ``` ## 仪表板查询 Chronicle Dashboard Query API 提供了无需创建仪表板即可执行仪表板查询以及获取仪表板查询详情的方法。 ### 执行仪表板查询 ``` # Define query and time interval query = """ metadata.event_type = "USER_LOGIN" match: principal.user.userid outcome: $logon_count = count(metadata.id) order: $logon_count desc limit: 10 """ interval = { "relativeTime": { "timeUnit": "DAY", "startTimeVal": "7" } } # Execute the query results = chronicle.execute_dashboard_query( query=query, interval=interval ) # Process results for result in results.get("results", []): print(result) ``` ### 获取仪表板查询详情 ``` # Get dashboard query details dashboard_query = chronicle.get_dashboard_query( query_id="query-id-here" ) print(f"Dashboard Query Details: {dashboard_query}") ``` ## 错误处理 SDK 定义了几种自定义异常: ``` from secops.exceptions import SecOpsError, AuthenticationError, APIError try: results = chronicle.search_udm(...) except AuthenticationError as e: print(f"Authentication failed: {e}") except APIError as e: print(f"API request failed: {e}") except SecOpsError as e: print(f"General error: {e}") ``` ## 值类型检测 使用 `summarize_entity` 函数时,SDK 会自动检测最常见的实体类型: - IP 地址(IPv4 和 IPv6) - MD5/SHA1/SHA256 哈希 - 域名 - 电子邮件地址 - MAC 地址 - 主机名 这种检测在 `summarize_entity` 内部进行,简化了其使用。您只需提供 `value` 参数。 ``` # The SDK automatically determines how to query for these values ip_summary = chronicle.summarize_entity(value="192.168.1.100", ...) domain_summary = chronicle.summarize_entity(value="example.com", ...) hash_summary = chronicle.summarize_entity(value="e17dd4eef8b4978673791ef4672f4f6a", ...) ``` 如果自动检测可能存在歧义(例如,字符串可能是用户名或主机名),您可以选择向 `summarize_entity` 提供 `preferred_entity_type` 提示。 ## 许可证 本项目基于 Apache License 2.0 授权 - [详见 LICENSE 文件。](https://github.com/google/secops-wrapper/blob/main/LICENSE)
标签:API封装, Chronicle, DFIR, Google Cloud, Google SecOps, IOC管理, Python SDK, SOAR, UDM搜索, 告警管理, 威胁情报, 安全运营, 实体查询, 开发工具包, 开发者工具, 扫描框架, 案件管理, 检测规则, 网络安全, 网络资产发现, 逆向工具, 隐私保护