daemon-blockint-tech/project-raven-d3fend
GitHub: daemon-blockint-tech/project-raven-d3fend
基于多模型智能体编排的防御性安全审计流水线,通过 AI 辩论验证和 D3FEND 本体映射实现从漏洞发现到防御建议的全链路自动化。
Stars: 2 | Forks: 1
# 多模型智能体网络防御
## 概述 此 pipeline 在 Project Raven 中实现了一个 Microsoft 风格的多模型智能体安全系统,利用 OpenRouter SDK 接入 300 多个 AI 模型。该 pipeline 遵循 5 阶段架构: ## 设计原则 (Kelly 的 14 条规则) Raven 建立在 Lockheed Martin Skunk Works 在 Kelly Johnson 领导下被证明有效的原则之上——这是一个传奇的创新团队,以极少的官僚主义在创纪录的时间内交付了 SR-71 Blackbird 等突破性飞机。 | 规则 | 原则 | Raven 应用 | |------|-----------|-------------------| | **1** | **完全控制** | `PipelineOrchestrator` 拥有端到端自主权;阶段之间无微观管理 | | **2** | **小型项目办公室** | 11 个专门的审计器,而非 100 多个通才——小型精英智能体 | | **3** | **严格限制** | 智能体系统增强小型团队;仅需传统 AppSec 人员编制的 10% | | **4** | **简单灵活的图纸** | 模块化的 Python 模块;无需重新架构即可替换审计器 | | **5** | **最少的报告** | 发现结果就是产品——而非过程文档;D3FEND 绑定提供可追溯性 | | **6** | **每月成本审查** | 通过 OpenRouter 进行实时成本跟踪;$10 试用预算与阶段级别限制 | | **7** | **承包商责任** | 智能体拥有其所属的漏洞类别领域;团队之间无工作交接 | | **8** | **精简检查** | 多模型辩论(3-5 个模型)取代人工代码审查周期 | | **9** | **测试授权** | 证明阶段生成 PoC;智能体“试飞”自己的发现 | | **10** | **预先明确规格** | CDP 契约——每项发现最终均终止于 𝒯/𝓜/𝓛;提前约定 D3FEND 映射 | | **11** | **及时的资金** | 通过 OpenRouter 按请求计费;无需为每次扫描跑去找财务 | | **12** | **互信** | 反馈循环(赞成/反对)+ 针对边缘情况的人在回路 | | **13** | **受控访问** | 仅限防御者执行;pipeline 中无攻击性工具 | | **14** | **奖励绩效** | 智能体有效性通过真阳性率衡量,而非消耗的 token | **Raven 的第 15 条规则:** 这些原则验证了 Raven 的方法:**小型、自主、被赋能的智能体,在明确的契约、最少的官僚主义以及对结果的直接问责下运行。** ## 架构 1. **准备** - 导入代码库,构建语言感知索引,生成威胁模型 2. **扫描** - 运行专门的审计智能体以发现漏洞 3. **验证** - 多模型辩论以过滤误报 4. **去重** - 合并语义相同的发现 5. **证明** - 生成并执行 PoC 以证明漏洞 6. **D3F 绑定** - 将发现映射到 D3FEND 对抗措施 ## 架构 ``` project-raven-d3fend/pipeline/ ├── config.yaml # Pipeline configuration ├── orchestrator.py # Main pipeline orchestrator ├── multi_agent_orchestrator/ # 100+ Agent Multi-Model Orchestration │ ├── agent_core.py # EventEmitter agent with hooks │ ├── agent_pool.py # Spawn, route, balance 100+ agents │ ├── model_router.py # Dynamic multi-model allocation │ ├── debate_panel.py # Multi-model debate validation │ ├── enricher_agent.py # D3FEND + ATT&CK enrichment │ ├── triage_agent.py # DevSecOps routing & Patch Tuesday │ ├── hooks.py # Lifecycle hooks & metrics │ ├── config.py # 100+ agent definitions │ └── orchestrator.py # Multi-agent pipeline coordinator ├── models/ # Data models for all stages ├── openrouter_integration/ # OpenRouter SDK wrapper │ ├── client.py │ ├── model_selector.py │ └── cost_tracker.py ├── prepare/ # Stage 1: Prepare │ ├── ingester.py │ ├── indexer.py │ ├── threat_modeler.py │ └── git_analyzer.py ├── scan/ # Stage 2: Scan │ ├── agent_router.py │ └── finding_collector.py ├── validate/ # Stage 3: Validate │ ├── debate_orchestrator.py │ ├── debater.py │ ├── voting.py │ └── confidence_scorer.py ├── dedup/ # Stage 4: Dedup │ ├── embedding_generator.py │ ├── similarity.py │ ├── clustering.py │ └── merger.py ├── prove/ # Stage 5: Prove │ ├── poc_generator.py │ ├── harness_builder.py │ ├── fuzzer.py │ ├── sandbox.py │ └── sanitizer_runner.py ├── d3fend/ # D3FEND + ATT&CK Enrichment │ ├── cwe_mapper.py │ ├── ontology_client.py │ ├── remediation.py │ ├── attack_mapper.py │ ├── d3fend_catalog_loader.py │ ├── cci_loader.py │ └── enrichment_engine.py ├── threat_intel/ # On-Chain Threat Intel │ └── onchain_threat_intel.py ├── cross_language/ # Cross-Language Analysis │ ├── ffi_analyzer.py │ ├── solidity_analyzer.py │ ├── wasm_analyzer.py │ └── taint_tracker.py ├── plugin_synthesis/ # Domain Invariant Extraction │ └── plugin_synthesis_agent.py ├── prove_extension/ # Symbolic Execution & Formal Verification │ └── formal_verification.py ├── eval/ # Benchmark Evaluation Framework │ ├── ground_truth_loader.py │ ├── evaluator.py │ └── report_generator.py ├── feedback/ # Retrospective Feedback Loop │ ├── false_negative_tracker.py │ ├── cve_parser.py │ ├── context_updater.py │ └── feedback_agent.py └── openai_agents_adapter/ # Optional OpenAI Agents SDK Backend ├── auditor_agent.py # Bug-class specialist agents ├── debater_agent.py # Multi-model debate panel └── pipeline_runner.py # Full MDASH pipeline orchestration ``` ## 安装 ### 前置条件 - Python 3.9+ - OpenRouter API key (设置 `OPENROUTER_API_KEY` 环境变量) - Docker (可选,用于 Prove 阶段沙盒化) ### 依赖项 安装 OpenRouter SDK: ``` pip install openrouter ``` 可选:安装 OpenAI Agents SDK 以用于生产级智能体编排、追踪和会话管理: ``` pip install openai-agents ``` 其他 Python 依赖项: - pyyaml - scikit-learn (用于聚类) - numpy (用于相似度计算) ## 配置 编辑 `pipeline/config.yaml` 以自定义: - 每个阶段的模型选择 - 预算限制 - 辩论配置 - 去重阈值 ## 用法 ### 基本用法 ``` from pipeline.orchestrator import PipelineOrchestrator # 初始化 pipeline pipeline = PipelineOrchestrator() # 在 target 上运行 pipeline report = pipeline.run_pipeline( target="https://github.com/example/repo.git", target_type="c-cpp-source", trials=10 ) # 打印报告 print(report.to_markdown()) ``` ### 使用 OpenAI Agents SDK 后端 (可选) 此 pipeline 包含一个可选的 [OpenAI Agents SDK](https://github.com/openai/openai-agents-python) 适配器,提供生产级智能体编排、内置追踪、会话管理以及人在回路支持。 ``` from pipeline.openai_agents_adapter import MDASHAgentsRunner, PipelineConfig # 为 debate 配置多样化 models config = PipelineConfig( auditor_models=["gpt-4o", "claude-3-5-sonnet-20241022"], debater_models=["gpt-4o", "gpt-4o-mini", "claude-3-5-sonnet-20241022"], enable_tracing=True, max_cost_usd=10.0 ) # 运行完整的 MDASH pipeline runner = MDASHAgentsRunner(config) report = runner.run_pipeline( target_path="/path/to/codebase", target_type="c-cpp-source" ) print(f"Found {report['findings_count']} findings") for finding in report['findings']: print(f"- {finding['bug_class']} at {finding['location']}") ``` ### 使用多智能体编排器 (100+ 智能体) 此 pipeline 包含一个生产级多智能体编排器,可跨 7 个 pipeline 阶段生成、路由和管理 100 多个专业智能体,并支持动态模型分配。 ``` import asyncio from pipeline.multi_agent_orchestrator import MultiAgentOrchestrator async def run(): # Initialize with dynamic model allocation orchestrator = MultiAgentOrchestrator( api_key="$OPENROUTER_API_KEY", budget_usd=50.0, max_concurrent=50, debate_panel_size=5, debate_threshold=0.67, ) # Run the full 7-stage pipeline report = await orchestrator.run_pipeline( target="/path/to/codebase", target_type="c-cpp-source", ) print(f"Found {len(report.findings)} findings") print(f"Wall time: {report.wall_time_seconds:.1f}s") print(f"Total cost: ${report.total_cost_usd:.2f}") print(f"Agents spawned: {report.metrics['total_agents']}") for f in report.findings: print(f"- {f['bug_class']} at {f['location']} | " f"owner={f['finding_owner']} | " f"patch_tuesday={f['patch_tuesday_target']}") asyncio.run(run()) ``` **特性:** - **100 多个不同的智能体类** — 每个漏洞类别都有自己的专门审计器 - **动态模型分配** — 前沿模型 (GPT-4o, Claude 3.5) 用于复杂推理,精简模型用于大批量扫描 - **并行异步执行** — 所有智能体均内置速率限制并发运行 - **多模型辩论** — 每项发现由 3-7 个使用不同模型的辩论者进行独立推理 - **DevSecOps 集成** — 每项发现都有负责人、Patch Tuesday 截止日期和 SLA ### 支持的目标类型 - `solana-program` - `evm-contract` - `c-cpp-source` - `rust-source` - `android-apk` - `binary-elf` - `binary-pe` - `linux-kernel-module` - `live-host` - `network-range-own-assets` ### 环境变量 必需项: - `OPENROUTER_API_KEY` - OpenRouter API 密钥 可选项: - `OPENROUTER_DEBUG` - 启用调试日志记录 (设置为 `true`) ## CDP 合规性 此 pipeline 强制执行 Raven 的可辩护预测契约 (CDP): - 每项发现均终止于: - **𝒯** - 工具预言机 (例如,清理器结果) - **𝓜** - 具有校准分数的 ML 检测器 - **𝓛** - 带有证伪测试的评分假设 - 每项发现都包含 D3FEND 技术绑定 - 低置信度项目被标记为“未确认”,而非发现 ## D3FEND + ATT&CK 丰富化 此 pipeline 为每项发现丰富了防御和攻击威胁情报上下文: ### D3FEND 技术映射 - 每项发现都会通过 `D3FENDEnrichmentEngine` 自动映射到相关的 D3FEND 防御技术 - 从 D3FEND 本体 (`ontology_client.py`) 中提取的 CWE 到 D3FEND 的映射 - 生成与 MITRE D3FEND 战术一致的防御建议 ### ATT&CK 技术丰富化 - `ATTACKMapper` 将漏洞类别和 CWE 映射到相关的 MITRE ATT&CK 攻击技术 - 提供威胁行为者上下文:哪些战术会利用此漏洞 - 生成解释现实世界利用场景的威胁叙事 ### 暴露分数调整 - 具有高威胁行为者可能性的发现获得 1.15 倍的优先级提升 - 强大的 D3FEND 防御覆盖可以略微降低暴露程度 - 集成到 `OnChainThreatIntelLayer` 中以进行统一优先级排序 ### 真实数据源 丰富化引擎在可用时加载真实的 MITRE 数据: - **D3FEND Catalog CSV** (`tools/d3fend.csv`) — 270 多种 D3FEND 技术,包含战术、层级和定义。通过 `D3FENDCatalogLoader` 加载。 - **CCI Mappings JSON** (`tools/cci.2022-04-05.json`) — 来自 D3FEND SPARQL 端点的 CCI 到 D3FEND 映射,将 NIST SP 800-53 控制措施链接到防御技术。通过 `CCILoader` 加载。 ``` from pipeline.d3fend import D3FENDEnrichmentEngine # 使用真实数据文件 engine = D3FENDEnrichmentEngine( catalog_path="tools/d3fend.csv", cci_path="tools/cci.2022-04-05.json" ) ``` ### 用法 ``` from pipeline.d3fend import D3FENDEnrichmentEngine from pipeline.models import CandidateFinding, BugClass engine = D3FENDEnrichmentEngine() # 丰富 candidate finding result = engine.enrich_candidate(candidate) print(result.threat_narrative) print([t.id for t in result.d3fend_techniques]) print([t.id for t in result.attack_techniques]) ``` ## 成本管理 此 pipeline 包含成本跟踪和预算强制执行: - 按阶段的成本限制 - 总预算上限 - 超出预算时自动终止 默认预算:$10.00 USD ## 后续步骤 要运行此 pipeline: 1. 设置 `OPENROUTER_API_KEY` 环境变量 2. 安装依赖项 3. 使用您的目标运行编排器 示例: ``` export OPENROUTER_API_KEY="your-api-key-here" python pipeline/orchestrator.py ``` ## 与现有 Raven 的集成 此 pipeline 旨在与现有 Raven 模块集成: - 使用现有的 D3FEND OWL 本体 (`raven/d3fend/`) - 兼容 Raven 技能结构 - 遵循 Raven 仅限防御者的执行策略
标签:AMSI绕过, CDP, D3FEND, DLL 劫持, IP 地址批量处理, LLM, OpenAI, PyRIT, SVG, Unmanaged PE, 人工智能, 内存规避, 多智能体系统, 多模型系统, 大语言模型, 威胁响应, 威胁检测, 安全工程, 安全开源项目, 本体论, 用户模式Hook绕过, 网络安全, 网络安全资助, 自动化防御, 蓝军, 请求拦截, 逆向工具, 防御者, 隐私保护