k3ijo-miyamoto/ai-threats

GitHub: k3ijo-miyamoto/ai-threats

一个轻量级Python工具,用于自动收集、分类和评估AI相关威胁,并通知相关团队。

Stars: 0 | Forks: 0

# l jargon. Similarly for others. 一款轻量级 Python 工具,用于收集、分类、评估 AI 相关威胁对自身企业的影响并发出通知。 详细的背景和运维设计请参考 [CLAUDE.md](CLAUDE.md)。 ## 构成 ``` threat_watch/ collectors/ # 収集 (RSS / GitHub Advisory) classifiers/ # AI分類 (rule-based / Claude) + 自社影響判定 storage/ # SQLite + CSVミラー notifiers/ # Teams Webhook reports/ # 週次Markdownレポート config/ sources.yaml # 収集対象 company_assets.yaml# 自社AIアセット定義 data/ # threat_register.sqlite / .csv (gitignored) main.py # CLIエントリポイント ``` ## 设置 ``` # uv 推奨 uv venv source .venv/bin/activate uv pip install -r requirements.txt # pip でも可 # - For "Company Impact", this might be a common phrase, so I can translate it to Chinese. # - I should translate the entire line, but keep specific terms in English. # - Final decision for first heading: "AI Threat Watch" is likely a name, so keep it in English. "MVP" is kept in English. So, the translation could be "AI Threat Watch (MVP)" but in Chinese, I might add a translation for clarity. To match examples, I'll keep "AI Threat Watch" in English and translate only if necessary. cp .env.example .env # .env を編集: ANTHROPIC_API_KEY, TEAMS_WEBHOOK_URL, GITHUB_TOKEN, THREAT_WATCH_CLASSIFIER cp config/company_assets.example.yaml config/company_assets.yaml # config/company_assets.yaml を自社のAI利用状況に合わせて編集 ``` ## 基本命令 ``` # 収集 → 分類 → 影響判定 → SQLite/CSVへ保存 python main.py collect # 高優先度の未通知アイテムをTeamsへ送信 python main.py notify # 週次レポート生成 (reports/weekly_report.md) python main.py report --days 7 # High優先度の各脅威に対し、Claudeで個別の対策を生成 python main.py advise [--limit N] # 一括実行(collect → advise → notify → report)— 通常はこれを使う python main.py run # 統計 python main.py stats # KEV対応期限が7日以内のアイテムをアラート表示 python main.py alerts --kev-due-within 7 # 月次・四半期レポート python main.py report --period monthly python main.py report --period quarterly # 人間レビュー記録(status / category / priority / note) python main.py review AI-THREAT-0042 \ --status Reviewing \ --correct-category "Not AI-related" \ --priority Low \ --note "誤検知。実際は IoT 機器の問題で AI 経路なし" # Webダッシュボード(要 requirements-dashboard.txt) uv pip install -r requirements-dashboard.txt # or: pip install -r requirements-dashboard.txt streamlit run dashboard.py ``` ## 典型运维流程 ### 日常 ``` python main.py run ``` 这将执行以下操作: 1. 从 8 个信息源获取 RSS/API 数据 2. 仅使用分类器对新项进行分类(通过 fingerprint 排除已存在项) 3. 与 `company_assets.yaml` 对照进行影响判定 4. 为每条 High 优先级威胁使用 Claude 生成个性化应对措施 (`tailored_action`) 5. 将 High 优先级威胁通知到 Slack / Teams 6. 更新周报 ### 周度复盘(人工手动进行) 1. 打开 [reports/weekly_report.md](reports/weekly_report.md) 2. 人工审核 “Company Impact = Unknown” 部分 3. 根据需要,在 Excel 等工具中打开 [data/threat_register.csv](data/threat_register.csv) 进行详细检查 ## 结果解读 ### Priority(优先级) | 值 | 应对方针 | |---|---| | **High** | 立即处理 — 直接影响自研资产 或 严重漏洞 + AI 相关 | | **Medium** | 在周度复盘中确认 | | **Low** | 仅纳入报告统计 | ### - In the example 'API Reference' -> 'API 参考', "Reference" is translated to "参考". So, for "AI Threat Watch", "Watch" could be translated to "监视" or "观察". But since it's a specific term, I might keep it as is. | 值 | 含义 | |---|---| | **Yes** | 匹配到 `company_assets.yaml` 中的关键词 — 确定有影响 | | **Unknown** | 影响不明 — **必须人工分流**(不可视为安全) | | **No** | 非 AI 相关,且与自研资产无关 | ## 分类器切换 在 `.env` 或环境变量中指定: ``` THREAT_WATCH_CLASSIFIER=claude # Claude API使用(推奨。ANTHROPIC_API_KEY必須) THREAT_WATCH_CLASSIFIER=rule-based # オフライン動作(精度低、API不要) ``` 通过命令行临时覆盖: ``` python main.py collect --classifier rule-based ``` 当 ClaudeClassifier 的 API 调用失败时,会自动回退至 rule-based 分类器。 ## 自定义 ### 添加自研 AI 工具 首先将 [config/company_assets.example.yaml](config/company_assets.example.yaml) 复制为 `config/company_assets.yaml`(运行时文件已在 `.gitignore` 中)。 ``` cp config/company_assets.example.yaml config/company_assets.yaml ``` 然后编辑 `config/company_assets.yaml`。例如,添加 GitLab Duo: ``` - name: GitLab Duo category: coding_ai keywords: - gitlab duo - gitlab ai data_access: - source_code risk_level: high owner: Engineering ``` 准确性的关键在于维护好此文件。过于短的 `keywords`(3 个字符以下)容易导致误匹配,应避免使用。 ### 添加/排除信息源 编辑 [config/sources.yaml](config/sources.yaml)。通过设置 `enabled: false` 可临时排除。 ### 启用 Teams / Slack 通知 在 `.env` 中添加 Incoming Webhook URL(可只配置一个或两个都配置): ``` TEAMS_WEBHOOK_URL=https://outlook.office.com/webhook/... SLACK_WEBHOOK_URL=https://hooks.slack.com/services/T.../B.../... ``` 如果同时配置了两个,则**会向两个服务都发送通知**。未配置的通知目标会自动跳过。 #### 获取 Slack Webhook 步骤 1. 访问 [api.slack.com/apps](https://api.slack.com/apps) → **Create New App** → **From scratch** 2. 左侧菜单找到 **Incoming Webhooks**,将其 **Activate** 3. **Add New Webhook to Workspace** → 选择要接收通知的频道并授权 4. 复制显示的 `https://hooks.slack.com/services/...` 到 `.env` 文件的 `SLACK_WEBHOOK_URL` #### 获取 Teams Webhook 步骤 1. 在要接收通知的频道中点击 “…” → **Connectors** → **Incoming Webhook** 并 **Configure** 2. 设置名称(例如:`AI Threat Watch`) → **Create** 3. 复制显示的 URL 到 `.env` 文件的 `TEAMS_WEBHOOK_URL` ## 输出字段 (威胁登记表) | 列 | 说明 | |---|---| | threat_id | 格式为 `AI-THREAT-NNNN` 的序列号 | | collected_at / published_at | 收集时间 / 发布时间 (ISO 8601) | | source / source_category | 信息源及其分类 | | title / url / summary | 标题、URL、摘要 | | category | AI 威胁类别 (如 Prompt Injection 等) | | severity | Low / Medium / High / Critical | | company_impact | Yes / No / Unknown | | affected_asset | 匹配到的自研 AI 资产 | | reason | 影响判定的理由 | | recommended_action | 建议措施(基于类别的通用建议) | | tailored_action | Claude 为 High 优先级威胁生成的个性化应对措施 | | priority | High / Medium / Low | | classifier_used | rule-based / claude | | status | New / Reviewing / Actioned / Closed / FalsePositive | | notified | Teams/Slack 通知已发送标志 | | cve_ids | 检测到的 CVE ID(逗号分隔) | | in_kev | 是否列入 CISA KEV(在野利用漏洞)列表 | | kev_due_date | 面向美国联邦机构的 KEV 处置期限 | | epss_max_score | EPSS 分数最大值(0.0-1.0,利用可能性预测) | | epss_max_cve | 对应 epss_max_score 的 CVE | | reviewed_at / reviewer / review_note | 人工审核记录 | | corrected_category | 审核员修正后的类别 | ## 通知规则 `main.py notify` 会发送 `priority = High` 且 `notified = 0` 的记录。 判定逻辑参见 [classifiers/impact_scorer.py](classifiers/impact_scorer.py)。 ## CVE 信息丰富化 `collect` 会自动从每条威胁中提取 CVE ID(`CVE-YYYY-NNNN` 格式),并与以下信息进行关联: - **CISA KEV** (已知被利用漏洞列表) — 在野利用的 CVE 列表,每天缓存一次至 [data/cache/kev.json](data/cache/kev.json) - **EPSS API** (FIRST.org) — 30 天内的利用可能性分数(0.0-1.0) 如果检测到 `in_kev=True` 或 `epss>=0.7`,则会设置 `impact_scorer` 的 `critical_hit` 标志,并将优先级提升为 High(必须满足 AI 相关前置条件)。 ## 人工审核反馈 AI 判定为初步判定,因此提供了审核员可修正的机制: ``` python main.py review AI-THREAT-0042 \ --status FalsePositive \ --correct-category "Not AI-related" \ --priority Low \ --note "Cisco IOSの脆弱性で、AI関連ではない" ``` - `status`: New / Reviewing / Actioned / Closed / FalsePositive - `corrected_category`: 不修改原始 `category`,记录在单独列中 - `reviewer`: 未指定时使用 `$USER` 环境变量 ### 小样本学习 累积的 `corrected_category` 修正将作为后续 Claude 分类的 **few-shot examples** 自动注入(最多最近 5 条)。审核越多,分类结果就越符合本企业的运营判断。 学习目标仅为 `corrected_category != category` 的记录(即存在修正意图的记录)。 ## CVE 集群与通知去重 针对同一 CVE 从多个信息源(如 CISA / GitHub Advisory / JVN / IPA 等)作为独立记录获取的问题: - **通知时自动去重**:`notify` 会跳过已通知过的相同 CVE(可通过 `--no-dedup-cve` 禁用) - **月度/季度报告**:聚合显示 CVE 集群,表明一个补丁可解决多个记录 ## 月度/季度报告 ``` python main.py report --period monthly python main.py report --period quarterly ``` 输出示例内容: - 各类别的**与上一时期对比趋势**(如 ▲25%、▼10% 等) - Priority / Impact / Status 分布 - **待处理的 High 条目**(status=New/Reviewing)明细 - KEV 命中数量 / EPSS≥0.7 数量 - **主要 CVE 集群**(由多个源报告的同一 CVE) - Policy Implications 占位符(由 AI Threat Lead 手动填写的框架) ## KEV 处置期限提醒 CISA KEV(在野利用 CVE 列表)设有针对联邦机构的处置期限。在商业环境中也可作为事实上的处置基准参考: ``` python main.py alerts --kev-due-within 7 # 7日以内 python main.py alerts --kev-due-within 30 # 30日以内 ``` 与勒索软件攻击相关的 CVE 会以 `[RANSOMWARE]` 醒目标示。 ## IOC(入侵指标)提取 `collect` 会自动从每条威胁中提取 IOC 并保存到 `iocs_json` 列: | 类型 | 示例 | |---|---| | IPv4 | `45.33.32.156`(排除私有地址/环回地址/文档示例地址) | | 域名 | `evil-c2.example.test`(排除 github.com 等知名域名) | | URL | `https://...` / `hxxps://...`(自动规范化 defang 表示) | | 哈希 | SHA256 / SHA1 / MD5 | | Bitcoin | 传统地址(以 1 或 3 开头) | Defang 表示法(如 `evil[.]com`, `hxxps://`, `1.2.3[.]4`)会自动转换回标准格式。 使用 MCP 工具 `search_iocs` 搜索 IOC: ``` "45.33.32.156 が含まれる脅威レコードを探して" ``` 计划未来用于 SIEM/EDR 集成时的关联数据。 ## 测试 ``` uv pip install -r requirements-dev.txt # or: pip install -r requirements-dev.txt pytest tests/ -v ``` 包含 67 项测试。通过 GitHub Actions ([.github/workflows/test.yml](.github/workflows/test.yml)) 在 push/PR 时于 Python 3.11/3.12 环境下自动运行。 ## 获取原文正文 当 RSS 的 `summary` 较短(少于 500 字符)时,会使用 `trafilatura` 从 `url` 提取文章正文,以扩充传递给分类器的摘要。 - 对于 GitHub Advisory / Twitter / YouTube 等不适合提取正文的信息源,会自动跳过 - 提取失败时,会使用原始 summary(非致命错误) - 可通过 `--no-fetch-body` 禁用此功能 ## Daily Report Skill(Claude CLI 命令) [.claude/skills/daily-report/SKILL.md](.claude/skills/daily-report/SKILL.md) 定义了一个技能,可在 Claude CLI 中输入 `/daily-report` 来生成每日简报。 ``` cd /home/hacker/Project/threat_watch claude > /daily-report ``` 其背后执行的操作: 1. 运行 `python main.py run` 进行收集、分类、通知和周报更新 2. 通过 MCP 工具(如 `stats`, `recent_high`, `list_kev_due` 等)获取上下文 3. 比较过去 7 天与过去 8-14 天的各类别数量以检测趋势 4. 在聊天中输出**自然语言简报**(包括待处理 High 项 / KEV 紧急 / 趋势 / 建议行动) 每天早晨只需输入这个 `/daily-report` 命令即可维持运营。 ### 使用 cron 实现完全自动化 [scripts/daily_report_cron.sh](scripts/daily_report_cron.sh) 是一个专为 cron 环境(PATH 最小化、不继承 NVM/venv)设计的包装脚本: ``` # crontab -e で追加 0 8 * * * /home/hacker/Project/threat_watch/scripts/daily_report_cron.sh >> /home/hacker/Project/threat_watch/logs/cron.log 2>&1 ``` 脚本功能: 1. 加载 NVM(激活 Claude CLI 所需的 Node 环境) 2. 激活项目的 `.venv` 3. 执行 `claude --print --max-budget-usd 2 /daily-report` 4. 将输出保存到 `logs/daily-report-YYYY-MM-DD.md` 5. 如果设置了 `SLACK_WEBHOOK_URL`,则发布到 Slack API 预算上限通过 `--max-budget-usd 2` 设置为默认 $2(实际通常在 $0.05 到 $0.20 之间)。 如果希望将 `SLACK_WEBHOOK_URL` 放在用于 cron 的环境文件中(而不是 `.env`),可以在 crontab 中指定 `BASH_ENV=/path/to/env.sh`,或在脚本开头使用 `source` 命令加载。 ## MCP 集成(通过 Claude CLI 用自然语言操作) [mcp_server.py](mcp_server.py) 通过 Model Context Protocol 公开威胁登记表。可以从 Claude CLI / Claude Desktop / Cursor 等客户端使用自然语言进行查询和记录操作。 ### 在 Claude CLI 中启用 项目根目录下已准备好 [.mcp.json](.mcp.json) 文件,只需在此目录启动 Claude CLI,MCP 服务器便会自动启动。 ``` cd /home/hacker/Project/threat_watch claude # 起動時に .mcp.json を読み込む ``` 首次使用时会弹出 MCP 服务器授权提示,请予以授权。 ### 使用示例(向 Claude 提问自然语言) ``` 過去7日のHighアイテムで、KEV該当のうち status=New のものをリストして ``` ``` AI-THREAT-0095 を Actioned にして、ノートに「MLflow 3.10.0 にアップグレード済み」と記録して ``` ``` CVE-2025-55182 がどのソースから報告されているか教えて ``` ``` 今月の AI Supply Chain カテゴリの件数は先月比で増えてる? ``` ``` KEVで期限が3日以内に切れるアイテムを教えて ``` ### 提供的工具 | 工具 | 用途 | |---|---| | `query_threats` | 根据 priority/status/category/source/cve/days 等灵活筛选 | | `get_threat` | 获取指定 threat_id 的一条完整记录 | | `cve_cluster` | 获取引用同一 CVE 的所有记录 | | `list_kev_due` | 获取 KEV 处置期限临近列表 | | `recent_high` | 获取最近的待处理 High 条目(常用组合) | | `stats` | 获取按类别/优先级的统计汇总 | | `mark_reviewed` | 记录审核结果(写入操作) | ### 在 Claude Desktop 中设置 ``` { "mcpServers": { "ai-threat-watch": { "command": "/絶対パス/threat_watch/.venv/bin/python", "args": ["/絶対パス/threat_watch/mcp_server.py"] } } } ``` ## Streamlit 仪表板 ``` uv pip install -r requirements-dashboard.txt # or: pip install -r requirements-dashboard.txt streamlit run dashboard.py ``` 浏览器中可查看: - KPI:总数 / High 数 / 自研影响 Yes 数 / KEV 命中数 / 待处理 High 数 - 过去 30 天按 Priority 分的活动趋势 - 按类别统计的数量 - KEV 命中条目(含处置期限日历) - 可按 Priority / Impact / Status / KEV / 时间段筛选的表格 ## 故障排除 | 症状 | 解决方法 | |---|---| | `new=0 duplicate=N` 频繁出现 | 正常。仅表示无新项,已被 fingerprint 去重。 | | 特定 RSS 源报警 | 通常是服务器端 XML 格式问题(如 CISA Alerts)。其他源工作正常可忽略。长期有问题则在 sources.yaml 中设为 `enabled: false` | | Claude API 错误 | 会自动回退至 `rule-based` 分类器。检查 `.env` 中的 API 密钥 | | 需要重新处理所有数据 | 删除 `data/threat_register.sqlite data/threat_register.csv` 后重新运行 `collect` | | 误分类较多 | 设置 `THREAT_WATCH_CLASSIFIER=claude` 提高精度。优化 `company_assets.yaml` 中的关键词,考虑单词边界(避免过短的关键词) | ## 自动化 使用 cron 每天早上 8 点运行的示例: ``` 0 8 * * * cd /home/hacker/Project/threat_watch && .venv/bin/python main.py run >> /var/log/threat-watch.log 2>&1 ``` ## 注意事项 - AI 分类和影响判定为**初步判定**。High/Critical/Unknown 条目**必须进行人工审核**。 - 优先参考原始信息。切勿仅凭 RSS 摘要下结论。 - 持续更新 `company_assets.yaml` 是保证准确性的关键。 - **切勿将 Unknown 视为安全**。Unknown 表示“待调查”。
标签:Claude AI分类, CLI命令行工具, CSV数据存储, EPSS丰富, GitHub安全公告, KEV丰富, Kubernetes, MCP服务器, RSS订阅源, SQLite数据库, Teams集成, YAML配置, 人工智能威胁情报, 人工智能安全, 内核监控, 合规性, 周期性报告, 威胁分类, 威胁情报收集, 威胁情报管线, 威胁监控, 安全情报管理, 安全警报, 自公司影响评估, 轻量级软件, 逆向工具, 通知系统