CognitiveSand/dynamic_white_list

GitHub: CognitiveSand/dynamic_white_list

一个通过多威胁 API 级联验证自动维护 URL 白名单与黑名单的 Python 库。

Stars: 0 | Forks: 0

# dynamic-white-list 一个用于维护网站域名自填充白名单和黑名单的 Python 库。未知域名会自动与多个威胁情报 API 进行比对,分类并缓存,以便未来即时查询。 ## 工作原理 ``` is_url_in_white_list(url) | v Extract registered domain (tldextract) | v Check in-memory cache ──── found? ──> return classification | not found | v Submit to bounded thread pool (singleflight dedup) | v Run threat API cascade (cheap first, expensive last) | +-- any API flags malicious ──> blacklist, persist, return False +-- all APIs say clean ──> whitelist, persist, return True +-- all APIs rate-limited ──> fail-closed, return False (not persisted, retried later) ``` ## 快速开始 ``` # 安装 uv add dynamic-white-list # 或克隆并在本地安装 git clone && cd dynamic_white_list uv sync --extra dev ``` ``` from dynamic_white_list import is_url_in_white_list # 首次调用:将域名与威胁 API 进行比对(约 1-5 秒) is_url_in_white_list("https://example.com/page") # True # 后续调用:即时缓存命中(无锁字典读取) is_url_in_white_list("https://example.com/other") # True (same domain) ``` 无需配置。开箱即用,4 个无密钥威胁 API 已激活。添加 API 密钥到 `.env` 以获得更深入的覆盖。 ## API ### `is_url_in_white_list(url: str) -> bool` 唯一的公共方法。线程安全,阻塞式。 - 从任意 URL 格式提取注册域名 - 若域名已知为干净则返回 `True` - 若域名已知为恶意、未知或验证失败则返回 `False` - 阻塞调用者直到验证完成(若为新域名) - 同一域名的并发请求会合并为单个 API 级联调用 ### `configure(**kwargs) -> None` 可选。在首次使用前调用以自定义行为。若未调用,则使用合理默认值。 ``` from dynamic_white_list import configure configure( env_path="/path/to/.env", # default: .env in cwd data_file="/path/to/domains.json", # default: ~/.dynamic_white_list/domains.json max_workers=4, # thread pool size for concurrent vetting request_timeout=10.0, # per-API HTTP timeout in seconds ) ``` ## 威胁情报 API 级联优先运行廉价/无密钥 API,仅在早期检查结果不明确时才调用昂贵的 API。任何 API 的恶意标志都会导致列入黑名单。所有干净信号均为列入白名单所必需。 ### 第一层 —— 无密钥(始终激活,无需注册) | API | 类别 | 免费限制 | |-----|----------|------------| | [URLhaus](https://urlhaus.abuse.ch/api/) (abuse.ch) | 恶意 URL | 无限(合理使用) | | [PhishDestroy](https://api.destroy.tools/) | 钓鱼/诈骗阻断列表 | 无限 | | [PhishStats](https://phishstats.info/) | 钓鱼情报 | 20 请求/分钟 | | [ThreatMiner](https://www.threatminer.org/) | 恶意软件样本、被动 DNS | 10 请求/分钟 | ### 第二层 —— 需密钥,高配额(添加密钥到 `.env` 以激活) | API | 类别 | 免费限制 | 环境变量 | |-----|----------|------------|-------------| | [Google Safe Browsing](https://developers.google.com/safe-browsing) | 恶意软件、钓鱼 | ~10,000/天 | `GOOGLE_SAFE_BROWSING_API_KEY` | | [Google Web Risk](https://cloud.google.com/security/products/web-risk) | 企业威胁列表 | 100,000/月 | `GOOGLE_WEB_RISK_API_KEY` | | [SecLookup](https://www.seclookup.com/) | 域名风险评分 | 50,000/月 | `SECLOOKUP_API_KEY` | ### 第三层 —— 需密钥,高成本(仅在前几层结果不明确时使用) | API | 类别 | 免费限制 | 环境变量 | |-----|----------|------------|-------------| | [VirusTotal](https://www.virustotal.com/) | 多引擎分析(70+) | 500/天,4/分钟 | `VIRUSTOTAL_API_KEY` | | [urlscan.io](https://urlscan.io/) | 完整 URL 渲染与分析 | 5,000/天 | `URLSCAN_API_KEY` | ## 配置 将 `.env.example` 复制为 `.env` 并填入你拥有的密钥: ``` cp .env.example .env ``` 无密钥 API 立即生效。每个有密钥的 API 在密钥存在且非空时自动激活。 ## 持久化 已验证的域名会持久化到 JSON 文件(默认为 `~/.dynamic_white_list/domains.json`)。每个条目存储完整的审核追踪: ``` { "example.com": { "domain": "example.com", "classification": "white", "check_results": [ { "checker_name": "urlhaus", "is_malicious": false, "raw_detail": "not found", "checked_at": "2026-03-29T12:00:00+00:00" }, { "checker_name": "google_safe_browsing", "is_malicious": false, "raw_detail": "clean", "checked_at": "2026-03-29T12:00:01+00:00" } ], "first_seen": "2026-03-29T12:00:00+00:00", "last_checked": "2026-03-29T12:00:01+00:00" } } ``` 该文件可读、可 grep、可手动编辑。写入方式为原子操作(先写入 `.tmp`,再 `os.replace`)。 ## 架构 ``` src/dynamic_white_list/ __init__.py Public API surface (2 functions) _models.py DomainRecord, CheckResult dataclasses _config.py Config loading from .env _rate_limiter.py Token bucket (one per API) _cascade.py ThreatChecker base class + cascade runner _checkers.py 9 concrete checker implementations _cache.py In-memory dict + JSON persistence _vetting.py Singleflight coordinator + thread pool ``` ### 设计模式 - **自填充缓存** —— 该库本质上是一个在未命中时从外部预言机获取数据的缓存 - **单一飞行** —— 同一域名的并发请求共享单个 Future;仅运行一次 API 级联 - **责任链** —— 检查器按顺序运行,第一个恶意命中即短路 - **模板方法** —— `ThreatChecker` 基类处理速率限制、超时和错误处理;子类仅实现 `_call_api()` 和 `_parse_response()` ### 并发模型 设计为单个全局实例服务于多个并行调用者(例如 Web 服务器线程)。 | 操作 | 锁 | 竞争 | |-----------|------|------------| | 缓存读取(99.9% 的成熟流量) | 无 | 零 —— 锁无 `dict.get()` 在 CPython GIL 下 | | 单一飞行检查 + 提交 | `VettingCoordinator._lock` | 微秒 | | 速率限制检查 | `TokenBucket._lock`(每个 API) | 纳秒 | | 缓存写入 + JSON 持久化 | `DomainCache._lock` | 毫秒,罕见 | 三种锁类型,无嵌套,死锁不可能。 ### 故障关闭行为 当所有 API 限流或不可用时,库返回 `False`(拒绝)但**不**持久化该域名。这意味着: - 调用者受到保护(故障关闭) - 域名会在下次请求时重试(不会永久误列入黑名单) - API 配额会随时间自然恢复 ## 添加新的威胁 API 1. 在 `_checkers.py` 中编写 `ThreatChecker` 的子类: ``` class MyNewChecker(ThreatChecker): name = "my_new_api" requires_key = True def __init__(self, api_key: str, timeout: float) -> None: super().__init__(TokenBucket(rate=100.0 / 86400, capacity=5), timeout) self._api_key = api_key def _call_api(self, domain: str) -> requests.Response: return requests.get( f"https://api.example.com/check/{domain}", headers={"Authorization": f"Bearer {self._api_key}"}, timeout=self.timeout, ) def _parse_response(self, response: requests.Response) -> CheckResult: data = response.json() if data.get("malicious"): return CheckResult(self.name, is_malicious=True, raw_detail="flagged") return CheckResult(self.name, is_malicious=False, raw_detail="clean") ``` 2. 在 `_config.py` 中添加环境变量映射: ``` "MY_NEW_API_KEY": "my_new_api", ``` 3. 在 `_checkers.py` 中的 `build_checker_list()` 末尾追加: ``` if "my_new_api" in keys: checkers.append(MyNewChecker(keys["my_new_api"], timeout)) ``` ## 开发 ``` # Setup uv sync --extra dev # 运行测试 uv run pytest -v # 运行特定测试文件 uv run pytest tests/test_cascade.py -v ``` ## 依赖 三个运行时依赖,均为成熟且轻量: - [`tldextract`](https://github.com/john-kurkowski/tldextract) —— 注册域名提取 - [`requests`](https://requests.readthedocs.io/) —— HTTP 客户端 - [`python-dotenv`](https://github.com/theskumar/python-dotenv) —— `.env` 文件加载 ## 许可证 MIT
标签:API聚合, SEO安全, tldextract, URL过滤, 单机锁, 去重并发, 域名校验, 威胁情报, 开发者工具, 恶意域名检测, 爬虫防护, 环境配置, 白名单, 线程池, 缓存, 网络防护, 逆向工具, 黑名单