KANGGAFH/breach-intel
GitHub: KANGGAFH/breach-intel
一套自动化数据泄露情报系统,通过 RSS 采集、NLP 三分类和多信号加权风险评分,将高危事件实时推送至 Telegram 并在 Web 仪表盘可视化展示。
Stars: 0 | Forks: 0
# BreachIntel — 自动化数据泄露情报系统
```
██████╗ ██████╗ ███████╗ █████╗ ██████╗██╗ ██╗
██╔══██╗██╔══██╗██╔════╝██╔══██╗██╔════╝██║ ██║
██████╔╝██████╔╝█████╗ ███████║██║ ███████║
██╔══██╗██╔══██╗██╔══╝ ██╔══██║██║ ██╔══██║
██████╔╝██║ ██║███████╗██║ ██║╚██████╗██║ ██║
╚═════╝ ╚═╝ ╚═╝╚══════╝╚═╝ ╚═╝ ╚═════╝╚═╝ ╚═╝
INTEL Automated Threat Intelligence · v1.0.0
```
## 概述
BreachIntel 是一个**微型数据工程 + AI + 网络安全项目**,旨在演示用于自动化威胁情报收集的生产级流水线。
它会自动执行以下操作:
1. 从 8 个精选的网络安全 RSS 源(Krebs、BleepingComputer、CISA 等)**收集**文章
2. 使用标准化流水线对文本进行**清理**和去重
3. 使用训练好的 NLP 模型将每篇文章**分类**为 `real_breach`、`suspicious` 或 `irrelevant`
4. 使用加权多信号公式(AI + 关键词 + 来源可靠性 + 时效性)对风险进行**评分**
5. 针对高风险项目通过 Telegram **发出警报**
6. 在深色模式终端美学风格的仪表盘上**可视化**所有内容
## 系统架构
```
┌─────────────────────────────────────────────────────────────────┐
│ BreachIntel Pipeline │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │
│ │ RSS Feeds │ │ Text │ │ Rule Engine │ │
│ │ (8 sources) │───▶│ Cleaner │───▶│ Keywords + │ │
│ │ feedparser │ │ BS4 + NLTK │ │ Regex + Blacklist│ │
│ └──────────────┘ └──────────────┘ └────────┬─────────┘ │
│ │ │
│ ┌──────────────┐ ┌──────────────┐ ┌────────▼─────────┐ │
│ │ SQLite DB │ │ Risk Scorer │ │ AI Classifier │ │
│ │ (Articles) │◀───│ Weighted │◀───│ TF-IDF + │ │
│ │ SQLAlchemy │ │ Formula │ │ LogisticRegr. │ │
│ └──────┬───────┘ └──────────────┘ └──────────────────┘ │
│ │ │
│ ┌──────▼───────┐ ┌──────────────┐ │
│ │ Flask API │ │ Telegram │ │
│ │ + Dashboard │ │ Alerter │ │
│ │ REST /api/ │ │ (optional) │ │
│ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
```
### 信号融合(风险评分公式)
```
Risk Score = (AI_confidence × 0.40)
+ (keyword_score × 0.25)
+ (source_trust × 0.20)
+ (recency_score × 0.15)
Risk Levels:
CRITICAL ≥ 0.80
HIGH ≥ 0.60
MEDIUM ≥ 0.35
LOW < 0.35
```
## 项目结构
```
breach-intel/
├── config.py # Central config (env vars + feed list)
├── run.py # Web server entry point
├── requirements.txt
├── .env.example # Template for environment variables
├── .gitignore
├── Dockerfile
├── docker-compose.yml
│
├── src/
│ ├── collectors/
│ │ ├── base.py # Abstract base collector
│ │ └── rss_collector.py # RSS/Atom feed collector (pool)
│ │
│ ├── processing/
│ │ ├── cleaner.py # Text normalization + deduplication
│ │ ├── rule_engine.py # Keyword + regex engine
│ │ ├── database.py # SQLAlchemy models (Article)
│ │ └── pipeline.py # Main orchestration pipeline
│ │
│ ├── models/
│ │ ├── classifier.py # NLP model (TF-IDF + LogReg)
│ │ └── risk_scorer.py # Multi-signal risk scorer
│ │
│ ├── alerts/
│ │ └── telegram_alert.py # Telegram Bot alerter
│ │
│ └── webapp/
│ ├── app.py # Flask app factory
│ ├── routes.py # Dashboard routes
│ ├── api.py # REST API blueprint
│ ├── templates/
│ │ ├── index.html # Main dashboard
│ │ └── detail.html # Article detail view
│ └── static/
│ ├── css/dashboard.css
│ └── js/dashboard.js
│
├── data/
│ ├── training_data.json # 50 labeled samples (real_breach/suspicious/irrelevant)
│ └── breach_classifier.joblib # (generated after training)
│
├── scripts/
│ ├── train_model.py # Train/retrain the classifier
│ └── run_collector.py # Manual or scheduled pipeline run
│
├── tests/
│ ├── test_cleaner.py
│ ├── test_rule_engine.py
│ ├── test_risk_scorer.py
│ └── test_api.py
│
└── notebooks/
└── (exploratory analysis — add your own)
```
## 快速开始
### 1. 克隆与设置
```
git clone https://github.com/your-handle/breach-intel.git
cd breach-intel
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install -r requirements.txt
```
### 2. 配置环境
```
cp .env.example .env
# 使用你的设置编辑 .env(Telegram 为可选)
```
关键变量:
```
FLASK_SECRET_KEY=your-random-secret
TELEGRAM_BOT_TOKEN=your-bot-token # Optional
TELEGRAM_CHAT_ID=your-chat-id # Optional
```
### 3. 训练模型
```
python scripts/train_model.py
```
预期输出:
```
TRAINING COMPLETE
Samples used : 50
CV F1 (weighted) : 0.8821 ± 0.0312
Test accuracy : 0.9000
Model saved to : data/breach_classifier.joblib
```
### 4. 运行仪表盘
```
python run.py
# 打开 http://localhost:5000
```
### 5. 运行收集扫描
```
# 单次运行
python scripts/run_collector.py
# 计划任务(每 30 分钟)
python scripts/run_collector.py --schedule
```
## Docker 部署
```
# 构建并启动所有服务
docker-compose up -d
# 查看日志
docker-compose logs -f web
docker-compose logs -f collector
# 停止
docker-compose down
```
## API 参考
所有接口均返回 JSON。基础路径:`/api/v1/`
| 方法 | 接口 | 描述 |
|--------|----------|-------------|
| GET | `/api/v1/health` | 健康检查 |
| GET | `/api/v1/stats` | 聚合统计 |
| GET | `/api/v1/articles` | 文章列表(分页) |
| GET | `/api/v1/articles/:id` | 获取单篇文章 |
| POST | `/api/v1/trigger-run` | 触发手动扫描 |
### 查询参数 (`/api/v1/articles`)
| 参数 | 值 | 描述 |
|-------|--------|-------------|
| `label` | `real_breach`, `suspicious`, `irrelevant` | 按分类过滤 |
| `risk_level` | `CRITICAL`, `HIGH`, `MEDIUM`, `LOW` | 按风险等级过滤 |
| `q` | string | 标题全文搜索 |
| `page` | integer | 页码(默认:1) |
| `per_page` | integer | 每页结果数(最大:100) |
### API 调用示例
```
# 获取所有严重违规
curl http://localhost:5000/api/v1/articles?risk_level=CRITICAL
# 搜索勒索软件
curl http://localhost:5000/api/v1/articles?q=ransomware
# 获取统计信息
curl http://localhost:5000/api/v1/stats
# 触发扫描(使用 API secret)
curl -X POST http://localhost:5000/api/v1/trigger-run \
-H "Authorization: Bearer your-api-secret"
```
## Telegram 机器人设置
1. 向 [@BotFather](https://t.me/botfather) 发送消息 → `/newbot` → 复制 token
2. 通过向 [@userinfobot](https://t.me/userinfobot) 发送消息获取您的 chat ID
3. 在 `.env` 中设置:
TELEGRAM_BOT_TOKEN=1234567890:ABCdef...
TELEGRAM_CHAT_ID=987654321
4. 测试连接:
python -c "
from src.alerts.telegram_alert import TelegramAlerter
TelegramAlerter().test_connection()
"
警报格式:
```
🚨 BREACH ALERT — CRITICAL
━━━━━━━━━━━━━━━━━━━━━━
📌 Title: Major Bank Data Breach Exposes 10M Records
📝 Summary: Customer names, emails and encrypted...
🏷️ Classification: Real Breach
📊 Risk Score: ████████░░ 82%
📰 Source: Krebs on Security
🕐 Published: 2024-03-15 14:32 UTC
🔗 Read Full Article
```
## 运行测试
```
# 所有测试
pytest tests/ -v
# 带有覆盖率
pytest tests/ -v --cov=src --cov-report=term-missing
# 单个模块
pytest tests/test_rule_engine.py -v
```
## 扩充数据集
默认的 `data/training_data.json` 包含 50 个标注样本。为了提高准确率:
1. **手动标注**:按以下格式将文章添加到 `data/training_data.json`:
{"text": "文章标题 + 内容...", "label": "real_breach"}
标签:`real_breach` | `suspicious` | `irrelevant`
2. **批量来源**:
- [CISA 已知被利用漏洞目录](https://www.cisa.gov/known-exploited-vulnerabilities-catalog)
- [HaveIBeenPwned 数据泄露列表](https://haveibeenpwned.com/PwnedWebsites)
- [Verizon DBIR 数据集](https://www.verizon.com/business/resources/reports/dbir/)
3. **重新训练**:
python scripts/train_model.py --data data/training_data.json
### 升级至 Transformer 模型
将 `src/models/classifier.py` 中的 TF-IDF 流水线替换为:
```
from transformers import pipeline
clf = pipeline(
"text-classification",
model="jackaduma/SecBERT", # Security-domain BERT
truncation=True,
max_length=512,
)
```
注意:需要在 requirements.txt 中包含 `transformers` 和 `torch`。速度会明显变慢,但在处理领域特定语言时准确度更高。
## 添加新数据源
在 `src/collectors/` 目录下的新文件中实现 `BaseCollector`:
```
from src.collectors.base import BaseCollector, RawArticle
class MyCustomCollector(BaseCollector):
name = "my_source"
reliability = 0.85
def fetch(self) -> list[RawArticle]:
# Your collection logic here
return [RawArticle(title=..., url=..., ...)]
```
然后在 `config.py` 中注册它:
```
RSS_FEEDS = [
...,
{"name": "My Source", "url": "https://mysource.com/feed", "reliability": 0.85},
]
```
## 环境变量参考
| 变量 | 默认值 | 描述 |
|----------|---------|-------------|
| `FLASK_SECRET_KEY` | `dev-secret` | Flask session secret |
| `FLASK_PORT` | `5000` | Web 服务器端口 |
| `FLASK_DEBUG` | `false` | 启用调试模式 |
| `DATABASE_URL` | `sqlite:///data/breach_intel.db` | 数据库连接字符串 |
| `TELEGRAM_BOT_TOKEN` | — | Telegram 机器人 token |
| `TELEGRAM_CHAT_ID` | — | Telegram 聊天/频道 ID |
| `FETCH_INTERVAL_MINUTES` | `30` | 收集器调度间隔 |
| `MAX_ARTICLES_PER_SOURCE` | `50` | 每个源每次运行的最大文章数 |
| `MODEL_PATH` | `data/breach_classifier.joblib` | 训练模型的路径 |
| `CONFIDENCE_THRESHOLD` | `0.65` | 信任预测的最低 AI 置信度 |
| `LOG_LEVEL` | `INFO` | 日志详细程度 |
| `LOG_FILE` | `logs/breach_intel.log` | 日志文件路径 |
| `API_SECRET` | — | `/trigger-run` 接口的 Bearer token |
## 未来改进方向
- [ ] **PostgreSQL 支持** — 将 SQLite 替换为生产环境的 PostgreSQL
- [ ] **MITRE ATT&CK 标签** — 根据文章内容自动标记战术/技术
- [ ] **CVE 提取** — 使用正则表达式 + NVD API 查找提及的 CVE
- [ ] **Twitter/X 监控** — 通过 Twitter API v2 实现(需要账号)
- [ ] **Shodan 集成** — 交叉比对暴露的 IP/域名
- [ ] **邮件摘要** — 通过 SendGrid/SMTP 发送每日汇总邮件
- [ ] **Slack 警报** — Telegram 的替代方案
- [ ] **SecBERT 模型** — 用领域特定的 Transformer 替换 TF-IDF
- [ ] **威胁行为者追踪** — 针对 APT 组织名称的实体提取
- [ ] **地理热力图** — 使用 D3.js 绘制数据泄露来源的地图
- [ ] **Prometheus 指标** — 暴露 `/metrics` 用于 Grafana 监控
## 技术栈
| 层级 | 技术 |
|-------|-----------|
| 语言 | Python 3.11+ |
| Web 框架 | Flask 3.0 + SQLAlchemy |
| 数据库 | SQLite (开发) / PostgreSQL (生产) |
| NLP 模型 | TF-IDF + Logistic Regression (scikit-learn) |
| 信息源收集 | feedparser + requests + BeautifulSoup4 |
| 数据处理 | pandas + numpy |
| 警报 | Telegram Bot API |
| 容器化 | Docker + docker-compose |
| 测试 | pytest + pytest-cov |
| 日志 | colorlog + RotatingFileHandler |
## 许可证
MIT 许可证 — 详见 [LICENSE](LICENSE)。
## 贡献指南
1. Fork 本仓库
2. 创建一个功能分支:`git checkout -b feat/your-feature`
3. 运行测试:`pytest tests/ -v`
4. 提交 Pull Request
*本项目作为一个作品集项目构建,旨在展示数据工程、NLP 与网络安全运营的交汇点。*
标签:Apex, Flask, HTTP/HTTPS抓包, NLP, Python, RSS订阅, scikit-learn, SQLite, Telegram机器人, TF-IDF, XSS, 威胁情报, 安全规则引擎, 实时处理, 密码管理, 开发者工具, 态势感知, 数据去重, 数据工程, 数据清洗, 文本分类, 无后门, 机器学习, 漏洞情报, 网络安全, 网络测绘, 自动抓取, 请求拦截, 逆向工具, 逻辑回归, 隐私保护, 风险评分