thamer-ali/cybersecurity-_detection-_using-ai-

GitHub: thamer-ali/cybersecurity-_detection-_using-ai-

评估通用大语言模型在零样本条件下基于原始 NetFlow 特征进行网络入侵检测的分类能力,提供完整的分布式数据处理与多模型对比评估实验框架。

Stars: 0 | Forks: 0

# 基于大型语言模型的智能威胁检测 ## 在基于 Apache Spark 的分布式计算环境中 **博士研究 — 格拉纳达大学** **候选人:** Thamer Aldraiwa **导师:** Prof. Juan Carlos Gámez Granados & Prof. Antonio Mora García **日期:** 2026年5月 ## 概述 本代码仓库包含了使用 UGR'16 数据集评估预训练大型语言模型(LLM)在网络入侵检测中表现的完整实验 pipeline。 本研究旨在探讨通用 LLM —— **Claude**、**ChatGPT**、**Gemini** 和 **DeepSeek** —— 是否能在无需任何特定领域训练的情况下,仅使用原始 NetFlow 特征将网络流分类为**攻击**或**正常**。 ## 数据集 | 属性 | 值 | |---|---| | 数据集 | UGR'16 — 格拉纳达大学网络入侵数据集 | | 来源 | Maciá-Fernández et al. (2018) | | 下载链接 | https://nesg.ugr.es/nesg-ugr16/download/attack/august/week4/august_week4_csv.tar.gz | | 原始 CSV 大小 | 77.6 GB | | Parquet 大小 | 14 GB (311 个文件,Snappy 压缩) | | 总记录数 | 862,526,861 条网络流记录 | | 标签类别 | background, anomaly-spam, blacklist, anomaly-sshscan | ## 代码仓库结构 ``` ├── ugr16_pipeline_v9.py # Diagnostic pipeline — revealed class imbalance ├── ugr16_pipeline_v10.py # Production pipeline — stratified sampling ├── ugr16_llm_eval_v2.py # LLM evaluation script v2 ├── Parquet Conversion/ # CSV to Parquet conversion script ├── create_sample_5000_3000_1000_... # Early sampling prototype (reference only) ├── UGR16 Spark Sampling — Processing Report # v10 processing report ├── samples/ # JSONL sample files (v10 output) │ ├── sample_1000_api.jsonl # 1,000 records — no label — sent to LLM │ ├── sample_3000_api.jsonl # 3,000 records — no label — sent to LLM │ ├── sample_5000_api.jsonl # 4,994 records — no label — sent to LLM │ ├── sample_1000_eval.jsonl # 1,000 records — with label — for metrics │ ├── sample_3000_eval.jsonl # 3,000 records — with label — for metrics │ └── sample_5000_eval.jsonl # 4,994 records — with label — for metrics └── README.md ``` ## 基础设施 | 属性 | 值 | |---|---| | 云平台 | Microsoft Azure | | 虚拟机类型 | Standard_D8s_v3 | | CPU | 8 vCPU | | RAM | 64 GB | | 存储 | 4× NVMe SSD 通过 LVM 组合 = /data 目录下 1.8 TB | | 操作系统 | Ubuntu 24.04 LTS | | Java | OpenJDK 11 | | Apache Spark | 3.5.1 with Hadoop 3 | | Python | 3.12.3 | ## Pipeline 版本 | 版本 | 描述 | 状态 | |---|---|---| | v1–v6 | Schema 验证、错误处理、多标签规则、类型转换修复 | 开发中 | | v7 | toDF() 重命名 + when() 内部的数值类型转换 — 修复零可疑样本 bug | 已修复 | | v8 | RULES 移至 Spark 启动后,对 protocol 使用 upper() — 修复 AssertionError | 已修复 | | **v9** | 6 项论文补充。纯随机抽样 → 4,999 个 background,1 个 attack | **诊断中** | | **v10** | 分层抽样。保留所有 v9 补充。4,700 + 290 + 4 = 4,994 条记录 | **生产环境** | ## 步骤 1 — 环境设置 ``` # 连接到 VM ssh thamer@74.162.89.167 # 安装 Java sudo apt update && sudo apt install openjdk-11-jdk -y java -version # 下载并安装 Apache Spark 3.5.1 cd /opt && sudo wget https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz sudo tar -xvzf spark-3.5.1-bin-hadoop3.tgz sudo mv spark-3.5.1-bin-hadoop3 spark # 测试 Spark /opt/spark/bin/pyspark # 安装 AzCopy wget https://aka.ms/downloadazcopy-v10-linux -O azcopy.tar.gz tar -xvf azcopy.tar.gz sudo cp ./azcopy_linux_amd64_*/azcopy /usr/bin/ sudo chmod +x /usr/bin/azcopy azcopy --version ``` ## 步骤 2 — 存储设置 (LVM — 1.8 TB) ``` # 将 4 块 NVMe SSD 组合为一个逻辑卷 sudo pvcreate /dev/nvme1n1 /dev/nvme2n1 /dev/nvme3n1 /dev/nvme4n1 sudo vgcreate vg_data /dev/nvme1n1 /dev/nvme2n1 /dev/nvme3n1 /dev/nvme4n1 sudo lvcreate -l 100%FREE -n lv_data vg_data # 格式化并挂载 sudo mkfs.xfs /dev/vg_data/lv_data sudo mkdir -p /data sudo mount /dev/vg_data/lv_data /data sudo chown -R thamer:thamer /data sudo chmod 775 /data # 使挂载在重启后保持持久 sudo blkid /dev/vg_data/lv_data echo 'UUID= /data xfs defaults,nofail 0 2' | sudo tee -a /etc/fstab sudo mount -a # 验证 — 应显示 1.8 TB df -h /data ``` ## 步骤 3 — 数据集获取 UGR'16 数据集是通过 **Azure Data Factory V2** 下载的,因为 UGR'16 服务器使用的 HTTP 重定向阻止了直接的 wget/curl 访问。 ``` # 使用 AzCopy 从 Azure Blob Storage 传输到 VM azcopy copy "" "/data/august_week4_csv.tar.gz" # 在后台解压 nohup tar -xvzf /data/august_week4_csv.tar.gz -C /data/ > extract.log 2>&1 & tail -f extract.log # 验证解压 ls -lh /data du -sh /data/august.week4.csv.uniqblacklistremoved # → ~77.6 GB # 删除压缩文件以释放空间 rm -f /data/august_week4_csv.tar.gz ``` ## 步骤 4 — CSV 转换为 Parquet ``` # 运行转换脚本 /opt/spark/bin/spark-submit /data/convert_to_parquet.py # 验证输出 du -sh /data/parquet # → ~14 GB find /data/parquet -name "*.parquet" | wc -l # → 311 files ls /data/parquet/_SUCCESS # → confirms completion # 如需要,修复嵌套目录结构 mv /data/parquet/august_week4/august_week4/* /data/parquet/ rmdir /data/parquet/august_week4/august_week4 ``` **为什么使用 Parquet?** Parquet 是一种列式二进制格式,比 CSV 提供 5.5 倍的压缩率,保留 schema,并通过分区级别的并行处理显著提高 Spark 的读取性能。 ## 步骤 5 — 检查 Schema ``` /opt/spark/bin/pyspark --driver-memory 10g --executor-memory 42g ``` ``` df_raw = spark.read.parquet("/data/parquet") print("Column count:", len(df_raw.columns)) # → 13 print("Column names:", df_raw.columns) # → ['_c0','_c1',...,'_c12'] for name, dtype in df_raw.dtypes: print(f"{name} -> {dtype}") # ALL → string df_raw.show(3, truncate=False) ``` **已确认的 schema — 所有 13 列均为 string 类型:** | 列名 | 语义名称 | 转换为 | 备注 | |---|---|---|---| | _c0 | timestamp | string | YYYY-MM-DD HH:MM:SS | | _c1 | duration | double | 流持续时间(秒) | | _c2 | src_ip | string | 源 IP | | _c3 | dst_ip | string | 目标 IP | | _c4 | src_port | int | 源端口 | | _c5 | dst_port | int | 目标端口 | | _c6 | protocol | string | TCP/UDP/ICMP/IPv6... | | _c7 | flags | string | TCP 标志 | | _c8 | fwd_pkts | int | 前向数据包 | | _c9 | bwd_pkts | int | 后向数据包 | | _c10 | total_pkts | long | 总数据包 (long 而非 int — 溢出风险) | | _c11 | total_bytes | long | 总字节数 (long 而非 int — 溢出风险) | | _c12 | label | string | background/blacklist/anomaly-spam/anomaly-sshscan | ## 步骤 6 — 运行 Pipeline (v10) ``` /opt/spark/bin/spark-submit \ --driver-memory 10g \ --executor-memory 42g \ /data/ugr16_pipeline_v10.py ``` **运行时间:** 在 8-CPU Azure 虚拟机上处理 862,526,861 条记录耗时 5.25 分钟。 ### v10 功能详解 — 逐步说明 | 步骤 | 操作 | 结果 | |---|---|---| | 1 | 加载 Parquet,验证 13 列,通过 toDF() 重命名 | 加载了 862,526,861 条记录 | | 2a | 完整数据集标签分布 | 记录了基线类别平衡情况 | | 2b | 规则贡献计数 — 1 次扫描,4 条规则 | 识别出 108,139,105 条可疑记录 | | 2c | 通过 concat_ws() 进行可疑记录选择 | 为每条记录生成多标签标记 | | 2d | 可疑样本池标签分布 | 确认 99.98% 为 background | | 2e | 规则重叠分布 | 记录了 9 种规则组合 | | 3 | SHA-256 record_id — null 安全 | 每条记录的唯一稳定 ID | | 4 | **分层抽样** — 与 v9 相比的关键变化 | 4,700 + 290 + 4 = 4,994 条记录 | | 5 | 嵌套子样本 1000 ⊂ 3000 ⊂ 5000 | 三种可比样本大小 | | 6 | 时间戳范围 | 2016-08-22 至 2016-08-29 | | 7 | 构建 API 和评估列集 | 标签从 API 文件中排除 | | 8 | 写入 6 个 JSONL 文件 | 所有行数已验证 | | 9 | 完整性检查 | record_id 唯一性通过 | | 10 | 处理报告 JSON + TXT | 记录了完整统计信息 | ### 为什么从 v9 升级到 v10 (分层抽样) v9 对可疑样本池使用了纯随机抽样: ``` Suspect pool: 99.98% background, 0.02% blacklist, 0.00% sshscan Random draw of 5,000 → 4,999 background, 1 blacklist, 0 sshscan ``` 由于只有 1 条攻击记录,无法计算 Precision/Recall/F1。v10 修复了这个问题: ``` # 独立对每个类进行采样 bg_sample = bg_df.orderBy(rand(seed=42)).limit(4700) # background bl_sample = bl_df.orderBy(rand(seed=42)).limit(290) # blacklist ssh_sample = ssh_df # all 4 sshscan sample_5000 = bg_sample.union(bl_sample).union(ssh_sample) ``` ### v10 样本组成 | 类别 | 数量 | % | 评估 | |---|---|---|---| | background | 4,700 | 94.11% | 有效基线 | | blacklist | 290 | 5.81% | 有效 — 足以计算指标 | | anomaly-sshscan | 4 | 0.08% | 仅作观察 | | anomaly-spam | 0 | 0% | 可疑样本池中不存在 | | **总计** | **4,994** | **100%** | **有效** | ### 嵌套样本标签分布 | 样本 | 总计 | background | blacklist | sshscan | CI (95%) | |---|---|---|---|---|---| | sample_1000 | 1,000 | 927 (92.7%) | 73 (7.3%) | 0 | ±3.10% | | sample_3000 | 3,000 | 2,812 (93.7%) | 185 (6.2%) | 3 (0.1%) | ±1.79% | | sample_5000 | 4,994 | 4,700 (94.1%) | 290 (5.8%) | 4 (0.08%) | ±1.39% | ## 步骤 7 — 运行 LLM 评估 (v2) ### 将 API 密钥设置为环境变量 ``` export ANTHROPIC_API_KEY="sk-ant-..." export OPENAI_API_KEY="sk-..." export GEMINI_API_KEY="AI..." export DEEPSEEK_API_KEY="sk-..." ``` ### 在 sample_1000 上运行评估(推荐首选) ``` python3 /data/ugr16_llm_eval_v2.py \ --sample-dir /data/llm_samples_v10 \ --output-dir /data/llm_eval_results_v2 \ --sample-size 1000 \ --models chatgpt gemini deepseek claude ``` ### eval v2 的主要功能 | 功能 | 描述 | |---|---| | 无硬编码密钥 | 所有 API 密钥均从环境变量中读取 | | 恢复模式 | 在第 500 条记录处崩溃了?重新启动 — 自动跳过已处理的记录 | | 重试 + 退避 | 在遇到速率限制和服务器错误时进行 5 次带指数退避的重试 | | 结构化解析 | 期望 JSON 响应 — 比子字符串匹配更安全 | | 增量写入 | 每处理完一条记录即保存预测结果 — 崩溃时不会丢失数据 | | 7 项指标 | Precision, Recall, Specificity, F1, FPR, FNR, Accuracy | ### 使用的 Prompt(4 个模型完全相同) ``` You are a network security analyst specialising in intrusion detection. Classify the following network flow record as either attack or normal. Network flow JSON: {record_json} Field meanings: ... Required JSON format: {"label":"attack","confidence":0.0,"reason":"short reason"} ``` ### 标签映射 ``` background → normal blacklist → attack anomaly-sshscan → attack ``` ### 计算的指标 | 指标 | 公式 | 是否为主要指标? | |---|---|---| | Precision | TP / (TP + FP) | ✓ 主要 | | Recall / Sensitivity | TP / (TP + FN) | ✓ 主要 | | F1-score | 2×P×R / (P+R) | ✓ 主要 | | False Positive Rate | FP / (FP + TN) | ✓ 主要 | | Specificity | TN / (TN + FP) | 次要 | | False Negative Rate | FN / (FN + TP) | 次要 | | Accuracy | (TP+TN) / total | 仅作参考 | ## 输出文件 ### Pipeline 输出 (v10) ``` /data/llm_samples_v10/ ├── sample_1000_api.jsonl # 1,000 lines — no label ├── sample_3000_api.jsonl # 3,000 lines — no label ├── sample_5000_api.jsonl # 4,994 lines — no label ├── sample_1000_eval.jsonl # 1,000 lines — with label ├── sample_3000_eval.jsonl # 3,000 lines — with label ├── sample_5000_eval.jsonl # 4,994 lines — with label ├── processing_report.json # Full statistics └── processing_report.txt # Human-readable report ``` ### 评估输出 (v2) ``` /data/llm_eval_results_v2/ ├── predictions_1000_claude.jsonl ├── predictions_1000_chatgpt.jsonl ├── predictions_1000_gemini.jsonl ├── predictions_1000_deepseek.jsonl ├── eval_report_1000.json ├── eval_report_1000.txt └── errors_1000.jsonl ``` ## 数据集统计 ### 完整数据集标签分布 | 标签 | 数量 | % | |---|---|---| | background | 854,170,414 | 99.03% | | anomaly-spam | 5,287,316 | 0.61% | | blacklist | 3,069,118 | 0.36% | | anomaly-sshscan | 12 | 0.00% | ### 可疑选择规则贡献 | 规则 | 条件 | 匹配数 | 占总量百分比 | |---|---|---|---| | sensitive_port | dst_port in {22,23,445,3389} | 95,972,228 | 11.13% | | icmp_traffic | upper(protocol) == 'ICMP' | 10,606,661 | 1.23% | | high_packet_count | total_pkts > 1,000 | 1,376,391 | 0.16% | | high_bytes | total_bytes > 1,000,000 | 867,298 | 0.10% | ## 评估的模型 | 模型 | 提供商 | 版本 | API | |---|---|---|---| | Claude | Anthropic | claude-3-haiku-20240307 | api.anthropic.com | | ChatGPT | OpenAI | gpt-4o-mini | api.openai.com | | Gemini | Google | gemini-1.5-flash-latest | generativelanguage.googleapis.com | | DeepSeek | DeepSeek | deepseek-chat | api.deepseek.com | ## 声明的局限性 1. **时间分层:** 随机抽样未采用时间窗口分层。样本时间跨度为 2016-08-22 至 2016-08-29。声明为对外部有效性的威胁。 2. **anomaly-spam 缺失:** 垃圾流不会触发四条可疑规则中的任何一条。垃圾邮件检测被排除在评估范围之外。 3. **anomaly-sshscan:** 可疑样本池中仅有 4 条记录。无法对该类别得出统计结论。 4. **阈值选择:** 阈值(1,000 pkts, 1 MB)是作为设计决策选定的。敏感性分析属于未来的工作。 5. **规则主导:** sensitive_port 捕获了 88.74% 的可疑记录。四规则设计实际上等效于单规则运作。 6. **少数类过采样:** blacklist 和 sshscan 的代表性相对于自然发生率偏高。结果反映的是评估性能,而非真实世界的检测率。 参考文献 Maciá-Fernández, G., Camacho, J., Magán-Carrión, R., García-Teodoro, P., & Therón, R. (2018). UGR'16: A new dataset for the evaluation of cyclostationarity-based network IDSs. *Computers & Security*, 73, 411–424. https://doi.org/10.1016/j.cose.2017.11.004 ## 联系方式 **Thamer Aldraiwa** 博士候选人 — 格拉纳达大学 Email: thamer@correo.ugr.es
标签:AMSI绕过, Apache Spark, Apex, ChatGPT, CISA项目, Claude, CVE检测, DDoS攻击检测, DeepSeek, DLL 劫持, Gemini, IP 地址批量处理, ITDR, LLM, NetFlow, Parquet, Petitpotam, PE 加载器, Promptflow, TruffleHog, UGR'16, Unmanaged PE, 人工智能, 分布式计算, 大数据处理, 大语言模型, 威胁检测, 学术论文, 安全研究社区, 异常处理, 异常检测, 机器学习, 深度学习, 熵值分析, 用户模式Hook绕过, 社会工程学检测, 网络安全, 网络流量分析, 逆向工具, 隐私保护, 零样本学习