vityasyyy/cskg
GitHub: vityasyyy/cskg
基于LLM从安全新闻等非结构化数据中自动提取威胁情报实体与关系,映射STIX 2.1本体构建可查询的网络安全知识图谱。
Stars: 1 | Forks: 5
# 基于非结构化数据的网络安全知识图谱 (CSKG)
[](#quick-start)
[](https://www.python.org/)
[](http://docs.oasis-open.org/cti/ns/stix#)
[](https://virtuoso.openlinksw.com/)
**在线端点(运行时):**
- **REST API 文档** → `http://localhost:8000/docs`
- **SPARQL 控制台** → `http://localhost:8890/sparql`
## 1. 什么是 CSKG?
想象一下每天阅读几十篇网络安全博客,并试图记住哪个威胁行为者使用了哪种恶意软件以及他们的目标是谁。CSKG 可以自动完成这项工作。
这是一个基于安全新闻(RSS 订阅源、报告和文章)构建的**动态知识图谱**。LLM 会阅读每篇文章,提取诸如威胁行为者和漏洞之类的实体,将它们映射到正式的网络安全本体(STIX),并将其作为链接数据存储在图数据库中。其结果是一个可查询、持续更新的全球威胁环境全景图。
**简而言之:** 它将新闻文章转化为结构化数据库,您可以向其提出诸如“目前哪些威胁行为者正在针对医疗保健行业?”或“哪些恶意软件与 CVE-2025-XXXX 有关?”等问题。
## 2. 系统概览
```
flowchart LR
subgraph Internet
A1[TheHackerNews]
A2[BleepingComputer]
A3[KrebsOnSecurity]
A4[FortiGuardLabs]
end
subgraph Docker
R[(Redis)]
P[Producer
scraper.py] E[Extractor
extractor_worker.py] B[Graph Builder
builder_worker.py] V[(Virtuoso
Triple Store)] API[FastAPI Server
api_server.py] S[Summary Worker
graph_eval_worker.py] end A1 & A2 & A3 & A4 -->|RSS Feeds| P P -->|articles_queue| R R -->|articles_queue| E E -->|Gemini LLM| E E -->|extractions_queue| R R -->|extractions_queue| B B -->|SPARQL INSERT| V API <-->|SPARQL queries| V S -.->|POST /query| API S -.->|daily report| ./reports/ ``` **服务(6 个 Docker 容器):** | 服务 | 目的 | 关键技术 | |---|---|---| | `producer` | 每 5 分钟抓取一次 RSS 订阅源,去重,将文章加入队列 | `feedparser`, `BeautifulSoup`, `redis` | | `extractor` | 从队列中获取文章,运行 LLM 提取实体/关系 | `LangChain`, `Gemini 2.0 Flash Lite`, `Pydantic` | | `graph_builder` | 获取提取结果,构建 RDF 三元组,插入 Virtuoso | `rdflib`, `SPARQLWrapper` | | `api` | 用于查询图谱的 REST API | `FastAPI`, `SPARQLWrapper` | | `summary` | 每 24 小时运行一次:查询图谱,生成战略简报 | `LangChain`, `Gemini` | | `virtuoso` | 持久化 RDF 三元组存储 | `Virtuoso Open Source` | | `redis` | producer → extractor → builder 之间的消息队列 | `Redis 7` | ## 3. 单篇文章的处理流程 让我们追踪一下,当一篇标题为“深入剖析 DragonForce 勒索软件及其与 Scattered Spider 的关联”的新文章出现在 BleepingComputer 上时会发生什么。 ### 步骤 1:抓取 `producer` 获取 RSS 订阅源,剥离 HTML,并推送到 JSON 文章: ``` { "title": "Deep dive into DragonForce ransomware...", "link": "https://www.bleepingcomputer.com/...", "published": "Wed, 03 Dec 2025 10:05:15 -0500", "content": "DragonForce is a ransomware group that uses Scattered Spider..." } ``` ### 步骤 2:提取 `extractor` 将文章文本发送给 Gemini。LLM 返回结构化数据: ``` { "threat_actors": ["DragonForce", "Scattered Spider"], "malware": [], "attack_patterns": ["initial access", "social engineering"], "vulnerabilities": [], "indicators": [], "relations": [ {"subject": "DragonForce", "relation": "uses", "object": "Scattered Spider"}, {"subject": "Scattered Spider", "relation": "uses", "object": "social engineering"}, {"subject": "Scattered Spider", "relation": "uses", "object": "initial access"} ] } ``` ### 步骤 3:构建图谱 `graph_builder` 将此结果转换为 RDF 三元组并将其插入 Virtuoso: ``` @prefix stix: .
@prefix cskg: .
@prefix rdfs: .
a stix:Report ;
stix:mentions cskg:dragonforce ,
cskg:scatteredspider ,
cskg:socialengineering ,
cskg:initialaccess .
cskg:dragonforce a stix:ThreatActor .
cskg:scatteredspider a stix:ThreatActor ;
rdfs:label "Scattered Spider" ;
stix:uses cskg:socialengineering ,
cskg:initialaccess .
```
### 步骤 4:查询
您现在可以通过 REST API 或 SPARQL 查询实时图谱:
```
SELECT * WHERE {
?actor a stix:ThreatActor ;
rdfs:label "DragonForce" .
?actor stix:uses ?tool .
?tool rdfs:label ?tool_name .
}
```
## 4. 快速入门
### 前置条件
- Docker & Docker Compose
- 拥有 Gemini 访问权限的 Google API 密钥
### 运行
```
# 1. Clone
git clone
cd
# 2. 添加你的 API key
echo "GOOGLE_API_KEY=your_key_here" > .env
# 3. 启动所有服务
make up
# 或: docker compose up --build -d
# 4. 检查状态
make status
# → {"status": "online", "total_triples": 1234}
# 5. 查看实时日志
make logs
```
### 便捷目标
| 命令 | 作用 |
|---|---|
| `make up` | 构建并启动所有服务 |
| `make down` | 停止所有服务 |
| `make logs` | 跟踪工作进程日志 |
| `make status` | API 的健康检查 |
| `make report` | 构建 LaTeX 报告 |
| `make report-clean` | 删除 LaTeX 构建产物 |
| `make dump` | 将整个图谱导出为 `cskg_full_dump.ttl` |
| `make clean` | 删除构建产物和导出文件 |
### 访问入口
| 服务 | URL | 用途 |
|---|---|---|
| REST API | `http://localhost:8000/docs` | POST /query 的 Swagger UI |
| SPARQL UI | `http://localhost:8890/sparql` | Virtuoso 直接控制台 |
| 报告 | `./reports/daily_brief_*.md` | 自动生成的每日简报 |
## 5. 服务架构
```
flowchart TB
subgraph Input[" "]
RSS[4 RSS Feeds]
end
subgraph Queue["Message Queue (Redis)"]
AQ[articles_queue]
EQ[extractions_queue]
SU[seen_urls SET]
end
subgraph Workers["Workers"]
PR[Producer
every 5 min] EX[Extractor
event-driven] GB[Graph Builder
event-driven] end subgraph Storage["Storage"] VV[(Virtuoso
Named Graph)] end subgraph Output["Output"] API[FastAPI
Port 8000] REP[Daily Report
./reports/] end RSS --> PR PR -->|push| AQ PR -->|sadd| SU AQ -->|blpop| EX EX -->|push| EQ EQ -->|blpop| GB GB -->|SPARQL INSERT| VV VV <-->|SPARQL SELECT| API SUM[Summary Worker
every 24h] API -->|POST /query| SUM SUM -->|write| REP ``` **所有工作进程共享同一个 Docker 镜像。** `compose.yml` 为每个服务覆盖了 `command:`。队列模式使用 Redis 的 `BLPOP`(阻塞式弹出),因此工作进程是事件驱动的并且能高效消费。 ## 6. 知识图谱(面向专家) ### 本体 CSKG 使用 **STIX 2.1** 作为其主要本体,并辅以自定义命名空间 `http://group2.org/cskg/`。 **映射的实体类型:** | 提取类型 | STIX 类 | |---|---| | 威胁行为者 | `stix:ThreatActor` | | 恶意软件 | `stix:Malware` | | 漏洞 | `stix:Vulnerability` | | 指标 | `stix:Indicator` | | 攻击模式 | `stix:AttackPattern` | | 报告 | `stix:Report` | **关系映射(16 种谓语动词):** ``` RELATIONSHIP_MAP = { "uses": STIX.uses, "targets": STIX.targets, "exploits": STIX.exploits, "mitigates": STIX.mitigates, "attributed_to": STIX.attributed_to, "variant_of": STIX.variant_of, "located_in": STIX.located_in, "impersonates": STIX.impersonates, "reports": STIX.reports, "patched": STIX.patched, "resolved": STIX.resolved, "disrupted": STIX.disrupted, "aligned_with": STIX.aligned_with, "observes": STIX.observes, "has_similarities_with": STIX.has_similarities_with, "propagated_via": STIX.propagated_via, } ``` ### 双 URI 实体消解 为了处理实体名称的变体(例如 `DragonForce` 与 `dragonforce`),同时保留原始标签,流水线为每个实体创建**两个节点**: - **规范 URI**(`safe_uri`):标准化、小写、仅限字母数字。用于所有关系。 - **带标签 URI**(`unsafe_uri`):保留原始文本作为 `rdfs:label`。通过 `owl:sameAs` 链接到规范 URI。 示例: ``` cskg:dragonforce a stix:ThreatActor . cskg:DragonForce a stix:ThreatActor ; rdfs:label "DragonForce" ; owl:sameAs cskg:dragonforce . ``` ### 外部 KG 链接:SEPSES CVE 当漏洞被检测为 CVE ID 时,流水线会链接到 **SEPSES CVE 知识图谱**,而不是创建本地节点: ``` cve_match = re.search(r"(CVE-\d{4}-\d{4,})", vuln, re.IGNORECASE) if cve_match: cve_id = cve_match.group(1).upper() vuln_uri = SEPSES_CVE[cve_id] # https://w3id.org/sepses/resource/cve/CVE-XXXX-XXXX ``` 这使得查询能够同时涵盖 CSKG 和来自 SEPSES 的丰富外部 CVE 数据。 ## 7. 查询图谱 ### 通过 REST API ``` curl -X POST http://localhost:8000/query \ -H "Content-Type: application/json" \ -d '{"query": "SELECT ?s ?p ?o WHERE { ?s ?p ?o } LIMIT 10"}' ``` ### 通过 SPARQL 控制台 打开 `http://localhost:8890/sparql` 并直接在命名图 `` 上运行查询。
### 示例用例
#### 用例 1:威胁行为者画像
**问题:** “根据最近的报告,威胁行为者‘Konni’使用了哪些恶意软件和攻击模式?”
```
PREFIX cskg:
PREFIX stix:
PREFIX rdfs:
SELECT DISTINCT ?malware_label ?pattern_label
WHERE {
GRAPH {
# Find the Konni threat actor
?actor a stix:ThreatActor ;
rdfs:label "Konni" .
# Find malware it uses
OPTIONAL {
?actor stix:uses ?malware .
?malware a stix:Malware ;
rdfs:label ?malware_label .
}
# Find attack patterns it uses
OPTIONAL {
?actor stix:uses ?pattern .
?pattern a stix:AttackPattern ;
rdfs:label ?pattern_label .
}
}
}
```
#### 用例 2:漏洞影响评估
**问题:** “我们存在‘CVE-2025-12480’漏洞。哪些威胁行为者正在积极利用它?”
```
PREFIX cskg:
PREFIX stix:
PREFIX rdfs:
PREFIX sepses:
SELECT DISTINCT ?actor_label
WHERE {
GRAPH {
# Find the CVE (using its linked SEPSES URI)
BIND(sepses:CVE-2025-12480 AS ?cve)
?cve a stix:Vulnerability .
# Find any threat actor that exploits it
?actor stix:exploits ?cve ;
a stix:ThreatActor ;
rdfs:label ?actor_label .
}
}
```
#### 用例 3:事件响应与分诊
**问题:** “我们在日志中发现了指标‘GlassWorm’。它是什么,有哪些报告提到了它?”
```
PREFIX cskg:
PREFIX stix:
PREFIX rdfs:
SELECT ?entity_label ?entity_type ?report_url
WHERE {
GRAPH {
# Find the entity by its label
?entity rdfs:label "GlassWorm" ;
a ?entity_type ;
rdfs:label ?entity_label .
# Find the report that mentions it
?report stix:mentions ?entity ;
a stix:Report .
# Get the URL of the report
BIND(IRI(str(?report)) as ?report_url)
# Filter for only STIX types
FILTER(CONTAINS(str(?entity_type), "stix"))
}
}
```
## 8. 每日情报报告
**summary worker**(`graph_eval_worker.py`)每 24 小时运行一次。它查询图谱中所有活跃的威胁行为者、他们的恶意软件和目标,然后使用 Gemini 生成一份**战略威胁环境评估**。
**输出:** 以 Markdown 格式保存的报告,存储在 `./reports/daily_brief_YYYY-MM-DD.md` 中。
典型的报告结构:
1. **执行摘要** — 观察到的主要趋势
2. **主要威胁行为者** — 活跃组织及其能力
3. **工具趋势** — 最常观察到的恶意软件/技术
4. **战略评估** — 对防御的影响
5. **建议** — 可操作的缓解措施
**查看最新报告:**
```
ls -lt reports/daily_brief_*.md | head -1
```
## 9. 架构说明
### 为什么选择 Redis 队列?
我们没有在服务之间使用直接的 HTTP 调用,而是使用了带有 `BLPOP` 的 Redis `LIST` 以实现松耦合。这意味着:
- 工作进程可以崩溃并重启,而不会丢失任务。
- `producer` 和 `extractor` 以不同的速度运行,而不会互相阻塞。
- 可以通过运行多个 extractor/builder 实例来水平扩展新的工作进程实例。
### 为什么选择 STIX?
STIX(Structured Threat Information Expression,结构化威胁信息表达式)是业界标准的威胁情报本体。通过将我们的 LLM 提取结果直接映射到 STIX 类,我们的图谱可以立即与现有的 TI 工具(MISP, OpenCTI, TAXII 服务器)兼容。
### 共享 Docker 镜像
所有 Python 服务共享一个 Dockerfile。`compose.yml` 通过 `command:` 选择要运行的模块。这保持了镜像的小体积,并简化了本地开发——只需更改文件并重启服务即可。
### 数据库初始化
在首次启动时,Virtuoso 会自动运行 `pipeline/virtuoso-scripts/init.sql`,以授予对命名图 `http://group2.org/cskg` 的 `SPARQL_FED`、`SPARQL_UPDATE` 和读写权限。
## 10. 导出与备份
### 导出完整图谱
```
make dump
# 或: python3 server/cskg_dump.py
```
此操作将保存 `cskg_full_dump.ttl`——这是实时图谱的完整 Turtle 转储。该转储文件已被 gitignore 忽略;您可以随时从运行中的 Virtuoso 实例重新构建它。
### 将转储加载到 Virtuoso
您可以通过位于 `http://localhost:8890/conductor` 的 Virtuoso SQL 控制台或通过 `isql` 加载 `.ttl` 文件:
```
DB.DBA.TTLP(file_to_string_output('/path/to/cskg_full_dump.ttl'), '', 'http://group2.org/cskg', 0);
```
## 许可证
Apache-2.0 许可证
scraper.py] E[Extractor
extractor_worker.py] B[Graph Builder
builder_worker.py] V[(Virtuoso
Triple Store)] API[FastAPI Server
api_server.py] S[Summary Worker
graph_eval_worker.py] end A1 & A2 & A3 & A4 -->|RSS Feeds| P P -->|articles_queue| R R -->|articles_queue| E E -->|Gemini LLM| E E -->|extractions_queue| R R -->|extractions_queue| B B -->|SPARQL INSERT| V API <-->|SPARQL queries| V S -.->|POST /query| API S -.->|daily report| ./reports/ ``` **服务(6 个 Docker 容器):** | 服务 | 目的 | 关键技术 | |---|---|---| | `producer` | 每 5 分钟抓取一次 RSS 订阅源,去重,将文章加入队列 | `feedparser`, `BeautifulSoup`, `redis` | | `extractor` | 从队列中获取文章,运行 LLM 提取实体/关系 | `LangChain`, `Gemini 2.0 Flash Lite`, `Pydantic` | | `graph_builder` | 获取提取结果,构建 RDF 三元组,插入 Virtuoso | `rdflib`, `SPARQLWrapper` | | `api` | 用于查询图谱的 REST API | `FastAPI`, `SPARQLWrapper` | | `summary` | 每 24 小时运行一次:查询图谱,生成战略简报 | `LangChain`, `Gemini` | | `virtuoso` | 持久化 RDF 三元组存储 | `Virtuoso Open Source` | | `redis` | producer → extractor → builder 之间的消息队列 | `Redis 7` | ## 3. 单篇文章的处理流程 让我们追踪一下,当一篇标题为“深入剖析 DragonForce 勒索软件及其与 Scattered Spider 的关联”的新文章出现在 BleepingComputer 上时会发生什么。 ### 步骤 1:抓取 `producer` 获取 RSS 订阅源,剥离 HTML,并推送到 JSON 文章: ``` { "title": "Deep dive into DragonForce ransomware...", "link": "https://www.bleepingcomputer.com/...", "published": "Wed, 03 Dec 2025 10:05:15 -0500", "content": "DragonForce is a ransomware group that uses Scattered Spider..." } ``` ### 步骤 2:提取 `extractor` 将文章文本发送给 Gemini。LLM 返回结构化数据: ``` { "threat_actors": ["DragonForce", "Scattered Spider"], "malware": [], "attack_patterns": ["initial access", "social engineering"], "vulnerabilities": [], "indicators": [], "relations": [ {"subject": "DragonForce", "relation": "uses", "object": "Scattered Spider"}, {"subject": "Scattered Spider", "relation": "uses", "object": "social engineering"}, {"subject": "Scattered Spider", "relation": "uses", "object": "initial access"} ] } ``` ### 步骤 3:构建图谱 `graph_builder` 将此结果转换为 RDF 三元组并将其插入 Virtuoso: ``` @prefix stix:
every 5 min] EX[Extractor
event-driven] GB[Graph Builder
event-driven] end subgraph Storage["Storage"] VV[(Virtuoso
Named Graph)] end subgraph Output["Output"] API[FastAPI
Port 8000] REP[Daily Report
./reports/] end RSS --> PR PR -->|push| AQ PR -->|sadd| SU AQ -->|blpop| EX EX -->|push| EQ EQ -->|blpop| GB GB -->|SPARQL INSERT| VV VV <-->|SPARQL SELECT| API SUM[Summary Worker
every 24h] API -->|POST /query| SUM SUM -->|write| REP ``` **所有工作进程共享同一个 Docker 镜像。** `compose.yml` 为每个服务覆盖了 `command:`。队列模式使用 Redis 的 `BLPOP`(阻塞式弹出),因此工作进程是事件驱动的并且能高效消费。 ## 6. 知识图谱(面向专家) ### 本体 CSKG 使用 **STIX 2.1** 作为其主要本体,并辅以自定义命名空间 `http://group2.org/cskg/`。 **映射的实体类型:** | 提取类型 | STIX 类 | |---|---| | 威胁行为者 | `stix:ThreatActor` | | 恶意软件 | `stix:Malware` | | 漏洞 | `stix:Vulnerability` | | 指标 | `stix:Indicator` | | 攻击模式 | `stix:AttackPattern` | | 报告 | `stix:Report` | **关系映射(16 种谓语动词):** ``` RELATIONSHIP_MAP = { "uses": STIX.uses, "targets": STIX.targets, "exploits": STIX.exploits, "mitigates": STIX.mitigates, "attributed_to": STIX.attributed_to, "variant_of": STIX.variant_of, "located_in": STIX.located_in, "impersonates": STIX.impersonates, "reports": STIX.reports, "patched": STIX.patched, "resolved": STIX.resolved, "disrupted": STIX.disrupted, "aligned_with": STIX.aligned_with, "observes": STIX.observes, "has_similarities_with": STIX.has_similarities_with, "propagated_via": STIX.propagated_via, } ``` ### 双 URI 实体消解 为了处理实体名称的变体(例如 `DragonForce` 与 `dragonforce`),同时保留原始标签,流水线为每个实体创建**两个节点**: - **规范 URI**(`safe_uri`):标准化、小写、仅限字母数字。用于所有关系。 - **带标签 URI**(`unsafe_uri`):保留原始文本作为 `rdfs:label`。通过 `owl:sameAs` 链接到规范 URI。 示例: ``` cskg:dragonforce a stix:ThreatActor . cskg:DragonForce a stix:ThreatActor ; rdfs:label "DragonForce" ; owl:sameAs cskg:dragonforce . ``` ### 外部 KG 链接:SEPSES CVE 当漏洞被检测为 CVE ID 时,流水线会链接到 **SEPSES CVE 知识图谱**,而不是创建本地节点: ``` cve_match = re.search(r"(CVE-\d{4}-\d{4,})", vuln, re.IGNORECASE) if cve_match: cve_id = cve_match.group(1).upper() vuln_uri = SEPSES_CVE[cve_id] # https://w3id.org/sepses/resource/cve/CVE-XXXX-XXXX ``` 这使得查询能够同时涵盖 CSKG 和来自 SEPSES 的丰富外部 CVE 数据。 ## 7. 查询图谱 ### 通过 REST API ``` curl -X POST http://localhost:8000/query \ -H "Content-Type: application/json" \ -d '{"query": "SELECT ?s ?p ?o WHERE { ?s ?p ?o } LIMIT 10"}' ``` ### 通过 SPARQL 控制台 打开 `http://localhost:8890/sparql` 并直接在命名图 `
标签:AI安全, AV绕过, Chat Copilot, CVE, DLL 劫持, Docker, FastAPI, Gemini, GPT, Linked Data, LLM, Python, Redis, REST API, RSS订阅, Splunk, STIX 2.1, STIX本体映射, Terrascan, Unmanaged PE, Virtuoso, 三元组存储, 关联数据, 后端开发, 大语言模型, 威胁情报, 威胁行为者, 安全数据处理, 安全知识图谱, 安全防御评估, 密码管理, 开发者工具, 恶意软件, 情报自动化, 搜索引擎查询, 数字签名, 新闻文章解析, 无后门, 消息队列, 漏洞管理, 生成式AI, 知识图谱构建, 系统架构, 索引, 网络威胁情报, 网络安全, 网络安全监控, 网络测绘, 自动化威胁情报提取, 请求拦截, 逆向工具, 隐私保护, 非结构化数据, 驱动开发