k3ijo-miyamoto/ai-threats
GitHub: k3ijo-miyamoto/ai-threats
一个轻量级Python工具,用于自动收集、分类和评估AI相关威胁,并通知相关团队。
Stars: 0 | Forks: 0
# l jargon. Similarly for others.
一款轻量级 Python 工具,用于收集、分类、评估 AI 相关威胁对自身企业的影响并发出通知。
详细的背景和运维设计请参考 [CLAUDE.md](CLAUDE.md)。
## 构成
```
threat_watch/
collectors/ # 収集 (RSS / GitHub Advisory)
classifiers/ # AI分類 (rule-based / Claude) + 自社影響判定
storage/ # SQLite + CSVミラー
notifiers/ # Teams Webhook
reports/ # 週次Markdownレポート
config/
sources.yaml # 収集対象
company_assets.yaml# 自社AIアセット定義
data/ # threat_register.sqlite / .csv (gitignored)
main.py # CLIエントリポイント
```
## 设置
```
# uv 推奨
uv venv
source .venv/bin/activate
uv pip install -r requirements.txt
# pip でも可
# - For "Company Impact", this might be a common phrase, so I can translate it to Chinese.
# - I should translate the entire line, but keep specific terms in English.
# - Final decision for first heading: "AI Threat Watch" is likely a name, so keep it in English. "MVP" is kept in English. So, the translation could be "AI Threat Watch (MVP)" but in Chinese, I might add a translation for clarity. To match examples, I'll keep "AI Threat Watch" in English and translate only if necessary.
cp .env.example .env
# .env を編集: ANTHROPIC_API_KEY, TEAMS_WEBHOOK_URL, GITHUB_TOKEN, THREAT_WATCH_CLASSIFIER
cp config/company_assets.example.yaml config/company_assets.yaml
# config/company_assets.yaml を自社のAI利用状況に合わせて編集
```
## 基本命令
```
# 収集 → 分類 → 影響判定 → SQLite/CSVへ保存
python main.py collect
# 高優先度の未通知アイテムをTeamsへ送信
python main.py notify
# 週次レポート生成 (reports/weekly_report.md)
python main.py report --days 7
# High優先度の各脅威に対し、Claudeで個別の対策を生成
python main.py advise [--limit N]
# 一括実行(collect → advise → notify → report)— 通常はこれを使う
python main.py run
# 統計
python main.py stats
# KEV対応期限が7日以内のアイテムをアラート表示
python main.py alerts --kev-due-within 7
# 月次・四半期レポート
python main.py report --period monthly
python main.py report --period quarterly
# 人間レビュー記録(status / category / priority / note)
python main.py review AI-THREAT-0042 \
--status Reviewing \
--correct-category "Not AI-related" \
--priority Low \
--note "誤検知。実際は IoT 機器の問題で AI 経路なし"
# Webダッシュボード(要 requirements-dashboard.txt)
uv pip install -r requirements-dashboard.txt # or: pip install -r requirements-dashboard.txt
streamlit run dashboard.py
```
## 典型运维流程
### 日常
```
python main.py run
```
这将执行以下操作:
1. 从 8 个信息源获取 RSS/API 数据
2. 仅使用分类器对新项进行分类(通过 fingerprint 排除已存在项)
3. 与 `company_assets.yaml` 对照进行影响判定
4. 为每条 High 优先级威胁使用 Claude 生成个性化应对措施 (`tailored_action`)
5. 将 High 优先级威胁通知到 Slack / Teams
6. 更新周报
### 周度复盘(人工手动进行)
1. 打开 [reports/weekly_report.md](reports/weekly_report.md)
2. 人工审核 “Company Impact = Unknown” 部分
3. 根据需要,在 Excel 等工具中打开 [data/threat_register.csv](data/threat_register.csv) 进行详细检查
## 结果解读
### Priority(优先级)
| 值 | 应对方针 |
|---|---|
| **High** | 立即处理 — 直接影响自研资产 或 严重漏洞 + AI 相关 |
| **Medium** | 在周度复盘中确认 |
| **Low** | 仅纳入报告统计 |
### - In the example 'API Reference' -> 'API 参考', "Reference" is translated to "参考". So, for "AI Threat Watch", "Watch" could be translated to "监视" or "观察". But since it's a specific term, I might keep it as is.
| 值 | 含义 |
|---|---|
| **Yes** | 匹配到 `company_assets.yaml` 中的关键词 — 确定有影响 |
| **Unknown** | 影响不明 — **必须人工分流**(不可视为安全) |
| **No** | 非 AI 相关,且与自研资产无关 |
## 分类器切换
在 `.env` 或环境变量中指定:
```
THREAT_WATCH_CLASSIFIER=claude # Claude API使用(推奨。ANTHROPIC_API_KEY必須)
THREAT_WATCH_CLASSIFIER=rule-based # オフライン動作(精度低、API不要)
```
通过命令行临时覆盖:
```
python main.py collect --classifier rule-based
```
当 ClaudeClassifier 的 API 调用失败时,会自动回退至 rule-based 分类器。
## 自定义
### 添加自研 AI 工具
首先将 [config/company_assets.example.yaml](config/company_assets.example.yaml) 复制为 `config/company_assets.yaml`(运行时文件已在 `.gitignore` 中)。
```
cp config/company_assets.example.yaml config/company_assets.yaml
```
然后编辑 `config/company_assets.yaml`。例如,添加 GitLab Duo:
```
- name: GitLab Duo
category: coding_ai
keywords:
- gitlab duo
- gitlab ai
data_access:
- source_code
risk_level: high
owner: Engineering
```
准确性的关键在于维护好此文件。过于短的 `keywords`(3 个字符以下)容易导致误匹配,应避免使用。
### 添加/排除信息源
编辑 [config/sources.yaml](config/sources.yaml)。通过设置 `enabled: false` 可临时排除。
### 启用 Teams / Slack 通知
在 `.env` 中添加 Incoming Webhook URL(可只配置一个或两个都配置):
```
TEAMS_WEBHOOK_URL=https://outlook.office.com/webhook/...
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/T.../B.../...
```
如果同时配置了两个,则**会向两个服务都发送通知**。未配置的通知目标会自动跳过。
#### 获取 Slack Webhook 步骤
1. 访问 [api.slack.com/apps](https://api.slack.com/apps) → **Create New App** → **From scratch**
2. 左侧菜单找到 **Incoming Webhooks**,将其 **Activate**
3. **Add New Webhook to Workspace** → 选择要接收通知的频道并授权
4. 复制显示的 `https://hooks.slack.com/services/...` 到 `.env` 文件的 `SLACK_WEBHOOK_URL`
#### 获取 Teams Webhook 步骤
1. 在要接收通知的频道中点击 “…” → **Connectors** → **Incoming Webhook** 并 **Configure**
2. 设置名称(例如:`AI Threat Watch`) → **Create**
3. 复制显示的 URL 到 `.env` 文件的 `TEAMS_WEBHOOK_URL`
## 输出字段 (威胁登记表)
| 列 | 说明 |
|---|---|
| threat_id | 格式为 `AI-THREAT-NNNN` 的序列号 |
| collected_at / published_at | 收集时间 / 发布时间 (ISO 8601) |
| source / source_category | 信息源及其分类 |
| title / url / summary | 标题、URL、摘要 |
| category | AI 威胁类别 (如 Prompt Injection 等) |
| severity | Low / Medium / High / Critical |
| company_impact | Yes / No / Unknown |
| affected_asset | 匹配到的自研 AI 资产 |
| reason | 影响判定的理由 |
| recommended_action | 建议措施(基于类别的通用建议) |
| tailored_action | Claude 为 High 优先级威胁生成的个性化应对措施 |
| priority | High / Medium / Low |
| classifier_used | rule-based / claude |
| status | New / Reviewing / Actioned / Closed / FalsePositive |
| notified | Teams/Slack 通知已发送标志 |
| cve_ids | 检测到的 CVE ID(逗号分隔) |
| in_kev | 是否列入 CISA KEV(在野利用漏洞)列表 |
| kev_due_date | 面向美国联邦机构的 KEV 处置期限 |
| epss_max_score | EPSS 分数最大值(0.0-1.0,利用可能性预测) |
| epss_max_cve | 对应 epss_max_score 的 CVE |
| reviewed_at / reviewer / review_note | 人工审核记录 |
| corrected_category | 审核员修正后的类别 |
## 通知规则
`main.py notify` 会发送 `priority = High` 且 `notified = 0` 的记录。
判定逻辑参见 [classifiers/impact_scorer.py](classifiers/impact_scorer.py)。
## CVE 信息丰富化
`collect` 会自动从每条威胁中提取 CVE ID(`CVE-YYYY-NNNN` 格式),并与以下信息进行关联:
- **CISA KEV** (已知被利用漏洞列表) — 在野利用的 CVE 列表,每天缓存一次至 [data/cache/kev.json](data/cache/kev.json)
- **EPSS API** (FIRST.org) — 30 天内的利用可能性分数(0.0-1.0)
如果检测到 `in_kev=True` 或 `epss>=0.7`,则会设置 `impact_scorer` 的 `critical_hit` 标志,并将优先级提升为 High(必须满足 AI 相关前置条件)。
## 人工审核反馈
AI 判定为初步判定,因此提供了审核员可修正的机制:
```
python main.py review AI-THREAT-0042 \
--status FalsePositive \
--correct-category "Not AI-related" \
--priority Low \
--note "Cisco IOSの脆弱性で、AI関連ではない"
```
- `status`: New / Reviewing / Actioned / Closed / FalsePositive
- `corrected_category`: 不修改原始 `category`,记录在单独列中
- `reviewer`: 未指定时使用 `$USER` 环境变量
### 小样本学习
累积的 `corrected_category` 修正将作为后续 Claude 分类的 **few-shot examples** 自动注入(最多最近 5 条)。审核越多,分类结果就越符合本企业的运营判断。
学习目标仅为 `corrected_category != category` 的记录(即存在修正意图的记录)。
## CVE 集群与通知去重
针对同一 CVE 从多个信息源(如 CISA / GitHub Advisory / JVN / IPA 等)作为独立记录获取的问题:
- **通知时自动去重**:`notify` 会跳过已通知过的相同 CVE(可通过 `--no-dedup-cve` 禁用)
- **月度/季度报告**:聚合显示 CVE 集群,表明一个补丁可解决多个记录
## 月度/季度报告
```
python main.py report --period monthly
python main.py report --period quarterly
```
输出示例内容:
- 各类别的**与上一时期对比趋势**(如 ▲25%、▼10% 等)
- Priority / Impact / Status 分布
- **待处理的 High 条目**(status=New/Reviewing)明细
- KEV 命中数量 / EPSS≥0.7 数量
- **主要 CVE 集群**(由多个源报告的同一 CVE)
- Policy Implications 占位符(由 AI Threat Lead 手动填写的框架)
## KEV 处置期限提醒
CISA KEV(在野利用 CVE 列表)设有针对联邦机构的处置期限。在商业环境中也可作为事实上的处置基准参考:
```
python main.py alerts --kev-due-within 7 # 7日以内
python main.py alerts --kev-due-within 30 # 30日以内
```
与勒索软件攻击相关的 CVE 会以 `[RANSOMWARE]` 醒目标示。
## IOC(入侵指标)提取
`collect` 会自动从每条威胁中提取 IOC 并保存到 `iocs_json` 列:
| 类型 | 示例 |
|---|---|
| IPv4 | `45.33.32.156`(排除私有地址/环回地址/文档示例地址) |
| 域名 | `evil-c2.example.test`(排除 github.com 等知名域名) |
| URL | `https://...` / `hxxps://...`(自动规范化 defang 表示) |
| 哈希 | SHA256 / SHA1 / MD5 |
| Bitcoin | 传统地址(以 1 或 3 开头) |
Defang 表示法(如 `evil[.]com`, `hxxps://`, `1.2.3[.]4`)会自动转换回标准格式。
使用 MCP 工具 `search_iocs` 搜索 IOC:
```
"45.33.32.156 が含まれる脅威レコードを探して"
```
计划未来用于 SIEM/EDR 集成时的关联数据。
## 测试
```
uv pip install -r requirements-dev.txt # or: pip install -r requirements-dev.txt
pytest tests/ -v
```
包含 67 项测试。通过 GitHub Actions ([.github/workflows/test.yml](.github/workflows/test.yml)) 在 push/PR 时于 Python 3.11/3.12 环境下自动运行。
## 获取原文正文
当 RSS 的 `summary` 较短(少于 500 字符)时,会使用 `trafilatura` 从 `url` 提取文章正文,以扩充传递给分类器的摘要。
- 对于 GitHub Advisory / Twitter / YouTube 等不适合提取正文的信息源,会自动跳过
- 提取失败时,会使用原始 summary(非致命错误)
- 可通过 `--no-fetch-body` 禁用此功能
## Daily Report Skill(Claude CLI 命令)
[.claude/skills/daily-report/SKILL.md](.claude/skills/daily-report/SKILL.md) 定义了一个技能,可在 Claude CLI 中输入 `/daily-report` 来生成每日简报。
```
cd /home/hacker/Project/threat_watch
claude
> /daily-report
```
其背后执行的操作:
1. 运行 `python main.py run` 进行收集、分类、通知和周报更新
2. 通过 MCP 工具(如 `stats`, `recent_high`, `list_kev_due` 等)获取上下文
3. 比较过去 7 天与过去 8-14 天的各类别数量以检测趋势
4. 在聊天中输出**自然语言简报**(包括待处理 High 项 / KEV 紧急 / 趋势 / 建议行动)
每天早晨只需输入这个 `/daily-report` 命令即可维持运营。
### 使用 cron 实现完全自动化
[scripts/daily_report_cron.sh](scripts/daily_report_cron.sh) 是一个专为 cron 环境(PATH 最小化、不继承 NVM/venv)设计的包装脚本:
```
# crontab -e で追加
0 8 * * * /home/hacker/Project/threat_watch/scripts/daily_report_cron.sh >> /home/hacker/Project/threat_watch/logs/cron.log 2>&1
```
脚本功能:
1. 加载 NVM(激活 Claude CLI 所需的 Node 环境)
2. 激活项目的 `.venv`
3. 执行 `claude --print --max-budget-usd 2 /daily-report`
4. 将输出保存到 `logs/daily-report-YYYY-MM-DD.md`
5. 如果设置了 `SLACK_WEBHOOK_URL`,则发布到 Slack
API 预算上限通过 `--max-budget-usd 2` 设置为默认 $2(实际通常在 $0.05 到 $0.20 之间)。
如果希望将 `SLACK_WEBHOOK_URL` 放在用于 cron 的环境文件中(而不是 `.env`),可以在 crontab 中指定 `BASH_ENV=/path/to/env.sh`,或在脚本开头使用 `source` 命令加载。
## MCP 集成(通过 Claude CLI 用自然语言操作)
[mcp_server.py](mcp_server.py) 通过 Model Context Protocol 公开威胁登记表。可以从 Claude CLI / Claude Desktop / Cursor 等客户端使用自然语言进行查询和记录操作。
### 在 Claude CLI 中启用
项目根目录下已准备好 [.mcp.json](.mcp.json) 文件,只需在此目录启动 Claude CLI,MCP 服务器便会自动启动。
```
cd /home/hacker/Project/threat_watch
claude # 起動時に .mcp.json を読み込む
```
首次使用时会弹出 MCP 服务器授权提示,请予以授权。
### 使用示例(向 Claude 提问自然语言)
```
過去7日のHighアイテムで、KEV該当のうち status=New のものをリストして
```
```
AI-THREAT-0095 を Actioned にして、ノートに「MLflow 3.10.0 にアップグレード済み」と記録して
```
```
CVE-2025-55182 がどのソースから報告されているか教えて
```
```
今月の AI Supply Chain カテゴリの件数は先月比で増えてる?
```
```
KEVで期限が3日以内に切れるアイテムを教えて
```
### 提供的工具
| 工具 | 用途 |
|---|---|
| `query_threats` | 根据 priority/status/category/source/cve/days 等灵活筛选 |
| `get_threat` | 获取指定 threat_id 的一条完整记录 |
| `cve_cluster` | 获取引用同一 CVE 的所有记录 |
| `list_kev_due` | 获取 KEV 处置期限临近列表 |
| `recent_high` | 获取最近的待处理 High 条目(常用组合) |
| `stats` | 获取按类别/优先级的统计汇总 |
| `mark_reviewed` | 记录审核结果(写入操作) |
### 在 Claude Desktop 中设置
```
{
"mcpServers": {
"ai-threat-watch": {
"command": "/絶対パス/threat_watch/.venv/bin/python",
"args": ["/絶対パス/threat_watch/mcp_server.py"]
}
}
}
```
## Streamlit 仪表板
```
uv pip install -r requirements-dashboard.txt # or: pip install -r requirements-dashboard.txt
streamlit run dashboard.py
```
浏览器中可查看:
- KPI:总数 / High 数 / 自研影响 Yes 数 / KEV 命中数 / 待处理 High 数
- 过去 30 天按 Priority 分的活动趋势
- 按类别统计的数量
- KEV 命中条目(含处置期限日历)
- 可按 Priority / Impact / Status / KEV / 时间段筛选的表格
## 故障排除
| 症状 | 解决方法 |
|---|---|
| `new=0 duplicate=N` 频繁出现 | 正常。仅表示无新项,已被 fingerprint 去重。 |
| 特定 RSS 源报警 | 通常是服务器端 XML 格式问题(如 CISA Alerts)。其他源工作正常可忽略。长期有问题则在 sources.yaml 中设为 `enabled: false` |
| Claude API 错误 | 会自动回退至 `rule-based` 分类器。检查 `.env` 中的 API 密钥 |
| 需要重新处理所有数据 | 删除 `data/threat_register.sqlite data/threat_register.csv` 后重新运行 `collect` |
| 误分类较多 | 设置 `THREAT_WATCH_CLASSIFIER=claude` 提高精度。优化 `company_assets.yaml` 中的关键词,考虑单词边界(避免过短的关键词) |
## 自动化
使用 cron 每天早上 8 点运行的示例:
```
0 8 * * * cd /home/hacker/Project/threat_watch && .venv/bin/python main.py run >> /var/log/threat-watch.log 2>&1
```
## 注意事项
- AI 分类和影响判定为**初步判定**。High/Critical/Unknown 条目**必须进行人工审核**。
- 优先参考原始信息。切勿仅凭 RSS 摘要下结论。
- 持续更新 `company_assets.yaml` 是保证准确性的关键。
- **切勿将 Unknown 视为安全**。Unknown 表示“待调查”。
标签:Claude AI分类, CLI命令行工具, CSV数据存储, EPSS丰富, GitHub安全公告, KEV丰富, Kubernetes, MCP服务器, RSS订阅源, SQLite数据库, Teams集成, YAML配置, 人工智能威胁情报, 人工智能安全, 内核监控, 合规性, 周期性报告, 威胁分类, 威胁情报收集, 威胁情报管线, 威胁监控, 安全情报管理, 安全警报, 自公司影响评估, 轻量级软件, 逆向工具, 通知系统