omarallam-ai/SentinelSync

GitHub: omarallam-ai/SentinelSync

将原始文本中的威胁指标提取、富化、评分并生成可读报告的轻量级安全分析工具。

Stars: 0 | Forks: 0

# SentinelSync ## 概述 SentinelSync 将原始文本 —— 例如网络钓鱼邮件、警报日志、工单记录 —— 转化为结构化、经评分且可解释的威胁情报快照。它提取危害指标(IP、域名、URL、SHA-256 哈希),使用两个公开的威胁情报源对其进行富化,在严格的防护措施下选择性运行安全的 TCP 连接探测,使用确定性规则对所有内容进行评分,并生成整洁的 Markdown 报告。整个流水线在 Windows 上通过 SQLite 本地运行 —— 无需 Docker、无需云服务、无需付费 API。 该项目通过 7 个里程碑构建,旨在展示生产级的工程规范:整洁架构、带有关联 ID 的结构化日志、TTL 缓存、重试/退避机制、多进程处理以及包含安全扫描的完整 CI。 ## 功能特性 - **IOC 提取** —— 基于正则表达式从任何原始文本中提取 IP、域名、URL 和 SHA-256 哈希,并对被误识别为域名的文件扩展名(如 `dropper.exe`、`gate.php` 等)进行误报过滤 - **双重富化源** —— 使用 ip-api.com 获取地理位置/ASN 信息(无需密钥),使用 URLhaus abuse.ch 获取恶意 URL/主机情报(主机端点公开) - **内联富化存储** —— 富化数据直接存储在 IOC 行(`enrichment_json`)中 —— 无需连接查询,无孤立记录 - **确定性评分** —— 可解释的规则产生 0-100 的分数,并附带持久化到数据库的可读原因;相同数据始终产生相同分数 - **安全 TCP 探测** —— 仅限连接的探测,具有严格的 RFC-1918 白名单;默认阻止公共 IP;对端口和工作人员有硬性上限 - **TTL 缓存** —— 所有外部 API 调用均缓存在 SQLite 中,以防止重复查询相同的指标 - **多进程流水线** —— 通过 `multiprocessing.Pool` 进行并行富化和探测,支持优雅的 Ctrl-C 关闭 - **Markdown 报告** —— 带有严重性标签(🔴🟡🟢⚪)的报告,包含每个 IOC 的富化数据、评分原因和探测结果 - **REST API** —— FastAPI 实现,包含关联 ID 中间件、结构化 JSON 日志记录和合理的路由排序 - **完整测试套件** —— 包含单元测试、集成测试、模拟网络调用;测试不触碰真实网络或真实数据库 ## 环境要求 - **Python** 3.11+ - **操作系统** Windows 10/11(也适用于 Linux/macOS) ``` typer>=0.12 fastapi>=0.111 uvicorn>=0.30 pydantic>=2.7 pydantic-settings>=2.3 requests>=2.32 beautifulsoup4>=4.12 sqlalchemy>=2.0 ``` 开发依赖项:`pytest`, `httpx`, `ruff`, `bandit`, `pip-audit` ## 安装说明 ``` # 1. 克隆仓库 git clone https://github.com/omarallam-ai/SentinelSync.git cd sentinelsync # 2. 创建并激活虚拟环境 python -m venv .venv .venv\Scripts\activate # Windows # source .venv/bin/activate # Linux/macOS # 3. 安装包及所有依赖 pip install -e ".[dev]" # 4. 创建环境文件 copy .env.example .env ``` `.env.example`: ``` SENTINELSYNC_DB_PATH=./sentinelsync.db SENTINELSYNC_LOG_LEVEL=INFO SENTINELSYNC_URLHAUS_AUTH_KEY= # optional — get free key at urlhaus.abuse.ch ``` ## 快速开始 ``` # 从文本文件提取并存储 IOC sentinelsync ingest --file sample_alert.txt # 或者直接粘贴原始文本 sentinelsync ingest --text "Suspicious IP 185.220.101.47 contacted http://malware-cdn.biz/payload.exe" # 丰富存储的 IOC (ip-api.com + URLhaus host lookup) sentinelsync enrich --limit 50 --workers 2 # 使用丰富数据 + 探测数据对所有 IOC 评分 sentinelsync score # 生成 Markdown 威胁情报报告 sentinelsync report --title "Case 2024-001" --out report.md # 启动 REST API sentinelsync serve --port 8000 ``` **富化和评分后的示例输出:** ``` Scored: 14 High: 1 Medium: 2 Low: 6 ``` **报告摘录示例:** ``` | # | Type | Normalized | Score | Enriched | Source | |---|--------|-------------------------|------------|----------|--------| | 1 | `url` | `http://malware-cdn.biz`| 60 🟡 MEDIUM | ✓ | cli | | 2 | `ip` | `185.220.101.47` | 20 🟢 LOW | ✓ | cli | | 3 | `hash_sha256` | `deadbeef...` | 5 🟢 LOW | – | cli | ``` ## REST API 使用 `sentinelsync serve` 启动服务器,然后打开 `http://127.0.0.1:8000/docs` 访问交互式 Swagger UI。 | 方法 | 端点 | 描述 | |--------|----------|-------------| | `POST` | `/iocs/ingest` | 从原始文本或文件中提取并存储 IOC | | `GET` | `/iocs` | 列出已存储的 IOC(按类型、最低分数过滤) | | `GET` | `/iocs/{id}` | 获取单个 IOC 及其完整富化数据 | | `POST` | `/iocs/score` | 根据存储的富化信息对所有 IOC 进行评分 | | `POST` | `/enrich/ip` | 直接富化单个 IP | | `POST` | `/enrich/urlhaus` | 直接富化单个 URL | | `POST` | `/probes` | 对 IP 运行安全的 TCP 探测 | | `POST` | `/reports` | 生成并持久化 Markdown 报告 | | `GET` | `/reports/{id}` | 检索先前生成的报告 | | `GET` | `/health` | 健康检查 | ``` # 示例:通过 API 摄取 curl -X POST "http://127.0.0.1:8000/iocs/ingest?text=8.8.8.8%20http://evil.com" # 示例:获取高于阈值的已评分 IOC curl "http://127.0.0.1:8000/iocs?min_score=20" ``` ## 架构 ``` Raw text / file │ ▼ INGEST → EXTRACT (regex + fake-TLD filter) │ ▼ NORMALIZE → DEDUPLICATE → SQLite (iocs table) │ ▼ ENRICH → ip-api.com (geo/ASN) + URLhaus (threat intel) └─ stored in iocs.enrichment_json (inline, no extra table) └─ cached in cache_entries (TTL-based) │ ▼ PROBE → TCP connect-only (RFC-1918 only by default) └─ stored in probes table │ ▼ SCORE → deterministic rules → score + reasons written to iocs row │ ▼ REPORT → Markdown with severity labels, enrichment, probe results ``` **数据库模式(3 张表):** | 表名 | 用途 | |-------|---------| | `iocs` | 指标 + 内联富化信息 + 分数 | | `cache_entries` | 外部 API 调用的 TTL 缓存 | | `probes` | TCP 探测结果 | ## 技术亮点 ### 安全防护措施 - 默认阻止公共 IP 探测;需通过 `SENTINELSYNC_ALLOW_PUBLIC_PROBES=true` 显式选择启用 - 对工作线程数(最多 8 个)和探测端口(最多 10 个)设有硬性上限 —— 在代码中强制执行,而不仅仅是在文档中说明 - 永远不硬编码凭证;所有密钥均通过 Pydantic Settings 从 `.env` 获取 - 日志注入防护 —— 转义换行符,字段长度限制在 2000 个字符以内 ### 工程实践 - **结构化日志** —— JSON 日志行,每个请求/命令包含关联 ID,并传递至多进程工作线程中 - **重试 + 退避** —— 所有 HTTP 调用均使用带有指数退避和 429 Retry-After 处理的 `urllib3.Retry` - **幂等数据库初始化** —— 在每条涉及数据库的代码路径上均调用 `Base.metadata.create_all()`,因此富化操作可在冷启动下直接运行,无需预先导入 - **富化数据合并** —— `save_enrichment()` 将新数据与现有 `enrichment_json` 合并而非覆盖,因此多次富化过程可累积数据 - **误报过滤** —— 域名正则过滤掉 40 多个看起来像顶级域但实际上不是的文件扩展名(`exe`, `dll`, `php`, `pdf` 等) - **确定性评分** —— 规则按固定顺序应用;相同的数据库状态始终产生相同的分数;原因与分数一同持久化保存,以供审计追踪 ### 测试 ``` # 运行所有测试 pytest -q # 运行覆盖率测试 pytest --cov=app --cov-report=term-missing # 安全扫描 bandit -r app -q # 依赖审计 pip-audit ``` - 针对提取、规范化、评分、缓存、流水线编排的单元测试 - 使用带有每个测试独立内存数据库的 `TestClient` 对所有 API 端点进行的集成测试 - 所有网络调用均被模拟 —— 任何测试均不需要互联网访问 - 流水线测试验证多进程行为、工作线程上限以及优雅的 Ctrl-C 处理 ## 工程决策 **为什么将富化数据内联存储在 IOC 行而不是单独的表中?** 单独的 `enrichments` 表引入了一个微妙的错误:`upsert_many()` 在遇到重复 IOC 时会静默失败并返回 `ids=[]`,导致外键从未被设置,富化行也从未被保存。将富化数据作为 JSON blob 直接存储在 IOC 行中消除了连接操作,消除了孤立记录问题,并使评分服务简化为 `json.loads(ioc.enrichment_json)` —— 无需查询。 **为什么选择确定性评分而不是机器学习(ML)?** 对于防御性分类工具,可重现性和可审计性比精度更重要。分析师需要知道某个指标*为什么*得了 60 分,而不仅仅是知道它得了分。每条规则都会生成一个人类可读的原因字符串,并与分数一起持久化保存,使结果具有可解释性和可调试性。 **为什么选择多进程而不是异步?** 富化流水线对外部 API 发出阻塞式 HTTP 调用。`asyncio` 将需要兼容异步的 HTTP 客户端和全程使用异步 SQLAlchemy。`multiprocessing.Pool` 允许每个工作线程使用标准的 `requests` 和其自己的 SQLite 会话 —— 这更简单,在 Windows 上更安全,并且对于受 CPU 限制的规范化工作也能获得真正的并行性。 ## 项目结构 ``` sentinelsync/ ├── app/ │ ├── api/ │ │ └── routes.py # FastAPI endpoints │ ├── cli/ │ │ └── main.py # Typer CLI commands │ ├── core/ │ │ ├── config.py # Pydantic Settings │ │ ├── guardrails.py # Probe safety enforcement │ │ ├── http.py # Hardened HTTP client │ │ ├── logging.py # Structured JSON logging │ │ └── scoring.py # Deterministic scoring rules │ ├── db/ │ │ ├── models.py # SQLAlchemy models (iocs, cache_entries, probes) │ │ ├── repo.py # Repository pattern │ │ └── session.py # Engine + session factory │ ├── domain/ │ │ └── types.py # IOCType enum │ └── services/ │ ├── cache.py # TTL cache service │ ├── enrich_ip.py # IP enrichment (ip-api + URLhaus host) │ ├── enrich_urlhaus.py # URL/domain enrichment (URLhaus) │ ├── extract.py # IOC extraction with fake-TLD filter │ ├── ingest.py # Text ingestion pipeline │ ├── normalize.py # Canonical form normalisation │ ├── pipeline.py # Multiprocessing orchestration │ ├── probe_tcp.py # Safe TCP connect probe │ └── report.py # Markdown report generator └── tests/ ├── unit/ # Pure unit tests (no network, no real DB) └── test_routes.py # API integration tests ``` ## 许可证 MIT 许可证 —— 详情请参阅 [LICENSE](LICENSE)。 ## 作者 作为展示 SOC 相关工程能力的作品集项目构建:防御性工具、整洁架构以及 Windows 优先技术栈上的生产级实践。
标签:CLI 工具, DAST, ESC4, GitHub, Hash 校验, HTTP/HTTPS抓包, HTTP工具, IOC提取, IP 地址批量处理, IP 地址查询, OSINT, Python, SHA-256, SOAR, SQLite, TCP 探测, URLhaus, URL 安全检测, 入侵指标, 多进程处理, 威胁情报, 字符串匹配, 开发者工具, 态势感知, 恶意软件分析, 情报丰富化, 无后门, 无线安全, 本地安全工具, 生产级工程, 网络安全, 网络测绘, 自动化 enrichment, 误报过滤, 逆向工具, 钓鱼邮件分析, 隐私保护