sajansshergill/signal-intelligence-pipeline

GitHub: sajansshergill/signal-intelligence-pipeline

一个端到端的AI安全威胁信号情报数据流水线,从公开来源抓取、分类并评分新兴AI威胁,通过API和仪表板提供可操作的情报。

Stars: 0 | Forks: 0

# AI 威胁信号情报流水线 ## 概述 AI 威胁信号情报流水线是一个端到端的数据工程系统,用于监控公开信息源 —— Reddit、HackerNews、arXiv 和 CVE 订阅源 —— 以发现新出现的 AI 安全威胁、越狱技术、prompt 注入模式和模型漏洞讨论。 原始信号经过抓取、去重、按威胁类别分类后,通过 REST API 和 Streamlit 仪表板呈现为可操作的情报。该流水线采用本地等效工具(Kafka → FastAPI worker → DuckDB → Airflow)进行架构,以镜像生产级别的 GCP 部署(Pub/Sub → Cloud Run → CloudSQL → Cloud Composer)。 ## 架构 ``` ┌─────────────────────────────────────────────────────┐ │ DATA SOURCES │ │ Reddit (PRAW) │ HackerNews API │ arXiv RSS │ │ CVE NVD Feed │ └──────────────┬──────────────────────────────────────┘ │ Scrapy + BeautifulSoup crawlers ▼ ┌─────────────────────────────────────────────────────┐ │ INGESTION LAYER │ │ Kafka Topic: raw_threat_signals │ │ (GCP equivalent: Cloud Pub/Sub) │ └──────────────┬──────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────┐ │ PROCESSING WORKERS │ │ FastAPI + Python workers (Cloud Run equivalent) │ │ - Deduplication (SHA-256 content hash) │ │ - PII anonymization + masking │ │ - Threat category tagging │ │ - Severity scoring │ └──────────────┬──────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────┐ │ STORAGE LAYER │ │ Raw JSON → /data/raw/ (GCS equivalent) │ │ Structured → DuckDB (CloudSQL equivalent) │ └──────────────┬──────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────┐ │ ORCHESTRATION (Airflow) │ │ Cloud Composer equivalent │ │ DAG: threat_ingestion_pipeline (daily) │ │ DAG: dbt_transformation_pipeline (daily) │ └──────────────┬──────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────┐ │ TRANSFORMATION LAYER (dbt) │ │ bronze → silver → gold medallion architecture │ └──────────────┬──────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────┐ │ SERVING LAYER │ │ FastAPI REST endpoints │ Streamlit Dashboard │ └─────────────────────────────────────────────────────┘ ``` ## dbt Medallion 架构 ### Bronze —— 原始数据摄入 | 模型 | 描述 | |---|---| | `bronze_reddit_posts` | 来自 r/MachineLearning、r/netsec、r/ArtificialIntelligence 的 Reddit 原始帖子 | | `bronze_hn_stories` | 匹配威胁关键词的 HackerNews 帖子和评论 | | `bronze_arxiv_papers` | 标记有对抗性 ML / 安全主题的 arXiv 摘要 | | `bronze_cve_records` | 与 AI/ML 相关漏洞的 CVE NVD 订阅源记录 | ### Silver —— 清洗与分类 | 模型 | 描述 | |---|---| | `silver_threat_signals` | 去重、匿名化并带有威胁类别标签的信号 | | `silver_entity_mentions` | 每个信号中提取出的模型系列和公司提及 | | `silver_source_velocity` | 每个来源的每日发帖量 —— 用于峰值检测 | ### Gold —— 情报层 | 模型 | 描述 | |---|---| | `gold_threat_signals` | 带有严重性指数的最终丰富信号,通过 API 提供 | | `gold_emerging_threats` | 过去 7 天内增长速度超过 2 倍的信号 | | `gold_severity_index` | 综合得分:参与度 × 新颖度 × 复现率 | | `gold_entity_risk_map` | 在对抗性讨论中出现频率最高的模型系列 | ## 威胁分类体系 信号被归类为以下几个类别: | 类别 | 示例 | |---|---| | `jailbreak` | Prompt 覆盖技术、角色扮演漏洞利用 | | `prompt_injection` | 通过文档、tool 输出进行的间接注入 | | `data_poisoning` | 训练数据篡改、后门攻击 | | `model_extraction` | 基于 API 的模型窃取、蒸馏攻击 | | `adversarial_inputs` | 对抗样本、图像扰动 | | `supply_chain` | 恶意的 fine-tune、受损的 checkpoint | | `infrastructure` | ML 服务栈中的 CVE(TorchServe、Triton 等) | | `policy_evasion` | 内容过滤器绕过、NSFW 绕过技术 | ## API Endpoint | 方法 | Endpoint | 描述 | |---|---|---| | `GET` | `/threats/latest` | 最近 50 条威胁信号 | | `GET` | `/threats/{category}` | 按威胁类别筛选的信号 | | `GET` | `/threats/emerging` | 过去 7 天内速度激增的信号 | | `GET` | `/threats/severity` | 按严重性指数降序排列的信号 | | `GET` | `/entities/{name}` | 所有提及特定模型或公司的信号 | | `GET` | `/health` | 流水线健康状态 + 上次成功运行的时间戳 | ## 技术栈 | 层级 | 工具 | GCP 生产等效方案 | |---|---|---| | 抓取 | Scrapy, BeautifulSoup, PRAW | — | | 摄入 | Apache Kafka (Docker) | Cloud Pub/Sub | | 处理 | Python worker, FastAPI | Cloud Run | | 原始存储 | 本地文件系统 (`/data/raw/`) | Cloud Storage (GCS) | | 数据库 | DuckDB | CloudSQL / Cloud Spanner | | 编排 | Apache Airflow (Docker) | Cloud Composer | | 转换 | dbt Core | — | | 服务 | FastAPI, Streamlit | Cloud Run | | 测试 | pytest | — | | 容器化 | Docker, Docker Compose | — | ## 项目结构 ``` ai-threat-signal-pipeline/ ├── scrappers/ │ ├── reddit_scraper.py # PRAW-based Reddit ingestion │ ├── hn_scraper.py # HackerNews API scraper │ ├── arxiv_scraper.py # arXiv RSS feed parser │ └── cve_scraper.py # NVD CVE feed scraper ├── ingestion/ │ ├── kafka_producer.py # Publishes raw signals to Kafka │ └── kafka_consumer.py # Consumes and routes to DuckDB ├── processing/ │ ├── deduplicator.py # SHA-256 content hash dedup │ ├── anonymizer.py # PII masking + anonymization │ ├── classifier.py # Threat category tagger │ └── severity_scorer.py # Composite severity index ├── dbt/ │ ├── models/ │ │ ├── bronze/ │ │ ├── silver/ │ │ └── gold/ │ ├── tests/ │ └── dbt_project.yml ├── airflow/ │ ├── dags/ │ │ ├── threat_ingestion_dag.py │ │ └── dbt_transformation_dag.py │ └── docker-compose.yml ├── api/ │ └── main.py # FastAPI REST endpoints ├── dashboard/ │ └── app.py # Streamlit threat dashboard ├── tests/ │ ├── test_scrapers.py │ ├── test_processing.py │ ├── test_dbt_models.py │ └── test_api.py ├── data/ │ └── raw/ # Raw JSON landing zone ├── docker-compose.yml # Kafka + Airflow + app stack ├── requirements.txt └── README.md ``` ## 快速开始 ### 前置条件 - Python 3.11+ - Docker + Docker Compose - Reddit API 凭证 (PRAW) ### 设置 ``` git clone https://github.com/sajansshergill/ai-threat-signal-pipeline cd ai-threat-signal-pipeline # 启动 Kafka + Airflow docker-compose up -d # 安装依赖 pip install -r requirements.txt # 配置凭证 cp .env.example .env # 添加 REDDIT_CLIENT_ID, REDDIT_CLIENT_SECRET, REDDIT_USER_AGENT # 对原始摄取的 signals 进行分类和评分 python processing/classifier.py # 运行 dbt 转换 cd dbt && dbt deps && dbt run && dbt test # 启动 API 服务器 uvicorn api.main:app --reload # 启动 dashboard streamlit run dashboard/app.py ``` ### 手动运行抓取器 ``` python scrappers/reddit_scraper.py python scrappers/hn_scraper.py python scrappers/arxiv_scraper.py python scrappers/cve_scraper.py ``` ## 测试 ``` pytest tests/ -v ``` | 测试套件 | 覆盖范围 | |---|---| | 抓取器单元测试 | 来源连通性、payload schema 验证 | | 处理单元测试 | 去重逻辑、匿名化、类别标记 | | dbt schema 测试 | 所有模型的非空、唯一性、可接受值检查 | | API 集成测试 | Endpoint 响应代码、payload 结构 | ## 关键工程决策 **为什么选择 Kafka 而不是直接写入数据库?** 将抓取器与处理 worker 解耦。抓取器可以在不阻塞下游转换的情况下应对突发流量 —— 这镜像了生产 GCP 流水线中使用的 Pub/Sub 模式。 **为什么选择 DuckDB?** 零基础设施列式存储,针对文本密集型数据的分析工作负载进行了优化。在生产部署中,只需更改单个连接字符串即可将其替换为 CloudSQL。 **为什么在存储前进行 SHA-256 去重?** 相同的越狱技术会同时在 Reddit、HN 和 Twitter 上出现。在摄入时进行内容哈希去重,可以防止相同的信号夸大下游的严重性得分。 **为什么对威胁数据采用 Medallion 架构?** Bronze 层保留原始来源以供审计跟踪。Silver 层进行规范化和匿名化。Gold 层仅呈现可操作的情报 —— 这与安全团队消费威胁订阅源的方式保持一致。 ## 作者 **Sajan Singh Shergill** 数据科学硕士,Pace University [linkedin.com/in/sajanshergill](https://linkedin.com/in/sajanshergill) · [sajansshergill.github.io](https://sajansshergill.github.io) · sajansshergill@gmail.com
标签:AI安全, Chat Copilot, GCP, Kubernetes, Splunk, URL抓取, 威胁情报, 安全规则引擎, 开发者工具, 数据工程, 数据抓取, 流式处理, 版权保护, 请求拦截, 软件成分分析, 逆向工具