GoogleCloudPlatform/df-ml-anomaly-detection

GitHub: GoogleCloudPlatform/df-ml-anomaly-detection

基于谷歌云组件的实时异常检测参考方案,通过机器学习分析 Netflow 日志与金融交易,用于识别网络安全威胁及欺诈活动。

Stars: 192 | Forks: 49

# 使用 Google Cloud Stream Analytics 和 AI 服务进行实时异常检测 此仓库提供了一个 Cloud Dataflow 流水线参考实现,该流水线与 BigQuery ML、Cloud AI Platform 和 AutoML(即将推出!)集成,作为实时 AI 模式的一部分执行异常检测用例。它包含以下实时异常检测用例的参考实现: 1. 在 netflow 日志中发现异常行为,以识别电信用例中的网络安全威胁。 2. 发现异常交易,以识别金融服务用例中的欺诈活动。 ## 目录 * [Netflow 日志中的异常检测](#anomaly-detection-in-netflow-log). * [参考架构](#anomaly-detection-reference-architecture-using-bqml). * [快速开始](#quick-start). * [了解更多](#learn-more-about-this-solution). * [使用 Dataflow 向 Pub/Sub 生成模拟数据](#test). * [使用 BQ ML 训练和归一化数据](#create-a-k-means-model-using-bq-ml ) * [使用 Dataflow 进行特征提取](#feature-extraction-after-aggregation). * [使用 Dataflow 进行实时异常值检测](#find-the-outliers). * [使用 Cloud DLP 对敏感数据 (IMSI) 进行去标识化](#dlp-integration). * [Looker 集成](#looker-integration). * [金融交易中的异常检测](#anomaly-detection-in-financial-transactions). * [参考架构](#anomaly-detection-reference-architecture-using-cloud-ai). * [构建与运行](#build--run-1) * [测试](#test-1) ## Netflow 日志中的异常检测 本节包含一个基于 ML 的网络异常检测解决方案的参考实现,使用了 Pub/Sub、Dataflow、BQML 和 Cloud DLP。它使用 BQML 中易于使用的内置 K-Means 聚类模型来训练和归一化 netflow 日志数据。实现的关键部分使用 Dataflow 进行特征提取和实时异常值检测,该部分已经过测试,可处理超过 20TB 的数据(250k 消息/秒)。最后,它还使用 Cloud DLP 对 IMSI(国际移动用户识别码)进行标记化处理,因为流式 Dataflow 流水线从 Pub/Sub 接入数百万条 netflow 日志。 对于许多客户而言,保护其内部网络免受恶意软件和安全威胁的侵害至关重要。随着恶意软件环境的不断变化以及物联网 (IoT) 和机器对机器 (M2M) 活动的爆发,现有的基于签名的恶意软件检测解决方案已不再足够。此概念验证 (PoC) 重点介绍了一个基于 ML 的网络异常检测解决方案,使用 PubSub、Dataflow、BQ ML 和 DLP 来检测订阅者设备上的移动恶意软件以及无线网络中的可疑行为。 此解决方案实现了下面重点介绍的参考架构。您将执行一个 dataflow streaming pipeline 来处理来自 GCS 和/或 PubSub 的 netflow 日志,以实时发现 netflow 日志中的异常值。此解决方案还使用了通过 BQ-ML 创建的内置 K-Means 聚类模型。要查看指导您实现此解决方案的分步教程,请参阅 [使用 Dataflow、BigQuery ML 和 Cloud Data Loss Prevention 构建安全的异常检测解决方案](https://cloud.google.com/solutions/building-anomaly-detection-dataflow-bigqueryml-dlp)。 总之,您可以使用此解决方案演示以下 3 个用例: 1. 通过使用 Dataflow/Beam 实现大规模流式分析。(特征提取和在线预测)。 2. 通过使用 BQ ML K-Means 聚类创建模型,让机器学习变得简单易行。 3. 通过使用 Cloud DLP 基于加密的标记化来保护敏感信息,例如:“IMSI(国际移动用户识别码)”。 ## 使用 BQML 的异常检测参考架构 ![ref_arch](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/2132078625214227.png) ## 快速开始 [![Open in Cloud Shell](http://gstatic.com/cloudssh/images/open-btn.svg)](https://console.cloud.google.com/cloudshell/editor?cloudshell_git_repo=https://github.com/GoogleCloudPlatform/df-ml-anomaly-detection.git) ### 启用 API ``` gcloud services enable storage_component gcloud services enable dataflow gcloud services enable cloudbuild.googleapis.com gcloud config set project ``` ### 访问 Cloud Build 服务帐号 ``` export PROJECT_NUMBER=$(gcloud projects list --filter=${PROJECT_ID} --format="value(PROJECT_NUMBER)") gcloud projects add-iam-policy-binding ${PROJECT_ID} --member serviceAccount:$PROJECT_NUMBER@cloudbuild.gserviceaccount.com --role roles/editor gcloud projects add-iam-policy-binding ${PROJECT_ID} --member serviceAccount:$PROJECT_NUMBER@cloudbuild.gserviceaccount.com --role roles/storage.objectAdmin ``` #### 导出所需参数 ``` export DATASET=bq-dataset-name export SUBSCRIPTION_ID=subscription_id export TOPIC_ID=topic_id export DATA_STORAGE_BUCKET=${PROJECT_ID}-data-storage-bucket ``` 您还可以导出 DLP 模板和批处理大小以在流水线中启用 DLP 转换 * 批处理大小以字节为单位,允许的最大值小于 520KB/payload ``` export DEID_TEMPLATE=projects/{id}/deidentifyTemplates/{template_id} export BATCH_SIZE = 350000 ``` #### 触发 Cloud Build 脚本 ``` gcloud builds submit scripts/. --config scripts/cloud-build-demo.yaml --substitutions \ _DATASET=$DATASET,\ _DATA_STORAGE_BUCKET=$DATA_STORAGE_BUCKET,\ _SUBSCRIPTION_ID=${SUBSCRIPTION_ID},\ _TOPIC_ID=${TOPIC_ID},\ _API_KEY=$(gcloud auth print-access-token) ``` #### (可选)使用 flex template 触发流水线 如果您已经拥有所有其他资源(如 BigQuery 表、PubSub 主题和订阅者、GCS 存储桶)或之前已创建,您可以使用以下命令通过公共镜像触发流水线。这对于运行流水线进行实时演示可能会有所帮助。 生成 10k 消息/秒的随机 netflow 日志数据: ``` gcloud beta dataflow flex-template run data-generator --project= --region= --template-file-gcs-location=gs://df-ml-anomaly-detection-mock-data/dataflow-flex-template/dynamic_template_data_generator_template.json --parameters=autoscalingAlgorithm="NONE",numWorkers=5,maxNumWorkers=5,workerMachineType=n1-standard-4,qps=10000,schemaLocation=gs://df-ml-anomaly-detection-mock-data/schema/next-demo-schema.json,eventType=net-flow-log,topic=projects//topics/events ``` 生成 1k 消息/秒的随机异常值数据: ``` gcloud beta dataflow flex-template run data-generator --project= --region= --template-file-gcs-location=gs://df-ml-anomaly-detection-mock-data/dataflow-flex-template/dynamic_template_data_generator_template.json --parameters=autoscalingAlgorithm="NONE",numWorkers=5,maxNumWorkers=5,workerMachineType=n1-standard-4,qps=1000,schemaLocation=gs://df-ml-anomaly-detection-mock-data/schema/next-demo-schema-outlier.json,eventType=net-flow-log,topic=projects//topics/ ``` 触发异常检测流水线: ``` gcloud beta dataflow flex-template run "anomaly-detection" --project=--region=us-central1 --template-file-gcs-location=gs://df-ml-anomaly-detection-mock-data/dataflow-flex-template/dynamic_template_secure_log_aggr_template.json --parameters=autoscalingAlgorithm="NONE",numWorkers=20,maxNumWorkers=20,workerMachineType=n1-highmem-4,subscriberId=projects//subscriptions/events-sub,tableSpec=:.cluster_model_data,batchFrequency=10,customGcsTempLocation=gs://df-tmp-data/temp,tempLocation=gs://df-temp-data/temp,clusterQuery=gs:///normalized_cluster_data.sql,outlierTableSpec=:.outlier_data,inputFilePattern=gs://df-ml-anomaly-detection-mock-data/flow_log*.json,enbaleStreamingEngine=true,windowInterval=5,writeMethod=FILE_LOADS,streaming=true ``` ### 在 PubSub 主题中生成一些模拟数据(1k 事件/秒) ``` gradle run -DmainClass=com.google.solutions.df.log.aggregations.StreamingBenchmark \ -Pargs="--streaming --runner=DataflowRunner --project=${PROJECT_ID} --autoscalingAlgorithm=NONE --workerMachineType=n1-standard-4 --numWorkers=3 --maxNumWorkers=3 --qps=1000 --schemaLocation=gs://df-ml-anomaly-detection-mock-data/schema/netflow_log_json_schema.json --eventType=netflow --topic=${TOPIC_ID} --region=us-central1" ``` ### 发布具有异常 tx 和 rx 字节的异常值 ``` gcloud pubsub topics publish events --message "{\"subscriberId\": \"00000000000000000\", \ \"srcIP\": \"12.0.9.4\", \ \"dstIP\": \"12.0.1.3\", \ \"srcPort\": 5000, \ \"dstPort\": 3000, \ \"txBytes\": 150000, \ \"rxBytes\": 40000, \ \"startTime\": 1570276550, \ \"endTime\": 1570276550, \ \"tcpFlag\": 0, \ \"protocolName\": \"tcp\", \ \"protocolNumber\": 0}" ``` ### 清理 请从 UI 手动停止/取消 dataflow 流水线。 ## 了解有关此解决方案的更多信息 ### 示例输入日志数据和聚合后的输出 示例输入数据 ``` { \"subscriberId\": \"100\", \"srcIP\": \"12.0.9.4", \"dstIP\": \"12.0.1.2\", \"srcPort\": 5000, \"dstPort\": 3000, \"txBytes\": 15, \"rxBytes\": 40, \"startTime\": 1570276550, \"endTime\": 1570276559, \"tcpFlag\": 0, \"protocolName\": \"tcp\", \"protocolNumber\": 0 }, { \"subscriberId\": \"100\", \"srcIP\": \"12.0.9.4\", \"dstIP\": \"12.0.1.2\", \"srcPort\": 5000, \"dstPort\": 3000, \"txBytes\": 10, \"rxBytes\": 40, \"startTime\": 1570276650, \"endTime\": 11570276750,, \"tcpFlag\": 0, \"protocolName\": \"tcp\", \"protocolNumber\": 0 } ``` ### 聚合后的特征提取 1. 添加处理时间戳。 2. 按目标子网和 subscriberId 分组 3. 近似唯一 IP 的数量 4. 近似唯一端口的数量 5. 唯一记录的数量 6. txBytes 的最大值、最小值、平均值 7. rxBytes 的最大值、最小值、平均值 8. duration 的最大值、最小值、平均值 ``` { "transaction_time": "2019-10-27 23:22:17.848000", "subscriber_id": "100", "dst_subnet": "12.0.1.2/22", "number_of_unique_ips": "1", "number_of_unique_ports": "1", "number_of_records": "2", "max_tx_bytes": "15", "min_tx_bytes": "10", "avg_tx_bytes": "12.5", "max_rx_bytes": "40", "min_rx_bytes": "40", "avg_rx_bytes": "40.0", "max_duration": "100", "min_duration": "9", "avg_duration": "54.5" } ``` ### 使用 Beam Schema 推断进行特征提取 ``` Group.byFieldNames("subscriberId", "dstSubnet") .aggregateField( "srcIP", new ApproximateUnique.ApproximateUniqueCombineFn( SAMPLE_SIZE, StringUtf8Coder.of()), "number_of_unique_ips") .aggregateField( "srcPort", new ApproximateUnique.ApproximateUniqueCombineFn( SAMPLE_SIZE, VarIntCoder.of()), "number_of_unique_ports") .aggregateField("srcIP", Count.combineFn(), "number_of_records") .aggregateField("txBytes", new AvgCombineFn(), "avg_tx_bytes") .aggregateField("txBytes", Max.ofIntegers(), "max_tx_bytes") .aggregateField("txBytes", Min.ofIntegers(), "min_tx_bytes") .aggregateField("rxBytes", new AvgCombineFn(), "avg_rx_bytes") .aggregateField("rxBytes", Max.ofIntegers(), "max_rx_bytes") .aggregateField("rxBytes", Min.ofIntegers(), "min_rx_bytes") .aggregateField("duration",new AvgCombineFn(), "avg_duration") .aggregateField("duration", Max.ofIntegers(), "max_duration") .aggregateField("duration", Min.ofIntegers(), "min_duration")); ``` ### 使用 BQ ML 创建 K-Means 模型 请使用 json schema (aggr_log_table_schema.json) 在 BQ 中创建表。 Cluster_model_data 表按 'ingestion timestamp' 分区,并按 dst_subnet 和 subscriber_id 聚类。 ``` CREATE or REPLACE TABLE network_logs.train_data as (select * from {dataset_name}.cluster_model_data where _PARTITIONDATE between 'date_from' AND 'date_to'; --> create model CREATE OR REPLACE {dataset_name}.log_cluster options(model_type='kmeans', num_clusters=4, standardize_features = true) AS select * except (transaction_time, subscriber_id, number_of_unique_ips, number_of_unique_ports, dst_subnet) from network_logs.train_data; ``` ### 使用 BQ 存储过程归一化数据 1. 在训练数据集上进行预测,以获取每条记录到质心的最近距离。 2. 计算每个点的标准差 (STD DEV) 以进行归一化 3. 将它们存储在 dataflow 流水线可以用作侧输入 的表中 ``` with centroid_details AS ( select centroid_id,array_agg(struct(feature as name, round(numerical_value,1) as value) order by centroid_id) AS cluster from ML.CENTROIDS(model network_logs.log_cluster_2) group by centroid_id), cluster as (select centroid_details.centroid_id as centroid_id, (select value from unnest(cluster) where name = 'number_of_records') AS number_of_records, (select value from unnest(cluster) where name = 'max_tx_bytes') AS max_tx_bytes, (select value from unnest(cluster) where name = 'min_tx_bytes') AS min_tx_bytes, (select value from unnest(cluster) where name = 'avg_tx_bytes') AS avg_tx_bytes, (select value from unnest(cluster) where name = 'max_rx_bytes') AS max_rx_bytes, (select value from unnest(cluster) where name = 'min_rx_bytes') AS min_rx_bytes, (select value from unnest(cluster) where name = 'avg_rx_bytes') AS avg_rx_bytes, (select value from unnest(cluster) where name = 'max_duration') AS max_duration, (select value from unnest(cluster) where name = 'min_duration') AS min_duration, (select value from unnest(cluster) where name = 'avg_duration') AS avg_duration from centroid_details order by centroid_id asc), predict as (select * from ML.PREDICT(model {dataset_name}.log_cluster_2, (select * from network_logs.train_data))) select c.centroid_id as centroid_id, (stddev((p.number_of_records-c.number_of_records) +(p.max_tx_bytes-c.max_tx_bytes) +(p.min_tx_bytes-c.min_tx_bytes) +(p.avg_tx_bytes-c.min_tx_bytes) +(p.max_rx_bytes-c.max_rx_bytes) +(p.min_rx_bytes-c.min_rx_bytes) +(p.avg_rx_bytes-c.min_rx_bytes) +(p.max_duration-c.max_duration) +(p.min_duration-c.min_duration) +(p.avg_duration-c.avg_duration)))as normalized_dest, any_value(c.number_of_records) as number_of_records, any_value(c.max_tx_bytes) as max_tx_bytes, any_value(c.min_tx_bytes) as min_tx_bytes , any_value(c.avg_tx_bytes) as avg_tx_bytes, any_value(c.max_rx_bytes) as max_rx_bytes, any_value(c.min_tx_bytes) as min_rx_bytes, any_value(c.avg_rx_bytes) as avg_rx_bytes, any_value(c.avg_duration) as avg_duration, any_value(c.max_duration) as max_duration, any_value(c.min_duration) as min_duration from predict as p inner join cluster as c on c.centroid_id = p.centroid_id group by c.centroid_id); ``` ### 查找异常值 1. 查找到质心的最近距离。 2. 计算输入向量和质心向量之间的标准差 (STD DEV) 3. 查找 Z 分数(样本中的值与平均值之间的差值,再除以标准差) 4. 分数为 2(比平均值高 2 个标准差即为异常值)。 ![outlier](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/0098023e8f214228.png) ## 开始之前(如果不使用 Cloud Build 脚本,则为可选步骤) ``` gcloud services enable dataflow gcloud services enable big query gcloud services enable storage_component ``` ## 创建 BigQuery 数据集和表 数据集 ``` bq --location=US mk -d \ --description "Network Logs Dataset \ ``` 聚合数据表 ``` bq mk -t --schema aggr_log_table_schema.json \ --time_partitioning_type=DAY \ --clustering_fields=dst_subnet, subscriber_id \ --description "Network Log Partition Table" \ --label myorg:prod \ :.cluster_model_data ``` 异常值表 ``` bq mk -t --schema outlier_table_schema.json \ --label myorg:prod \ :.network_logs.outlier_data ``` ## 构建与运行 构建 ``` gradle spotlessApply -DmainClass=com.google.solutions.df.log.aggregations.SecureLogAggregationPipeline gradle build -DmainClass=com.google.solutions.df.log.aggregations.SecureLogAggregationPipeline gcloud auth configure-docker gradle jib --image=gcr.io/${PROJECT_ID}/df-ml-anomaly-detection:latest -DmainClass=com.google.solutions.df.log.aggregations.SecureLogAggregationPipeline ``` 运行 ``` gcloud beta dataflow flex-template run "anomaly-detection" \ --project=${PROJECT_ID} \ --region=us-central1 \ --template-file-gcs-location=gs://${DF_TEMPLATE_CONFIG_BUCKET}/dynamic_template_secure_log_aggr_template.json \ --parameters=autoscalingAlgorithm="NONE",\ numWorkers=5,\ maxNumWorkers=5,\ workerMachineType=n1-highmem-4,\ subscriberId=projects/${PROJECT_ID}/subscriptions/${SUBSCRIPTION_ID},\ tableSpec=${PROJECT_ID}:${DATASET_NAME}.cluster_model_data,\ batchFrequency=2,\ customGcsTempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\ tempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\ clusterQuery=gs://${DF_TEMPLATE_CONFIG_BUCKET}/normalized_cluster_data.sql,\ outlierTableSpec=${PROJECT_ID}:${DATASET_NAME}.outlier_data,\ inputFilePattern=gs://df-ml-anomaly-detection-mock-data/flow_log*.json,\ workerDiskType=compute.googleapis.com/projects/${PROJECT_ID}/zones/us-central1-b/diskTypes/pd-ssd,\ diskSizeGb=5,\ windowInterval=10,\ writeMethod=FILE_LOADS,\ streaming=true ``` ## 测试 以 250k 消息/秒的速度发布模拟日志数据 用于负载测试的 Schema: ``` { "subscriberId": "{{long(1111111111,9999999999)}}", "srcIP": "{{ipv4()}}", "dstIP": "{{subnet()}}", "srcPort": {{integer(1000,5000)}}, "dstPort": {{integer(1000,5000)}}, "txBytes": {{integer(10,1000)}}, "rxBytes": {{integer(10,1000)}}, "startTime": {{starttime()}}, "endTime": {{endtime()}}, "tcpFlag": {{integer(0,65)}}, "protocolName": "{{random("tcp","udp","http")}}", "protocolNumber": {{integer(0,1)}} } ``` 运行: ``` export PROJECT_ID=$(gcloud config get-value project) export TOPIC_ID=topic-id gcloud pubsub topics create $TOPIC_ID gcloud builds submit . --machine-type=n1-highcpu-8 --config scripts/cloud-build-data-generator.yaml --substitutions _TOPIC_ID=${TOPIC_ID} ``` 异常值测试 ``` gcloud pubsub topics publish \ --message "{\"subscriberId\": \"demo1\",\"srcIP\": \"12.0.9.4\",\"dstIP\": \"12.0.1.3\",\"srcPort\": 5000,\"dstPort\": 3000,\"txBytes\": 150000,\"rxBytes\": 40000,\"startTime\": 1570276550,\"endTime\": 1570276550,\"tcpFlag\": 0,\"protocolName\": \"tcp\",\"protocolNumber\": 0}" gcloud pubsub topics publish \ --message "{\"subscriberId\": \"demo1\",\"srcIP\": \"12.0.9.4\",\"dstIP\": \"12.0.1.3\",\"srcPort\": 5000,\"dstPort\": 3000,\"txBytes\": 15000000,\"rxBytes\": 4000000,\"startTime\": 1570276550,\"endTime\": 1570276550,\"tcpFlag\": 0,\"protocolName\": \"tcp\",\"protocolNumber\": 0}" ``` 特征提取测试 ``` gcloud pubsub topics publish \ --message "{\"subscriberId\": \"100\",\"srcIP\": \"12.0.9.4\",\"dstIP\": \"12.0.1.2\",\"srcPort\": 5000,\"dstPort\": 3000,\"txBytes\": 10,\"rxBytes\": 40,\"startTime\": 1570276550,\"endTime\": 1570276559,\"tcpFlag\": 0,\"protocolName\": \"tcp\",\"protocolNumber\": 0}" gcloud pubsub topics publish \ --message "{\"subscriberId\": \"100\",\"srcIP\": \"13.0.9.4\",\"dstIP\": \"12.0.1.2\",\"srcPort\": 5001,\"dstPort\": 3000,\"txBytes\": 15,\"rxBytes\": 40,\"startTime\": 1570276650,\"endTime\": 1570276750,\"tcpFlag\": 0,\"protocolName\": \"tcp\",\"protocolNumber\": 0}" OUTPUT: INFO: row value Row:[2, 2, 2, 12.5, 15, 10, 50] ``` ``` gcloud pubsub topics publish \ --message "{\"subscriberId\": \"100\",\"srcIP\": \"12.0.9.4\",\"dstIP\": \"12.0.1.2\",\"srcPort\": 5000,\"dstPort\": 3000,\"txBytes\": 10,\"rxBytes\": 40,\"startTime\": 1570276550,\"endTime\": 1570276550,\"tcpFlag\": 0,\"protocolName\": \"tcp\",\"protocolNumber\": 0}" gcloud pubsub topics publish \ --message "{\"subscriberId\": \"100\",\"srcIP\": \"12.0.9.4\",\"dstIP\": \"12.0.1.2\",\"srcPort\": 5000,\"dstPort\": 3000,\"txBytes\": 15,\"rxBytes\": 40,\"startTime\": 1570276550,\"endTime\": 1570276550,\"tcpFlag\": 0,\"protocolName\": \"tcp\",\"protocolNumber\": 0}" OUTPUT INFO: row value Row:[1, 1, 2, 12.5, 15, 10, 0] ``` ## 250k 消息/秒下的流水线性能 流水线 DAG(待办事项:使用更新的流水线 DAG 进行更改) ![dag](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/9931b4bcd5214230.png) 消息速率 ![msg_rate_](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/1797892b9c214231.png) ![dag](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/90980f0c04214232.png) 确认消息速率 ![ack_msg](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/b4f1dc4987214233.png) CPU 利用率 ![cpu](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/6b2b5aa662214235.png) 系统延迟 ![latency](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/edb9e7cb7f214236.png) ### 使用 BQ-ML 进行 K-Means 聚类(模型评估) ![ref_arch](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/973f82eb4b214237.png) ![ref_arch](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/65b764cc15214238.png) ![ref_arch](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/8cb748f0e3214239.png) ![ref_arch](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/ee181cd8e8214241.png) ### DLP 集成 为了保护日志中的任何敏感数据,您可以在数据存储到 BigQuery 之前使用 Cloud DLP 进行检查和去标识化。这是我们参考架构中的一个可选集成。要启用它,请按照以下步骤操作: * 更新 scripts/deid_imei_number.json 中的 JSON 文件,以添加适用于您的用例的去标识化转换。下面的屏幕截图使用了基于加密的确定性转换来对 IMSI 号码进行去标识化。要了解如何使用 DLP 转换,请参阅此 [指南](https://cloud.google.com/solutions/de-identification-re-identification-pii-using-cloud-dlp)。 ``` { "deidentifyTemplate":{ "displayName":"Config to DeIdentify IMEI Number", "description":"IMEI Number masking transformation", "deidentifyConfig":{ "recordTransformations":{ "fieldTransformations":[ { "fields":[ { "name":"subscriber_id" } ], "primitiveTransformation":{ "cryptoDeterministicConfig":{ "cryptoKey":{ } }, "surrogateInfoType":{ "name":"IMSI_TOKEN" } } } } ] } } }, "templateId":"dlp-deid-subid" } ``` * 运行此脚本 (deid_template.sh) 在您的项目中创建模板。 ``` PROJECT_ID=$(gcloud config get-value project) DEID_CONFIG="@deid_imei_number.json" DEID_TEMPLATE_OUTPUT="deid-template.json" API_KEY=$(gcloud auth print-access-token) API_ROOT_URL="https://dlp.googleapis.com" DEID_TEMPLATE_API="${API_ROOT_URL}/v2/projects/${PROJECT_ID}/deidentifyTemplates" curl -X POST -H "Content-Type: application/json" \ -H "Authorization: Bearer ${API_KEY}" \ "${DEID_TEMPLATE_API}"`` \ -d "${DEID_CONFIG}"\ -o "${DEID_TEMPLATE_OUTPUT}" ``` * 将 DLP 模板名称传递给 Dataflow 流水线。 ``` --deidTemplateName=projects//deidentifyTemplates/dlp-deid-subid" ``` 注意:请查看此 [repo](https://github.com/GoogleCloudPlatform/dlp-dataflow-deidentification) 以了解有关端到端数据标记化解决方案的更多信息。 ![ref_arch](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/26a5e155da214242.png) 如果您从 DAG 中点击 DLP 转换,您将看到以下子转换: ![ref_arch](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/cb77a82b4d214243.png) ### Looker 集成 除了特征表和异常值表之外,您还可以在 BigQuery 表中包含原始 netflow 日志数据。这包括从模拟 IP 派生的地理位置数据,如国家、城市、纬度、经度。流水线使用 [MaxMind-GeoIP2 Java API](https://maxmind.github.io/GeoIP2-java/)。 #### 创建一个表来存储原始日志 netflow 数据 ``` bq mk -t --schema src/main/resources/netflow_log_raw_datat.json \ --time_partitioning_type=DAY \ --clustering_fields="geoCountry,geoCity" \ --description "Raw Netflow Log Data" \ ${PROJECT_ID}:${DATASET_NAME}.netflow_log_data ``` #### Looker 仓库 您可以利用此 [开源代码](https://github.com/llooker/anomaly_detection_network_logs/tree/master) 为您的用例导入 Looker 仪表板和底层的 LookML。 #### 在流水线中传递参数。如果未传递,流水线假定不需要存储原始日志数据。特征和异常值数据将按预期存储。 ``` --logTableSpec=:.netflow_log_data ``` ![log_data_dag](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/e57d599674214244.png) ![log_data_dag](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/dd061428af214245.png) ## 金融交易中的异常检测 本节包含一个使用 Dataflow 和 Cloud AI 查找欺诈交易的参考实现。第一步是使用 [Kaggle 数据集](https://www.kaggle.com/ntnu-testimon/paysim1) 创建 TensorFlow 提升树 模型,并将模型部署在 Cloud AI 中进行在线预测。然后使用下面参考架构中重点介绍的 Dataflow 流水线对在线预测请求进行微批处理。最后,交易数据存储在一个名为 transactions 的 BigQuery 表中,异常值数据存储在一个名为 fraud_prediction 的表中。可以通过 transactionId 列连接两个表。默认情况下,概率设置为 0.99 (99%) 以识别欺诈交易。 要查看指导您实现此解决方案的分步教程,请参阅 [使用 AI Platform、Dataflow 和 BigQuery 检测金融交易中的异常](https://cloud.google.com/solutions/detecting-anomalies-in-financial-transactions)。 ## 使用 Cloud AI 的异常检测参考架构 ![ref_arch](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/2d694f7b0d214246.png) ## 使用 Flex Template 触发流水线 您可以使用以下命令通过公共镜像作为 Dataflow flex template 触发流水线 ``` gcloud beta dataflow flex-template run "anomaly-detection-finserv" --project= --region= --template-file-gcs-location=gs://df-ml-anomaly-detection-mock-data/dataflow-flex-template/dynamic_template_finserv_fraud_detection.json --parameters=autoscalingAlgorithm="NONE",numWorkers=30,maxNumWorkers=30,workerMachineType=n1-highmem-8,subscriberId=projects//subscriptions/,tableSpec=project:dataset.transactions,outlierTableSpec=project:dataset.fraud_prediction,tempLocation=gs:///temp,inputFilePattern=gs://df-ml-anomaly-detection-mock-data/finserv_fraud_detection/fraud_data_kaggle.json,modelId=,versionId=,keyRange=1024,batchSize=500000 ``` ## 构建与运行 1. 请遵循此 [代码实验室](https://codelabs.developers.google.com/codelabs/fraud-detection-ai-explanations/#7) 在 Cloud AI 中构建和部署 [TensorFlow 模型](https://www.tensorflow.org/api_docs/python/tf/estimator/BoostedTreesClassifier)。另外,请注意笔记本中的更改,即添加 transactionId 作为服务函数的一部分。更新后的笔记本可以在[这里](https://github.com/GoogleCloudPlatform/df-ml-anomaly-detection/blob/finserv-fraud-detection-tf-cloud-ai/fraud-detection-notebook/credit-card-fraud-detection-v1.ipynb)找到。 2. 使用以下命令在 BigQuery 中创建数据集和两个表。 ``` bq --location=US mk -d \ --description "FinServ Transaction Dataset \ ``` ``` bq mk -t --schema src/main/resources/transactions.json \ --label myorg:prod \ :.transactions ``` ``` bq mk -t --schema src/main/resources/fraud_prediction.json \ --label myorg:prod \ :.fraud_prediction ``` 3. 通过传递所有必需参数来触发 Dataflow 流水线。流水线使用 state 和 timer api 对预测 API 的调用进行微批处理。 ``` gradle run -DmainClass=com.google.solutions.df.log.aggregations.FraudDetectionFinServTranPipeline -Pargs="--runner=DataflowRunner --project=> \ --inputFilePattern=gs://df-ml-anomaly-detection-mock-data/finserv_fraud_detection/fraud_data_kaggle.json \ --subscriberId=projects//subscriptions/ \ --modelId= \ --versionId= \ --outlierTableSpec=:.fraud_prediction \ --tableSpec=:.transactions \ --keyRange=1024 \ --autoscalingAlgorithm=NONE \ --numWorkers=30 \ --maxNumWorkers=30 \ --workerMachineType=n1-highmem-8 \ --batchSize=500000 \ --region=us-central1 \ --gcpTempLocation=gs://df-temp-data/temp" ``` ## 测试 1. 从 GCS 存储桶读取完整的 kaggle 数据集 (6M+) 并在 PubSub 主题中发布一些随机模拟数据 ![ingest_data](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/e1bdb24a5d214248.png). 完整 DAG ![ingest_data](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/3291573b3b214249.png). 2. 具有 500KB payload/请求的预测转换 ![predict_data](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/3da4923721214250.png). ![batch_data](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/ab703f6421214251.png). 3. BigQuery 验证 ![predict_data](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/1ee4873f16214253.png). ![trans_schema](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/a07d4c2576214254.png). ![predict_schema](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/4198525f53214255.png). 4. Cloud AI 中的预测 API ![predict_data](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/56350591b2214256.png). ![trans_schema](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/5adcb4956b214258.png). ![predict_schema](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/53962b658f214259.png). 4. Dataflow UI 中的自定义计数器 ![custom_counter](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/081837d677214300.png).
标签:AMSI绕过, Apex, AutoML, BigQuery ML, BQML, Caido项目解析, Cloud Dataflow, Cloud DLP, Google Cloud, IMSI去标识化, K-Means聚类, Looker, Netflow, Pub/Sub, 交易异常, 人工智能, 参考架构, 域名枚举, 大数据, 威胁检测, 实时异常检测, 数据脱敏, 无监督学习, 机器学习, 欺诈检测, 流式处理, 特征提取, 用户模式Hook绕过, 电信行业, 目录扫描, 网络安全, 谷歌云, 金融服务, 金融风控, 隐私保护