ledunguit/mcp-dynamic-analysis-server

GitHub: ledunguit/mcp-dynamic-analysis-server

一个基于 MCP 协议的动态分析服务器,通过集成 Valgrind Memcheck 为大模型提供内存漏洞检测能力。

Stars: 0 | Forks: 0

# 动态分析 MCP Server (Valgrind Memcheck) 一个基于 STDIO 的本地 MCP (Model Context Protocol) 服务器,专注于动态分析。此版本自带 Valgrind Memcheck 作为首个工具,代码库结构支持后续添加更多动态分析工具。 ## 概述 - Python 3.11+ - 基于 STDIO 的 MCP server (JSON-RPC) - Valgrind Memcheck 执行 + XML 解析 - 面向 LLM 推理的标准化发现结果 JSON - 在 `runs/` 目录下按运行存储产物 - 使用 pytest 的基础测试 ## 架构 层级: 1. **MCP Interface Layer**:`src/mcp_dynamic_analysis_server/app.py` 2. **Execution Layer**:`core/command_builder.py`、`core/runner.py`、`core/validators.py` 3. **Parsing + Normalization**:`core/parser_memcheck.py`、`core/normalizer.py`、`core/severity.py` 4. **Artifact / Persistence**:`core/artifact_store.py` 工具在 `app.py` 中注册,并可通过额外的动态分析工具扩展。Valgrind Memcheck 当前通过 `valgrind.*` 暴露。 ## 环境要求 ### 选项 A:Docker(推荐,主机无需安装 Valgrind) - Docker + Docker Compose ### 选项 B:主机(高级) - Python 3.11+(推荐使用 pyenv) - 本地安装 Valgrind(Linux) - 用于编译示例二进制文件的 C 编译器(`cc` 或 `clang`) ## 配置 将 `WORKSPACE_ROOT` 设置为允许执行根目录。默认为项目根目录。所有 `target_path`、`cwd` 和抑制文件必须解析到此目录下。 对于 R2 上传,请在 `.env` 中配置凭据(参见 `.env.example`)。 `R2_ENDPOINT` 用于 SDK/预签名,而 `R2_ALLOW_HOSTS` 控制允许下载的主机。如果您为 R2 使用自定义公共域名,请将 `R2_ALLOW_HOSTS` 设置为该域名(可选包含原始 R2 端点主机作为备用)。**仅使用主机名(不带协议)。** 可选:启用 `R2_HEALTHCHECK_ON_STARTUP=true` 以在启动时运行快速 `head_bucket`(通过 `R2_HEALTHCHECK_TIMEOUT_SEC` 设置超时)。失败将记录为警告,不会停止服务器。 ## 安装设置 [仅限主机] ``` pyenv virtualenv 3.12.8 mcp-da pyenv local mcp-da pip install -e .[test] ``` ## 安装 Valgrind [仅限主机] 在 macOS (Homebrew) 上: ``` brew install valgrind ``` 在 Ubuntu/Debian 上: ``` sudo apt-get update && sudo apt-get install -y valgrind ``` ## Docker(在 macOS 上推荐) Valgrind 仅限 Linux。在 macOS 上,使用 Docker 在 Linux 容器内运行服务器和工具。 构建并通过 HTTP 运行 MCP server(Valgrind 在容器内运行): ``` docker compose up --build ``` 服务器监听 `http://localhost:8080`。 产物将通过卷挂载写入主机的 `runs/` 目录。 ## 构建示例易受攻击二进制文件 如果在主机上运行(无 Docker): ``` make -C examples/vulnerable ``` 如果在 Docker 中运行(推荐): ``` docker compose exec -T mcp-da make -C examples/vulnerable ``` 输出: - `examples/vulnerable/bin/invalid_read` - `examples/vulnerable/bin/leak` ## 启动 MCP Server (STDIO) [仅限主机] ``` mcp-da-server ``` 服务器从 STDIN 读取 JSON-RPC 消息,并将响应写入 STDOUT。 ## HTTP API 模式 您也可以将服务器作为简单的 HTTP JSON-RPC API 运行: ``` mcp-da-server --transport http --host 0.0.0.0 --port 8080 ``` 向 `POST /mcp` 发送 JSON-RPC: ``` curl -s http://localhost:8080/mcp \\\n -H 'Content-Type: application/json' \\\n -d '{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"tools/list\",\"params\":{}}'\n``` Example JSON-RPC (tools/call): ```json { "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": { "name": "valgrind.analyze_memcheck", "arguments": { "target_path": "examples/vulnerable/bin/invalid_read" } } } ``` 响应(结果内容为 JSON 文本): ``` { "jsonrpc": "2.0", "id": 1, "result": { "content": [ { "type": "text", "text": "{\\n \\\"run_id\\\": \\\"...\\\"\\n}" } ] } } ``` ## MCP 客户端/主机配置 示例主机配置(伪代码): ``` { "mcpServers": { "dynamic-analysis": { "command": "mcp-da-server", "args": [] } } } ``` ## 工具 ### valgrind.analyze_memcheck 输入(示例): ``` { "target_path": "examples/vulnerable/bin/invalid_read", "args": [], "cwd": "examples/vulnerable", "timeout_sec": 30, "track_origins": true, "leak_check": "full", "show_leak_kinds": "all", "xml": true, "suppressions": [], "env": {}, "stdin": "", "labels": ["demo"] } ``` 对于远程/公共服务器,您可以将二进制文件上传到 R2 并传递 `artifact_id` 或 `target_url`: ``` { "artifact_id": "abcd1234", "args": [], "cwd": "/app/workdir", "timeout_sec": 60, "track_origins": true, "leak_check": "full", "show_leak_kinds": "all", "xml": true, "suppressions": [], "env": {}, "stdin": "" } ``` 输出(摘要): ``` { "run_id": "20260313-153012-acde1234", "status": "completed", "tool": "memcheck", "exit_code": 42, "timed_out": false, "error_exit_code_triggered": true, "stats": { "finding_count": 1, "high": 1, "medium": 0, "low": 0 }, "top_findings": ["..."], "artifacts": { "run_dir": "runs/20260313-153012-acde1234", "report_path": "runs/20260313-153012-acde1234/normalized_report.json", "xml_path": "runs/20260313-153012-acde1234/valgrind.xml", "log_path": "runs/20260313-153012-acde1234/valgrind.log", "stdout_path": "runs/20260313-153012-acde1234/stdout.txt", "stderr_path": "runs/20260313-153012-acde1234/stderr.txt" } } ``` ### valgrind.get_report 输入: ``` { "run_id": "20260313-153012-acde1234" } ``` 输出:完整的标准化报告 JSON。 ### valgrind.list_findings 输入: ``` { "run_id": "20260313-153012-acde1234", "severity": "high", "kind": "InvalidRead", "file": "src/main.c", "function": "parse_input", "limit": 20 } ``` 输出: ``` { "run_id": "20260313-153012-acde1234", "count": 3, "findings": [] } ``` ### valgrind.compare_runs 输入: ``` { "base_run_id": "20260313-153012-acde1234", "new_run_id": "20260313-153512-acde5678" } ``` 输出: ``` { "base_run_id": "20260313-153012-acde1234", "new_run_id": "20260313-153512-acde5678", "summary": {"fixed": 0, "new": 1, "persistent": 2}, "fixed_findings": [], "new_findings": [], "persistent_findings": [] } ``` ### valgrind.get_raw_artifact 输入: ``` { "run_id": "20260313-153012-acde1234", "artifact_type": "xml" } ``` 输出: ``` { "run_id": "20260313-153012-acde1234", "artifact_type": "xml", "path": "runs/20260313-153012-acde1234/valgrind.xml", "content": "...", "truncated": false, "size_bytes": 12345 } ``` ### artifact.create_upload_url 为二进制文件创建预签名的 R2 上传 URL。 输入: ``` { "filename": "app.bin", "content_type": "application/octet-stream", "size_bytes": 123456, "sha256": "..." } ``` 输出: ``` { "artifact_id": "abcd1234", "upload_url": "https://...presigned-put...", "download_url": "https://...presigned-get...", "expires_in_sec": 900, "key": "uploads/abcd1234/app.bin" } ``` ## 运行产物 每次运行会在 `runs//` 下创建: - `request.json` - `command.txt` - `stdout.txt` - `stderr.txt` - `valgrind.xml` - `valgrind.log` - `normalized_report.json` - `summary.json` - `metadata.json` ## 资源提供者(额外功能) 资源通过以下 URI scheme 暴露: ``` artifact:/// ``` 示例: ``` artifact://20260313-153012-acde1234/xml ``` ## 提示词(额外功能) 服务器暴露了一个 `judge_guidance` 提示词,以帮助 LLM 解释 Memcheck 发现。 ## 演示脚本(额外功能) 运行端到端演示(构建示例、运行 Memcheck 两次、比较): ``` python scripts/demo_end_to_end.py ``` ## 测试 ``` pytest ``` ## 已知限制 - 目前仅实现了 Valgrind Memcheck,但注册表支持添加更多动态分析工具。 - JSON-RPC STDIO 传输要求每行一个 JSON 消息。 - 通过 `valgrind.get_raw_artifact` 或资源读取时,大型产物会被截断。 ## 扩展 添加新的动态分析工具: 1. 在 `src/mcp_dynamic_analysis_server/tools/` 中实现新的工具处理程序。 2. 根据需要在 `core/` 下添加解析器/标准化器模块。 3. 在 `app.py` 中使用新名称和 JSON schema 注册该工具。 ## 需求说明 - Memcheck 强制使用 XML 输出以确保结构化解析。 - 服务器验证 `WORKSPACE_ROOT` 内的可执行路径。 - 产物按运行 ID 隔离。 ## 项目结构(节选) ``` mcp_dynamic_analysis_server/ pyproject.toml README.md .env.example src/mcp_dynamic_analysis_server/ app.py config.py logging_config.py models/ tools/ core/ resources/ prompts/ runs/ examples/ tests/ ```
标签:C/C++, Cloudflare R2, DLL 劫持, DNS 反向解析, Docker, JSON-RPC, MCP服务器, Memcheck, Python, STDIO, Valgrind, 事务性I/O, 云安全监控, 内存检查, 内存泄漏, 大语言模型, 安全推理, 安全规则引擎, 安全防御评估, 无后门, 服务器, 缓冲区溢出, 证据驱动, 请求拦截, 逆向工具, 静态分析