lillianting0318/threathunt-gpt

GitHub: lillianting0318/threathunt-gpt

基于 RAG 和 LLM 的 MITRE ATT&CK 威胁狩猎平台,为 SOC 分析师提供从威胁情报查询到 YARA/KQL/SPL 检测规则自动生成的一站式工作流。

Stars: 0 | Forks: 0

# ThreatHunt-GPT ![Python](https://img.shields.io/badge/Python-3.11-3776AB?style=flat&logo=python&logoColor=white) ![Streamlit](https://img.shields.io/badge/Streamlit-1.55-FF4B4B?style=flat&logo=streamlit&logoColor=white) ![MITRE ATT&CK](https://img.shields.io/badge/MITRE_ATT%26CK®-Enterprise_+_ICS-red?style=flat) ![LLM](https://img.shields.io/badge/LLM-Groq_llama--3.3--70b-F55036?style=flat) ## 目录 - [概述](#overview) - [架构](#architecture) - [快速开始](#quick-start) - [Groq API 密钥设置](#groq-api-key-setup) - [使用方法](#how-to-use) - [日志分诊代理](#log-triage-agent) - [多组织比较](#multi-group-comparison) - [扩展 CTI 数据](#extending-cti-data) - [替换 LLM](#swapping-the-llm) - [OpenClaw / 多代理集成](#openclaw--multi-agent-integration) - [项目结构](#project-structure) - [支持的威胁组织](#supported-threat-groups) ## 概述 ThreatHunt-GPT 是一个网络安全威胁情报平台,结合了 RAG(检索增强生成)与 MITRE ATT&CK®,帮助 SOC 分析师和威胁狩猎人员快速识别、分析和响应威胁。 | 功能 | 描述 | |---|---| | **威胁组织查询** | 输入任意 APT 名称以检索 CTI 摘要和映射的 MITRE TTP | | **YARA 规则生成** | 按战术类别自动生成可部署的 YARA 检测规则 | | **KQL 查询** | 开箱即用的 Microsoft Sentinel / Defender for Endpoint 狩猎查询 | | **SPL 查询** | Splunk Enterprise Security 狩猎查询 | | **AI 狩猎摘要** | Groq `llama-3.3-70b` 生成结构化威胁评估 | | **多组织比较** | 跨多个 APT 组织的并排 TTP 重叠分析 | | **日志分诊代理** | 分析 Windows 事件日志并生成威胁级别决策报告 | ## 架构 ``` ┌─────────────────────────────────────────────────────────┐ │ Streamlit UI (app.py) │ └────────────┬──────────────────────────┬─────────────────┘ │ │ ▼ ▼ ┌────────────────────┐ ┌───────────────────────────┐ │ pipeline.py │ │ log_triage_pipeline.py │ │ (Threat Hunt) │ │ (Log Triage Agent) │ └──┬──────┬──────┬───┘ └──┬────────────────────────┘ │ │ │ │ ▼ ▼ ▼ ▼ CTI TTP Answer LogProcessor Agent Agent Agent (log_processor.py) │ │ │ │ └──────┴──────┘ │ │ ▼ ▼ MITRE TTP Enrichment RAGEngine (rag_engine.py) │ ┌───────┴────────┐ │ │ FAISS Index MITRE ATT&CK (vector search) 823 techniques Enterprise + ICS ``` ### 3 代理威胁狩猎管道 ``` User Query │ ▼ [Agent 1 — CTI Agent] Retrieves threat group summary from CTI knowledge base │ Uses exact match → word-boundary regex → FAISS semantic search ▼ [Agent 2 — TTP Agent] Maps group to MITRE technique IDs │ Enriches each TTP with full name + description from ATT&CK ▼ [Agent 3 — Answer Agent] Calls Groq llama-3.3-70b for threat assessment │ Generates YARA rules, KQL queries, SPL queries ▼ Output: CTI Summary · TTP Matrix · YARA · KQL · SPL ``` ### 日志分诊管道 ``` Select Scenario │ ▼ simulate_logs() Generates Windows Event Logs (4625, 4688, 1102, ...) │ ▼ LogProcessor Maps each Event ID → MITRE TTP │ Enriches TTPs via RAGEngine ▼ _calculate_threat_level() Applies 7 escalation rules → High / Medium / Low │ ▼ _infer_attack_chain() Reconstructs kill chain in chronological order │ ▼ _generate_soc_recs() Produces tiered SOC recommendations + TTP-specific controls │ ▼ Decision Report (dict) Fully JSON-serializable, ready for downstream agents ``` ## 快速开始 ### 环境要求 - Python 3.11+ - macOS / Linux / Windows - 约 500MB 磁盘空间(首次运行时缓存模型权重) ### 安装说明 ``` # 克隆或解压项目 cd threathunt-gpt # 安装依赖 pip install -r requirements.txt # 设置你的 Groq API 密钥(见下一节) export GROQ_API_KEY="gsk_your_key_here" # 启动 streamlit run app.py ``` 在浏览器中打开 `http://localhost:8501`。 ## Groq API 密钥设置 **获取免费密钥:** [https://console.groq.com](https://console.groq.com) — 无需信用卡。 **选项 A — .env 文件(推荐)** 在项目根目录创建一个 `.env` 文件: ``` GROQ_API_KEY=gsk_your_key_here ``` **选项 B — 环境变量** ``` export GROQ_API_KEY="gsk_your_key_here" ``` 如果未设置密钥,AI 狩猎摘要选项卡将显示设置说明,而不是崩溃。 ## 使用方法 ### 威胁组织查询 1. 在 **AVAILABLE GROUPS** 侧边栏列表中点击任意组织,或在搜索栏中输入名称 2. 点击 **⟩ HUNT** 3. 结果将显示在五个选项卡中: | 选项卡 | 内容 | |---|---| | CTI Summary | 威胁组织概况 + AI 生成的狩猎评估 | | TTP Matrix | 所有映射的 MITRE 技术及其描述和链接 | | YARA Rules | 根据战术模板生成的检测规则 | | KQL Queries | 每种战术的 Microsoft Sentinel 狩猎查询 | | SPL Queries | 每种战术的 Splunk ES 狩猎查询 | ### 搜索提示 - 精确名称:`APT28` - 部分名称:`lazarus` → 匹配 `Lazarus Group` - 自然语言:`What techniques does FIN7 use?` - 匹配引擎使用 **词边界正则表达式**,以防止 APT1 匹配到 APT12 ## 日志分诊代理 位于侧边栏的 **◈ LOG TRIAGE** 下。 ### 可用场景 | 场景 | 模拟的事件 ID | 描述 | |---|---|---| | `brute_force` | 4625 ×5, 4624, 4672 | 凭据暴力破解后成功登录 | | `lateral_movement` | 4624, 4648, 4672, 4688 | 使用显式凭据的网络登录及进程创建 | | `persistence` | 4698, 7045, 4688 | 计划任务和服务安装 | | `defense_evasion` | 1102, 4719, 4688, 4103 | 日志清除 + 审核策略更改 + PowerShell | | `kerberoasting` | 4768, 4769 ×3, 4776 | 重复的 Kerberos 服务票证请求 | | `full_attack` | 包含以上所有 | 完整的多阶段攻击链 | ### 输出选项卡 | 选项卡 | 内容 | |---|---| | SOC Report | 威胁级别标识 + 触发的升级规则 + 分层行动项 | | Attack Chain | 按时间顺序的杀伤链重建 | | TTP Mapping | 所有识别出的 MITRE 技术及描述 | | IOC Summary | 可疑账户、源 IP、受影响主机 | | Event Log | 事件 ID 细分及其严重性和发生次数 | ### 威胁级别评分 | 规则 | 分数加成 | |---|---| | 安全日志被清除 (Event 1102) | +20 | | 3 个或更多高严重性事件 | +15 | | Kerberoasting 模式 (≥3 × Event 4769) | +15 | | 暴力破解模式 (≥5 × Event 4625) | +10 | | 横向移动后的权限提升 | +10 | | 检测到持久化机制 | +8 | | 触发 PowerShell 脚本块日志记录 | +8 | 最终得分 ≥ 30 → **高** · ≥ 12 → **中** · < 12 → **低** ### 使用真实日志数据 将 `simulate_logs()` 替换为您自己的解析器: ``` from agents.log_processor import LogEvent, triage_logs from datetime import datetime # 从 EVTX、SIEM API 或任何来源解析 events = [ LogEvent( event_id=4625, timestamp=datetime.utcnow(), computer="WIN-DC01", account="jsmith", source_ip="10.0.0.55", description="Failed logon attempt", ), # ... more events ] triaged = triage_logs(events, rag=rag_instance) ``` ## 多组织比较 1. 在侧边栏中点击一个组织以选中它 2. 点击 **+ ADD** 将其添加到比较列表 3. 对其他组织重复此操作(需要 2 个或更多) 4. 点击 **COMPARE** ### 比较选项卡 | 选项卡 | 内容 | |---|---| | TTP Overlap | 所有组织共享的技术(黄色)+ 每个组织的独有技术 | | CTI Summaries | 并排的情报摘要 | | TTP Breakdown | 每个组织的完整技术列表,共享的 TTP 标有 ⚠ | 对于双组织比较,三列韦恩图样式视图显示:仅组织 A · 共享 · 仅组织 B。 ## 扩展 CTI 数据 通过在 `threat_intel_examples/` 中创建 JSON 文件来添加新的威胁组织: ``` { "title": "APT99", "summary": "APT99 is a financially motivated group targeting...", "techniques": ["T1059.001", "T1071.001", "T1566.001", "T1486"] } ``` 然后重建 FAISS 索引: ``` python faiss_builder.py ``` 新组织将自动出现在侧边栏中。 ## 替换 LLM Answer Agent 位于 `agents/answer_agent.py`。将 `_call_llm()` 替换为任何 LLM: ### Ollama (本地,完全免费) ``` ollama pull mistral ``` ``` import requests def _call_llm(prompt: str) -> str: r = requests.post("http://localhost:11434/api/generate", json={ "model": "mistral", "prompt": prompt, "stream": False, }) return r.json()["response"] ``` ### OpenAI ``` from openai import OpenAI client = OpenAI(api_key="sk-...") def _call_llm(prompt: str) -> str: r = client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], max_tokens=600, ) return r.choices[0].message.content ``` ### Anthropic Claude ``` import anthropic client = anthropic.Anthropic(api_key="sk-ant-...") def _call_llm(prompt: str) -> str: r = client.messages.create( model="claude-sonnet-4-5", max_tokens=600, messages=[{"role": "user", "content": prompt}], ) return r.content[0].text ``` ## OpenClaw / 多代理集成 所有管道输出都是纯 Python 字典 —— 完全支持 JSON 序列化且与框架无关。 ``` # 仅导入你需要的内容 from pipeline import run_pipeline, run_log_triage_pipeline from agents.log_processor import LogEvent, triage_logs from rag_engine import RAGEngine rag = RAGEngine( faiss_path="combined_index.faiss", metadata_path="combined_metadata.json", mitre_path="combined_mitre.json", ) # --- 威胁狩猎 --- result = run_pipeline(group_name="APT28", rag=rag) # result["threat_level"] → "高" # result["cti"]["summary"] → str # result["ttp"]["ttps"] → List[Dict] # result["answer"]["yara_rule"] → str # result["answer"]["kql_query"] → str # result["answer"]["hunt_summary"] → str # --- 日志分诊 --- report = run_log_triage_pipeline(scenario="full_attack", rag=rag) # report["threat_level"] → "高" # report["escalation_rules_triggered"] → List[str] # report["attack_chain"] → List[Dict] # report["soc_recommendations"]["base_actions"] → List[str] # report["mitre_ttps"] → List[Dict] # report["iocs"] → Dict # --- 自定义日志事件 --- from agents.log_processor import simulate_logs events = simulate_logs(scenario="kerberoasting") triaged = triage_logs(events, rag=rag) for t in triaged: print(t.to_dict()) # JSON-serializable ``` ### 稳定接口(可安全依赖) | 函数 | 位置 | 签名 | |---|---|---| | `run_pipeline()` | `pipeline.py` | `(group_name, rag, llm=None) → Dict` | | `run_log_triage_pipeline()` | `pipeline.py` | `(scenario, custom_events, rag) → Dict` | | `triage_logs()` | `agents/log_processor.py` | `(events, rag=None) → List[TriagedLog]` | | `simulate_logs()` | `agents/log_processor.py` | `(scenario, count, custom_event_ids) → List[LogEvent]` | | `rag.find_group()` | `rag_engine.py` | `(query) → Optional[str]` | | `rag.enrich_ttps()` | `rag_engine.py` | `(ttp_ids) → List[Dict]` | ## 项目结构 ``` threathunt-gpt/ │ ├── app.py # Streamlit UI — all pages and routing ├── pipeline.py # Pipeline orchestrator (re-exports all entry points) ├── rag_engine.py # RAGEngine class — FAISS search + MITRE enrichment ├── log_triage_pipeline.py # Log Triage pipeline — scoring, chain, report builder │ ├── agents/ │ ├── __init__.py │ ├── cti_agent.py # Agent 1 — CTI summary retrieval │ ├── ttp_agent.py # Agent 2 — TTP enrichment │ ├── answer_agent.py # Agent 3 — Groq LLM + YARA/KQL/SPL generation │ └── log_processor.py # Log Triage Agent — Event ID → TTP mapping │ ├── combined_index.faiss # FAISS vector index (semantic search) ├── combined_metadata.json # CTI knowledge base (APT summaries + TTP lists) ├── combined_mitre.json # MITRE ATT&CK techniques (823 Enterprise + ICS) │ ├── requirements.txt └── README.md ``` ## 支持的威胁组织 知识库中目前包含 24 个威胁组织: | 组织 | 类型 | 来源 | |---|---|---| | APT1 | 国家级 | 中国 (PLA 61398 部队) | | APT3 | 国家级 | 中国 (MSS) | | APT12 | 国家级 | 中国 | | APT16 | 国家级 | 中国 | | APT17 | 国家级 | 中国 | | APT18 | 国家级 | 中国 | | APT19 | 国家级 | 中国 | | APT28 | 国家级 | 俄罗斯 (GRU) | | APT29 | 国家级 | 俄罗斯 (SVR) | | APT30 | 国家级 | 中国 | | APT32 | 国家级 | 越南 | | APT33 | 国家级 | 伊朗 | | APT-C-23 | 国家级 | 中东 | | APT-C-36 | 国家级 | 南美洲 | | ALLANITE | 国家级 | 俄罗斯 (疑似) | | Agrius | 国家级 | 伊朗 (MOIS) | | Ajax Security Team | 国家级 | 伊朗 | | Akira | 网络犯罪 | 未知 | | Andariel | 国家级 | 朝鲜 (Lazarus 子组织) | | Aoqin Dragon | 国家级 | 中国 (疑似) | | Cobalt Group | 网络犯罪 | 未知 | | FIN7 | 网络犯罪 | 未知 | | Lazarus Group | 国家级 | 朝鲜 | | admin@338 | 国家级 | 中国 | 要添加更多组织,请参阅[扩展 CTI 数据](#extending-cti-data)。 ## 依赖项 | 包 | 用途 | |---|---| | `streamlit` | Web UI | | `faiss-cpu` | 向量相似性搜索 | | `sentence-transformers` | 文本嵌入模型 (all-MiniLM-L6-v2) | | `groq` | LLM API 客户端 | | `python-dotenv` | 环境变量加载 | | `numpy` | 向量操作 | ## 已知限制 - **ICS 技术**:T0xxx 技术由硬编码查找表覆盖(93 种技术)。要获取完整的 ICS 覆盖范围,请下载 MITRE ICS ATT&CK STIX 包并重建索引。 - **CTI 数据**:知识库包含 24 个组织。实际生产部署应与 MITRE ATT&CK TAXII feeds 或商业 CTI 平台集成。 - **日志模拟**:日志分诊代理生成合成日志。要在生产环境中使用,请将 `simulate_logs()` 替换为真实的 EVTX 解析器或 SIEM API 连接器。 - **YARA/KQL/SPL 规则**:根据战术模板生成。在部署到生产环境之前,请务必进行审查和调优。 ## 许可证 仅用于研究和教育目的。MITRE ATT&CK® 是 The MITRE Corporation 的注册商标。
标签:AI安全分析, APT攻击, Cloudflare, Defender for Endpoint, DLL 劫持, DNS信息、DNS暴力破解, KQL查询, Kubernetes, Llama-3.3-70b, Microsoft Sentinel, MITRE ATT&CK, Python, RAG, SOC分析师, SPL查询, Streamlit, ThreatHunt-GPT, Windows事件日志, YARA规则, 大语言模型, 威胁情报, 威胁评估, 子域名变形, 安全运营中心, 开发者工具, 无后门, 无线安全, 日志分类, 检测规则生成, 检索增强生成, 网络安全, 网络安全审计, 网络映射, 自动化响应, 访问控制, 逆向工具, 隐私保护