Setounkpe7/threat-intel-api
GitHub: Setounkpe7/threat-intel-api
一个自托管的开源网络威胁情报聚合 API,将 OSINT 数据源按行业配置进行差异化评分,为不同业务场景提供定制化的漏洞风险视图。
Stars: 0 | Forks: 0
# CyberThreat 情报 API
[](https://github.com/Setounkpe7/threat-intel-api/actions/workflows/security.yml)
[](#development)
[](https://www.python.org/downloads/release/python-3120/)
[](Dockerfile)
[](#)
一个 REST API,用于聚合 OSINT 网络威胁情报源,对其进行去重,根据可配置的**行业配置** (sector profiles) 进行评分,并通过版本化端点公开结果。里程碑 1 交付了针对 NVD 的摄取管道 (ingestion pipeline) 以及三个端点。里程碑 2 增加了行业感知评分、按行业划分的仪表板、RSS 源、热重载 YAML 配置、管理端点和速率限制。当前的 DevSecOps 阶段增加了加固容器、CI/CD 安全门禁以及文档化的漏洞披露路径 — 请参阅 [SECURITY.md](SECURITY.md)。
## 本地安装(无 Docker)
需要 Python 3.12+。
```
python3.12 -m venv .venv
source .venv/bin/activate
pip install -e ".[dev]"
cp .env.example .env
# 如果需要,编辑 .env(默认适用于 docker-compose Postgres)
# 对于本地 SQLite 开发(无 Docker),编辑 .env 以使用:
# DATABASE_URL=sqlite+aiosqlite:///./threat_intel.db
alembic upgrade head
uvicorn threat_intel.main:app --reload
```
打开 http://localhost:8000/docs 查看交互式 Swagger UI。
## 使用 Docker 运行
```
cp .env.example .env
docker compose up --build
```
`app` 服务会等待 Postgres 启动,在启动时运行迁移,然后在端口 8000 上通过 uvicorn 提供服务。
```
curl localhost:8000/health
```
## 按需触发摄取运行
默认情况下,进程内调度器每 60 分钟轮询一次 NVD。要强制单次运行(在演示和首次启动技术栈时很有用):
```
docker compose exec app python -m threat_intel.cli.collect_once nvd
```
您应该会看到类似 `inserted=27 updated=0 unchanged=0` 的输出。
## 端点
### 公开
| 方法 | 路径 | 描述 |
|---|---|---|
| GET | `/health` | 服务状态、数据库状态、每个收集器的上次运行情况、统计信息 |
| GET | `/api/v1/threats` | 分页列表,过滤器:`severity`、`since`、`source`、`limit`、`offset` |
| GET | `/api/v1/cve/{cve_id}` | 某个 CVE 的完整详情(包括 `raw_data`) |
| GET | `/api/v1/sectors` | 列出公开的行业配置(`?visibility=all` 需要 `X-Admin-Key`) |
| GET | `/api/v1/sectors/{id}` | 配置详情(没有 `X-Admin-Key` 时,私有配置返回 404) |
| GET | `/api/v1/sectors/{id}/threats` | 某行业的已评分威胁(`?min_score`、`?limit`、`?since`) |
| GET | `/api/v1/sectors/{id}/dashboard` | 24小时热门 / 7天热门 / 聚合统计 |
| GET | `/api/v1/sectors/{id}/feed.rss` | RSS 2.0 源(默认 `min_score=70`) |
| GET | `/api/v1/stats/global` | 用于未来公共仪表板的聚合统计 |
| GET | `/docs` | Swagger UI |
| GET | `/openapi.json` | 机器可读 schema |
### 管理(需要 `X-Admin-Key` 请求头)
| 方法 | 路径 | 描述 |
|---|---|---|
| POST | `/api/v1/admin/reload-profiles` | 重新扫描 `profiles/` 并执行 upsert(返回 added/updated/removed/errors) |
| POST | `/api/v1/admin/rescore-all` | 在后台排队进行全面重新评分(返回 202) |
公共端点限制为每个 IP 每分钟 100 个请求(`RATE_LIMIT_DEFAULT`)。错误以 `application/problem+json`(RFC 7807)格式返回。
## 行业配置
每个 `profiles/public/*.yaml`(以及 `profiles/private/*.yaml`)声明一个行业配置——它是关键词、技术、CWE 优先级、排除项、合规标签和 CVSS 阈值的集合。每个威胁都会根据每个配置进行评分,因此添加 YAML 并进行热重载就足以公开一个新的仪表板、威胁端点和 RSS 源。
有关 schema,请参阅 [profiles/README.md](profiles/README.md);有关带有具体示例的评分算法,请参阅 [docs/SCORING.md](docs/SCORING.md)。
```
# 添加 profile
$EDITOR profiles/public/my-org.yaml
# 热重载而无需重启
curl -X POST http://localhost:8000/api/v1/admin/reload-profiles \
-H "X-Admin-Key: $ADMIN_API_KEY"
# 查询 sector
curl http://localhost:8000/api/v1/sectors/my-org/dashboard | jq
```
### 示例:金融行业的 top-10 严重威胁
```
curl 'http://localhost:8000/api/v1/sectors/finance/threats?min_score=70&limit=10' | jq
```
### 示例:订阅 RSS 源
```
curl 'http://localhost:8000/api/v1/sectors/finance/feed.rss?min_score=70' \
-H 'Accept: application/rss+xml'
```
### 架构
```
flowchart LR
NVD[NVD feed] --> Collector
Collector --> Ingestion["IngestionService
(dedup + upsert)"] Ingestion --> Threat[(threat)] Threat --> ScoringJob[ThreatScoringJob] ProfileLoader[SectorProfileLoader] --> Profiles[(sector_profile)] Profiles --> ScoringJob ScoringJob --> Scores[(threat_sector_score)] Scores --> API[/api/v1/sectors/.../] Threat --> API ``` ## 项目布局 ``` src/threat_intel/ ├── api/ FastAPI routers (health, v1.threats, v1.cve) ├── collectors/ Source adapters (BaseCollector + NVDCollector) ├── analyzers/ Dedup, scoring (M2) ├── services/ Ingestion + query layer ├── schemas/ Pydantic request/response models ├── models/ SQLAlchemy ORM ├── core/ Config, DB engine, logging, exceptions, scheduler ├── cli/ Typer commands for ops/demos └── main.py create_app() + lifespan ``` 测试位于 `tests/unit/` 和 `tests/integration/` 下。迁移在 `alembic/versions/` 中。 ## 添加新的收集器 `BaseCollector` 的核心意义在于,添加新数据源只需一个文件和一次注册。我们来演练一遍: 1. 在 `src/threat_intel/collectors/.py` **创建收集器**。继承 `BaseCollector`,设置 `source_name` 和 `source_kind` 类属性,并实现 `fetch(since)`(返回 `AsyncIterator[RawEvent]`)和 `normalize(raw)`(返回 `ThreatDraft`)。
2. 在 `normalize()` 中**映射数据源的严重性 / CVSS / ID** 到 `ThreatDraft` 字段。对于您没有的内容,保留为 `None` 或空列表——API 模型已经处理了缺失数据。
3. 在 `src/threat_intel/main.py` 中**注册它**:在 lifespan 中实例化它,并将其添加到 `IngestionService` 收集器列表以及 `app.state.collector_names` 中。
4. 如果您想要周期性任务,请在 `core/scheduler.py` 中**调度它**。
5. 在 `tests/unit/test__collector.py` 下**添加单元测试**,使用 `respx` 桩接上游 HTTP 调用。将具有代表性的 payload 保存在 `tests/fixtures/` 下。
去重、持久化、API 暴露和 `/health` 报告都是自动免费获取的——它们均基于 `source_name` + `external_id` 进行关联。
## 开发
```
pytest # full suite
ruff check src tests # lint
ruff format src tests
mypy src # static types
```
### 预提交钩子
每次提交都会运行代码检查、类型检查、SAST (bandit) 和密钥扫描。克隆后在安装时运行一次:
```
pip install pre-commit # or: pipx install pre-commit
pre-commit install # registers .git/hooks/pre-commit
pre-commit run --all-files # one-off run on the whole tree
```
配置位于 [`.pre-commit-config.yaml`](.pre-commit-config.yaml)。钩子包括:ruff(lint + format)、mypy strict、bandit、gitleaks,以及标准的空格 / 私钥 / 大文件防护。
## 安全
| 领域 | 位置 |
|-------------------|----------------------------------------------------------------|
| 披露政策 | [SECURITY.md](SECURITY.md) — 电子邮件联系方式 + 范围 |
| 已实施的控制 | [docs/SECURITY.md](docs/SECURITY.md) — OWASP API Top 10 映射 |
| 本地审计 | `make security-audit` (bandit + pip-audit + semgrep) |
| CI 门禁 | [security.yml](.github/workflows/security.yml) — 7 个作业,合并前必须全部通过 |
| 容器 | 加固的多阶段 Alpine 镜像,非 root uid 1001,约 195 MB |
快速检查:
```
make lint # ruff + ruff-format
make typecheck # mypy strict on src/
make test # pytest with --cov-fail-under=80
make security-audit # bandit + pip-audit + semgrep
make docker-build # build hardened image
make docker-scan # hadolint + trivy (CRITICAL+HIGH)
```
## 里程碑 2 (M2) 明确不包含的范围
NLP 实体提取 (M3)、告警 Webhooks (M4)、STIX/TAXII 导出 (M5)、公共仪表板前端 (M6)、额外的收集器——RSS / GitHub Advisories / OTX(后期)。数据模型和收集器接口的设计旨在确保上述每一项功能的加入都不会破坏现有功能。
(dedup + upsert)"] Ingestion --> Threat[(threat)] Threat --> ScoringJob[ThreatScoringJob] ProfileLoader[SectorProfileLoader] --> Profiles[(sector_profile)] Profiles --> ScoringJob ScoringJob --> Scores[(threat_sector_score)] Scores --> API[/api/v1/sectors/.../] Threat --> API ``` ## 项目布局 ``` src/threat_intel/ ├── api/ FastAPI routers (health, v1.threats, v1.cve) ├── collectors/ Source adapters (BaseCollector + NVDCollector) ├── analyzers/ Dedup, scoring (M2) ├── services/ Ingestion + query layer ├── schemas/ Pydantic request/response models ├── models/ SQLAlchemy ORM ├── core/ Config, DB engine, logging, exceptions, scheduler ├── cli/ Typer commands for ops/demos └── main.py create_app() + lifespan ``` 测试位于 `tests/unit/` 和 `tests/integration/` 下。迁移在 `alembic/versions/` 中。 ## 添加新的收集器 `BaseCollector` 的核心意义在于,添加新数据源只需一个文件和一次注册。我们来演练一遍: 1. 在 `src/threat_intel/collectors/
标签:Alembic, API安全, AV绕过, CI/CD安全, CISA项目, CVE聚合, DevSecOps, Docker, Docker Compose, ESC4, FastAPI, GPT, JSON输出, Llama, NVD, OSINT, PostgreSQL, Python, Python 3.12, REST API, RSS订阅, SQLite, Swagger UI, Uvicorn, Web截图, YAML配置, 上游代理, 安全仪表盘, 安全左移, 安全网关, 安全评分, 安全防御评估, 实时处理, 容器安全, 密码管理, 异步数据库, 数据去重, 无后门, 测试用例, 漏洞披露, 漏洞管理, 热重载配置, 网络威胁情报, 请求拦截, 限流