bommakantirishi1-lab/nl-threat-hunter
GitHub: bommakantirishi1-lab/nl-threat-hunter
一款基于本地大模型将自然语言转化为 KQL/EQL 查询的威胁狩猎辅助工具,旨在降低安全分析的语法门槛。
Stars: 0 | Forks: 0
# 步骤 3:核心代码实现
## 概述
我们将构建 MVP:UI 输入 → LLM 翻译为 KQL/EQL → 在数据上模拟执行 → 丰富信息 + 展示。
## 核心文件实现
### src/translator.py (LLM 翻译模块)
```
import ollama
from langchain.prompts import PromptTemplate
def translate_to_query(nl_query, target_lang='KQL'):
# Support KQL or EQL
prompt_template = PromptTemplate.from_template(
"Translate this natural language threat hunt to {lang} query: '{query}'. "
"Output ONLY the query string, no explanations. Ensure it's valid syntax."
)
prompt = prompt_template.format(lang=target_lang, query=nl_query)
response = ollama.generate(model='llama2', prompt=prompt)
return response['response'].strip() # Clean output
```
### src/hunter.py (在模拟数据上执行查询)
```
import pandas as pd
def execute_hunt(query, data_path='data/sample_logs.json'):
# Load simulated data (real: API to Sentinel/Falcon)
df = pd.read_json(data_path)
# Mock execution: Filter df based on parsed query (parse query string here)
# For simplicity, assume query like "process_name == 'powershell.exe' AND ip LIKE '%russia%'"
# Use eval or safer parsing—crude example:
try:
results = df.query(query) # Pandas query simulates
except:
results = pd.DataFrame() # Handle bad queries
return results
```
### src/enricher.py (TI 丰富信息)
```
import requests
from dotenv import load_dotenv
import os
load_dotenv()
def enrich_ioc(ioc, type='ip'):
# IOC = IP/hash/etc
if type == 'ip':
url = f"https://www.abuseipdb.com/check/{ioc}/json?key={os.getenv('ABUSEIPDB_KEY')}"
response = requests.get(url)
return response.json() if response.ok else {}
# Add VT for hashes/files: Similar requests.get
```
### src/app.py (Streamlit UI—使用 `streamlit run app.py` 运行)
```
import streamlit as st
from translator import translate_to_query
from hunter import execute_hunt
from enricher import enrich_ioc
import plotly.express as px
st.title("NL Threat Hunter")
nl_query = st.text_input("Enter hunt in English (e.g., 'Suspicious logins from Russia last day')")
lang = st.selectbox("Query Language", ["KQL", "EQL"])
if st.button("Hunt"):
if not nl_query:
st.error("Input something, idiot.")
return
query = translate_to_query(nl_query, lang)
st.write(f"Generated Query: {query}")
results = execute_hunt(query) # Add real data path
if results.empty:
st.warning("No threats found—or your query sucks.")
else:
st.dataframe(results)
# Enrich example IP from results
if 'ip' in results.columns:
sample_ip = results['ip'].iloc[0]
enrichment = enrich_ioc(sample_ip)
st.json(enrichment)
# Viz: Plot attack chains if NetworkX integrated
fig = px.bar(results, x='timestamp', y='risk_score')
st.plotly_chart(fig)
# MITRE map: Hardcode or API fetch
st.write("Mapped to MITRE T1078 (e.g.)")
```
## 关键实现要点
1. **translator.py**: 系统的核心。使用 Ollama (本地 LLM) 将自然语言翻译为 KQL/EQL 查询。
2. **hunter.py**: 使用 pandas 在模拟/真实数据上执行翻译后的查询。
3. **enricher.py**: 与威胁情报 API (VirusTotal, AbuseIPDB) 集成以进行 IOC 丰富。
4. **app.py**: Streamlit UI,将所有内容整合在一起以实现用户友好的交互。
## 数据与测试
1. 生成 `data/sample_logs.json`: 创建 100+ 条具有逼真遥测数据的条目。
2. `tests/test_translator.py` 中的单元测试: 使用 pytest 验证查询生成。
3. 手动测试: 输入查询,验证输出,修复 LLM 幻觉。
4. 指标: 跟踪准确率 (目标是在 100+ 次测试搜寻中达到 85%+)。
## 后续步骤
1. 实现上述代码片段。
2. 使用示例日志设置数据目录。
3. 为每个模块创建单元测试。
4. 使用各种查询测试 Streamlit 应用程序。
5. 部署并分享以供审查。
## 概述
# 步骤 4:数据生成与测试
## 1. 生成样本数据
**文件:** `data/sample_logs.json`
创建 100+ 条条目的真实模拟安全日志。示例结构:
```
[
{
"timestamp": "2026-03-14T10:30:00Z",
"ip": "192.168.1.100",
"process": "powershell.exe",
"risk_score": 80,
"event_type": "suspicious_execution"
},
{
"timestamp": "2026-03-14T11:45:00Z",
"ip": "10.0.0.50",
"process": "cmd.exe",
"risk_score": 65,
"event_type": "command_execution"
}
]
```
**生成脚本** (generate_logs.py):
```
import json
import random
from datetime import datetime, timedelta
processes = ["powershell.exe", "cmd.exe", "svchost.exe", "rundll32.exe", "explorer.exe"]
ips = [f"192.168.1.{i}" for i in range(1, 255)] + [f"10.0.0.{i}" for i in range(1, 255)]
event_types = ["suspicious_execution", "command_execution", "network_connection", "registry_mod"]
logs = []
base_time = datetime.now() - timedelta(days=1)
for i in range(100):
logs.append({
"timestamp": (base_time + timedelta(minutes=i*15)).isoformat() + "Z",
"ip": random.choice(ips),
"process": random.choice(processes),
"risk_score": random.randint(30, 100),
"event_type": random.choice(event_types)
})
with open('data/sample_logs.json', 'w') as f:
json.dump(logs, f, indent=2)
print("Generated 100 sample logs in data/sample_logs.json")
```
运行: `python generate_logs.py`
## 2. 单元测试
**文件:** `tests/test_translator.py`
使用 pytest 测试 LLM 翻译功能:
```
import pytest
from src.translator import translate_to_query
def test_translate_powershell():
"""Test translation of PowerShell hunting query"""
result = translate_to_query("hunt for suspicious powershell executions")
assert "powershell" in result.lower() or "process" in result.lower()
def test_translate_network():
"""Test translation of network hunting query"""
result = translate_to_query("find unusual network connections")
assert "network" in result.lower() or "connection" in result.lower() or "ip" in result.lower()
def test_translate_output_format():
"""Test that output is a valid string"""
result = translate_to_query("test query")
assert isinstance(result, str)
assert len(result) > 0
```
运行: `pytest tests/test_translator.py -v`
## 3. 手动测试
### 测试用例 (20+ 个样本查询):
1. 输入: "Hunt for lateral movement from suspicious IPs"
- 预期输出: 带有 IP 过滤和横向移动模式的 KQL/EQL 查询
2. 输入: "Find processes with network connections to unknown domains"
- 预期输出: 基于网络连接和域模式进行过滤的查询
3. 输入: "Detect privilege escalation attempts"
- 预期输出: 匹配权限提升 TTP 的查询
4. 输入: "Search for malware signatures in log files"
- 预期输出: 基于恶意软件指标进行过滤的查询
5. 输入: "Find command injection attempts"
- 预期输出: 匹配命令注入模式的查询
### 测试流程:
1. 运行 Streamlit 应用程序: `streamlit run src/app.py`
2. 在文本框中输入查询
3. 选择查询语言 (KQL 或 EQL)
4. 点击 "Hunt"
5. 验证:
- 生成的查询语法正确
- 结果与预期输出匹配
- MITRE 映射适当
- 丰富信息数据正确加载
### 修复 LLM 幻觉:
添加 few-shot 示例以提高准确率:
```
FEW_SHOT_EXAMPLES = """
Example 1:
Input: "find powershell"
Output: process_name == "powershell.exe"
Example 2:
Input: "suspicious from russia"
Output: origin_country == "Russia" AND risk_score > 70
Example 3:
Input: "network to c2"
Output: destination_ip IN ('malicious_ips_list') AND protocol IN ('tcp', 'udp')
"""
```
## 4. 指标与准确率
### 跟踪指标:
**文件:** `metrics.json`
```
{
"total_tests": 50,
"successful_translations": 43,
"accuracy": "86%",
"average_response_time_ms": 250,
"false_positives": 2,
"false_negatives": 5,
"test_date": "2026-03-14"
}
```
### 准确率计算:
```
def calculate_accuracy(correct_queries, total_queries):
return (correct_queries / total_queries) * 100
# 50 题中答对 43 题 = 86% 准确率
accuracy = calculate_accuracy(43, 50)
print(f"Query Translation Accuracy: {accuracy}%")
```
### 目标指标:
- **查询准确率:** >= 85% (100 次搜寻中有 85+ 次翻译正确)
- **响应时间:** 每次查询 < 500ms
- **误报率:** < 5%
- **漏报率:** < 10%
## 5. 验证清单
- [ ] 所有 100 条样本日志已生成于 `data/sample_logs.json`
- [ ] 所有单元测试通过: `pytest tests/ -v`
- [ ] 完成了 20+ 个样本查询的手动测试
- [ ] 指标已计算并记录
- [ ] 准确率 >= 85%
- [ ] 响应时间 < 500ms
- [ ] LLM 幻觉已识别并修复
- [ ] 结果已使用 VT/AbuseIPDB 数据进行丰富
- [ ] MITRE 映射已验证
- [ ] README 已更新指标
## 后续步骤
1. 完成所有测试和验证
2. 记录发现的任何失败或问题
3. 迭代 LLM 提示词以提高准确率
4. 进入步骤 5:高级功能与优化
5. 部署并展示结果
# 步骤 5:高级功能与增强
一旦核心功能正常工作,请考虑添加以下增强功能:
### MITRE ATT&CK 集成
- 使用 NetworkX 可视化攻击链,并将威胁搜寻结果映射到 MITRE ATT&CK 技术
- 从 MITRE ATT&CK API 获取技术元数据,为检测到的活动提供上下文
### 生产环境集成
- 使用 Microsoft Sentinel API (配合 MSAL 身份验证) 替换模拟数据,接入真实的 SIEM/EDR 集成
- 实现 azure-monitor-query 库用于查询生产遥测数据
### 错误处理与可靠性
- 为 LLM 失败实施回退机制 (针对常见搜寻模式采用基于规则的查询生成)
- 在执行前添加输入验证和查询语法检查
### 安全加固
- 使用适当的库 (例如 bleach) 清理所有用户输入
- 加密敏感环境变量和 API 密钥
- 为 Web 界面实施适当的身份验证和授权
### 部署
- 创建 Dockerfile 用于容器化部署
- 配置部署到云平台 (Heroku, AWS 或 Azure)
# 步骤 6:文档与部署
### README 文档
创建全面的文档,包括:
- **项目概述**: 清晰描述目的和能力
- **设置说明**:
pip install -r requirements.txt
ollama run llama2
- **使用指南**: UI 实际操作的截图和示例
- **架构图**: 系统组件的可视化表示 (使用 draw.io 或基于文本的图表)
- **性能指标**: 记录准确性指标 (例如,“在 100 次测试搜寻中达到 85% 的查询翻译准确率”)
- **未来路线图**: 计划与真实 EDR 系统集成以及 ISO 27001 合规控制
### 部署流程
1. 创建 Heroku 应用程序或等效的云平台
2. 为 API 密钥和敏感数据配置环境变量
3. 将代码推送到部署平台
4. 使用实时演示链接更新 README: `Demo: https://nl-threat-hunter.herokuapp.com`
# 步骤 7:项目完成与后续步骤
### 最终任务
1. **版本控制**: 使用描述性消息提交所有代码更改
git add .
git commit -m "Complete natural language threat hunter implementation"
git push origin main
2. **仓库共享**: 将你的 GitHub 仓库设为公开以展示你的工作
3. **简历更新**: 将此项目添加到你的专业简历中:
- "Developed natural language threat hunting tool using Ollama/Streamlit"
- "Implemented LLM-based translation of plain English queries to KQL/EQL with 85% accuracy"
- "Integrated threat intelligence enrichment via VirusTotal and AbuseIPDB APIs"
4. **合规文档**: 记录与安全框架的一致性
- 将功能映射到 ISO 27001 控制 (例如,A.13 用于通信安全)
- 为查询执行日志创建审计跟踪文档
### 推荐的后续步骤
- 针对多样化的威胁搜寻场景进行全面测试
- 收集 SOC 分析师的反馈并迭代用户界面
- 探索与更多 SIEM 平台 (Splunk, Elastic) 的集成
- 实施高级功能,如自动化剧本生成
- 考虑为开源威胁搜寻社区做出贡献
标签:AbuseIPDB, AI 安全, AI风险缓解, CrowdStrike, DLL 劫持, EDR, EQL, IOC 富化, KQL, Kubernetes, LangChain, Llama 2, LLM, LLM评估, MVP, NLP, Ollama, Python, Sentinel, Streamlit, Unmanaged PE, 代码生成, 大语言模型, 威胁情报, 开发者工具, 无后门, 查询转换, 渗透测试工具, 网络安全, 脆弱性评估, 访问控制, 轻量级, 逆向工具, 隐私保护