sajansshergill/signal-intelligence-pipeline
GitHub: sajansshergill/signal-intelligence-pipeline
一个端到端的AI安全威胁信号情报数据流水线,从公开来源抓取、分类并评分新兴AI威胁,通过API和仪表板提供可操作的情报。
Stars: 0 | Forks: 0
# AI 威胁信号情报流水线
## 概述
AI 威胁信号情报流水线是一个端到端的数据工程系统,用于监控公开信息源 —— Reddit、HackerNews、arXiv 和 CVE 订阅源 —— 以发现新出现的 AI 安全威胁、越狱技术、prompt 注入模式和模型漏洞讨论。
原始信号经过抓取、去重、按威胁类别分类后,通过 REST API 和 Streamlit 仪表板呈现为可操作的情报。该流水线采用本地等效工具(Kafka → FastAPI worker → DuckDB → Airflow)进行架构,以镜像生产级别的 GCP 部署(Pub/Sub → Cloud Run → CloudSQL → Cloud Composer)。
## 架构
```
┌─────────────────────────────────────────────────────┐
│ DATA SOURCES │
│ Reddit (PRAW) │ HackerNews API │ arXiv RSS │
│ CVE NVD Feed │
└──────────────┬──────────────────────────────────────┘
│ Scrapy + BeautifulSoup crawlers
▼
┌─────────────────────────────────────────────────────┐
│ INGESTION LAYER │
│ Kafka Topic: raw_threat_signals │
│ (GCP equivalent: Cloud Pub/Sub) │
└──────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────┐
│ PROCESSING WORKERS │
│ FastAPI + Python workers (Cloud Run equivalent) │
│ - Deduplication (SHA-256 content hash) │
│ - PII anonymization + masking │
│ - Threat category tagging │
│ - Severity scoring │
└──────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────┐
│ STORAGE LAYER │
│ Raw JSON → /data/raw/ (GCS equivalent) │
│ Structured → DuckDB (CloudSQL equivalent) │
└──────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────┐
│ ORCHESTRATION (Airflow) │
│ Cloud Composer equivalent │
│ DAG: threat_ingestion_pipeline (daily) │
│ DAG: dbt_transformation_pipeline (daily) │
└──────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────┐
│ TRANSFORMATION LAYER (dbt) │
│ bronze → silver → gold medallion architecture │
└──────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────┐
│ SERVING LAYER │
│ FastAPI REST endpoints │ Streamlit Dashboard │
└─────────────────────────────────────────────────────┘
```
## dbt Medallion 架构
### Bronze —— 原始数据摄入
| 模型 | 描述 |
|---|---|
| `bronze_reddit_posts` | 来自 r/MachineLearning、r/netsec、r/ArtificialIntelligence 的 Reddit 原始帖子 |
| `bronze_hn_stories` | 匹配威胁关键词的 HackerNews 帖子和评论 |
| `bronze_arxiv_papers` | 标记有对抗性 ML / 安全主题的 arXiv 摘要 |
| `bronze_cve_records` | 与 AI/ML 相关漏洞的 CVE NVD 订阅源记录 |
### Silver —— 清洗与分类
| 模型 | 描述 |
|---|---|
| `silver_threat_signals` | 去重、匿名化并带有威胁类别标签的信号 |
| `silver_entity_mentions` | 每个信号中提取出的模型系列和公司提及 |
| `silver_source_velocity` | 每个来源的每日发帖量 —— 用于峰值检测 |
### Gold —— 情报层
| 模型 | 描述 |
|---|---|
| `gold_threat_signals` | 带有严重性指数的最终丰富信号,通过 API 提供 |
| `gold_emerging_threats` | 过去 7 天内增长速度超过 2 倍的信号 |
| `gold_severity_index` | 综合得分:参与度 × 新颖度 × 复现率 |
| `gold_entity_risk_map` | 在对抗性讨论中出现频率最高的模型系列 |
## 威胁分类体系
信号被归类为以下几个类别:
| 类别 | 示例 |
|---|---|
| `jailbreak` | Prompt 覆盖技术、角色扮演漏洞利用 |
| `prompt_injection` | 通过文档、tool 输出进行的间接注入 |
| `data_poisoning` | 训练数据篡改、后门攻击 |
| `model_extraction` | 基于 API 的模型窃取、蒸馏攻击 |
| `adversarial_inputs` | 对抗样本、图像扰动 |
| `supply_chain` | 恶意的 fine-tune、受损的 checkpoint |
| `infrastructure` | ML 服务栈中的 CVE(TorchServe、Triton 等) |
| `policy_evasion` | 内容过滤器绕过、NSFW 绕过技术 |
## API Endpoint
| 方法 | Endpoint | 描述 |
|---|---|---|
| `GET` | `/threats/latest` | 最近 50 条威胁信号 |
| `GET` | `/threats/{category}` | 按威胁类别筛选的信号 |
| `GET` | `/threats/emerging` | 过去 7 天内速度激增的信号 |
| `GET` | `/threats/severity` | 按严重性指数降序排列的信号 |
| `GET` | `/entities/{name}` | 所有提及特定模型或公司的信号 |
| `GET` | `/health` | 流水线健康状态 + 上次成功运行的时间戳 |
## 技术栈
| 层级 | 工具 | GCP 生产等效方案 |
|---|---|---|
| 抓取 | Scrapy, BeautifulSoup, PRAW | — |
| 摄入 | Apache Kafka (Docker) | Cloud Pub/Sub |
| 处理 | Python worker, FastAPI | Cloud Run |
| 原始存储 | 本地文件系统 (`/data/raw/`) | Cloud Storage (GCS) |
| 数据库 | DuckDB | CloudSQL / Cloud Spanner |
| 编排 | Apache Airflow (Docker) | Cloud Composer |
| 转换 | dbt Core | — |
| 服务 | FastAPI, Streamlit | Cloud Run |
| 测试 | pytest | — |
| 容器化 | Docker, Docker Compose | — |
## 项目结构
```
ai-threat-signal-pipeline/
├── scrappers/
│ ├── reddit_scraper.py # PRAW-based Reddit ingestion
│ ├── hn_scraper.py # HackerNews API scraper
│ ├── arxiv_scraper.py # arXiv RSS feed parser
│ └── cve_scraper.py # NVD CVE feed scraper
├── ingestion/
│ ├── kafka_producer.py # Publishes raw signals to Kafka
│ └── kafka_consumer.py # Consumes and routes to DuckDB
├── processing/
│ ├── deduplicator.py # SHA-256 content hash dedup
│ ├── anonymizer.py # PII masking + anonymization
│ ├── classifier.py # Threat category tagger
│ └── severity_scorer.py # Composite severity index
├── dbt/
│ ├── models/
│ │ ├── bronze/
│ │ ├── silver/
│ │ └── gold/
│ ├── tests/
│ └── dbt_project.yml
├── airflow/
│ ├── dags/
│ │ ├── threat_ingestion_dag.py
│ │ └── dbt_transformation_dag.py
│ └── docker-compose.yml
├── api/
│ └── main.py # FastAPI REST endpoints
├── dashboard/
│ └── app.py # Streamlit threat dashboard
├── tests/
│ ├── test_scrapers.py
│ ├── test_processing.py
│ ├── test_dbt_models.py
│ └── test_api.py
├── data/
│ └── raw/ # Raw JSON landing zone
├── docker-compose.yml # Kafka + Airflow + app stack
├── requirements.txt
└── README.md
```
## 快速开始
### 前置条件
- Python 3.11+
- Docker + Docker Compose
- Reddit API 凭证 (PRAW)
### 设置
```
git clone https://github.com/sajansshergill/ai-threat-signal-pipeline
cd ai-threat-signal-pipeline
# 启动 Kafka + Airflow
docker-compose up -d
# 安装依赖
pip install -r requirements.txt
# 配置凭证
cp .env.example .env
# 添加 REDDIT_CLIENT_ID, REDDIT_CLIENT_SECRET, REDDIT_USER_AGENT
# 对原始摄取的 signals 进行分类和评分
python processing/classifier.py
# 运行 dbt 转换
cd dbt && dbt deps && dbt run && dbt test
# 启动 API 服务器
uvicorn api.main:app --reload
# 启动 dashboard
streamlit run dashboard/app.py
```
### 手动运行抓取器
```
python scrappers/reddit_scraper.py
python scrappers/hn_scraper.py
python scrappers/arxiv_scraper.py
python scrappers/cve_scraper.py
```
## 测试
```
pytest tests/ -v
```
| 测试套件 | 覆盖范围 |
|---|---|
| 抓取器单元测试 | 来源连通性、payload schema 验证 |
| 处理单元测试 | 去重逻辑、匿名化、类别标记 |
| dbt schema 测试 | 所有模型的非空、唯一性、可接受值检查 |
| API 集成测试 | Endpoint 响应代码、payload 结构 |
## 关键工程决策
**为什么选择 Kafka 而不是直接写入数据库?**
将抓取器与处理 worker 解耦。抓取器可以在不阻塞下游转换的情况下应对突发流量 —— 这镜像了生产 GCP 流水线中使用的 Pub/Sub 模式。
**为什么选择 DuckDB?**
零基础设施列式存储,针对文本密集型数据的分析工作负载进行了优化。在生产部署中,只需更改单个连接字符串即可将其替换为 CloudSQL。
**为什么在存储前进行 SHA-256 去重?**
相同的越狱技术会同时在 Reddit、HN 和 Twitter 上出现。在摄入时进行内容哈希去重,可以防止相同的信号夸大下游的严重性得分。
**为什么对威胁数据采用 Medallion 架构?**
Bronze 层保留原始来源以供审计跟踪。Silver 层进行规范化和匿名化。Gold 层仅呈现可操作的情报 —— 这与安全团队消费威胁订阅源的方式保持一致。
## 作者
**Sajan Singh Shergill**
数据科学硕士,Pace University
[linkedin.com/in/sajanshergill](https://linkedin.com/in/sajanshergill) · [sajansshergill.github.io](https://sajansshergill.github.io) · sajansshergill@gmail.com
标签:AI安全, Chat Copilot, GCP, Kubernetes, Splunk, URL抓取, 威胁情报, 安全规则引擎, 开发者工具, 数据工程, 数据抓取, 流式处理, 版权保护, 请求拦截, 软件成分分析, 逆向工具