Kopanov/EdgeGuard-Knowledge-Graph

GitHub: Kopanov/EdgeGuard-Knowledge-Graph

面向边缘基础设施的图增强威胁情报平台,通过多源采集、MISP 归一化和 Neo4j 知识图谱实现行业定制的威胁关联分析。

Stars: 0 | Forks: 0

EdgeGuard Logo

EdgeGuard

边缘基础设施上基于图增强 xAI 的威胁情报

CI Python 3.12+ Neo4j 5.27 MIT License Version

IICT-BAS + Ratio1 | 由 ResilMesh - 第二次公开征集 资助

本项目通过 ResilMesh 第二次公开征集 获得了资金支持,
由欧盟“Horizon Europe”研究与创新计划资助。

## 🤔 EdgeGuard 是什么? 想象一下你公司的计算机网络是一座城市。EdgeGuard 就像是拥有一套**智能安全系统**,它可以: 1. 从多个外部源**收集**威胁情报(11 种活跃收集器类型 + 2 个行业占位符 —— 见 `docs/DATA_SOURCES.md`) 2. 将所有信息**暂存**到 MISP —— 这是唯一的真实数据源 —— 包含完整的出处和时间戳 3. **构建**一个 Neo4j 知识图谱,展示威胁、攻击者和行业之间的关系 4. 自动**丰富**图谱:衰减过期 IOC 的置信度,将攻击者归组到攻击活动中,校准边权重 5. 按行业(医疗保健、能源、金融)进行**筛选**,让你只看到与你所在行业相关的信息 6. 与 ResilMesh/CRUSOE **集成**,进行网络拓扑关联 ### 工作流 ``` External sources (11 active feeds + 2 sector placeholders) │ ▼ Collectors ──────────────────────────────────────────────────────────────────┐ (OTX, NVD, CISA, MITRE, VirusTotal, AbuseIPDB, │ ThreatFox, URLhaus, Feodo, SSL BL, CyberCure, …) │ │ │ ▼ │ MISP ◄────── Single Source of Truth ──────── all data lands here first │ (tagged: zone, source, TLP, confidence) │ │ │ ▼ │ STIX 2.1 conversion (primary) / direct fallback │ │ │ ▼ │ Neo4j Knowledge Graph │ (Indicators, CVEs, Actors, Techniques, │ Malware, Campaigns — all cross-linked) │ │ │ ▼ │ Enrichment Jobs (post-sync) │ • IOC Confidence Decay (stale indicators retire automatically) │ • Campaign Node Builder (ThreatActor → Malware → IOC chains) │ • Co-occurrence Calibration (confidence weighted by event tightness) │ │ │ ▼ │ FastAPI REST API (port 8000) ────────────────────────────────────────────────┘ GraphQL API (port 4001) ← ISIM-compatible; ResilMesh queries EdgeGuard + NATS real-time alerts the same way it queries ISIM + ResilMesh/CRUSOE integration ``` **两种运行模式:** - **基线模式**(Baseline,运行一次):完整的历史数据加载 —— 每个行业最多 24 个月的数据,默认**每个数据源**不限条目(`BASELINE_COLLECTION_LIMIT=0`),需手动触发 - **增量模式**(Incremental,定时执行):2-3 天的时间窗口,默认每个数据源拉取 **200** 条目(`EDGEGUARD_INCREMENTAL_LIMIT`),通过 Airflow 定时调度运行 **限制是按阶段划分的**(每个数据源的基线上限 ≠ MISP→Neo4j 拉取数量 ≠ Neo4j 内存分块)。详情请参阅此 README 后文的 **[docs/COLLECTION_AND_SYNC_LIMITS.md](docs/COLLECTION_AND_SYNC_LIMITS.md)** 以及 **§ 收集与同步限制** 章节。 ## 🎯 为什么选择 EdgeGuard? | 问题 | EdgeGuard 的解决方案 | |---------|-------------------| | 威胁数据过多 | 按行业筛选(医疗保健 / 能源 / 金融 / 全球) | | 难以看清关联 | 图数据库映射了 攻击者 → 技术 → CVE → IOC 的关系 | | 过期指标充斥图谱 | IOC 置信度衰减会自动淘汰旧的 IOC | | 无法判断哪些情报可信 | 由来源数量和事件紧密度校准置信度 | | 多个互不连通的数据源 | 所有信息流都通过 MISP 作为唯一的真实来源 | | 需要与现有平台集成 | 完全兼容 ResilMesh/CRUSOE 数据模型 | ### 确定性的摄取与链接(而非模糊图匹配) EdgeGuard **有意**采用**确定性的身份与合并规则**(在约定的键上使用 Neo4j **`MERGE`**,采用**精确的**标识符如 **CVE** 和 **MITRE `mitre_id`**,以及文档中规定的范围共现),而不是**模糊的**字符串/图推理(例如用于生产实体链接的子串 **`CONTAINS`**)。这种权衡有利于在威胁情报中实现**可审计性**、**可重复的**同步以及**更少的假阳性边**(错误的 攻击者 ↔ 技术 ↔ IOC 链接具有极高的影响力)。关键边(例如 **`USES`**)的关系逻辑在 **[docs/ARCHITECTURE.md](docs/ARCHITECTURE.md)** 和 **[docs/KNOWLEDGE_GRAPH.md](docs/KNOWLEDGE_GRAPH.md)** 中有详细说明;合并行为、数据出处以及**精确与模糊匹配**的历史记录详见 **[docs/DATA_QUALITY.md](docs/DATA_QUALITY.md)**。 ## 🏢 适用人群? - **医院** → 检测对患者系统的攻击 - **能源公司** → 监控电网安全 - **银行** → 发现金融欺诈 - 任何具有**边缘基础设施的组织** ## 🚀 快速开始 ### 前置条件 - Docker + Docker Compose(推荐 —— 包含 Neo4j,**`airflow_postgres`**,Airflow,REST API,GraphQL) - **或者** **Python 3.12+**(用于 pip/venv 安装;`pyproject.toml` / CI 与此版本匹配)+ Neo4j + MISP。对于使用 PostgreSQL 元数据库的 **pip 安装的 Airflow**,请使用 **`pip install ".[airflow]"`** 或 **`apache-airflow[postgres]~=2.11`**(见 `requirements.txt`)。*注意:Airflow 2.11 上游支持 Python 3.11+;EdgeGuard 统一使用 3.12+。* ### 选项 A —— 一键安装(Docker,推荐) ``` git clone https://github.com/Kopanov/EdgeGuard-Knowledge-Graph.git cd EdgeGuard-Knowledge-Graph ./install.sh # detects Docker, copies .env, builds image, starts stack ``` `install.sh` 会处理所有事情:检查 Docker,复制 `.env.example → .env`,构建镜像,并运行 `docker compose up -d`。完成后,请使用你的 API 密钥编辑 `.env` 并重新启动。 **安装后验证(在任何安装方法后运行此命令):** ``` python src/edgeguard.py preflight # comprehensive readiness check (env vars, APIs, Neo4j, MISP, Airflow, disk) python src/edgeguard.py doctor # checks MISP, Neo4j, Airflow, NATS, schema, API keys, version compatibility python src/edgeguard.py stats # quick dashboard: node counts, last sync, pipeline runs ``` `preflight` 会运行 7 个检查类别(环境变量、API 连通性、Neo4j schema、MISP 健康、Airflow、磁盘、熔断器),退出码 0 表示就绪,1 表示发现问题。`stats` 会显示系统中当前的数据概况。 **EdgeGuard CLI 快速参考:** | 命令 | 用途 | |---------|---------| | `edgeguard preflight` | 运行前准备检查(7 个类别,CI 环境使用 `--strict`) | | `edgeguard stats` | 快速面板:节点计数、上次同步、流水线运行情况 | | `edgeguard stats --full` | + 按区域、来源和 MISP 事件/区域的细分 | | `edgeguard stats --by-zone` | 每个区域的节点计数(显示多区域重叠) | | `edgeguard stats --by-source` | 每个来源的节点计数(nvd, otx, cisa 等) | | `edgeguard stats --misp` | 按来源和区域统计的 MISP 事件/属性计数 | | `edgeguard dag status` | Airflow DAG 运行状态(带颜色标识,`--state running` 过滤器) | | `edgeguard dag kill` | 强制失败卡住的 DAG 运行(保留检查点,支持 `--dry-run`) | | `edgeguard checkpoint status` | 每个数据源的基线进度 + 增量游标 | | `edgeguard checkpoint clear` | 清除基线状态(默认保留增量状态) | | `edgeguard clear neo4j` | 删除 Neo4j 中的所有图谱数据(保留约束) | | `edgeguard clear misp` | 从 MISP 中删除所有 EdgeGuard 事件 | | `edgeguard clear all` | 完全重置:Neo4j + MISP + 检查点(`--force` 跳过确认) | | `edgeguard doctor` | 诊断连通性(MISP、Neo4j、Airflow、NATS、schema) | | `edgeguard heal` | 自动修复(重置熔断器、清除锁、重试) | | `edgeguard validate` | 配置验证(速率限制、密码、Neo4j schema) | | `edgeguard monitor` | 实时健康状态显示 | | `edgeguard version` | CalVer 版本号 + git SHA | 所有命令均支持 `--help`。数据命令支持 `--json` 以便自动化处理。 **接下来(推荐的阅读顺序):** [docs/SETUP_GUIDE.md](docs/SETUP_GUIDE.md)(完成连接) → [docs/AIRFLOW_DAGS.md](docs/AIRFLOW_DAGS.md)(运行 DAG) → [docs/BASELINE_SMOKE_TEST.md](docs/BASELINE_SMOKE_TEST.md)(首次基线运行)。见下文 **§ 推荐阅读顺序**。 如果你偏好明确的步骤,也可以使用 `make`: ``` make install # same as ./install.sh make update # git pull (ff-only) + auto: Docker Compose or pip path (see install.sh --help) make start # docker compose up -d (if already installed) make stop # docker compose down make logs # tail all service logs make monitoring # add Prometheus + Grafana make help # list all available targets ``` 启动的服务(`docker compose up -d`): ``` Neo4j Browser → http://localhost:7474 Airflow UI → http://localhost:8082 (8082 avoids ResilMesh Temporal conflict) REST API → http://localhost:8000/health GraphQL API → http://localhost:4001/graphql Graph Explorer → open docs/visualization.html in browser, connect to http://localhost:8000 airflow_postgres → PostgreSQL for Airflow metadata (Docker network only, not exposed) ``` 可选的监控栈(`make monitoring` 或 `docker compose -f docker-compose.monitoring.yml up -d`): ``` Prometheus → http://localhost:9090 Grafana → http://localhost:3000 (set GRAFANA_ADMIN_PASSWORD in .env) AlertManager → http://localhost:9093 Metrics endpoint → http://localhost:8001/metrics (EdgeGuard pipeline metrics) ``` Airflow 的调度器/UI 状态使用 **`airflow_postgres`**;如果需要,可以在 `.env` 中使用 **`AIRFLOW_POSTGRES_*`** 覆盖凭据。**`airflow`** 服务镜像由 **`Dockerfile.airflow`** **构建**(包含 EdgeGuard Python 依赖,如 **neo4j** / **pymisp**);在依赖或 **`src/`** 逻辑更改后,请运行 **`docker compose build airflow`**。**`docker-compose.yml`** `x-common-env` 将 `.env` 中的 MISP→Neo4j 调优参数(**`EDGEGUARD_NEO4J_SYNC_CHUNK_SIZE`**、**`EDGEGUARD_REL_BATCH_SIZE`**、**`EDGEGUARD_DEBUG_GC`**、**`EDGEGUARD_MISP_EVENT_SEARCH`** 等)转发到 **airflow** / **api** / **graphql**。运维和故障排除:[`docs/AIRFLOW_DAGS.md`](docs/AIRFLOW_DAGS.md)。 ### 选项 B —— Python / pip(无 Docker) ``` git clone https://github.com/Kopanov/EdgeGuard-Knowledge-Graph.git cd EdgeGuard-Knowledge-Graph ./install.sh --python # creates .venv, pip installs all extras # 或等效方式: make install-py # same thing via Makefile # 或手动方式: pip install ".[api,graphql,monitoring]" cp .env.example .env # fill in MISP_URL, MISP_API_KEY, NEO4J_PASSWORD python src/health_check.py # verify connectivity ``` 用于开发(添加 ruff、mypy、pytest): ``` ./install.sh --python --dev # 或: make install-dev make ci # lint + type-check + tests in one command ``` **已经克隆了?** 拉取最新代码并刷新(当 `docker` + compose v2 + `docker-compose.yml` 可用时自动选择 **Docker Compose**;否则选择 **pip / .venv**): ``` ./install.sh --update # auto (Docker or pip) ./install.sh --update --docker # force Docker only (fail if unavailable) ./install.sh --update --python # force pip / .venv ./install.sh --update --python --dev # include dev extras ``` ### 选项 C —— Conda 环境 ``` conda create -n edgeguard python=3.12 conda activate edgeguard git clone https://github.com/Kopanov/EdgeGuard-Knowledge-Graph.git cd EdgeGuard-Knowledge-Graph pip install ".[api,graphql,monitoring]" cp .env.example .env # fill in MISP_URL, MISP_API_KEY, NEO4J_PASSWORD python src/edgeguard.py doctor # validate setup ``` 这要求 **MISP 和 Neo4j 在外部运行**(作为系统服务、Docker 容器或远程实例均可)。在 `.env` 中通过 `MISP_URL` 和 `NEO4J_URI` 指定它们的地址。完整的 conda/venv/边缘设置详情请参阅 [`docs/ENVIRONMENTS.md`](docs/ENVIRONMENTS.md)。 更详细的运维说明(CLI 表、路径):[`docs/SETUP_GUIDE.md`](docs/SETUP_GUIDE.md)。 ### 使用 direnv 自动激活环境(日常开发推荐) [direnv](https://direnv.net) 可以让你在 `cd` 进入项目时自动激活 `.venv` 和 `.env` —— 离开时自动取消激活。不需要手动执行 `source .venv/bin/activate`。 ``` # 一次性设置(在你的机器上执行一次) brew install direnv # macOS; or: apt install direnv echo 'eval "$(direnv hook zsh)"' >> ~/.zshrc # or bash / fish source ~/.zshrc # 在项目目录内 — 允许 direnv 一次 cd EdgeGuard-Knowledge-Graph direnv allow # 从现在起:cd 进入 → 自动加载 venv + .env。cd 退出 → 停用。 ``` 在执行 `direnv allow` 后,`python` 将指向 `.venv/bin/python`,所有 `.env` 变量都会被导出,并且 `src/` 会被加入到 `PYTHONPATH` 中 —— 在后续的任何终端会话中均无需手动操作。 ### 基于 Airflow DAG 的工作流(生产环境推荐) EdgeGuard 在 `dags/edgeguard_pipeline.py` 中内置了 **6** 个主要 DAG(1 个基线 + 5 个增量/同步调度)。可选的 **指标** DAG 位于 `dags/edgeguard_metrics_server.py`(`edgeguard_metrics_server`、`edgeguard_metrics_server_scheduled`)。部署完成后: ``` # 1. 启动堆栈(包括 Airflow + airflow_postgres) docker compose up -d # 然后打开 Airflow UI(见上文的“已启动服务”)。仅 Pip 安装 Airflow:pip install ".[airflow]" # (安装 apache-airflow[postgres])并根据 Airflow DB 文档设置 AIRFLOW__DATABASE__SQL_ALCHEMY_CONN。 # 2. 触发一次 baseline DAG(完整历史加载) # Airflow UI → DAGs → edgeguard_baseline → 触发 ▶ # 可选择先配置这些 Airflow 变量: # BASELINE_DAYS = 730 (2 年历史) # BASELINE_COLLECTION_LIMIT = 0 (0 = 无限制) # 快速 7 天冒烟测试:在 .env 中设置 EDGEGUARD_BASELINE_DAYS=7 和 # EDGEGUARD_BASELINE_COLLECTION_LIMIT=1000,重启 airflow,然后触发。 # 参见 docs/BASELINE_SMOKE_TEST.md # 3. Incremental cron DAG 在 baseline 后自动启动: # edgeguard_pipeline → 每 30 分钟 (OTX) # edgeguard_medium_freq → 每 4 小时 (CISA, VirusTotal) # edgeguard_low_freq → 每 8 小时 (NVD) # edgeguard_daily → 每天凌晨 2 点 (MITRE, AbuseIPDB, feeds) # edgeguard_neo4j_sync → 每 3 天 (MISP → Neo4j + enrichment) ``` ### 手动 / 独立命令 ``` cd src # 检查系统健康状态 python health_check.py # 运行 MISP → Neo4j 同步(增量 — 最近 3 天) python run_misp_to_neo4j.py # 运行完整历史同步 python run_misp_to_neo4j.py --full # 手动运行同步后 enrichment 任务 python enrichment_jobs.py # 重建跨来源图关系 python build_relationships.py # 通过 CLI 检查系统健康状态 python edgeguard.py doctor # 自动修复常见问题 python edgeguard.py heal # 从 git 拉取最新代码并重新安装(从克隆根目录;等同于 ./install.sh --update) edgeguard update # auto: Docker Compose if available, else pip edgeguard --update # synonym for the above edgeguard update --docker # force Docker only edgeguard update --python # force pip / .venv edgeguard version # release CalVer + git short SHA (no .env) ``` 版本号使用**日历版本控制**(`pyproject.toml` 中的 `YYYY.M.D`)。详见 [`docs/VERSIONING.md`](docs/VERSIONING.md)。 ## 📊 核心功能 ### 1. 多区域威胁检测 - **医疗保健**(24 个月数据) - **能源**(24 个月数据) - **金融**(24 个月数据) - **全球**(24 个月数据) - 条目可以具有**多个区域**(例如,`['finance', 'healthcare']`) ### 2. 以 MISP 作为唯一真实来源 所有收集器首先只写入 MISP。Neo4j 仅从 MISP 中获取数据——从不直接写入。这带来了: - 为每条情报提供完整的审计轨迹和出处 - 能够随时从 MISP 重放、审计或重新同步图谱 - 在原始收集和图谱丰富之间实现清晰分离 ### 3. 同步后丰富流水线 在每次 MISP → Neo4j 同步之后,会自动运行 **四个** 丰富作业(`enrichment_jobs.run_all_enrichment_jobs`): | 作业 | 功能描述 | |-----|-------------| | **IOC 置信度衰减** | 对过期的指标/漏洞进行分层衰减;当最后更新时间**超过 365 天**时予以淘汰(`decay_ioc_confidence`) | | **攻击活动节点构建器** | 从 ThreatActor → Malware → Indicator 链中创建 `Campaign` 节点(`build_campaign_nodes`) | | **共现校准** | 根据 MISP 事件大小,调整 `misp_cooccurrence` / `misp_cor` 中带有 `source_id` 的 `INDICATES`/`EXPLOITS` 边的权重(`calibrate_cooccurrence_confidence`) | | **漏洞↔CVE 桥接** | 在 `Vulnerability` 和 `CVE` 之间建立幂等的 `REFERS_TO` 关系(`bridge_vulnerability_cve`) | ### 4. ResilMesh 互操作性 EdgeGuard 映射了 ResilMesh/CRUSOE 的数据模型 —— `CVE`、`CVSSv2/v31/v40`、`Vulnerability`、`Host`、`SoftwareVersion`、`ThreatActor`、`Technique`、`Campaign` 节点均与上游 schema 保持一致。所有 EdgeGuard 管理的节点都带有 `edgeguard_managed = true` 属性,以便在共享图谱中进行干净过滤。完整的对齐细节和正在进行的工作项请参阅 [`docs/RESILMESH_INTEROPERABILITY.md`](docs/RESILMESH_INTEROPERABILITY.md)。 ### 5. GraphQL API(兼容 ISIM,端口 4001) EdgeGuard 在 **端口 4001** 上暴露了一个基于 Strawberry/FastAPI 的 GraphQL 端点 —— 这与 ResilMesh 部署中 ISIM 使用的端口相同。这意味着 ResilMesh 可以像查询 ISIM 一样查询 EdgeGuard 数据,无需特殊的集成工作。 ``` # 示例:医疗保健行业中的严重漏洞 query { vulnerabilities(filter: { zone: "healthcare", minCvss: 9.0 }) { cveId description status severity } } # 示例:具有完整 CVE 链的活跃指标 query { indicators(filter: { zone: "energy", activeOnly: true }) { value indicatorType confidenceScore } cve(cveId: "CVE-2024-12345") { cvssV31 { baseScore attackVector } } } ``` 所有节点类型均可用:`CVE`、`Vulnerability`、`Indicator`、`ThreatActor`、`Malware`、`Technique`、`Tactic`、`Campaign`。完整的类型覆盖表请参阅 [`docs/ARCHITECTURE.md`](docs/ARCHITECTURE.md)。 ### 6. 数据来源(13 个槽位 —— 见 `docs/DATA_SOURCES.md`) | 来源 | 类型 | 提供的内容 | |--------|------|------------------| | **AlienVault OTX** | 威胁情报 | 实时指标脉冲、恶意软件哈希、IP、域名 | | **CISA KEV** | 漏洞 | 已知被利用漏洞目录(美国政府) | | **MITRE ATT&CK** | 战术与技术 | 企业攻击矩阵、威胁攻击者画像 | | **NVD** | 漏洞 | CVE 数据库、严重性评分 (CVSS) | | **MISP** | 威胁共享 | 枢纽 —— 所有收集器首先汇聚于此(你的部署实例) | | **VirusTotal** | 恶意软件分析 | 文件/哈希信誉、扫描结果(70多种杀毒引擎);主收集器为 `vt_collector.py`,丰富路径为 `virustotal_collector.py` | | **AbuseIPDB** | 威胁情报 | IP 信誉和滥用报告 | | **Feodo Tracker** | 威胁情报 | 银行木马 C&C 服务器 IP | | **SSL Blacklist** | 威胁情报 | 恶意 SSL 证书指纹 | | **URLhaus** | 威胁情报 | 恶意 URL 数据库 | | **CyberCure** | 威胁情报 | IP、URL 和哈希源 | | **ThreatFox** | 威胁情报 | 带有恶意软件家族上下文的 IOC | | **能源 / 医疗保健源** | 威胁情报 | 占位收集器(需要 ISAC / 成员资格;默认未激活) | **各项对你的意义:** - *CISA KEV* = “这些是黑客实际使用的最危险的漏洞” - *MITRE ATT&CK* = “这些是黑客用于攻击的技术” - *VirusTotal* = “我们用 70 多种工具扫描了这个文件,这是它们发现的结果” - *AbuseIPDB* = “此 IP 因不良行为被举报” **按行业划分的数据保留期:** - 医疗保健:24 个月 - 能源:24 个月 - 金融:24 个月 - 全球:24 个月 ### 7. 图谱知识库 - 节点:`Indicator`、`CVE`、`Vulnerability`、`Malware`、`ThreatActor`、`Technique`、`Tactic`、`Campaign` - 关系:`INDICATES`、`EXPLOITS`、`USES`、`USES_TECHNIQUE`、`ATTRIBUTED_TO`、`TARGETS`、`AFFECTS`、`IN_TACTIC`、`IS_SAME_AS`、`REFERS_TO`、`PART_OF`、`RUNS`、`HAS_CVSS_*` - 所有边均带有 `confidence_score`、`sources[]`、`imported_at`、`match_type` - 包含置信度评分的完整关系 schema 请参阅 [docs/KNOWLEDGE_GRAPH.md](docs/KNOWLEDGE_GRAPH.md)。 ### 8. 实时告警 - 用于即时通知的 NATS 集成 - 基于区域的主题路由(`resilmesh.threats.zone.healthcare.*`) - 多区域威胁检测 ### 9. 生产级就绪的 CLI - `doctor` —— 诊断连接和配置问题 - `heal` —— 自动修复常见问题 - `validate` —— 检查配置 - `monitor` —— 健康状态面板 ### 10. 交互式图谱浏览器 EdgeGuard 内置了一个基于 **Cytoscape.js** 的浏览器端图谱可视化工具。它连接到 FastAPI 后端(`GET /graph/explore`)并渲染来自 Neo4j 的实时数据 —— 无需额外软件。 | 视图 | 展示内容 | |------|---------------| | **恶意软件** | 恶意软件家族 → 指标 → 行业(层级结构) | | **攻击者** | 威胁攻击者 → ATT&CK 技术 → 战术 | | **指标** | 按区域分组和着色的 IOC | | **CVEs** | 按 CVSS 调整大小的漏洞;CISA KEV 条目以红色高亮显示 | **使用方法:** 在浏览器中打开 `docs/visualization.html`,输入 API URL(`http://localhost:8000`)和你的 API 密钥,然后点击 Connect。完整文档请参阅 [`docs/GRAPH_EXPLORER.md`](docs/GRAPH_EXPLORER.md)。 ## 📁 项目结构 ``` EdgeGuard/ ├── src/ │ ├── config.py # Zone keywords, sector limits, get_effective_limit() │ ├── neo4j_client.py # Graph MERGE logic, CVSS sub-nodes, relationship methods │ ├── run_misp_to_neo4j.py # MISP → Neo4j (attribute parse; Airflow path). Optional STIX for CLI/export │ ├── run_pipeline.py # CLI: run collectors → MISP; optional direct Neo4j path │ ├── build_relationships.py # Graph edges: exact / MITRE-ID (USES actor+malware→technique), co-occurrence INDICATES, … │ ├── enrichment_jobs.py # IOC decay, Campaign nodes, Vulnerability-CVE bridge │ ├── query_api.py # FastAPI REST API — sector-filtered queries (port 8000) │ ├── graphql_api.py # GraphQL API — ISIM-compatible endpoint (port 4001) │ ├── graphql_schema.py # Strawberry type definitions (CVE, Indicator, Actor …) │ ├── health_check.py # MISP + Neo4j connectivity checks │ ├── resilience.py # Circuit breakers and retry logic │ ├── edgeguard.py # CLI: preflight / stats / dag / checkpoint / doctor / heal / validate / monitor │ ├── airflow_client.py # Airflow REST API wrapper (used by CLI dag commands) │ └── collectors/ # One module per data source │ ├── otx_collector.py │ ├── nvd_collector.py # Extracts CVSSv2, CVSSv31, CVSSv40 │ ├── cisa_collector.py │ ├── mitre_collector.py │ ├── vt_collector.py │ ├── abuseipdb_collector.py │ ├── global_feed_collector.py # ThreatFox, URLhaus, CyberCure │ ├── finance_feed_collector.py # Feodo, SSL Blacklist │ └── misp_writer.py ├── dags/ │ ├── edgeguard_pipeline.py # 6 DAGs: baseline, high/med/low/daily collectors, Neo4j sync │ └── edgeguard_metrics_server.py # Optional Prometheus metrics DAG(s) ├── tests/ # Pytest test suite (8+ tests, CI-gated at 30% coverage) ├── docs/ # Full documentation set (architecture, collectors, Airflow, …) ├── docker-compose.yml # Full stack: Neo4j + airflow_postgres + Airflow + REST + GraphQL ├── docker-compose.monitoring.yml # Prometheus + Grafana overlay ├── install.sh # One-command installer (Docker path or --python pip path) ├── Makefile # Shortcuts: make install / update / start / test / lint / health └── .env.example # Annotated variable reference ``` ## 📖 推荐阅读顺序(面向新运维人员) 请在克隆仓库**之后**使用此路径 —— 它与产品的运行方式一致(安装 → Airflow → 首次基线运行): | 顺序 | 文档 | 目的 | |-------|-----------|---------| | **1** | **[docs/SETUP_GUIDE.md](docs/SETUP_GUIDE.md)** | **从这里开始** —— 完整的安装指南(Docker Compose 或 venv)、`.env` 配置、MISP/Neo4j 连接、健康检查 | | **2** | **[docs/AIRFLOW_DAGS.md](docs/AIRFLOW_DAGS.md)** | 运行和调试 DAG(CLI、修改 `.env` 后的重启、故障排除) | | **3** | **[docs/BASELINE_SMOKE_TEST.md](docs/BASELINE_SMOKE_TEST.md)** | 安全地执行首次 **`edgeguard_baseline`** 运行(短时间窗口 / 限制) | 然后,当像 **200** 与 **1000** 这样的数字让你感到困惑时,可以打开 **[docs/COLLECTION_AND_SYNC_LIMITS.md](docs/COLLECTION_AND_SYNC_LIMITS.md)**;需要了解 MISP 网络/同步发现机制时,请阅读 **[docs/MISP_SOURCES.md](docs/MISP_SOURCES.md)**。如果同步**在 Neo4j 中成功**但 Airflow 任务显示为**红色**或被**僵尸终结**,请阅读 **[docs/HEARTBEAT.md](docs/HEARTBEAT.md)**。 **完整索引 + “可跳过内容”:** [docs/DOCUMENTATION_AUDIT.md](docs/DOCUMENTATION_AUDIT.md)。 **快速参考:** 端口 → 见上方 **§ 启动的服务**。CLI → 见 **§ edgeguard CLI**。限制 → 见 [docs/COLLECTION_AND_SYNC_LIMITS.md](docs/COLLECTION_AND_SYNC_LIMITS.md)。OOM/僵尸进程 → 见 [docs/HEARTBEAT.md](docs/HEARTBEAT.md)。 本 README 保持为**简短的**入门指引(快速开始、端口、环境变量)。深入的上手指南请参阅 **SETUP_GUIDE**。 ## 📚 文档 ### 入门指南 | 文档 | 描述 | |----------|-------------| | [docs/SETUP_GUIDE.md](docs/SETUP_GUIDE.md) | **从这里开始** —— 完整安装(Docker Compose、venv 或 conda)、`.env` 配置、MISP/Neo4j 连接 | | [docs/AIRFLOW_DAGS.md](docs/AIRFLOW_DAGS.md) | 运行和调试 Airflow DAG —— CLI、重启、故障排除 | | [docs/BASELINE_SMOKE_TEST.md](docs/BASELINE_SMOKE_TEST.md) | 安全地执行首次基线运行 —— 短时间窗口、小规模限制 | | [docs/API_KEYS_SETUP.md](docs/API_KEYS_SETUP.md) | 获取每个 API 密钥的分步说明 | | [docs/ENVIRONMENTS.md](docs/ENVIRONMENTS.md) | 环境标志(`EDGEGUARD_ENV`)、Python 3.12+、conda/venv 设置 | | [docs/DOCKER_SETUP_GUIDE.md](docs/DOCKER_SETUP_GUIDE.md) | 高内存调优、全新状态、特定工作站的设置 | ### 参考资料 | 文档 | 描述 | |----------|-------------| | [docs/TECHNICAL_SPEC.md](docs/TECHNICAL_SPEC.md) | **规范化 Schema** —— 完整的节点/关系 Cypher 规范、属性类型、约束 | | [docs/COLLECTORS.md](docs/COLLECTORS.md) | 每个收集器的行为、区域检测、丰富字段、输出示例 | | [docs/DATA_SOURCES.md](docs/DATA_SOURCES.md) | 所有 13 个数据源 —— 库存、分类、数据量 | | [docs/DATA_SOURCES_RATE_LIMITS.md](docs/DATA_SOURCES_RATE_LIMITS.md) | API 速率限制、成本、供应商端点参考 | | [docs/COLLECTION_AND_SYNC_LIMITS.md](docs/COLLECTION_AND_SYNC_LIMITS.md) | 不同的限制何时适用 —— 基线 vs 增量 vs 同步 vs 分块 | | [docs/NEO4J_SAMPLE_QUERIES.md](docs/NEO4J_SAMPLE_QUERIES.md) | 适用于 Neo4j Browser 的 Cypher 查询示例 | ### 架构与设计 | 文档 | 描述 | |----------|-------------| | [docs/ARCHITECTURE.md](docs/ARCHITECTURE.md) | 流水线数据流、区域检测、分块策略 | | [docs/ARCHITECTURE_DIAGRAMS.md](docs/ARCHITECTURE_DIAGRAMS.md) | **可视化图表**(Mermaid) —— 系统概览、图谱 Schema、流水线流程、重试逻辑、OOM 保护 | | [docs/KNOWLEDGE_GRAPH.md](docs/KNOWLEDGE_GRAPH.md) | 区域/行业架构、多区域支持、查询示例 | | [docs/METHODOLOGY.md](docs/METHODOLOGY.md) | 行业分类算法 —— 关键词、评分、阈值 | | [docs/DATA_QUALITY.md](docs/DATA_QUALITY.md) | 确定性 MERGE 策略、精确匹配规则 | ### 运维 | 文档 | 描述 | |----------|-------------| | [docs/HEARTBEAT.md](docs/HEARTBEAT.md) | Airflow 僵尸任务、OOM/SIGKILL、心跳超时 | | [docs/PROMETHEUS_SETUP.md](docs/PROMETHEUS_SETUP.md) | Prometheus + Grafana 监控栈 | | [docs/MISP_SOURCES.md](docs/MISP_SOURCES.md) | MISP 作为唯一真实来源、事件发现、元数据前缀 | | [docs/RESILIENCE_CONFIG.md](docs/RESILIENCE_CONFIG.md) | 熔断器、重试逻辑、超时调优 | | [docs/SECRETS_MANAGEMENT.md](docs/SECRETS_MANAGEMENT.md) | 凭据处理、环境变量、密钥轮换 | ### ResilMesh 集成 | 文档 | 描述 | |----------|-------------| | [docs/RESILMESH_INTEROPERABILITY.md](docs/RESILMESH_INTEROPERABILITY.md) | 集成契约 —— EdgeGuard 生成什么,ResilMesh 消费什么 | | [docs/RESILMESH_INTEGRATION_GUIDE.md](docs/RESILMESH_INTEGRATION_GUIDE.md) | NATS 消息传递、Neo4j 桥接、告警丰富流程 | | [docs/RESILMESH_INTEGRATION_TESTING.md](docs/RESILMESH_INTEGRATION_TESTING.md) | 集成验证和测试程序 | ### 工具与可视化 | 文档 | 描述 | |----------|-------------| | [docs/GRAPH_EXPLORER.md](docs/GRAPH_EXPLORER.md) | 交互式 Cytoscape.js 图谱浏览器 —— 视图、API、扩展 | | [docs/DEMO.md](docs/DEMO.md) | 模拟 ResilMesh / NATS 的演示场景 | ### 项目 | 文档 | 描述 | |----------|-------------| | [docs/CONTRIBUTING.md](docs/CONTRIBUTING.md) | 检查清单 —— 代码检查、测试、CI 要求 | | [docs/VERSIONING.md](docs/VERSIONING.md) | CalVer 策略(`YYYY.M.D`) | | [docs/PRODUCTION_READINESS.md](docs/PRODUCTION_READINESS.md) | 组件状态和上线检查清单 | | [docs/DEPLOYMENT_READINESS_CHECKLIST.md](docs/DEPLOYMENT_READINESS_CHECKLIST.md) | 有序的部署门禁(`make deploy-check`) | | [docs/DOCUMENTATION_AUDIT.md](docs/DOCUMENTATION_AUDIT.md) | 文档 ↔ 代码可追溯性表 | ## 🔒 安全 - API 密钥存储在环境变量中(从不硬编码) - 所有 Cypher 查询均采用**参数化**形式 —— 无字符串插值,可防注入 - SSL/TLS:见下文 **[SSL/TLS (MISP vs Neo4j)](#ssltls-misp-https-vs-neo4j-bolt)** —— **`EDGEGUARD_SSL_VERIFY`** 控制的是指向 MISP 及收集器的 **HTTPS** 请求,**而非** Neo4j Bolt 信任设置 - 凭据已加入 gitignore;Docker 镜像绝不内置密钥 - Prometheus 指标端点默认仅绑定到**本地回环地址**(`127.0.0.1:8001`) - Airflow 运行在端口 8082 —— 刻意避开了 ResilMesh Temporal(8080) - 所有外部服务调用均配备熔断器 ## 📝 许可证 MIT 许可证 - 详见 LICENSE 文件。 ## 🤝 贡献 1. 派生该仓库 2. 创建特性分支 3. 在发起 PR 之前运行 **`make lint`** 和 **`make test`**(或参考 CI 配置) 4. 提交拉取请求 —— **[`docs/CONTRIBUTING.md`](docs/CONTRIBUTING.md)** 包含了完整的检查清单(Ruff、Mypy、pytest、Docker 构建) **EdgeGuard** —— 边缘基础设施上感知行业的威胁情报系统,为 ResilMesh 互操作性而构建。 ## ⚖️ 收集与同步限制 当文档/代码中的 **`1000`** 或 **`2000`** 看似矛盾时,请参考此表。 | 阶段 | 典型上限 | 备注 | |-------|----------------|-------| | **基线收集器**(Airflow `edgeguard_baseline`,`run_pipeline --baseline`) | **`BASELINE_COLLECTION_LIMIT`** / **`EDGEGUARD_BASELINE_COLLECTION_LIMIT`** —— 单个外部数据源的最大**条目数**(OTX、NVD、CISA、MITRE、feeds 等)。**`0`** = 不限。 | **不会**配置 MISP→Neo4j 同步。基线 DAG **不会**运行 **`MISPCollector`**。 | | **增量收集器**(cron 定时 DAG) | **`EDGEGUARD_INCREMENTAL_LIMIT`**(默认每个数据源 **200** 条),可使用 **`EDGEGUARD_MAX_ENTRIES`** 覆盖 | 每次计划运行时每个数据源的条目上限 | | **MISP → Neo4j 同步** (`run_misp_to_neo4j`) | **事件列表:** 通过 **`GET /events/index`**(随后请求 **`/events`**)带分页获取(每页 **500** 行,最多 **100** 页)+ 客户端过滤(从 **`EDGEGUARD_MISP_EVENT_SEARCH`** 获取的 **`info`** 子串,默认为 **EdgeGuard**,或 **`org.name` == EdgeGuard**)。**回退机制:** PyMISP **`restSearch`** + **`limit: 1000`**。**单事件处理:** 解析 → 去重 → **跨条目边构建** → 节点合并 → 关系批量写入。**Neo4j 内存:** **`EDGEGUARD_NEO4J_SYNC_CHUNK_SIZE`** / **`EDGEGUARD_REL_BATCH_SIZE`**。 | 索引页大小 ≠ Neo4j 合并分块 **1000** —— 详见 [COLLECTION_AND_SYNC_LIMITS.md](docs/COLLECTION_AND_SYNC_LIMITS.md) | | **`MISPCollector`**(可选 / 默认基线步骤 2 中未包含) | **`/events`** 索引上限为 **`min(3×limit, 2000)`** 或 **2000**;每个事件 **500** 个属性 | 采用增量风格的 **`resolve_collection_limit(..., baseline=False)`** —— **而非**基线 Airflow Variable | **完整详情:** [docs/COLLECTION_AND_SYNC_LIMITS.md](docs/COLLECTION_AND_SYNC_LIMITS.md)。 ## 📋 环境变量 | 变量 | 描述 | 默认值 | |----------|-------------|---------| | `MISP_URL` | MISP 服务器 URL | `https://localhost:8443` | | `MISP_API_KEY` | MISP API 认证密钥 | *(必填)* | | `EDGEGUARD_MISP_EVENT_SEARCH` | 过滤索引结果时匹配 **`Event.info`** 的子字符串(并传递给 **`restSearch`** 回退机制)。必须与标题(如 `EdgeGuard-…`)保持一致 | `EdgeGuard` | | `EDGEGUARD_MISP_HTTP_HOST` | 当 MISP vhost 与 URL 主机名不同时使用的可选 HTTP `Host` 头 | *(未设置)* | | `NEO4J_URI` | Neo4j 连接 URI(Compose **`x-common-env`** 默认为 **`bolt://neo4j:7687`**;宿主机脚本通常使用 **`bolt://localhost:7687`**) | 参见 **`.env.example`** | | `NEO4J_USER` | Neo4j 用户名 | `neo4j` | | `NEO4J_PASSWORD` | Neo4j 密码 | *(必填,无默认值)* | | `NEO4J_DATABASE` | Neo4j 数据库名称 | `neo4j` | | `EDGEGUARD_SSL_VERIFY` | 验证 **HTTPS** 的 TLS 证书(适用于 MISP API、收集器、MISP→Neo4j 同步的 `requests` / PyMISP)。未设置或缺失时默认为 **`true`**。如果未设置或为空,**`SSL_VERIFY`** 将作为相同的标志被读取(`true`/`false`)。**不会**使用 **`SSL_CERT_VERIFY`** —— 请设置 **`EDGEGUARD_SSL_VERIFY`**。 | `true` | | `OTX_API_KEY` | AlienVault OTX API 密钥 | *(可选)* | | `NVD_API_KEY` | NVD API 密钥 | *(可选)* | | `VIRUSTOTAL_API_KEY` | VirusTotal API 密钥 | *(可选)* | | `ABUSEIPDB_API_KEY` | AbuseIPDB API 密钥(免费版:每天 1000 次请求) | *(可选)* | | `THREATFOX_API_KEY` | Abuse.ch ThreatFox API 密钥([auth.abuse.ch](https://auth.abuse.ch/))。无此密钥时,**`collect_threatfox`** 会跳过(如果任务在无密钥的情况下被强制调用 API,将返回 HTTP 401)。 | *(可选)* | | `EDGEGUARD_COLLECT_SOURCES` | 可选的逗号分隔收集器允许列表(例如 `otx,nvd,cisa,mitre`)。未设置 = 启用全部(受可选密钥跳过限制)。详见 [docs/AIRFLOW_DAGS.md](docs/AIRFLOW_DAGS.md)。 | *(未设置)* | | `EDGEGUARD_ZONE_DETECT_THRESHOLD` | 在 **`detect_zones_from_text`** 中,行业生效所需的最低加权得分(默认 **1.5**)。调高可执行更严格的标记。 | `1.5` | | `EDGEGUARD_ZONE_ITEM_THRESHOLD` | 在 **`detect_zones_from_item`** 中的最低综合得分(默认 **1.5**)。 | `1.5` | | `EDGEGUARD_MISP_PREFETCH_EXISTING_ATTRS` | 在执行 **`push_items`** 之前,加载目标 MISP 事件上已存在的 **`type`+`value`** 并跳过重复项(适用于重新运行、重叠的增量窗口)。 | `true` | | `EDGEGUARD_OTX_INCREMENTAL_LOOKBACK_DAYS` | 首次 OTX 增量运行(无检查点)时:**`modified_since`** = 当前时间减去此天数。 | `3` | | `EDGEGUARD_OTX_INCREMENTAL_OVERLAP_SEC` | 查询 **`modified_since`** 时从存储的 OTX 游标中减去的秒数(用于处理时钟偏移/遗漏的边缘脉冲)。 | `300` | | `EDGEGUARD_OTX_INCREMENTAL_MAX_PAGES` | 每次增量运行时的最大 OTX API 页数(每页随后延迟 **2s**)。 | `25` | | `EDGEGUARD_MITRE_CONDITIONAL_GET` | MITRE STIX 包:在计划运行时发送 **`If-None-Match`**;若返回 **HTTP 304** 则跳过重新解析/推送。基线运行总是全量下载。 | `true` | | `EDGEGUARD_NEO4J_SYNC_CHUNK_SIZE` | **MISP→Neo4j 同步** (`run_misp_to_neo4j`):每次 Python 分块最大**解析图谱条目**合并数(限制 Airflow worker 在 Neo4j 写入期间的 **RAM** 使用)。**不是** MISP **事件索引**页面大小(代码中为每页 **500**)或 **`restSearch` `limit: 1000`** 回退机制 —— 详见 [COLLECTION_AND_SYNC_LIMITS.md](docs/COLLECTION_AND_SYNC_LIMITS.md)。默认为 **`500`**。如果在解析后出现 **OOM** / **SIGKILL**,请调低(例如 `250`)。**`0`** 或 **`all`** = **单次处理**(OOM 风险极高)。 | `500` | | `EDGEGUARD_REL_BATCH_SIZE` | 每次 **`create_misp_relationships_batch`** UNWIND 往返请求的最大**关系定义**数(包括内嵌和跨条目边)。如果在处理巨大事件时遇到 Neo4j **事务超时**,请调低此值。 | `2000` | | `EDGEGUARD_DEBUG_GC` | 如果设置为**真值**,则在每个 Neo4j **节点**合并分块后运行 **`gc.collect()`**(仅用于诊断;在资源紧凑的 worker 上可能会**增加** RSS 峰值 —— 除非进行性能分析,否则在生产环境中不要设置)。 | *(未设置)* | | `EDGEGUARD_INCREMENTAL_LIMIT` | 常规 cron 运行中**每个数据源**的最大条目数。`0` = 不限。 | `200` | | `EDGEGUARD_MAX_ENTRIES` | 硬性全局覆盖 —— 非零时覆盖 `INCREMENTAL_LIMIT`。保持为 `0` 以使用按模式的默认值。 | `0` | | `EDGEGUARD_ENABLE_METRICS` | 在端口 8001 上启动 Prometheus 指标服务 | `false` | | `EDGEGUARD_METRICS_HOST` | 指标端点的绑定地址 | `127.0.0.1` | | `EDGEGUARD_GRAPHQL_PORT` | GraphQL API 的端口 | `4001` | | `EDGEGUARD_GRAPHQL_PLAYGROUND` | 启用 GraphiQL 浏览器资源管理器 | `false`(默认),仅在开发环境设置为 `true` | | `AIRFLOW__WEBSERVER__WEB_SERVER_PORT` | Airflow UI 端口 —— 设置为 8082 以避免与 ResilMesh Temporal 冲突 | `8082` | | `AIRFLOW_POSTGRES_USER` | **仅限 Docker Compose** —— `airflow_postgres` 的 PostgreSQL 用户(用于 Airflow 元数据) | `airflow` | | `AIRFLOW_POSTGRES_PASSWORD` | **仅限 Docker Compose** —— 该角色的密码(在生产环境中请使用强密码) | `airflow` | | `AIRFLOW_POSTGRES_DB` | **仅限 Docker Compose** —— 数据库名称 | `airflow` | | `AIRFLOW__DATABASE__SQL_ALCHEMY_CONN` | 裸机 / 自定义 Airflow:用于元数据的 SQLAlchemy URI(PostgreSQL 或 MySQL)。Compose 会自动设置此项。 | *(Compose:由 `docker-compose.yml` 设置)* | | `EDGEGUARD_MISP_BATCH_THROTTLE_SEC` | 每次向 MISP 推送 500 个属性的批次之间的暂停时间(秒)。防止大型事件导致内存耗尽。 | `5.0` | | `EDGEGUARD_MISP_EVENT_FETCH_THROTTLE_SEC` | 同步期间获取连续 MISP 之间的暂停时间(秒)。 | `2.0` | | `EDGEGUARD_MAX_EVENT_ATTRIBUTES` | MISP→Neo4j 同步:属性计数超过此值的事件将**推迟**到同步运行结束时处理(优先处理较小的事件)。防止在关键数据落库前大型事件导致 worker 发生 OOM。设置为 `0` 可禁用。 | `50000` | | `EDGEGUARD_DEV_MODE` | 启用控制台跟踪输出(仅限开发) | *(未设置)* | **MISP→Neo4j 内存:** 大规模的首次同步需要足够的 RAM 来容纳解析后的条目;**单事件** 处理 + Python 端的节点分块 + `merge_*_batch` UNWINDs 保持了峰值使用量的上限。如果同步在 Neo4j 写入期间**以退出码 -9 终止**,请检查 **Docker cgroup** 限制:Compose 默认为 **`airflow`** 服务设置了 **`AIRFLOW_MEMORY_LIMIT=4g`** —— 可在 **`.env`** 中调高,或降低 **`EDGEGUARD_NEO4J_SYNC_CHUNK_SIZE`** / **`EDGEGUARD_REL_BATCH_SIZE`**。详见 [docs/SETUP_GUIDE.md](docs/SETUP_GUIDE.md)、[docs/HEARTBEAT.md](docs/HEARTBEAT.md)、[docs/AIRFLOW_DAGS.md](docs/AIRFLOW_DAGS.md)。除非你了解 OOM 权衡的后果,否则**千万不要**将分块大小设置为 **`0`** / **`all`**。 **MISP Apache 调优(大型基线):** MISP 默认的 Apache `MaxRequestWorkers 150` 在流水线推送大型事件(例如约 95K 个 NVD 属性)时可能会导致内存耗尽。请在 MISP 容器内(`/etc/apache2/mods-enabled/mpm_prefork.conf`)将其**降低至 25**。流水线也会自动节流写入 —— 详见 **`EDGEGUARD_MISP_BATCH_THROTTLE_SEC`**(批次之间默认 5 秒)和 **`EDGEGUARD_MISP_EVENT_FETCH_THROTTLE_SEC`**(事件获取之间默认 2 秒)。完整细节请参阅 [docs/DOCKER_SETUP_GUIDE.md](docs/DOCKER_SETUP_GUIDE.md) § *MISP 性能调优*。 **基线收集**由 Airflow Variables 控制(而非环境变量)。这些上限适用于基线 DAG 中的**每个外部收集器**(OTX、NVD 等) —— **不**适用于 MISP→Neo4j 同步的 MISP 事件搜索(详见 [COLLECTION_AND_SYNC_LIMITS.md](docs/COLLECTION_AND_SYNC_LIMITS.md))。 | Airflow Variable | 描述 | 默认值 | |-----------------|-------------|---------| | `BASELINE_COLLECTION_LIMIT` | 基线运行时**单个外部数据源**的条目数。`0` = 不限。 | `0` | | `BASELINE_DAYS` | 获取多少天的历史记录(NVD、OTX)。 | `730` | | `EDGEGUARD_BASELINE_DAYS` | *(可选)* 设置后,在基线 DAG 中覆盖 `BASELINE_DAYS`(例如,冒烟测试设为 `7`)。 | *(未设置)* | | `EDGEGUARD_BASELINE_COLLECTION_LIMIT` | *(可选)* 设置后,在基线 DAG 中覆盖 `BASELINE_COLLECTION_LIMIT`。 | *(未设置)* | ### SSL/TLS:MISP (HTTPS) vs Neo4j (Bolt) | 层级 | 使用 TLS 的对象 | 在开发环境中如何放宽验证 | |--------|----------------|-----------------------------------| | **MISP 与 HTTP 收集器** | `requests`、PyMISP、`MISPHealthCheck` | 设置 **`EDGEGUARD_SSL_VERIFY=false`**(例如使用自签名证书的内部 MISP)。**生产环境:请保持 `true`** 并安装合适的 CA 或信任库。 | | **MISP → Neo4j 同步** | 所有 MISP HTTP 调用均使用相同的 **`EDGEGUARD_SSL_VERIFY`**(`run_misp_to_neo4j.py` 从此标志设置同步会话的 `verify` 参数)。 | 如果同步任务在 **Airflow** 下运行,请在 **调度器和 worker**(与任务相同的运行环境)上设置该变量,而不仅仅是在收集器容器上 —— 否则 Tier1 可能通过,而 **`full_neo4j_sync`** 会在 MISP 健康检查时失败。 | | **Neo4j 驱动** | 基于 **`NEO4J_URI`** 的 Bolt 连接 —— **独立于** `EDGEGUARD_SSL_VERIFY` | 使用 URI schemes:**`bolt://`** / **`neo4j://`**(典型本地/LAN),**`bolt+s://`** / **`neo4j+s://`**(TLS + 系统 CA),**`bolt+ssc://`** / **`neo4j+ssc://`**(TLS,根据驱动语义信任自签名证书)。自签名 Neo4j **不能**通过关闭 `EDGEGUARD_SSL_VERIFY` 来解决。 | 更多细节:**`docs/AIRFLOW_DAGS.md`**(故障排除)、**`src/config.py`**(`SSL_VERIFY`)。 ### .env 文件示例 ``` # MISP 配置 export MISP_URL="https://misp.example.com:8443" export MISP_API_KEY="your-misp-api-key-here" # Neo4j 配置 export NEO4J_URI="bolt://localhost:7687" export NEO4J_USER="neo4j" export NEO4J_PASSWORD="your-secure-password" # 仅限开发:使用自签名 HTTPS 的内部 MISP(在 Airflow worker 上也要设置,用于 MISP→Neo4j 同步) # export EDGEGUARD_SSL_VERIFY=false # 可选:外部威胁 feeds export OTX_API_KEY="your-otx-key" export NVD_API_KEY="your-nvd-key" # Docker Compose — Airflow 元数据 Postgres 的可选覆盖(默认:airflow/airflow/airflow) # AIRFLOW_POSTGRES_USER=airflow # AIRFLOW_POSTGRES_PASSWORD=change-me-in-production # AIRFLOW_POSTGRES_DB=airflow ``` ## 🏥 健康检查 EdgeGuard 提供了一个全面的健康检查模块来监控系统状态。 ### 用法 ``` # 运行健康检查 python src/health_check.py ``` ### 编程式用法 ``` from health_check import health_check, format_health_status # 获取健康状态 status = health_check() print(status) # { # 'misp_healthy': True, # 'neo4j_healthy': True, # 'misp_details': {...}, # 'neo4j_details': {...}, # 'last_pipeline_run': '2024-01-15T10:30:00', # 'node_counts': {...}, # 'timestamp': '2024-01-15T12:00:00', # 'overall_healthy': True # } # 格式化为可读字符串 print(format_health_status(status)) ``` ### HTTP APIs (FastAPI) | 服务 | URL | 行为 | |---------|-----|----------| | **REST** (`query_api`) | `GET /health` | 始终返回 **HTTP 200**。响应体包含:`status`(`ok` / `degraded`)、`neo4j_connected`(通过 `Neo4jClient.health_check()` 测试 Neo4j **ping + APOC**)。 | | **GraphQL** (`graphql_api`) | `GET /health` | 仅当 Neo4j 健康时返回 **HTTP 200**(相同检查)。如果驱动缺失或 `health_check()` 失败则返回 **HTTP 503**(适用于严格的负载均衡器 / k8s 探针)。 | REST API 上的其他路由在运行查询之前,会使用快速的 **`Neo4jClient.is_connected()`**(驱动存在)作为前置守卫。 ### 健康检查输出(`health_check.py`) - **MISP 状态**:连接健康状况和响应时间 - **Neo4j 状态**:连接健康状况、版本和响应时间 - **节点计数**:图谱中所有节点类型的当前计数 - **上次流水线运行**:上次成功同步的时间戳 ## 📊 指标 指标模块用于跟踪流水线的执行统计数据。 ### 用法 ``` # 查看指标摘要 python src/metrics.py ``` ### 编程式用法 ``` from metrics import PipelineMetrics # 初始化指标 metrics = PipelineMetrics() # 开始追踪一次运行 metrics.start_run() # ... 运行你的 pipeline ... # 记录完成 metrics.end_run( success=True, nodes_created=150, relationships_created=75, events_processed=10, indicators_synced=100, vulnerabilities_synced=50 ) # 获取摘要 summary = metrics.get_summary() print(f"Success rate: {summary['success_rate_percent']}%") # 获取近期运行 recent = metrics.get_recent_runs(5) ``` ### 跟踪指标 | 指标 | 描述 | |--------|-------------| | 总运行次数 | 流水线执行的总次数 | | 成功/失败计数 | 成功与失败运行的细分 | | 成功率 | 成功运行的百分比 | | 持续时间 | 每次运行耗费的时间 | | 创建的节点数 | 每次运行创建的节点数量 | | 创建的关系数 | 每次运行创建的关系数量 | | 处理的事件数 | 每次运行处理的 MISP 事件数量 | | 按类型划分 | 指标、漏洞、恶意软件、攻击者、技术 | ### 指标存储 指标会持久化到 `data/metrics.json` 中,以便在重启后保留。 ## 📈 当前状态 EdgeGuard v2026.3.28 已达到**生产测试就绪**状态。完整的流水线已在 Docker (Mac Mini) 上验证通过,所有 CI 绿灯,BugBot 检查通过。 ### ✅ 已实现 - **11 个活跃的源收集器**,包含丰富的字段(+ 可选的 VirusTotal 丰富模块,+ 行业源占位符):OTX、NVD(CVSSv2/v31/v40 + CISA KEV + 版本约束)、CISA、MITRE ATT&CK(技术 + 攻击者 + 恶意软件 + **工具**)、VirusTotal(YARA 规则、Sigma 规则、沙箱判定)、AbuseIPDB(滥用类别)、ThreatFox、URLhaus(url_status、reporter)、CyberCure、Feodo(last_online)、SSL Blacklist - **以 MISP 作为唯一真实来源**:所有收集器首先写入 MISP 并带有完整的出处标签;使用 OTX_META/TF_META/NVD_META 注释前缀进行元数据往返 - **Neo4j 知识图谱**:具备生产就绪能力的图谱,支持 MERGE 去重、13 种确定性关系类型、针对 CVE 和 Malware 节点的跨源 IS_SAME_AS 去重 - **ResilMesh Schema 对齐**:`CVE`、`Vulnerability`(带有 `status: LIST`)、`CVSSv2/v30/v31/v40`(双向)、`Tool`、`REFERS_TO` 桥接、所有节点类型上的 `edgeguard_managed`(包括 13 种 ResilMesh 资产层方法)、IP.tag 作为 LIST —— 均与官方 ResilMesh 数据模型匹配 - **STIX 2.1 流水线**:主转换路径,带有直接的 MISP 回退机制 - **基线 + 增量模式**:一次性深度历史加载和计划的 2-3 天增量更新;针对并发 Airflow worker 的检查点文件锁 - **6 个主要 Airflow DAG** 位于 `edgeguard_pipeline.py` 中(+ 可选指标 DAG);UI 位于端口 **8082**(故意避开 Temporal);基线和增量之间的同步冲突保护机制 - **同步后丰富**:IOC 置信度衰减、Campaign 节点物化(包含对已淘汰指标的清理)、Vulnerability↔CVE 桥接、共现置信度校准 - **多区域行业过滤**:医疗保健、能源、金融、全球 —— 151 个加权关键词及负向排除项,Sector 节点创建中的区域验证白名单 - **GraphQL API**(端口 4001):Strawberry/FastAPI 带有 CORS;所有节点类型(包括 Tool)均可查询;解析了 14 个丰富字段;可通过 `EDGEGUARD_GRAPHQL_PLAYGROUND=true` 选择启用 GraphiQL - **FastAPI REST API**(端口 8000):按行业过滤、限速、带认证的端点;`/graph/explore` 包含 4 个视图(恶意软件、攻击者、指标、CVE) - **交互式图谱浏览器**:基于浏览器的 Cytoscape.js 可视化工具,使用实时 Neo4j 数据,支持 CISA KEV 高亮、搜索、区域过滤([docs/GRAPH_EXPLORER.md](docs/GRAPH_EXPLORER.md)) - **全栈 Docker Compose**:Neo4j + Airflow + REST API + GraphQL 均在一个 `docker compose up -d` 中提供;也支持带有外部 MISP+Neo4j 的 conda/venv/裸机部署 - **CI/CD**:代码检查、类型检查、pytest(161 个测试,30% 覆盖率门禁)、Docker 构建、pip-audit、BugBot —— 全部绿灯 - **健康检查 + 指标**:MISP(带有 PyMISP 版本兼容性检测)、Neo4j(带有 30 秒超时)、Airflow、NATS;Prometheus/Grafana 监控栈 - **生产级 CLI**(16 个命令):`preflight`(8 类就绪检查)、`stats --full`(按区域/来源的节点计数 + MISP 细分)、`dag status/kill`(Airflow 运行监控 + 卡住运行的恢复)、`checkpoint status/clear`(基线进度 + 增量游标)、`doctor`、`heal`、`validate`、`monitor`、`version` - **熔断器 + 重试**:修复了 HALF_OPEN 死锁、单调时间、所有外部服务调用的弹性模式 - **UTC 感知时间戳**:24 个文件中的所有 70 多个 datetime 实例均使用 `timezone.utc`(兼容 Python 3.12) ### 🚧 进行中 / 计划中 - **SoftwareVersion 桥接**:`(Vulnerability)-[:IN]->(SoftwareVersion)-[:ON]->(Host)` —— NVD 的 `version_constraints` 现已收集并存储;CPE 到 SoftwareVersion 的映射待定(参见 [`docs/RESILMESH_INTEROPERABILITY.md`](docs/RESILMESH_INTEROPERABILITY.md) §4.4) - **ISIM GraphQL Schema 扩展**:EdgeGuard 类型(Indicator、ThreatActor、Malware、Technique、Campaign、Tool)通过端口 4001 暴露 —— 计划与 ResilMesh 团队进行正式的 ISIM Schema 合并 - **Neo4j 角色分离**:目前使用共享凭据;目标状态是使用 Neo4j Enterprise 角色或将 GraphQL访问层 - **实时 NATS 告警**:演示已实现(`demo/mock_resilmesh_publisher.py`),生产加固正在进行中 - **医疗保健和能源行业特定的源收集器**:HC3、E-ISAC —— 需要 ISAC 成员资格;占位收集器已就绪 _最后更新:2026-03-28_
标签:CISA项目, FTP漏洞扫描, _IOC管理_, IP 地址批量处理, Neo4j, PE 加载器, Python, ResilMesh, 医疗安全, 可解释人工智能, 威胁情报, 安全编排, 开发者工具, 攻击者画像, 数据丰富, 无后门, 智能安全系统, 欧盟地平线, 知识图谱构建, 网络信息收集, 网络安全, 能源安全, 自动化与响应, 自定义请求头, 行业过滤, 请求拦截, 边缘计算, 金融安全, 隐私保护, 韧性网络