KANGGAFH/breach-intel

GitHub: KANGGAFH/breach-intel

一套自动化数据泄露情报系统,通过 RSS 采集、NLP 三分类和多信号加权风险评分,将高危事件实时推送至 Telegram 并在 Web 仪表盘可视化展示。

Stars: 0 | Forks: 0

# BreachIntel — 自动化数据泄露情报系统 ``` ██████╗ ██████╗ ███████╗ █████╗ ██████╗██╗ ██╗ ██╔══██╗██╔══██╗██╔════╝██╔══██╗██╔════╝██║ ██║ ██████╔╝██████╔╝█████╗ ███████║██║ ███████║ ██╔══██╗██╔══██╗██╔══╝ ██╔══██║██║ ██╔══██║ ██████╔╝██║ ██║███████╗██║ ██║╚██████╗██║ ██║ ╚═════╝ ╚═╝ ╚═╝╚══════╝╚═╝ ╚═╝ ╚═════╝╚═╝ ╚═╝ INTEL Automated Threat Intelligence · v1.0.0 ``` ## 概述 BreachIntel 是一个**微型数据工程 + AI + 网络安全项目**,旨在演示用于自动化威胁情报收集的生产级流水线。 它会自动执行以下操作: 1. 从 8 个精选的网络安全 RSS 源(Krebs、BleepingComputer、CISA 等)**收集**文章 2. 使用标准化流水线对文本进行**清理**和去重 3. 使用训练好的 NLP 模型将每篇文章**分类**为 `real_breach`、`suspicious` 或 `irrelevant` 4. 使用加权多信号公式(AI + 关键词 + 来源可靠性 + 时效性)对风险进行**评分** 5. 针对高风险项目通过 Telegram **发出警报** 6. 在深色模式终端美学风格的仪表盘上**可视化**所有内容 ## 系统架构 ``` ┌─────────────────────────────────────────────────────────────────┐ │ BreachIntel Pipeline │ │ │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │ │ │ RSS Feeds │ │ Text │ │ Rule Engine │ │ │ │ (8 sources) │───▶│ Cleaner │───▶│ Keywords + │ │ │ │ feedparser │ │ BS4 + NLTK │ │ Regex + Blacklist│ │ │ └──────────────┘ └──────────────┘ └────────┬─────────┘ │ │ │ │ │ ┌──────────────┐ ┌──────────────┐ ┌────────▼─────────┐ │ │ │ SQLite DB │ │ Risk Scorer │ │ AI Classifier │ │ │ │ (Articles) │◀───│ Weighted │◀───│ TF-IDF + │ │ │ │ SQLAlchemy │ │ Formula │ │ LogisticRegr. │ │ │ └──────┬───────┘ └──────────────┘ └──────────────────┘ │ │ │ │ │ ┌──────▼───────┐ ┌──────────────┐ │ │ │ Flask API │ │ Telegram │ │ │ │ + Dashboard │ │ Alerter │ │ │ │ REST /api/ │ │ (optional) │ │ │ └──────────────┘ └──────────────┘ │ └─────────────────────────────────────────────────────────────────┘ ``` ### 信号融合(风险评分公式) ``` Risk Score = (AI_confidence × 0.40) + (keyword_score × 0.25) + (source_trust × 0.20) + (recency_score × 0.15) Risk Levels: CRITICAL ≥ 0.80 HIGH ≥ 0.60 MEDIUM ≥ 0.35 LOW < 0.35 ``` ## 项目结构 ``` breach-intel/ ├── config.py # Central config (env vars + feed list) ├── run.py # Web server entry point ├── requirements.txt ├── .env.example # Template for environment variables ├── .gitignore ├── Dockerfile ├── docker-compose.yml │ ├── src/ │ ├── collectors/ │ │ ├── base.py # Abstract base collector │ │ └── rss_collector.py # RSS/Atom feed collector (pool) │ │ │ ├── processing/ │ │ ├── cleaner.py # Text normalization + deduplication │ │ ├── rule_engine.py # Keyword + regex engine │ │ ├── database.py # SQLAlchemy models (Article) │ │ └── pipeline.py # Main orchestration pipeline │ │ │ ├── models/ │ │ ├── classifier.py # NLP model (TF-IDF + LogReg) │ │ └── risk_scorer.py # Multi-signal risk scorer │ │ │ ├── alerts/ │ │ └── telegram_alert.py # Telegram Bot alerter │ │ │ └── webapp/ │ ├── app.py # Flask app factory │ ├── routes.py # Dashboard routes │ ├── api.py # REST API blueprint │ ├── templates/ │ │ ├── index.html # Main dashboard │ │ └── detail.html # Article detail view │ └── static/ │ ├── css/dashboard.css │ └── js/dashboard.js │ ├── data/ │ ├── training_data.json # 50 labeled samples (real_breach/suspicious/irrelevant) │ └── breach_classifier.joblib # (generated after training) │ ├── scripts/ │ ├── train_model.py # Train/retrain the classifier │ └── run_collector.py # Manual or scheduled pipeline run │ ├── tests/ │ ├── test_cleaner.py │ ├── test_rule_engine.py │ ├── test_risk_scorer.py │ └── test_api.py │ └── notebooks/ └── (exploratory analysis — add your own) ``` ## 快速开始 ### 1. 克隆与设置 ``` git clone https://github.com/your-handle/breach-intel.git cd breach-intel python -m venv venv source venv/bin/activate # Windows: venv\Scripts\activate pip install -r requirements.txt ``` ### 2. 配置环境 ``` cp .env.example .env # 使用你的设置编辑 .env(Telegram 为可选) ``` 关键变量: ``` FLASK_SECRET_KEY=your-random-secret TELEGRAM_BOT_TOKEN=your-bot-token # Optional TELEGRAM_CHAT_ID=your-chat-id # Optional ``` ### 3. 训练模型 ``` python scripts/train_model.py ``` 预期输出: ``` TRAINING COMPLETE Samples used : 50 CV F1 (weighted) : 0.8821 ± 0.0312 Test accuracy : 0.9000 Model saved to : data/breach_classifier.joblib ``` ### 4. 运行仪表盘 ``` python run.py # 打开 http://localhost:5000 ``` ### 5. 运行收集扫描 ``` # 单次运行 python scripts/run_collector.py # 计划任务(每 30 分钟) python scripts/run_collector.py --schedule ``` ## Docker 部署 ``` # 构建并启动所有服务 docker-compose up -d # 查看日志 docker-compose logs -f web docker-compose logs -f collector # 停止 docker-compose down ``` ## API 参考 所有接口均返回 JSON。基础路径:`/api/v1/` | 方法 | 接口 | 描述 | |--------|----------|-------------| | GET | `/api/v1/health` | 健康检查 | | GET | `/api/v1/stats` | 聚合统计 | | GET | `/api/v1/articles` | 文章列表(分页) | | GET | `/api/v1/articles/:id` | 获取单篇文章 | | POST | `/api/v1/trigger-run` | 触发手动扫描 | ### 查询参数 (`/api/v1/articles`) | 参数 | 值 | 描述 | |-------|--------|-------------| | `label` | `real_breach`, `suspicious`, `irrelevant` | 按分类过滤 | | `risk_level` | `CRITICAL`, `HIGH`, `MEDIUM`, `LOW` | 按风险等级过滤 | | `q` | string | 标题全文搜索 | | `page` | integer | 页码(默认:1) | | `per_page` | integer | 每页结果数(最大:100) | ### API 调用示例 ``` # 获取所有严重违规 curl http://localhost:5000/api/v1/articles?risk_level=CRITICAL # 搜索勒索软件 curl http://localhost:5000/api/v1/articles?q=ransomware # 获取统计信息 curl http://localhost:5000/api/v1/stats # 触发扫描(使用 API secret) curl -X POST http://localhost:5000/api/v1/trigger-run \ -H "Authorization: Bearer your-api-secret" ``` ## Telegram 机器人设置 1. 向 [@BotFather](https://t.me/botfather) 发送消息 → `/newbot` → 复制 token 2. 通过向 [@userinfobot](https://t.me/userinfobot) 发送消息获取您的 chat ID 3. 在 `.env` 中设置: TELEGRAM_BOT_TOKEN=1234567890:ABCdef... TELEGRAM_CHAT_ID=987654321 4. 测试连接: python -c " from src.alerts.telegram_alert import TelegramAlerter TelegramAlerter().test_connection() " 警报格式: ``` 🚨 BREACH ALERT — CRITICAL ━━━━━━━━━━━━━━━━━━━━━━ 📌 Title: Major Bank Data Breach Exposes 10M Records 📝 Summary: Customer names, emails and encrypted... 🏷️ Classification: Real Breach 📊 Risk Score: ████████░░ 82% 📰 Source: Krebs on Security 🕐 Published: 2024-03-15 14:32 UTC 🔗 Read Full Article ``` ## 运行测试 ``` # 所有测试 pytest tests/ -v # 带有覆盖率 pytest tests/ -v --cov=src --cov-report=term-missing # 单个模块 pytest tests/test_rule_engine.py -v ``` ## 扩充数据集 默认的 `data/training_data.json` 包含 50 个标注样本。为了提高准确率: 1. **手动标注**:按以下格式将文章添加到 `data/training_data.json`: {"text": "文章标题 + 内容...", "label": "real_breach"} 标签:`real_breach` | `suspicious` | `irrelevant` 2. **批量来源**: - [CISA 已知被利用漏洞目录](https://www.cisa.gov/known-exploited-vulnerabilities-catalog) - [HaveIBeenPwned 数据泄露列表](https://haveibeenpwned.com/PwnedWebsites) - [Verizon DBIR 数据集](https://www.verizon.com/business/resources/reports/dbir/) 3. **重新训练**: python scripts/train_model.py --data data/training_data.json ### 升级至 Transformer 模型 将 `src/models/classifier.py` 中的 TF-IDF 流水线替换为: ``` from transformers import pipeline clf = pipeline( "text-classification", model="jackaduma/SecBERT", # Security-domain BERT truncation=True, max_length=512, ) ``` 注意:需要在 requirements.txt 中包含 `transformers` 和 `torch`。速度会明显变慢,但在处理领域特定语言时准确度更高。 ## 添加新数据源 在 `src/collectors/` 目录下的新文件中实现 `BaseCollector`: ``` from src.collectors.base import BaseCollector, RawArticle class MyCustomCollector(BaseCollector): name = "my_source" reliability = 0.85 def fetch(self) -> list[RawArticle]: # Your collection logic here return [RawArticle(title=..., url=..., ...)] ``` 然后在 `config.py` 中注册它: ``` RSS_FEEDS = [ ..., {"name": "My Source", "url": "https://mysource.com/feed", "reliability": 0.85}, ] ``` ## 环境变量参考 | 变量 | 默认值 | 描述 | |----------|---------|-------------| | `FLASK_SECRET_KEY` | `dev-secret` | Flask session secret | | `FLASK_PORT` | `5000` | Web 服务器端口 | | `FLASK_DEBUG` | `false` | 启用调试模式 | | `DATABASE_URL` | `sqlite:///data/breach_intel.db` | 数据库连接字符串 | | `TELEGRAM_BOT_TOKEN` | — | Telegram 机器人 token | | `TELEGRAM_CHAT_ID` | — | Telegram 聊天/频道 ID | | `FETCH_INTERVAL_MINUTES` | `30` | 收集器调度间隔 | | `MAX_ARTICLES_PER_SOURCE` | `50` | 每个源每次运行的最大文章数 | | `MODEL_PATH` | `data/breach_classifier.joblib` | 训练模型的路径 | | `CONFIDENCE_THRESHOLD` | `0.65` | 信任预测的最低 AI 置信度 | | `LOG_LEVEL` | `INFO` | 日志详细程度 | | `LOG_FILE` | `logs/breach_intel.log` | 日志文件路径 | | `API_SECRET` | — | `/trigger-run` 接口的 Bearer token | ## 未来改进方向 - [ ] **PostgreSQL 支持** — 将 SQLite 替换为生产环境的 PostgreSQL - [ ] **MITRE ATT&CK 标签** — 根据文章内容自动标记战术/技术 - [ ] **CVE 提取** — 使用正则表达式 + NVD API 查找提及的 CVE - [ ] **Twitter/X 监控** — 通过 Twitter API v2 实现(需要账号) - [ ] **Shodan 集成** — 交叉比对暴露的 IP/域名 - [ ] **邮件摘要** — 通过 SendGrid/SMTP 发送每日汇总邮件 - [ ] **Slack 警报** — Telegram 的替代方案 - [ ] **SecBERT 模型** — 用领域特定的 Transformer 替换 TF-IDF - [ ] **威胁行为者追踪** — 针对 APT 组织名称的实体提取 - [ ] **地理热力图** — 使用 D3.js 绘制数据泄露来源的地图 - [ ] **Prometheus 指标** — 暴露 `/metrics` 用于 Grafana 监控 ## 技术栈 | 层级 | 技术 | |-------|-----------| | 语言 | Python 3.11+ | | Web 框架 | Flask 3.0 + SQLAlchemy | | 数据库 | SQLite (开发) / PostgreSQL (生产) | | NLP 模型 | TF-IDF + Logistic Regression (scikit-learn) | | 信息源收集 | feedparser + requests + BeautifulSoup4 | | 数据处理 | pandas + numpy | | 警报 | Telegram Bot API | | 容器化 | Docker + docker-compose | | 测试 | pytest + pytest-cov | | 日志 | colorlog + RotatingFileHandler | ## 许可证 MIT 许可证 — 详见 [LICENSE](LICENSE)。 ## 贡献指南 1. Fork 本仓库 2. 创建一个功能分支:`git checkout -b feat/your-feature` 3. 运行测试:`pytest tests/ -v` 4. 提交 Pull Request *本项目作为一个作品集项目构建,旨在展示数据工程、NLP 与网络安全运营的交汇点。*
标签:Apex, Flask, HTTP/HTTPS抓包, NLP, Python, RSS订阅, scikit-learn, SQLite, Telegram机器人, TF-IDF, XSS, 威胁情报, 安全规则引擎, 实时处理, 密码管理, 开发者工具, 态势感知, 数据去重, 数据工程, 数据清洗, 文本分类, 无后门, 机器学习, 漏洞情报, 网络安全, 网络测绘, 自动抓取, 请求拦截, 逆向工具, 逻辑回归, 隐私保护, 风险评分