CognitiveSand/dynamic_white_list
GitHub: CognitiveSand/dynamic_white_list
一个通过多威胁 API 级联验证自动维护 URL 白名单与黑名单的 Python 库。
Stars: 0 | Forks: 0
# dynamic-white-list
一个用于维护网站域名自填充白名单和黑名单的 Python 库。未知域名会自动与多个威胁情报 API 进行比对,分类并缓存,以便未来即时查询。
## 工作原理
```
is_url_in_white_list(url)
|
v
Extract registered domain (tldextract)
|
v
Check in-memory cache ──── found? ──> return classification
|
not found
|
v
Submit to bounded thread pool (singleflight dedup)
|
v
Run threat API cascade (cheap first, expensive last)
|
+-- any API flags malicious ──> blacklist, persist, return False
+-- all APIs say clean ──> whitelist, persist, return True
+-- all APIs rate-limited ──> fail-closed, return False (not persisted, retried later)
```
## 快速开始
```
# 安装
uv add dynamic-white-list
# 或克隆并在本地安装
git clone && cd dynamic_white_list
uv sync --extra dev
```
```
from dynamic_white_list import is_url_in_white_list
# 首次调用:将域名与威胁 API 进行比对(约 1-5 秒)
is_url_in_white_list("https://example.com/page") # True
# 后续调用:即时缓存命中(无锁字典读取)
is_url_in_white_list("https://example.com/other") # True (same domain)
```
无需配置。开箱即用,4 个无密钥威胁 API 已激活。添加 API 密钥到 `.env` 以获得更深入的覆盖。
## API
### `is_url_in_white_list(url: str) -> bool`
唯一的公共方法。线程安全,阻塞式。
- 从任意 URL 格式提取注册域名
- 若域名已知为干净则返回 `True`
- 若域名已知为恶意、未知或验证失败则返回 `False`
- 阻塞调用者直到验证完成(若为新域名)
- 同一域名的并发请求会合并为单个 API 级联调用
### `configure(**kwargs) -> None`
可选。在首次使用前调用以自定义行为。若未调用,则使用合理默认值。
```
from dynamic_white_list import configure
configure(
env_path="/path/to/.env", # default: .env in cwd
data_file="/path/to/domains.json", # default: ~/.dynamic_white_list/domains.json
max_workers=4, # thread pool size for concurrent vetting
request_timeout=10.0, # per-API HTTP timeout in seconds
)
```
## 威胁情报 API
级联优先运行廉价/无密钥 API,仅在早期检查结果不明确时才调用昂贵的 API。任何 API 的恶意标志都会导致列入黑名单。所有干净信号均为列入白名单所必需。
### 第一层 —— 无密钥(始终激活,无需注册)
| API | 类别 | 免费限制 |
|-----|----------|------------|
| [URLhaus](https://urlhaus.abuse.ch/api/) (abuse.ch) | 恶意 URL | 无限(合理使用) |
| [PhishDestroy](https://api.destroy.tools/) | 钓鱼/诈骗阻断列表 | 无限 |
| [PhishStats](https://phishstats.info/) | 钓鱼情报 | 20 请求/分钟 |
| [ThreatMiner](https://www.threatminer.org/) | 恶意软件样本、被动 DNS | 10 请求/分钟 |
### 第二层 —— 需密钥,高配额(添加密钥到 `.env` 以激活)
| API | 类别 | 免费限制 | 环境变量 |
|-----|----------|------------|-------------|
| [Google Safe Browsing](https://developers.google.com/safe-browsing) | 恶意软件、钓鱼 | ~10,000/天 | `GOOGLE_SAFE_BROWSING_API_KEY` |
| [Google Web Risk](https://cloud.google.com/security/products/web-risk) | 企业威胁列表 | 100,000/月 | `GOOGLE_WEB_RISK_API_KEY` |
| [SecLookup](https://www.seclookup.com/) | 域名风险评分 | 50,000/月 | `SECLOOKUP_API_KEY` |
### 第三层 —— 需密钥,高成本(仅在前几层结果不明确时使用)
| API | 类别 | 免费限制 | 环境变量 |
|-----|----------|------------|-------------|
| [VirusTotal](https://www.virustotal.com/) | 多引擎分析(70+) | 500/天,4/分钟 | `VIRUSTOTAL_API_KEY` |
| [urlscan.io](https://urlscan.io/) | 完整 URL 渲染与分析 | 5,000/天 | `URLSCAN_API_KEY` |
## 配置
将 `.env.example` 复制为 `.env` 并填入你拥有的密钥:
```
cp .env.example .env
```
无密钥 API 立即生效。每个有密钥的 API 在密钥存在且非空时自动激活。
## 持久化
已验证的域名会持久化到 JSON 文件(默认为 `~/.dynamic_white_list/domains.json`)。每个条目存储完整的审核追踪:
```
{
"example.com": {
"domain": "example.com",
"classification": "white",
"check_results": [
{
"checker_name": "urlhaus",
"is_malicious": false,
"raw_detail": "not found",
"checked_at": "2026-03-29T12:00:00+00:00"
},
{
"checker_name": "google_safe_browsing",
"is_malicious": false,
"raw_detail": "clean",
"checked_at": "2026-03-29T12:00:01+00:00"
}
],
"first_seen": "2026-03-29T12:00:00+00:00",
"last_checked": "2026-03-29T12:00:01+00:00"
}
}
```
该文件可读、可 grep、可手动编辑。写入方式为原子操作(先写入 `.tmp`,再 `os.replace`)。
## 架构
```
src/dynamic_white_list/
__init__.py Public API surface (2 functions)
_models.py DomainRecord, CheckResult dataclasses
_config.py Config loading from .env
_rate_limiter.py Token bucket (one per API)
_cascade.py ThreatChecker base class + cascade runner
_checkers.py 9 concrete checker implementations
_cache.py In-memory dict + JSON persistence
_vetting.py Singleflight coordinator + thread pool
```
### 设计模式
- **自填充缓存** —— 该库本质上是一个在未命中时从外部预言机获取数据的缓存
- **单一飞行** —— 同一域名的并发请求共享单个 Future;仅运行一次 API 级联
- **责任链** —— 检查器按顺序运行,第一个恶意命中即短路
- **模板方法** —— `ThreatChecker` 基类处理速率限制、超时和错误处理;子类仅实现 `_call_api()` 和 `_parse_response()`
### 并发模型
设计为单个全局实例服务于多个并行调用者(例如 Web 服务器线程)。
| 操作 | 锁 | 竞争 |
|-----------|------|------------|
| 缓存读取(99.9% 的成熟流量) | 无 | 零 —— 锁无 `dict.get()` 在 CPython GIL 下 |
| 单一飞行检查 + 提交 | `VettingCoordinator._lock` | 微秒 |
| 速率限制检查 | `TokenBucket._lock`(每个 API) | 纳秒 |
| 缓存写入 + JSON 持久化 | `DomainCache._lock` | 毫秒,罕见 |
三种锁类型,无嵌套,死锁不可能。
### 故障关闭行为
当所有 API 限流或不可用时,库返回 `False`(拒绝)但**不**持久化该域名。这意味着:
- 调用者受到保护(故障关闭)
- 域名会在下次请求时重试(不会永久误列入黑名单)
- API 配额会随时间自然恢复
## 添加新的威胁 API
1. 在 `_checkers.py` 中编写 `ThreatChecker` 的子类:
```
class MyNewChecker(ThreatChecker):
name = "my_new_api"
requires_key = True
def __init__(self, api_key: str, timeout: float) -> None:
super().__init__(TokenBucket(rate=100.0 / 86400, capacity=5), timeout)
self._api_key = api_key
def _call_api(self, domain: str) -> requests.Response:
return requests.get(
f"https://api.example.com/check/{domain}",
headers={"Authorization": f"Bearer {self._api_key}"},
timeout=self.timeout,
)
def _parse_response(self, response: requests.Response) -> CheckResult:
data = response.json()
if data.get("malicious"):
return CheckResult(self.name, is_malicious=True, raw_detail="flagged")
return CheckResult(self.name, is_malicious=False, raw_detail="clean")
```
2. 在 `_config.py` 中添加环境变量映射:
```
"MY_NEW_API_KEY": "my_new_api",
```
3. 在 `_checkers.py` 中的 `build_checker_list()` 末尾追加:
```
if "my_new_api" in keys:
checkers.append(MyNewChecker(keys["my_new_api"], timeout))
```
## 开发
```
# Setup
uv sync --extra dev
# 运行测试
uv run pytest -v
# 运行特定测试文件
uv run pytest tests/test_cascade.py -v
```
## 依赖
三个运行时依赖,均为成熟且轻量:
- [`tldextract`](https://github.com/john-kurkowski/tldextract) —— 注册域名提取
- [`requests`](https://requests.readthedocs.io/) —— HTTP 客户端
- [`python-dotenv`](https://github.com/theskumar/python-dotenv) —— `.env` 文件加载
## 许可证
MIT
标签:API聚合, SEO安全, tldextract, URL过滤, 单机锁, 去重并发, 域名校验, 威胁情报, 开发者工具, 恶意域名检测, 爬虫防护, 环境配置, 白名单, 线程池, 缓存, 网络防护, 逆向工具, 黑名单