omarallam-ai/SentinelSync
GitHub: omarallam-ai/SentinelSync
将原始文本中的威胁指标提取、富化、评分并生成可读报告的轻量级安全分析工具。
Stars: 0 | Forks: 0
# SentinelSync
## 概述
SentinelSync 将原始文本 —— 例如网络钓鱼邮件、警报日志、工单记录 —— 转化为结构化、经评分且可解释的威胁情报快照。它提取危害指标(IP、域名、URL、SHA-256 哈希),使用两个公开的威胁情报源对其进行富化,在严格的防护措施下选择性运行安全的 TCP 连接探测,使用确定性规则对所有内容进行评分,并生成整洁的 Markdown 报告。整个流水线在 Windows 上通过 SQLite 本地运行 —— 无需 Docker、无需云服务、无需付费 API。
该项目通过 7 个里程碑构建,旨在展示生产级的工程规范:整洁架构、带有关联 ID 的结构化日志、TTL 缓存、重试/退避机制、多进程处理以及包含安全扫描的完整 CI。
## 功能特性
- **IOC 提取** —— 基于正则表达式从任何原始文本中提取 IP、域名、URL 和 SHA-256 哈希,并对被误识别为域名的文件扩展名(如 `dropper.exe`、`gate.php` 等)进行误报过滤
- **双重富化源** —— 使用 ip-api.com 获取地理位置/ASN 信息(无需密钥),使用 URLhaus abuse.ch 获取恶意 URL/主机情报(主机端点公开)
- **内联富化存储** —— 富化数据直接存储在 IOC 行(`enrichment_json`)中 —— 无需连接查询,无孤立记录
- **确定性评分** —— 可解释的规则产生 0-100 的分数,并附带持久化到数据库的可读原因;相同数据始终产生相同分数
- **安全 TCP 探测** —— 仅限连接的探测,具有严格的 RFC-1918 白名单;默认阻止公共 IP;对端口和工作人员有硬性上限
- **TTL 缓存** —— 所有外部 API 调用均缓存在 SQLite 中,以防止重复查询相同的指标
- **多进程流水线** —— 通过 `multiprocessing.Pool` 进行并行富化和探测,支持优雅的 Ctrl-C 关闭
- **Markdown 报告** —— 带有严重性标签(🔴🟡🟢⚪)的报告,包含每个 IOC 的富化数据、评分原因和探测结果
- **REST API** —— FastAPI 实现,包含关联 ID 中间件、结构化 JSON 日志记录和合理的路由排序
- **完整测试套件** —— 包含单元测试、集成测试、模拟网络调用;测试不触碰真实网络或真实数据库
## 环境要求
- **Python** 3.11+
- **操作系统** Windows 10/11(也适用于 Linux/macOS)
```
typer>=0.12
fastapi>=0.111
uvicorn>=0.30
pydantic>=2.7
pydantic-settings>=2.3
requests>=2.32
beautifulsoup4>=4.12
sqlalchemy>=2.0
```
开发依赖项:`pytest`, `httpx`, `ruff`, `bandit`, `pip-audit`
## 安装说明
```
# 1. 克隆仓库
git clone https://github.com/omarallam-ai/SentinelSync.git
cd sentinelsync
# 2. 创建并激活虚拟环境
python -m venv .venv
.venv\Scripts\activate # Windows
# source .venv/bin/activate # Linux/macOS
# 3. 安装包及所有依赖
pip install -e ".[dev]"
# 4. 创建环境文件
copy .env.example .env
```
`.env.example`:
```
SENTINELSYNC_DB_PATH=./sentinelsync.db
SENTINELSYNC_LOG_LEVEL=INFO
SENTINELSYNC_URLHAUS_AUTH_KEY= # optional — get free key at urlhaus.abuse.ch
```
## 快速开始
```
# 从文本文件提取并存储 IOC
sentinelsync ingest --file sample_alert.txt
# 或者直接粘贴原始文本
sentinelsync ingest --text "Suspicious IP 185.220.101.47 contacted http://malware-cdn.biz/payload.exe"
# 丰富存储的 IOC (ip-api.com + URLhaus host lookup)
sentinelsync enrich --limit 50 --workers 2
# 使用丰富数据 + 探测数据对所有 IOC 评分
sentinelsync score
# 生成 Markdown 威胁情报报告
sentinelsync report --title "Case 2024-001" --out report.md
# 启动 REST API
sentinelsync serve --port 8000
```
**富化和评分后的示例输出:**
```
Scored: 14
High: 1
Medium: 2
Low: 6
```
**报告摘录示例:**
```
| # | Type | Normalized | Score | Enriched | Source |
|---|--------|-------------------------|------------|----------|--------|
| 1 | `url` | `http://malware-cdn.biz`| 60 🟡 MEDIUM | ✓ | cli |
| 2 | `ip` | `185.220.101.47` | 20 🟢 LOW | ✓ | cli |
| 3 | `hash_sha256` | `deadbeef...` | 5 🟢 LOW | – | cli |
```
## REST API
使用 `sentinelsync serve` 启动服务器,然后打开 `http://127.0.0.1:8000/docs` 访问交互式 Swagger UI。
| 方法 | 端点 | 描述 |
|--------|----------|-------------|
| `POST` | `/iocs/ingest` | 从原始文本或文件中提取并存储 IOC |
| `GET` | `/iocs` | 列出已存储的 IOC(按类型、最低分数过滤) |
| `GET` | `/iocs/{id}` | 获取单个 IOC 及其完整富化数据 |
| `POST` | `/iocs/score` | 根据存储的富化信息对所有 IOC 进行评分 |
| `POST` | `/enrich/ip` | 直接富化单个 IP |
| `POST` | `/enrich/urlhaus` | 直接富化单个 URL |
| `POST` | `/probes` | 对 IP 运行安全的 TCP 探测 |
| `POST` | `/reports` | 生成并持久化 Markdown 报告 |
| `GET` | `/reports/{id}` | 检索先前生成的报告 |
| `GET` | `/health` | 健康检查 |
```
# 示例:通过 API 摄取
curl -X POST "http://127.0.0.1:8000/iocs/ingest?text=8.8.8.8%20http://evil.com"
# 示例:获取高于阈值的已评分 IOC
curl "http://127.0.0.1:8000/iocs?min_score=20"
```
## 架构
```
Raw text / file
│
▼
INGEST → EXTRACT (regex + fake-TLD filter)
│
▼
NORMALIZE → DEDUPLICATE → SQLite (iocs table)
│
▼
ENRICH → ip-api.com (geo/ASN) + URLhaus (threat intel)
└─ stored in iocs.enrichment_json (inline, no extra table)
└─ cached in cache_entries (TTL-based)
│
▼
PROBE → TCP connect-only (RFC-1918 only by default)
└─ stored in probes table
│
▼
SCORE → deterministic rules → score + reasons written to iocs row
│
▼
REPORT → Markdown with severity labels, enrichment, probe results
```
**数据库模式(3 张表):**
| 表名 | 用途 |
|-------|---------|
| `iocs` | 指标 + 内联富化信息 + 分数 |
| `cache_entries` | 外部 API 调用的 TTL 缓存 |
| `probes` | TCP 探测结果 |
## 技术亮点
### 安全防护措施
- 默认阻止公共 IP 探测;需通过 `SENTINELSYNC_ALLOW_PUBLIC_PROBES=true` 显式选择启用
- 对工作线程数(最多 8 个)和探测端口(最多 10 个)设有硬性上限 —— 在代码中强制执行,而不仅仅是在文档中说明
- 永远不硬编码凭证;所有密钥均通过 Pydantic Settings 从 `.env` 获取
- 日志注入防护 —— 转义换行符,字段长度限制在 2000 个字符以内
### 工程实践
- **结构化日志** —— JSON 日志行,每个请求/命令包含关联 ID,并传递至多进程工作线程中
- **重试 + 退避** —— 所有 HTTP 调用均使用带有指数退避和 429 Retry-After 处理的 `urllib3.Retry`
- **幂等数据库初始化** —— 在每条涉及数据库的代码路径上均调用 `Base.metadata.create_all()`,因此富化操作可在冷启动下直接运行,无需预先导入
- **富化数据合并** —— `save_enrichment()` 将新数据与现有 `enrichment_json` 合并而非覆盖,因此多次富化过程可累积数据
- **误报过滤** —— 域名正则过滤掉 40 多个看起来像顶级域但实际上不是的文件扩展名(`exe`, `dll`, `php`, `pdf` 等)
- **确定性评分** —— 规则按固定顺序应用;相同的数据库状态始终产生相同的分数;原因与分数一同持久化保存,以供审计追踪
### 测试
```
# 运行所有测试
pytest -q
# 运行覆盖率测试
pytest --cov=app --cov-report=term-missing
# 安全扫描
bandit -r app -q
# 依赖审计
pip-audit
```
- 针对提取、规范化、评分、缓存、流水线编排的单元测试
- 使用带有每个测试独立内存数据库的 `TestClient` 对所有 API 端点进行的集成测试
- 所有网络调用均被模拟 —— 任何测试均不需要互联网访问
- 流水线测试验证多进程行为、工作线程上限以及优雅的 Ctrl-C 处理
## 工程决策
**为什么将富化数据内联存储在 IOC 行而不是单独的表中?**
单独的 `enrichments` 表引入了一个微妙的错误:`upsert_many()` 在遇到重复 IOC 时会静默失败并返回 `ids=[]`,导致外键从未被设置,富化行也从未被保存。将富化数据作为 JSON blob 直接存储在 IOC 行中消除了连接操作,消除了孤立记录问题,并使评分服务简化为 `json.loads(ioc.enrichment_json)` —— 无需查询。
**为什么选择确定性评分而不是机器学习(ML)?**
对于防御性分类工具,可重现性和可审计性比精度更重要。分析师需要知道某个指标*为什么*得了 60 分,而不仅仅是知道它得了分。每条规则都会生成一个人类可读的原因字符串,并与分数一起持久化保存,使结果具有可解释性和可调试性。
**为什么选择多进程而不是异步?**
富化流水线对外部 API 发出阻塞式 HTTP 调用。`asyncio` 将需要兼容异步的 HTTP 客户端和全程使用异步 SQLAlchemy。`multiprocessing.Pool` 允许每个工作线程使用标准的 `requests` 和其自己的 SQLite 会话 —— 这更简单,在 Windows 上更安全,并且对于受 CPU 限制的规范化工作也能获得真正的并行性。
## 项目结构
```
sentinelsync/
├── app/
│ ├── api/
│ │ └── routes.py # FastAPI endpoints
│ ├── cli/
│ │ └── main.py # Typer CLI commands
│ ├── core/
│ │ ├── config.py # Pydantic Settings
│ │ ├── guardrails.py # Probe safety enforcement
│ │ ├── http.py # Hardened HTTP client
│ │ ├── logging.py # Structured JSON logging
│ │ └── scoring.py # Deterministic scoring rules
│ ├── db/
│ │ ├── models.py # SQLAlchemy models (iocs, cache_entries, probes)
│ │ ├── repo.py # Repository pattern
│ │ └── session.py # Engine + session factory
│ ├── domain/
│ │ └── types.py # IOCType enum
│ └── services/
│ ├── cache.py # TTL cache service
│ ├── enrich_ip.py # IP enrichment (ip-api + URLhaus host)
│ ├── enrich_urlhaus.py # URL/domain enrichment (URLhaus)
│ ├── extract.py # IOC extraction with fake-TLD filter
│ ├── ingest.py # Text ingestion pipeline
│ ├── normalize.py # Canonical form normalisation
│ ├── pipeline.py # Multiprocessing orchestration
│ ├── probe_tcp.py # Safe TCP connect probe
│ └── report.py # Markdown report generator
└── tests/
├── unit/ # Pure unit tests (no network, no real DB)
└── test_routes.py # API integration tests
```
## 许可证
MIT 许可证 —— 详情请参阅 [LICENSE](LICENSE)。
## 作者
作为展示 SOC 相关工程能力的作品集项目构建:防御性工具、整洁架构以及 Windows 优先技术栈上的生产级实践。
标签:CLI 工具, DAST, ESC4, GitHub, Hash 校验, HTTP/HTTPS抓包, HTTP工具, IOC提取, IP 地址批量处理, IP 地址查询, OSINT, Python, SHA-256, SOAR, SQLite, TCP 探测, URLhaus, URL 安全检测, 入侵指标, 多进程处理, 威胁情报, 字符串匹配, 开发者工具, 态势感知, 恶意软件分析, 情报丰富化, 无后门, 无线安全, 本地安全工具, 生产级工程, 网络安全, 网络测绘, 自动化 enrichment, 误报过滤, 逆向工具, 钓鱼邮件分析, 隐私保护