EdgeGuard
边缘基础设施上基于图增强 xAI 的威胁情报
IICT-BAS + Ratio1 | 由 ResilMesh - 第二次公开征集 资助
本项目通过 ResilMesh 第二次公开征集 获得了资金支持,
由欧盟“Horizon Europe”研究与创新计划资助。
## 🤔 EdgeGuard 是什么?
想象一下你公司的计算机网络是一座城市。EdgeGuard 就像是拥有一套**智能安全系统**,它可以:
1. 从多个外部源**收集**威胁情报(11 种活跃收集器类型 + 2 个行业占位符 —— 见 `docs/DATA_SOURCES.md`)
2. 将所有信息**暂存**到 MISP —— 这是唯一的真实数据源 —— 包含完整的出处和时间戳
3. **构建**一个 Neo4j 知识图谱,展示威胁、攻击者和行业之间的关系
4. 自动**丰富**图谱:衰减过期 IOC 的置信度,将攻击者归组到攻击活动中,校准边权重
5. 按行业(医疗保健、能源、金融)进行**筛选**,让你只看到与你所在行业相关的信息
6. 与 ResilMesh/CRUSOE **集成**,进行网络拓扑关联
### 工作流
```
External sources (11 active feeds + 2 sector placeholders)
│
▼
Collectors ──────────────────────────────────────────────────────────────────┐
(OTX, NVD, CISA, MITRE, VirusTotal, AbuseIPDB, │
ThreatFox, URLhaus, Feodo, SSL BL, CyberCure, …) │
│ │
▼ │
MISP ◄────── Single Source of Truth ──────── all data lands here first │
(tagged: zone, source, TLP, confidence) │
│ │
▼ │
STIX 2.1 conversion (primary) / direct fallback │
│ │
▼ │
Neo4j Knowledge Graph │
(Indicators, CVEs, Actors, Techniques, │
Malware, Campaigns — all cross-linked) │
│ │
▼ │
Enrichment Jobs (post-sync) │
• IOC Confidence Decay (stale indicators retire automatically) │
• Campaign Node Builder (ThreatActor → Malware → IOC chains) │
• Co-occurrence Calibration (confidence weighted by event tightness) │
│ │
▼ │
FastAPI REST API (port 8000) ────────────────────────────────────────────────┘
GraphQL API (port 4001) ← ISIM-compatible; ResilMesh queries EdgeGuard
+ NATS real-time alerts the same way it queries ISIM
+ ResilMesh/CRUSOE integration
```
**两种运行模式:**
- **基线模式**(Baseline,运行一次):完整的历史数据加载 —— 每个行业最多 24 个月的数据,默认**每个数据源**不限条目(`BASELINE_COLLECTION_LIMIT=0`),需手动触发
- **增量模式**(Incremental,定时执行):2-3 天的时间窗口,默认每个数据源拉取 **200** 条目(`EDGEGUARD_INCREMENTAL_LIMIT`),通过 Airflow 定时调度运行
**限制是按阶段划分的**(每个数据源的基线上限 ≠ MISP→Neo4j 拉取数量 ≠ Neo4j 内存分块)。详情请参阅此 README 后文的 **[docs/COLLECTION_AND_SYNC_LIMITS.md](docs/COLLECTION_AND_SYNC_LIMITS.md)** 以及 **§ 收集与同步限制** 章节。
## 🎯 为什么选择 EdgeGuard?
| 问题 | EdgeGuard 的解决方案 |
|---------|-------------------|
| 威胁数据过多 | 按行业筛选(医疗保健 / 能源 / 金融 / 全球) |
| 难以看清关联 | 图数据库映射了 攻击者 → 技术 → CVE → IOC 的关系 |
| 过期指标充斥图谱 | IOC 置信度衰减会自动淘汰旧的 IOC |
| 无法判断哪些情报可信 | 由来源数量和事件紧密度校准置信度 |
| 多个互不连通的数据源 | 所有信息流都通过 MISP 作为唯一的真实来源 |
| 需要与现有平台集成 | 完全兼容 ResilMesh/CRUSOE 数据模型 |
### 确定性的摄取与链接(而非模糊图匹配)
EdgeGuard **有意**采用**确定性的身份与合并规则**(在约定的键上使用 Neo4j **`MERGE`**,采用**精确的**标识符如 **CVE** 和 **MITRE `mitre_id`**,以及文档中规定的范围共现),而不是**模糊的**字符串/图推理(例如用于生产实体链接的子串 **`CONTAINS`**)。这种权衡有利于在威胁情报中实现**可审计性**、**可重复的**同步以及**更少的假阳性边**(错误的 攻击者 ↔ 技术 ↔ IOC 链接具有极高的影响力)。关键边(例如 **`USES`**)的关系逻辑在 **[docs/ARCHITECTURE.md](docs/ARCHITECTURE.md)** 和 **[docs/KNOWLEDGE_GRAPH.md](docs/KNOWLEDGE_GRAPH.md)** 中有详细说明;合并行为、数据出处以及**精确与模糊匹配**的历史记录详见 **[docs/DATA_QUALITY.md](docs/DATA_QUALITY.md)**。
## 🏢 适用人群?
- **医院** → 检测对患者系统的攻击
- **能源公司** → 监控电网安全
- **银行** → 发现金融欺诈
- 任何具有**边缘基础设施的组织**
## 🚀 快速开始
### 前置条件
- Docker + Docker Compose(推荐 —— 包含 Neo4j,**`airflow_postgres`**,Airflow,REST API,GraphQL)
- **或者** **Python 3.12+**(用于 pip/venv 安装;`pyproject.toml` / CI 与此版本匹配)+ Neo4j + MISP。对于使用 PostgreSQL 元数据库的 **pip 安装的 Airflow**,请使用 **`pip install ".[airflow]"`** 或 **`apache-airflow[postgres]~=2.11`**(见 `requirements.txt`)。*注意:Airflow 2.11 上游支持 Python 3.11+;EdgeGuard 统一使用 3.12+。*
### 选项 A —— 一键安装(Docker,推荐)
```
git clone https://github.com/Kopanov/EdgeGuard-Knowledge-Graph.git
cd EdgeGuard-Knowledge-Graph
./install.sh # detects Docker, copies .env, builds image, starts stack
```
`install.sh` 会处理所有事情:检查 Docker,复制 `.env.example → .env`,构建镜像,并运行 `docker compose up -d`。完成后,请使用你的 API 密钥编辑 `.env` 并重新启动。
**安装后验证(在任何安装方法后运行此命令):**
```
python src/edgeguard.py preflight # comprehensive readiness check (env vars, APIs, Neo4j, MISP, Airflow, disk)
python src/edgeguard.py doctor # checks MISP, Neo4j, Airflow, NATS, schema, API keys, version compatibility
python src/edgeguard.py stats # quick dashboard: node counts, last sync, pipeline runs
```
`preflight` 会运行 7 个检查类别(环境变量、API 连通性、Neo4j schema、MISP 健康、Airflow、磁盘、熔断器),退出码 0 表示就绪,1 表示发现问题。`stats` 会显示系统中当前的数据概况。
**EdgeGuard CLI 快速参考:**
| 命令 | 用途 |
|---------|---------|
| `edgeguard preflight` | 运行前准备检查(7 个类别,CI 环境使用 `--strict`) |
| `edgeguard stats` | 快速面板:节点计数、上次同步、流水线运行情况 |
| `edgeguard stats --full` | + 按区域、来源和 MISP 事件/区域的细分 |
| `edgeguard stats --by-zone` | 每个区域的节点计数(显示多区域重叠) |
| `edgeguard stats --by-source` | 每个来源的节点计数(nvd, otx, cisa 等) |
| `edgeguard stats --misp` | 按来源和区域统计的 MISP 事件/属性计数 |
| `edgeguard dag status` | Airflow DAG 运行状态(带颜色标识,`--state running` 过滤器) |
| `edgeguard dag kill` | 强制失败卡住的 DAG 运行(保留检查点,支持 `--dry-run`) |
| `edgeguard checkpoint status` | 每个数据源的基线进度 + 增量游标 |
| `edgeguard checkpoint clear` | 清除基线状态(默认保留增量状态) |
| `edgeguard clear neo4j` | 删除 Neo4j 中的所有图谱数据(保留约束) |
| `edgeguard clear misp` | 从 MISP 中删除所有 EdgeGuard 事件 |
| `edgeguard clear all` | 完全重置:Neo4j + MISP + 检查点(`--force` 跳过确认) |
| `edgeguard doctor` | 诊断连通性(MISP、Neo4j、Airflow、NATS、schema) |
| `edgeguard heal` | 自动修复(重置熔断器、清除锁、重试) |
| `edgeguard validate` | 配置验证(速率限制、密码、Neo4j schema) |
| `edgeguard monitor` | 实时健康状态显示 |
| `edgeguard version` | CalVer 版本号 + git SHA |
所有命令均支持 `--help`。数据命令支持 `--json` 以便自动化处理。
**接下来(推荐的阅读顺序):** [docs/SETUP_GUIDE.md](docs/SETUP_GUIDE.md)(完成连接) → [docs/AIRFLOW_DAGS.md](docs/AIRFLOW_DAGS.md)(运行 DAG) → [docs/BASELINE_SMOKE_TEST.md](docs/BASELINE_SMOKE_TEST.md)(首次基线运行)。见下文 **§ 推荐阅读顺序**。
如果你偏好明确的步骤,也可以使用 `make`:
```
make install # same as ./install.sh
make update # git pull (ff-only) + auto: Docker Compose or pip path (see install.sh --help)
make start # docker compose up -d (if already installed)
make stop # docker compose down
make logs # tail all service logs
make monitoring # add Prometheus + Grafana
make help # list all available targets
```
启动的服务(`docker compose up -d`):
```
Neo4j Browser → http://localhost:7474
Airflow UI → http://localhost:8082 (8082 avoids ResilMesh Temporal conflict)
REST API → http://localhost:8000/health
GraphQL API → http://localhost:4001/graphql
Graph Explorer → open docs/visualization.html in browser, connect to http://localhost:8000
airflow_postgres → PostgreSQL for Airflow metadata (Docker network only, not exposed)
```
可选的监控栈(`make monitoring` 或 `docker compose -f docker-compose.monitoring.yml up -d`):
```
Prometheus → http://localhost:9090
Grafana → http://localhost:3000 (set GRAFANA_ADMIN_PASSWORD in .env)
AlertManager → http://localhost:9093
Metrics endpoint → http://localhost:8001/metrics (EdgeGuard pipeline metrics)
```
Airflow 的调度器/UI 状态使用 **`airflow_postgres`**;如果需要,可以在 `.env` 中使用 **`AIRFLOW_POSTGRES_*`** 覆盖凭据。**`airflow`** 服务镜像由 **`Dockerfile.airflow`** **构建**(包含 EdgeGuard Python 依赖,如 **neo4j** / **pymisp**);在依赖或 **`src/`** 逻辑更改后,请运行 **`docker compose build airflow`**。**`docker-compose.yml`** `x-common-env` 将 `.env` 中的 MISP→Neo4j 调优参数(**`EDGEGUARD_NEO4J_SYNC_CHUNK_SIZE`**、**`EDGEGUARD_REL_BATCH_SIZE`**、**`EDGEGUARD_DEBUG_GC`**、**`EDGEGUARD_MISP_EVENT_SEARCH`** 等)转发到 **airflow** / **api** / **graphql**。运维和故障排除:[`docs/AIRFLOW_DAGS.md`](docs/AIRFLOW_DAGS.md)。
### 选项 B —— Python / pip(无 Docker)
```
git clone https://github.com/Kopanov/EdgeGuard-Knowledge-Graph.git
cd EdgeGuard-Knowledge-Graph
./install.sh --python # creates .venv, pip installs all extras
# 或等效方式:
make install-py # same thing via Makefile
# 或手动方式:
pip install ".[api,graphql,monitoring]"
cp .env.example .env # fill in MISP_URL, MISP_API_KEY, NEO4J_PASSWORD
python src/health_check.py # verify connectivity
```
用于开发(添加 ruff、mypy、pytest):
```
./install.sh --python --dev
# 或:
make install-dev
make ci # lint + type-check + tests in one command
```
**已经克隆了?** 拉取最新代码并刷新(当 `docker` + compose v2 + `docker-compose.yml` 可用时自动选择 **Docker Compose**;否则选择 **pip / .venv**):
```
./install.sh --update # auto (Docker or pip)
./install.sh --update --docker # force Docker only (fail if unavailable)
./install.sh --update --python # force pip / .venv
./install.sh --update --python --dev # include dev extras
```
### 选项 C —— Conda 环境
```
conda create -n edgeguard python=3.12
conda activate edgeguard
git clone https://github.com/Kopanov/EdgeGuard-Knowledge-Graph.git
cd EdgeGuard-Knowledge-Graph
pip install ".[api,graphql,monitoring]"
cp .env.example .env # fill in MISP_URL, MISP_API_KEY, NEO4J_PASSWORD
python src/edgeguard.py doctor # validate setup
```
这要求 **MISP 和 Neo4j 在外部运行**(作为系统服务、Docker 容器或远程实例均可)。在 `.env` 中通过 `MISP_URL` 和 `NEO4J_URI` 指定它们的地址。完整的 conda/venv/边缘设置详情请参阅 [`docs/ENVIRONMENTS.md`](docs/ENVIRONMENTS.md)。
更详细的运维说明(CLI 表、路径):[`docs/SETUP_GUIDE.md`](docs/SETUP_GUIDE.md)。
### 使用 direnv 自动激活环境(日常开发推荐)
[direnv](https://direnv.net) 可以让你在 `cd` 进入项目时自动激活 `.venv` 和 `.env` —— 离开时自动取消激活。不需要手动执行 `source .venv/bin/activate`。
```
# 一次性设置(在你的机器上执行一次)
brew install direnv # macOS; or: apt install direnv
echo 'eval "$(direnv hook zsh)"' >> ~/.zshrc # or bash / fish
source ~/.zshrc
# 在项目目录内 — 允许 direnv 一次
cd EdgeGuard-Knowledge-Graph
direnv allow
# 从现在起:cd 进入 → 自动加载 venv + .env。cd 退出 → 停用。
```
在执行 `direnv allow` 后,`python` 将指向 `.venv/bin/python`,所有 `.env` 变量都会被导出,并且 `src/` 会被加入到 `PYTHONPATH` 中 —— 在后续的任何终端会话中均无需手动操作。
### 基于 Airflow DAG 的工作流(生产环境推荐)
EdgeGuard 在 `dags/edgeguard_pipeline.py` 中内置了 **6** 个主要 DAG(1 个基线 + 5 个增量/同步调度)。可选的 **指标** DAG 位于 `dags/edgeguard_metrics_server.py`(`edgeguard_metrics_server`、`edgeguard_metrics_server_scheduled`)。部署完成后:
```
# 1. 启动堆栈(包括 Airflow + airflow_postgres)
docker compose up -d
# 然后打开 Airflow UI(见上文的“已启动服务”)。仅 Pip 安装 Airflow:pip install ".[airflow]"
# (安装 apache-airflow[postgres])并根据 Airflow DB 文档设置 AIRFLOW__DATABASE__SQL_ALCHEMY_CONN。
# 2. 触发一次 baseline DAG(完整历史加载)
# Airflow UI → DAGs → edgeguard_baseline → 触发 ▶
# 可选择先配置这些 Airflow 变量:
# BASELINE_DAYS = 730 (2 年历史)
# BASELINE_COLLECTION_LIMIT = 0 (0 = 无限制)
# 快速 7 天冒烟测试:在 .env 中设置 EDGEGUARD_BASELINE_DAYS=7 和
# EDGEGUARD_BASELINE_COLLECTION_LIMIT=1000,重启 airflow,然后触发。
# 参见 docs/BASELINE_SMOKE_TEST.md
# 3. Incremental cron DAG 在 baseline 后自动启动:
# edgeguard_pipeline → 每 30 分钟 (OTX)
# edgeguard_medium_freq → 每 4 小时 (CISA, VirusTotal)
# edgeguard_low_freq → 每 8 小时 (NVD)
# edgeguard_daily → 每天凌晨 2 点 (MITRE, AbuseIPDB, feeds)
# edgeguard_neo4j_sync → 每 3 天 (MISP → Neo4j + enrichment)
```
### 手动 / 独立命令
```
cd src
# 检查系统健康状态
python health_check.py
# 运行 MISP → Neo4j 同步(增量 — 最近 3 天)
python run_misp_to_neo4j.py
# 运行完整历史同步
python run_misp_to_neo4j.py --full
# 手动运行同步后 enrichment 任务
python enrichment_jobs.py
# 重建跨来源图关系
python build_relationships.py
# 通过 CLI 检查系统健康状态
python edgeguard.py doctor
# 自动修复常见问题
python edgeguard.py heal
# 从 git 拉取最新代码并重新安装(从克隆根目录;等同于 ./install.sh --update)
edgeguard update # auto: Docker Compose if available, else pip
edgeguard --update # synonym for the above
edgeguard update --docker # force Docker only
edgeguard update --python # force pip / .venv
edgeguard version # release CalVer + git short SHA (no .env)
```
版本号使用**日历版本控制**(`pyproject.toml` 中的 `YYYY.M.D`)。详见 [`docs/VERSIONING.md`](docs/VERSIONING.md)。
## 📊 核心功能
### 1. 多区域威胁检测
- **医疗保健**(24 个月数据)
- **能源**(24 个月数据)
- **金融**(24 个月数据)
- **全球**(24 个月数据)
- 条目可以具有**多个区域**(例如,`['finance', 'healthcare']`)
### 2. 以 MISP 作为唯一真实来源
所有收集器首先只写入 MISP。Neo4j 仅从 MISP 中获取数据——从不直接写入。这带来了:
- 为每条情报提供完整的审计轨迹和出处
- 能够随时从 MISP 重放、审计或重新同步图谱
- 在原始收集和图谱丰富之间实现清晰分离
### 3. 同步后丰富流水线
在每次 MISP → Neo4j 同步之后,会自动运行 **四个** 丰富作业(`enrichment_jobs.run_all_enrichment_jobs`):
| 作业 | 功能描述 |
|-----|-------------|
| **IOC 置信度衰减** | 对过期的指标/漏洞进行分层衰减;当最后更新时间**超过 365 天**时予以淘汰(`decay_ioc_confidence`) |
| **攻击活动节点构建器** | 从 ThreatActor → Malware → Indicator 链中创建 `Campaign` 节点(`build_campaign_nodes`) |
| **共现校准** | 根据 MISP 事件大小,调整 `misp_cooccurrence` / `misp_cor` 中带有 `source_id` 的 `INDICATES`/`EXPLOITS` 边的权重(`calibrate_cooccurrence_confidence`) |
| **漏洞↔CVE 桥接** | 在 `Vulnerability` 和 `CVE` 之间建立幂等的 `REFERS_TO` 关系(`bridge_vulnerability_cve`) |
### 4. ResilMesh 互操作性
EdgeGuard 映射了 ResilMesh/CRUSOE 的数据模型 —— `CVE`、`CVSSv2/v31/v40`、`Vulnerability`、`Host`、`SoftwareVersion`、`ThreatActor`、`Technique`、`Campaign` 节点均与上游 schema 保持一致。所有 EdgeGuard 管理的节点都带有 `edgeguard_managed = true` 属性,以便在共享图谱中进行干净过滤。完整的对齐细节和正在进行的工作项请参阅 [`docs/RESILMESH_INTEROPERABILITY.md`](docs/RESILMESH_INTEROPERABILITY.md)。
### 5. GraphQL API(兼容 ISIM,端口 4001)
EdgeGuard 在 **端口 4001** 上暴露了一个基于 Strawberry/FastAPI 的 GraphQL 端点 —— 这与 ResilMesh 部署中 ISIM 使用的端口相同。这意味着 ResilMesh 可以像查询 ISIM 一样查询 EdgeGuard 数据,无需特殊的集成工作。
```
# 示例:医疗保健行业中的严重漏洞
query {
vulnerabilities(filter: { zone: "healthcare", minCvss: 9.0 }) {
cveId description status severity
}
}
# 示例:具有完整 CVE 链的活跃指标
query {
indicators(filter: { zone: "energy", activeOnly: true }) {
value indicatorType confidenceScore
}
cve(cveId: "CVE-2024-12345") {
cvssV31 { baseScore attackVector }
}
}
```
所有节点类型均可用:`CVE`、`Vulnerability`、`Indicator`、`ThreatActor`、`Malware`、`Technique`、`Tactic`、`Campaign`。完整的类型覆盖表请参阅 [`docs/ARCHITECTURE.md`](docs/ARCHITECTURE.md)。
### 6. 数据来源(13 个槽位 —— 见 `docs/DATA_SOURCES.md`)
| 来源 | 类型 | 提供的内容 |
|--------|------|------------------|
| **AlienVault OTX** | 威胁情报 | 实时指标脉冲、恶意软件哈希、IP、域名 |
| **CISA KEV** | 漏洞 | 已知被利用漏洞目录(美国政府) |
| **MITRE ATT&CK** | 战术与技术 | 企业攻击矩阵、威胁攻击者画像 |
| **NVD** | 漏洞 | CVE 数据库、严重性评分 (CVSS) |
| **MISP** | 威胁共享 | 枢纽 —— 所有收集器首先汇聚于此(你的部署实例) |
| **VirusTotal** | 恶意软件分析 | 文件/哈希信誉、扫描结果(70多种杀毒引擎);主收集器为 `vt_collector.py`,丰富路径为 `virustotal_collector.py` |
| **AbuseIPDB** | 威胁情报 | IP 信誉和滥用报告 |
| **Feodo Tracker** | 威胁情报 | 银行木马 C&C 服务器 IP |
| **SSL Blacklist** | 威胁情报 | 恶意 SSL 证书指纹 |
| **URLhaus** | 威胁情报 | 恶意 URL 数据库 |
| **CyberCure** | 威胁情报 | IP、URL 和哈希源 |
| **ThreatFox** | 威胁情报 | 带有恶意软件家族上下文的 IOC |
| **能源 / 医疗保健源** | 威胁情报 | 占位收集器(需要 ISAC / 成员资格;默认未激活) |
**各项对你的意义:**
- *CISA KEV* = “这些是黑客实际使用的最危险的漏洞”
- *MITRE ATT&CK* = “这些是黑客用于攻击的技术”
- *VirusTotal* = “我们用 70 多种工具扫描了这个文件,这是它们发现的结果”
- *AbuseIPDB* = “此 IP 因不良行为被举报”
**按行业划分的数据保留期:**
- 医疗保健:24 个月
- 能源:24 个月
- 金融:24 个月
- 全球:24 个月
### 7. 图谱知识库
- 节点:`Indicator`、`CVE`、`Vulnerability`、`Malware`、`ThreatActor`、`Technique`、`Tactic`、`Campaign`
- 关系:`INDICATES`、`EXPLOITS`、`USES`、`USES_TECHNIQUE`、`ATTRIBUTED_TO`、`TARGETS`、`AFFECTS`、`IN_TACTIC`、`IS_SAME_AS`、`REFERS_TO`、`PART_OF`、`RUNS`、`HAS_CVSS_*`
- 所有边均带有 `confidence_score`、`sources[]`、`imported_at`、`match_type`
- 包含置信度评分的完整关系 schema 请参阅 [docs/KNOWLEDGE_GRAPH.md](docs/KNOWLEDGE_GRAPH.md)。
### 8. 实时告警
- 用于即时通知的 NATS 集成
- 基于区域的主题路由(`resilmesh.threats.zone.healthcare.*`)
- 多区域威胁检测
### 9. 生产级就绪的 CLI
- `doctor` —— 诊断连接和配置问题
- `heal` —— 自动修复常见问题
- `validate` —— 检查配置
- `monitor` —— 健康状态面板
### 10. 交互式图谱浏览器
EdgeGuard 内置了一个基于 **Cytoscape.js** 的浏览器端图谱可视化工具。它连接到 FastAPI 后端(`GET /graph/explore`)并渲染来自 Neo4j 的实时数据 —— 无需额外软件。
| 视图 | 展示内容 |
|------|---------------|
| **恶意软件** | 恶意软件家族 → 指标 → 行业(层级结构) |
| **攻击者** | 威胁攻击者 → ATT&CK 技术 → 战术 |
| **指标** | 按区域分组和着色的 IOC |
| **CVEs** | 按 CVSS 调整大小的漏洞;CISA KEV 条目以红色高亮显示 |
**使用方法:** 在浏览器中打开 `docs/visualization.html`,输入 API URL(`http://localhost:8000`)和你的 API 密钥,然后点击 Connect。完整文档请参阅 [`docs/GRAPH_EXPLORER.md`](docs/GRAPH_EXPLORER.md)。
## 📁 项目结构
```
EdgeGuard/
├── src/
│ ├── config.py # Zone keywords, sector limits, get_effective_limit()
│ ├── neo4j_client.py # Graph MERGE logic, CVSS sub-nodes, relationship methods
│ ├── run_misp_to_neo4j.py # MISP → Neo4j (attribute parse; Airflow path). Optional STIX for CLI/export
│ ├── run_pipeline.py # CLI: run collectors → MISP; optional direct Neo4j path
│ ├── build_relationships.py # Graph edges: exact / MITRE-ID (USES actor+malware→technique), co-occurrence INDICATES, …
│ ├── enrichment_jobs.py # IOC decay, Campaign nodes, Vulnerability-CVE bridge
│ ├── query_api.py # FastAPI REST API — sector-filtered queries (port 8000)
│ ├── graphql_api.py # GraphQL API — ISIM-compatible endpoint (port 4001)
│ ├── graphql_schema.py # Strawberry type definitions (CVE, Indicator, Actor …)
│ ├── health_check.py # MISP + Neo4j connectivity checks
│ ├── resilience.py # Circuit breakers and retry logic
│ ├── edgeguard.py # CLI: preflight / stats / dag / checkpoint / doctor / heal / validate / monitor
│ ├── airflow_client.py # Airflow REST API wrapper (used by CLI dag commands)
│ └── collectors/ # One module per data source
│ ├── otx_collector.py
│ ├── nvd_collector.py # Extracts CVSSv2, CVSSv31, CVSSv40
│ ├── cisa_collector.py
│ ├── mitre_collector.py
│ ├── vt_collector.py
│ ├── abuseipdb_collector.py
│ ├── global_feed_collector.py # ThreatFox, URLhaus, CyberCure
│ ├── finance_feed_collector.py # Feodo, SSL Blacklist
│ └── misp_writer.py
├── dags/
│ ├── edgeguard_pipeline.py # 6 DAGs: baseline, high/med/low/daily collectors, Neo4j sync
│ └── edgeguard_metrics_server.py # Optional Prometheus metrics DAG(s)
├── tests/ # Pytest test suite (8+ tests, CI-gated at 30% coverage)
├── docs/ # Full documentation set (architecture, collectors, Airflow, …)
├── docker-compose.yml # Full stack: Neo4j + airflow_postgres + Airflow + REST + GraphQL
├── docker-compose.monitoring.yml # Prometheus + Grafana overlay
├── install.sh # One-command installer (Docker path or --python pip path)
├── Makefile # Shortcuts: make install / update / start / test / lint / health
└── .env.example # Annotated variable reference
```
## 📖 推荐阅读顺序(面向新运维人员)
请在克隆仓库**之后**使用此路径 —— 它与产品的运行方式一致(安装 → Airflow → 首次基线运行):
| 顺序 | 文档 | 目的 |
|-------|-----------|---------|
| **1** | **[docs/SETUP_GUIDE.md](docs/SETUP_GUIDE.md)** | **从这里开始** —— 完整的安装指南(Docker Compose 或 venv)、`.env` 配置、MISP/Neo4j 连接、健康检查 |
| **2** | **[docs/AIRFLOW_DAGS.md](docs/AIRFLOW_DAGS.md)** | 运行和调试 DAG(CLI、修改 `.env` 后的重启、故障排除) |
| **3** | **[docs/BASELINE_SMOKE_TEST.md](docs/BASELINE_SMOKE_TEST.md)** | 安全地执行首次 **`edgeguard_baseline`** 运行(短时间窗口 / 限制) |
然后,当像 **200** 与 **1000** 这样的数字让你感到困惑时,可以打开 **[docs/COLLECTION_AND_SYNC_LIMITS.md](docs/COLLECTION_AND_SYNC_LIMITS.md)**;需要了解 MISP 网络/同步发现机制时,请阅读 **[docs/MISP_SOURCES.md](docs/MISP_SOURCES.md)**。如果同步**在 Neo4j 中成功**但 Airflow 任务显示为**红色**或被**僵尸终结**,请阅读 **[docs/HEARTBEAT.md](docs/HEARTBEAT.md)**。
**完整索引 + “可跳过内容”:** [docs/DOCUMENTATION_AUDIT.md](docs/DOCUMENTATION_AUDIT.md)。
**快速参考:** 端口 → 见上方 **§ 启动的服务**。CLI → 见 **§ edgeguard CLI**。限制 → 见 [docs/COLLECTION_AND_SYNC_LIMITS.md](docs/COLLECTION_AND_SYNC_LIMITS.md)。OOM/僵尸进程 → 见 [docs/HEARTBEAT.md](docs/HEARTBEAT.md)。
本 README 保持为**简短的**入门指引(快速开始、端口、环境变量)。深入的上手指南请参阅 **SETUP_GUIDE**。
## 📚 文档
### 入门指南
| 文档 | 描述 |
|----------|-------------|
| [docs/SETUP_GUIDE.md](docs/SETUP_GUIDE.md) | **从这里开始** —— 完整安装(Docker Compose、venv 或 conda)、`.env` 配置、MISP/Neo4j 连接 |
| [docs/AIRFLOW_DAGS.md](docs/AIRFLOW_DAGS.md) | 运行和调试 Airflow DAG —— CLI、重启、故障排除 |
| [docs/BASELINE_SMOKE_TEST.md](docs/BASELINE_SMOKE_TEST.md) | 安全地执行首次基线运行 —— 短时间窗口、小规模限制 |
| [docs/API_KEYS_SETUP.md](docs/API_KEYS_SETUP.md) | 获取每个 API 密钥的分步说明 |
| [docs/ENVIRONMENTS.md](docs/ENVIRONMENTS.md) | 环境标志(`EDGEGUARD_ENV`)、Python 3.12+、conda/venv 设置 |
| [docs/DOCKER_SETUP_GUIDE.md](docs/DOCKER_SETUP_GUIDE.md) | 高内存调优、全新状态、特定工作站的设置 |
### 参考资料
| 文档 | 描述 |
|----------|-------------|
| [docs/TECHNICAL_SPEC.md](docs/TECHNICAL_SPEC.md) | **规范化 Schema** —— 完整的节点/关系 Cypher 规范、属性类型、约束 |
| [docs/COLLECTORS.md](docs/COLLECTORS.md) | 每个收集器的行为、区域检测、丰富字段、输出示例 |
| [docs/DATA_SOURCES.md](docs/DATA_SOURCES.md) | 所有 13 个数据源 —— 库存、分类、数据量 |
| [docs/DATA_SOURCES_RATE_LIMITS.md](docs/DATA_SOURCES_RATE_LIMITS.md) | API 速率限制、成本、供应商端点参考 |
| [docs/COLLECTION_AND_SYNC_LIMITS.md](docs/COLLECTION_AND_SYNC_LIMITS.md) | 不同的限制何时适用 —— 基线 vs 增量 vs 同步 vs 分块 |
| [docs/NEO4J_SAMPLE_QUERIES.md](docs/NEO4J_SAMPLE_QUERIES.md) | 适用于 Neo4j Browser 的 Cypher 查询示例 |
### 架构与设计
| 文档 | 描述 |
|----------|-------------|
| [docs/ARCHITECTURE.md](docs/ARCHITECTURE.md) | 流水线数据流、区域检测、分块策略 |
| [docs/ARCHITECTURE_DIAGRAMS.md](docs/ARCHITECTURE_DIAGRAMS.md) | **可视化图表**(Mermaid) —— 系统概览、图谱 Schema、流水线流程、重试逻辑、OOM 保护 |
| [docs/KNOWLEDGE_GRAPH.md](docs/KNOWLEDGE_GRAPH.md) | 区域/行业架构、多区域支持、查询示例 |
| [docs/METHODOLOGY.md](docs/METHODOLOGY.md) | 行业分类算法 —— 关键词、评分、阈值 |
| [docs/DATA_QUALITY.md](docs/DATA_QUALITY.md) | 确定性 MERGE 策略、精确匹配规则 |
### 运维
| 文档 | 描述 |
|----------|-------------|
| [docs/HEARTBEAT.md](docs/HEARTBEAT.md) | Airflow 僵尸任务、OOM/SIGKILL、心跳超时 |
| [docs/PROMETHEUS_SETUP.md](docs/PROMETHEUS_SETUP.md) | Prometheus + Grafana 监控栈 |
| [docs/MISP_SOURCES.md](docs/MISP_SOURCES.md) | MISP 作为唯一真实来源、事件发现、元数据前缀 |
| [docs/RESILIENCE_CONFIG.md](docs/RESILIENCE_CONFIG.md) | 熔断器、重试逻辑、超时调优 |
| [docs/SECRETS_MANAGEMENT.md](docs/SECRETS_MANAGEMENT.md) | 凭据处理、环境变量、密钥轮换 |
### ResilMesh 集成
| 文档 | 描述 |
|----------|-------------|
| [docs/RESILMESH_INTEROPERABILITY.md](docs/RESILMESH_INTEROPERABILITY.md) | 集成契约 —— EdgeGuard 生成什么,ResilMesh 消费什么 |
| [docs/RESILMESH_INTEGRATION_GUIDE.md](docs/RESILMESH_INTEGRATION_GUIDE.md) | NATS 消息传递、Neo4j 桥接、告警丰富流程 |
| [docs/RESILMESH_INTEGRATION_TESTING.md](docs/RESILMESH_INTEGRATION_TESTING.md) | 集成验证和测试程序 |
### 工具与可视化
| 文档 | 描述 |
|----------|-------------|
| [docs/GRAPH_EXPLORER.md](docs/GRAPH_EXPLORER.md) | 交互式 Cytoscape.js 图谱浏览器 —— 视图、API、扩展 |
| [docs/DEMO.md](docs/DEMO.md) | 模拟 ResilMesh / NATS 的演示场景 |
### 项目
| 文档 | 描述 |
|----------|-------------|
| [docs/CONTRIBUTING.md](docs/CONTRIBUTING.md) | 检查清单 —— 代码检查、测试、CI 要求 |
| [docs/VERSIONING.md](docs/VERSIONING.md) | CalVer 策略(`YYYY.M.D`) |
| [docs/PRODUCTION_READINESS.md](docs/PRODUCTION_READINESS.md) | 组件状态和上线检查清单 |
| [docs/DEPLOYMENT_READINESS_CHECKLIST.md](docs/DEPLOYMENT_READINESS_CHECKLIST.md) | 有序的部署门禁(`make deploy-check`) |
| [docs/DOCUMENTATION_AUDIT.md](docs/DOCUMENTATION_AUDIT.md) | 文档 ↔ 代码可追溯性表 |
## 🔒 安全
- API 密钥存储在环境变量中(从不硬编码)
- 所有 Cypher 查询均采用**参数化**形式 —— 无字符串插值,可防注入
- SSL/TLS:见下文 **[SSL/TLS (MISP vs Neo4j)](#ssltls-misp-https-vs-neo4j-bolt)** —— **`EDGEGUARD_SSL_VERIFY`** 控制的是指向 MISP 及收集器的 **HTTPS** 请求,**而非** Neo4j Bolt 信任设置
- 凭据已加入 gitignore;Docker 镜像绝不内置密钥
- Prometheus 指标端点默认仅绑定到**本地回环地址**(`127.0.0.1:8001`)
- Airflow 运行在端口 8082 —— 刻意避开了 ResilMesh Temporal(8080)
- 所有外部服务调用均配备熔断器
## 📝 许可证
MIT 许可证 - 详见 LICENSE 文件。
## 🤝 贡献
1. 派生该仓库
2. 创建特性分支
3. 在发起 PR 之前运行 **`make lint`** 和 **`make test`**(或参考 CI 配置)
4. 提交拉取请求 —— **[`docs/CONTRIBUTING.md`](docs/CONTRIBUTING.md)** 包含了完整的检查清单(Ruff、Mypy、pytest、Docker 构建)
**EdgeGuard** —— 边缘基础设施上感知行业的威胁情报系统,为 ResilMesh 互操作性而构建。
## ⚖️ 收集与同步限制
当文档/代码中的 **`1000`** 或 **`2000`** 看似矛盾时,请参考此表。
| 阶段 | 典型上限 | 备注 |
|-------|----------------|-------|
| **基线收集器**(Airflow `edgeguard_baseline`,`run_pipeline --baseline`) | **`BASELINE_COLLECTION_LIMIT`** / **`EDGEGUARD_BASELINE_COLLECTION_LIMIT`** —— 单个外部数据源的最大**条目数**(OTX、NVD、CISA、MITRE、feeds 等)。**`0`** = 不限。 | **不会**配置 MISP→Neo4j 同步。基线 DAG **不会**运行 **`MISPCollector`**。 |
| **增量收集器**(cron 定时 DAG) | **`EDGEGUARD_INCREMENTAL_LIMIT`**(默认每个数据源 **200** 条),可使用 **`EDGEGUARD_MAX_ENTRIES`** 覆盖 | 每次计划运行时每个数据源的条目上限 |
| **MISP → Neo4j 同步** (`run_misp_to_neo4j`) | **事件列表:** 通过 **`GET /events/index`**(随后请求 **`/events`**)带分页获取(每页 **500** 行,最多 **100** 页)+ 客户端过滤(从 **`EDGEGUARD_MISP_EVENT_SEARCH`** 获取的 **`info`** 子串,默认为 **EdgeGuard**,或 **`org.name` == EdgeGuard**)。**回退机制:** PyMISP **`restSearch`** + **`limit: 1000`**。**单事件处理:** 解析 → 去重 → **跨条目边构建** → 节点合并 → 关系批量写入。**Neo4j 内存:** **`EDGEGUARD_NEO4J_SYNC_CHUNK_SIZE`** / **`EDGEGUARD_REL_BATCH_SIZE`**。 | 索引页大小 ≠ Neo4j 合并分块 **1000** —— 详见 [COLLECTION_AND_SYNC_LIMITS.md](docs/COLLECTION_AND_SYNC_LIMITS.md) |
| **`MISPCollector`**(可选 / 默认基线步骤 2 中未包含) | **`/events`** 索引上限为 **`min(3×limit, 2000)`** 或 **2000**;每个事件 **500** 个属性 | 采用增量风格的 **`resolve_collection_limit(..., baseline=False)`** —— **而非**基线 Airflow Variable |
**完整详情:** [docs/COLLECTION_AND_SYNC_LIMITS.md](docs/COLLECTION_AND_SYNC_LIMITS.md)。
## 📋 环境变量
| 变量 | 描述 | 默认值 |
|----------|-------------|---------|
| `MISP_URL` | MISP 服务器 URL | `https://localhost:8443` |
| `MISP_API_KEY` | MISP API 认证密钥 | *(必填)* |
| `EDGEGUARD_MISP_EVENT_SEARCH` | 过滤索引结果时匹配 **`Event.info`** 的子字符串(并传递给 **`restSearch`** 回退机制)。必须与标题(如 `EdgeGuard-…`)保持一致 | `EdgeGuard` |
| `EDGEGUARD_MISP_HTTP_HOST` | 当 MISP vhost 与 URL 主机名不同时使用的可选 HTTP `Host` 头 | *(未设置)* |
| `NEO4J_URI` | Neo4j 连接 URI(Compose **`x-common-env`** 默认为 **`bolt://neo4j:7687`**;宿主机脚本通常使用 **`bolt://localhost:7687`**) | 参见 **`.env.example`** |
| `NEO4J_USER` | Neo4j 用户名 | `neo4j` |
| `NEO4J_PASSWORD` | Neo4j 密码 | *(必填,无默认值)* |
| `NEO4J_DATABASE` | Neo4j 数据库名称 | `neo4j` |
| `EDGEGUARD_SSL_VERIFY` | 验证 **HTTPS** 的 TLS 证书(适用于 MISP API、收集器、MISP→Neo4j 同步的 `requests` / PyMISP)。未设置或缺失时默认为 **`true`**。如果未设置或为空,**`SSL_VERIFY`** 将作为相同的标志被读取(`true`/`false`)。**不会**使用 **`SSL_CERT_VERIFY`** —— 请设置 **`EDGEGUARD_SSL_VERIFY`**。 | `true` |
| `OTX_API_KEY` | AlienVault OTX API 密钥 | *(可选)* |
| `NVD_API_KEY` | NVD API 密钥 | *(可选)* |
| `VIRUSTOTAL_API_KEY` | VirusTotal API 密钥 | *(可选)* |
| `ABUSEIPDB_API_KEY` | AbuseIPDB API 密钥(免费版:每天 1000 次请求) | *(可选)* |
| `THREATFOX_API_KEY` | Abuse.ch ThreatFox API 密钥([auth.abuse.ch](https://auth.abuse.ch/))。无此密钥时,**`collect_threatfox`** 会跳过(如果任务在无密钥的情况下被强制调用 API,将返回 HTTP 401)。 | *(可选)* |
| `EDGEGUARD_COLLECT_SOURCES` | 可选的逗号分隔收集器允许列表(例如 `otx,nvd,cisa,mitre`)。未设置 = 启用全部(受可选密钥跳过限制)。详见 [docs/AIRFLOW_DAGS.md](docs/AIRFLOW_DAGS.md)。 | *(未设置)* |
| `EDGEGUARD_ZONE_DETECT_THRESHOLD` | 在 **`detect_zones_from_text`** 中,行业生效所需的最低加权得分(默认 **1.5**)。调高可执行更严格的标记。 | `1.5` |
| `EDGEGUARD_ZONE_ITEM_THRESHOLD` | 在 **`detect_zones_from_item`** 中的最低综合得分(默认 **1.5**)。 | `1.5` |
| `EDGEGUARD_MISP_PREFETCH_EXISTING_ATTRS` | 在执行 **`push_items`** 之前,加载目标 MISP 事件上已存在的 **`type`+`value`** 并跳过重复项(适用于重新运行、重叠的增量窗口)。 | `true` |
| `EDGEGUARD_OTX_INCREMENTAL_LOOKBACK_DAYS` | 首次 OTX 增量运行(无检查点)时:**`modified_since`** = 当前时间减去此天数。 | `3` |
| `EDGEGUARD_OTX_INCREMENTAL_OVERLAP_SEC` | 查询 **`modified_since`** 时从存储的 OTX 游标中减去的秒数(用于处理时钟偏移/遗漏的边缘脉冲)。 | `300` |
| `EDGEGUARD_OTX_INCREMENTAL_MAX_PAGES` | 每次增量运行时的最大 OTX API 页数(每页随后延迟 **2s**)。 | `25` |
| `EDGEGUARD_MITRE_CONDITIONAL_GET` | MITRE STIX 包:在计划运行时发送 **`If-None-Match`**;若返回 **HTTP 304** 则跳过重新解析/推送。基线运行总是全量下载。 | `true` |
| `EDGEGUARD_NEO4J_SYNC_CHUNK_SIZE` | **MISP→Neo4j 同步** (`run_misp_to_neo4j`):每次 Python 分块最大**解析图谱条目**合并数(限制 Airflow worker 在 Neo4j 写入期间的 **RAM** 使用)。**不是** MISP **事件索引**页面大小(代码中为每页 **500**)或 **`restSearch` `limit: 1000`** 回退机制 —— 详见 [COLLECTION_AND_SYNC_LIMITS.md](docs/COLLECTION_AND_SYNC_LIMITS.md)。默认为 **`500`**。如果在解析后出现 **OOM** / **SIGKILL**,请调低(例如 `250`)。**`0`** 或 **`all`** = **单次处理**(OOM 风险极高)。 | `500` |
| `EDGEGUARD_REL_BATCH_SIZE` | 每次 **`create_misp_relationships_batch`** UNWIND 往返请求的最大**关系定义**数(包括内嵌和跨条目边)。如果在处理巨大事件时遇到 Neo4j **事务超时**,请调低此值。 | `2000` |
| `EDGEGUARD_DEBUG_GC` | 如果设置为**真值**,则在每个 Neo4j **节点**合并分块后运行 **`gc.collect()`**(仅用于诊断;在资源紧凑的 worker 上可能会**增加** RSS 峰值 —— 除非进行性能分析,否则在生产环境中不要设置)。 | *(未设置)* |
| `EDGEGUARD_INCREMENTAL_LIMIT` | 常规 cron 运行中**每个数据源**的最大条目数。`0` = 不限。 | `200` |
| `EDGEGUARD_MAX_ENTRIES` | 硬性全局覆盖 —— 非零时覆盖 `INCREMENTAL_LIMIT`。保持为 `0` 以使用按模式的默认值。 | `0` |
| `EDGEGUARD_ENABLE_METRICS` | 在端口 8001 上启动 Prometheus 指标服务 | `false` |
| `EDGEGUARD_METRICS_HOST` | 指标端点的绑定地址 | `127.0.0.1` |
| `EDGEGUARD_GRAPHQL_PORT` | GraphQL API 的端口 | `4001` |
| `EDGEGUARD_GRAPHQL_PLAYGROUND` | 启用 GraphiQL 浏览器资源管理器 | `false`(默认),仅在开发环境设置为 `true` |
| `AIRFLOW__WEBSERVER__WEB_SERVER_PORT` | Airflow UI 端口 —— 设置为 8082 以避免与 ResilMesh Temporal 冲突 | `8082` |
| `AIRFLOW_POSTGRES_USER` | **仅限 Docker Compose** —— `airflow_postgres` 的 PostgreSQL 用户(用于 Airflow 元数据) | `airflow` |
| `AIRFLOW_POSTGRES_PASSWORD` | **仅限 Docker Compose** —— 该角色的密码(在生产环境中请使用强密码) | `airflow` |
| `AIRFLOW_POSTGRES_DB` | **仅限 Docker Compose** —— 数据库名称 | `airflow` |
| `AIRFLOW__DATABASE__SQL_ALCHEMY_CONN` | 裸机 / 自定义 Airflow:用于元数据的 SQLAlchemy URI(PostgreSQL 或 MySQL)。Compose 会自动设置此项。 | *(Compose:由 `docker-compose.yml` 设置)* |
| `EDGEGUARD_MISP_BATCH_THROTTLE_SEC` | 每次向 MISP 推送 500 个属性的批次之间的暂停时间(秒)。防止大型事件导致内存耗尽。 | `5.0` |
| `EDGEGUARD_MISP_EVENT_FETCH_THROTTLE_SEC` | 同步期间获取连续 MISP 之间的暂停时间(秒)。 | `2.0` |
| `EDGEGUARD_MAX_EVENT_ATTRIBUTES` | MISP→Neo4j 同步:属性计数超过此值的事件将**推迟**到同步运行结束时处理(优先处理较小的事件)。防止在关键数据落库前大型事件导致 worker 发生 OOM。设置为 `0` 可禁用。 | `50000` |
| `EDGEGUARD_DEV_MODE` | 启用控制台跟踪输出(仅限开发) | *(未设置)* |
**MISP→Neo4j 内存:** 大规模的首次同步需要足够的 RAM 来容纳解析后的条目;**单事件** 处理 + Python 端的节点分块 + `merge_*_batch` UNWINDs 保持了峰值使用量的上限。如果同步在 Neo4j 写入期间**以退出码 -9 终止**,请检查 **Docker cgroup** 限制:Compose 默认为 **`airflow`** 服务设置了 **`AIRFLOW_MEMORY_LIMIT=4g`** —— 可在 **`.env`** 中调高,或降低 **`EDGEGUARD_NEO4J_SYNC_CHUNK_SIZE`** / **`EDGEGUARD_REL_BATCH_SIZE`**。详见 [docs/SETUP_GUIDE.md](docs/SETUP_GUIDE.md)、[docs/HEARTBEAT.md](docs/HEARTBEAT.md)、[docs/AIRFLOW_DAGS.md](docs/AIRFLOW_DAGS.md)。除非你了解 OOM 权衡的后果,否则**千万不要**将分块大小设置为 **`0`** / **`all`**。
**MISP Apache 调优(大型基线):** MISP 默认的 Apache `MaxRequestWorkers 150` 在流水线推送大型事件(例如约 95K 个 NVD 属性)时可能会导致内存耗尽。请在 MISP 容器内(`/etc/apache2/mods-enabled/mpm_prefork.conf`)将其**降低至 25**。流水线也会自动节流写入 —— 详见 **`EDGEGUARD_MISP_BATCH_THROTTLE_SEC`**(批次之间默认 5 秒)和 **`EDGEGUARD_MISP_EVENT_FETCH_THROTTLE_SEC`**(事件获取之间默认 2 秒)。完整细节请参阅 [docs/DOCKER_SETUP_GUIDE.md](docs/DOCKER_SETUP_GUIDE.md) § *MISP 性能调优*。
**基线收集**由 Airflow Variables 控制(而非环境变量)。这些上限适用于基线 DAG 中的**每个外部收集器**(OTX、NVD 等) —— **不**适用于 MISP→Neo4j 同步的 MISP 事件搜索(详见 [COLLECTION_AND_SYNC_LIMITS.md](docs/COLLECTION_AND_SYNC_LIMITS.md))。
| Airflow Variable | 描述 | 默认值 |
|-----------------|-------------|---------|
| `BASELINE_COLLECTION_LIMIT` | 基线运行时**单个外部数据源**的条目数。`0` = 不限。 | `0` |
| `BASELINE_DAYS` | 获取多少天的历史记录(NVD、OTX)。 | `730` |
| `EDGEGUARD_BASELINE_DAYS` | *(可选)* 设置后,在基线 DAG 中覆盖 `BASELINE_DAYS`(例如,冒烟测试设为 `7`)。 | *(未设置)* |
| `EDGEGUARD_BASELINE_COLLECTION_LIMIT` | *(可选)* 设置后,在基线 DAG 中覆盖 `BASELINE_COLLECTION_LIMIT`。 | *(未设置)* |
### SSL/TLS:MISP (HTTPS) vs Neo4j (Bolt)
| 层级 | 使用 TLS 的对象 | 在开发环境中如何放宽验证 |
|--------|----------------|-----------------------------------|
| **MISP 与 HTTP 收集器** | `requests`、PyMISP、`MISPHealthCheck` | 设置 **`EDGEGUARD_SSL_VERIFY=false`**(例如使用自签名证书的内部 MISP)。**生产环境:请保持 `true`** 并安装合适的 CA 或信任库。 |
| **MISP → Neo4j 同步** | 所有 MISP HTTP 调用均使用相同的 **`EDGEGUARD_SSL_VERIFY`**(`run_misp_to_neo4j.py` 从此标志设置同步会话的 `verify` 参数)。 | 如果同步任务在 **Airflow** 下运行,请在 **调度器和 worker**(与任务相同的运行环境)上设置该变量,而不仅仅是在收集器容器上 —— 否则 Tier1 可能通过,而 **`full_neo4j_sync`** 会在 MISP 健康检查时失败。 |
| **Neo4j 驱动** | 基于 **`NEO4J_URI`** 的 Bolt 连接 —— **独立于** `EDGEGUARD_SSL_VERIFY` | 使用 URI schemes:**`bolt://`** / **`neo4j://`**(典型本地/LAN),**`bolt+s://`** / **`neo4j+s://`**(TLS + 系统 CA),**`bolt+ssc://`** / **`neo4j+ssc://`**(TLS,根据驱动语义信任自签名证书)。自签名 Neo4j **不能**通过关闭 `EDGEGUARD_SSL_VERIFY` 来解决。 |
更多细节:**`docs/AIRFLOW_DAGS.md`**(故障排除)、**`src/config.py`**(`SSL_VERIFY`)。
### .env 文件示例
```
# MISP 配置
export MISP_URL="https://misp.example.com:8443"
export MISP_API_KEY="your-misp-api-key-here"
# Neo4j 配置
export NEO4J_URI="bolt://localhost:7687"
export NEO4J_USER="neo4j"
export NEO4J_PASSWORD="your-secure-password"
# 仅限开发:使用自签名 HTTPS 的内部 MISP(在 Airflow worker 上也要设置,用于 MISP→Neo4j 同步)
# export EDGEGUARD_SSL_VERIFY=false
# 可选:外部威胁 feeds
export OTX_API_KEY="your-otx-key"
export NVD_API_KEY="your-nvd-key"
# Docker Compose — Airflow 元数据 Postgres 的可选覆盖(默认:airflow/airflow/airflow)
# AIRFLOW_POSTGRES_USER=airflow
# AIRFLOW_POSTGRES_PASSWORD=change-me-in-production
# AIRFLOW_POSTGRES_DB=airflow
```
## 🏥 健康检查
EdgeGuard 提供了一个全面的健康检查模块来监控系统状态。
### 用法
```
# 运行健康检查
python src/health_check.py
```
### 编程式用法
```
from health_check import health_check, format_health_status
# 获取健康状态
status = health_check()
print(status)
# {
# 'misp_healthy': True,
# 'neo4j_healthy': True,
# 'misp_details': {...},
# 'neo4j_details': {...},
# 'last_pipeline_run': '2024-01-15T10:30:00',
# 'node_counts': {...},
# 'timestamp': '2024-01-15T12:00:00',
# 'overall_healthy': True
# }
# 格式化为可读字符串
print(format_health_status(status))
```
### HTTP APIs (FastAPI)
| 服务 | URL | 行为 |
|---------|-----|----------|
| **REST** (`query_api`) | `GET /health` | 始终返回 **HTTP 200**。响应体包含:`status`(`ok` / `degraded`)、`neo4j_connected`(通过 `Neo4jClient.health_check()` 测试 Neo4j **ping + APOC**)。 |
| **GraphQL** (`graphql_api`) | `GET /health` | 仅当 Neo4j 健康时返回 **HTTP 200**(相同检查)。如果驱动缺失或 `health_check()` 失败则返回 **HTTP 503**(适用于严格的负载均衡器 / k8s 探针)。 |
REST API 上的其他路由在运行查询之前,会使用快速的 **`Neo4jClient.is_connected()`**(驱动存在)作为前置守卫。
### 健康检查输出(`health_check.py`)
- **MISP 状态**:连接健康状况和响应时间
- **Neo4j 状态**:连接健康状况、版本和响应时间
- **节点计数**:图谱中所有节点类型的当前计数
- **上次流水线运行**:上次成功同步的时间戳
## 📊 指标
指标模块用于跟踪流水线的执行统计数据。
### 用法
```
# 查看指标摘要
python src/metrics.py
```
### 编程式用法
```
from metrics import PipelineMetrics
# 初始化指标
metrics = PipelineMetrics()
# 开始追踪一次运行
metrics.start_run()
# ... 运行你的 pipeline ...
# 记录完成
metrics.end_run(
success=True,
nodes_created=150,
relationships_created=75,
events_processed=10,
indicators_synced=100,
vulnerabilities_synced=50
)
# 获取摘要
summary = metrics.get_summary()
print(f"Success rate: {summary['success_rate_percent']}%")
# 获取近期运行
recent = metrics.get_recent_runs(5)
```
### 跟踪指标
| 指标 | 描述 |
|--------|-------------|
| 总运行次数 | 流水线执行的总次数 |
| 成功/失败计数 | 成功与失败运行的细分 |
| 成功率 | 成功运行的百分比 |
| 持续时间 | 每次运行耗费的时间 |
| 创建的节点数 | 每次运行创建的节点数量 |
| 创建的关系数 | 每次运行创建的关系数量 |
| 处理的事件数 | 每次运行处理的 MISP 事件数量 |
| 按类型划分 | 指标、漏洞、恶意软件、攻击者、技术 |
### 指标存储
指标会持久化到 `data/metrics.json` 中,以便在重启后保留。
## 📈 当前状态
EdgeGuard v2026.3.28 已达到**生产测试就绪**状态。完整的流水线已在 Docker (Mac Mini) 上验证通过,所有 CI 绿灯,BugBot 检查通过。
### ✅ 已实现
- **11 个活跃的源收集器**,包含丰富的字段(+ 可选的 VirusTotal 丰富模块,+ 行业源占位符):OTX、NVD(CVSSv2/v31/v40 + CISA KEV + 版本约束)、CISA、MITRE ATT&CK(技术 + 攻击者 + 恶意软件 + **工具**)、VirusTotal(YARA 规则、Sigma 规则、沙箱判定)、AbuseIPDB(滥用类别)、ThreatFox、URLhaus(url_status、reporter)、CyberCure、Feodo(last_online)、SSL Blacklist
- **以 MISP 作为唯一真实来源**:所有收集器首先写入 MISP 并带有完整的出处标签;使用 OTX_META/TF_META/NVD_META 注释前缀进行元数据往返
- **Neo4j 知识图谱**:具备生产就绪能力的图谱,支持 MERGE 去重、13 种确定性关系类型、针对 CVE 和 Malware 节点的跨源 IS_SAME_AS 去重
- **ResilMesh Schema 对齐**:`CVE`、`Vulnerability`(带有 `status: LIST`)、`CVSSv2/v30/v31/v40`(双向)、`Tool`、`REFERS_TO` 桥接、所有节点类型上的 `edgeguard_managed`(包括 13 种 ResilMesh 资产层方法)、IP.tag 作为 LIST —— 均与官方 ResilMesh 数据模型匹配
- **STIX 2.1 流水线**:主转换路径,带有直接的 MISP 回退机制
- **基线 + 增量模式**:一次性深度历史加载和计划的 2-3 天增量更新;针对并发 Airflow worker 的检查点文件锁
- **6 个主要 Airflow DAG** 位于 `edgeguard_pipeline.py` 中(+ 可选指标 DAG);UI 位于端口 **8082**(故意避开 Temporal);基线和增量之间的同步冲突保护机制
- **同步后丰富**:IOC 置信度衰减、Campaign 节点物化(包含对已淘汰指标的清理)、Vulnerability↔CVE 桥接、共现置信度校准
- **多区域行业过滤**:医疗保健、能源、金融、全球 —— 151 个加权关键词及负向排除项,Sector 节点创建中的区域验证白名单
- **GraphQL API**(端口 4001):Strawberry/FastAPI 带有 CORS;所有节点类型(包括 Tool)均可查询;解析了 14 个丰富字段;可通过 `EDGEGUARD_GRAPHQL_PLAYGROUND=true` 选择启用 GraphiQL
- **FastAPI REST API**(端口 8000):按行业过滤、限速、带认证的端点;`/graph/explore` 包含 4 个视图(恶意软件、攻击者、指标、CVE)
- **交互式图谱浏览器**:基于浏览器的 Cytoscape.js 可视化工具,使用实时 Neo4j 数据,支持 CISA KEV 高亮、搜索、区域过滤([docs/GRAPH_EXPLORER.md](docs/GRAPH_EXPLORER.md))
- **全栈 Docker Compose**:Neo4j + Airflow + REST API + GraphQL 均在一个 `docker compose up -d` 中提供;也支持带有外部 MISP+Neo4j 的 conda/venv/裸机部署
- **CI/CD**:代码检查、类型检查、pytest(161 个测试,30% 覆盖率门禁)、Docker 构建、pip-audit、BugBot —— 全部绿灯
- **健康检查 + 指标**:MISP(带有 PyMISP 版本兼容性检测)、Neo4j(带有 30 秒超时)、Airflow、NATS;Prometheus/Grafana 监控栈
- **生产级 CLI**(16 个命令):`preflight`(8 类就绪检查)、`stats --full`(按区域/来源的节点计数 + MISP 细分)、`dag status/kill`(Airflow 运行监控 + 卡住运行的恢复)、`checkpoint status/clear`(基线进度 + 增量游标)、`doctor`、`heal`、`validate`、`monitor`、`version`
- **熔断器 + 重试**:修复了 HALF_OPEN 死锁、单调时间、所有外部服务调用的弹性模式
- **UTC 感知时间戳**:24 个文件中的所有 70 多个 datetime 实例均使用 `timezone.utc`(兼容 Python 3.12)
### 🚧 进行中 / 计划中
- **SoftwareVersion 桥接**:`(Vulnerability)-[:IN]->(SoftwareVersion)-[:ON]->(Host)` —— NVD 的 `version_constraints` 现已收集并存储;CPE 到 SoftwareVersion 的映射待定(参见 [`docs/RESILMESH_INTEROPERABILITY.md`](docs/RESILMESH_INTEROPERABILITY.md) §4.4)
- **ISIM GraphQL Schema 扩展**:EdgeGuard 类型(Indicator、ThreatActor、Malware、Technique、Campaign、Tool)通过端口 4001 暴露 —— 计划与 ResilMesh 团队进行正式的 ISIM Schema 合并
- **Neo4j 角色分离**:目前使用共享凭据;目标状态是使用 Neo4j Enterprise 角色或将 GraphQL访问层
- **实时 NATS 告警**:演示已实现(`demo/mock_resilmesh_publisher.py`),生产加固正在进行中
- **医疗保健和能源行业特定的源收集器**:HC3、E-ISAC —— 需要 ISAC 成员资格;占位收集器已就绪
_最后更新:2026-03-28_