ledunguit/mcp-dynamic-analysis-server
GitHub: ledunguit/mcp-dynamic-analysis-server
一个基于 MCP 协议的动态分析服务器,通过集成 Valgrind Memcheck 为大模型提供内存漏洞检测能力。
Stars: 0 | Forks: 0
# 动态分析 MCP Server (Valgrind Memcheck)
一个基于 STDIO 的本地 MCP (Model Context Protocol) 服务器,专注于动态分析。此版本自带 Valgrind Memcheck 作为首个工具,代码库结构支持后续添加更多动态分析工具。
## 概述
- Python 3.11+
- 基于 STDIO 的 MCP server (JSON-RPC)
- Valgrind Memcheck 执行 + XML 解析
- 面向 LLM 推理的标准化发现结果 JSON
- 在 `runs/` 目录下按运行存储产物
- 使用 pytest 的基础测试
## 架构
层级:
1. **MCP Interface Layer**:`src/mcp_dynamic_analysis_server/app.py`
2. **Execution Layer**:`core/command_builder.py`、`core/runner.py`、`core/validators.py`
3. **Parsing + Normalization**:`core/parser_memcheck.py`、`core/normalizer.py`、`core/severity.py`
4. **Artifact / Persistence**:`core/artifact_store.py`
工具在 `app.py` 中注册,并可通过额外的动态分析工具扩展。Valgrind Memcheck 当前通过 `valgrind.*` 暴露。
## 环境要求
### 选项 A:Docker(推荐,主机无需安装 Valgrind)
- Docker + Docker Compose
### 选项 B:主机(高级)
- Python 3.11+(推荐使用 pyenv)
- 本地安装 Valgrind(Linux)
- 用于编译示例二进制文件的 C 编译器(`cc` 或 `clang`)
## 配置
将 `WORKSPACE_ROOT` 设置为允许执行根目录。默认为项目根目录。所有 `target_path`、`cwd` 和抑制文件必须解析到此目录下。
对于 R2 上传,请在 `.env` 中配置凭据(参见 `.env.example`)。
`R2_ENDPOINT` 用于 SDK/预签名,而 `R2_ALLOW_HOSTS` 控制允许下载的主机。如果您为 R2 使用自定义公共域名,请将 `R2_ALLOW_HOSTS` 设置为该域名(可选包含原始 R2 端点主机作为备用)。**仅使用主机名(不带协议)。**
可选:启用 `R2_HEALTHCHECK_ON_STARTUP=true` 以在启动时运行快速 `head_bucket`(通过 `R2_HEALTHCHECK_TIMEOUT_SEC` 设置超时)。失败将记录为警告,不会停止服务器。
## 安装设置 [仅限主机]
```
pyenv virtualenv 3.12.8 mcp-da
pyenv local mcp-da
pip install -e .[test]
```
## 安装 Valgrind [仅限主机]
在 macOS (Homebrew) 上:
```
brew install valgrind
```
在 Ubuntu/Debian 上:
```
sudo apt-get update && sudo apt-get install -y valgrind
```
## Docker(在 macOS 上推荐)
Valgrind 仅限 Linux。在 macOS 上,使用 Docker 在 Linux 容器内运行服务器和工具。
构建并通过 HTTP 运行 MCP server(Valgrind 在容器内运行):
```
docker compose up --build
```
服务器监听 `http://localhost:8080`。
产物将通过卷挂载写入主机的 `runs/` 目录。
## 构建示例易受攻击二进制文件
如果在主机上运行(无 Docker):
```
make -C examples/vulnerable
```
如果在 Docker 中运行(推荐):
```
docker compose exec -T mcp-da make -C examples/vulnerable
```
输出:
- `examples/vulnerable/bin/invalid_read`
- `examples/vulnerable/bin/leak`
## 启动 MCP Server (STDIO) [仅限主机]
```
mcp-da-server
```
服务器从 STDIN 读取 JSON-RPC 消息,并将响应写入 STDOUT。
## HTTP API 模式
您也可以将服务器作为简单的 HTTP JSON-RPC API 运行:
```
mcp-da-server --transport http --host 0.0.0.0 --port 8080
```
向 `POST /mcp` 发送 JSON-RPC:
```
curl -s http://localhost:8080/mcp \\\n -H 'Content-Type: application/json' \\\n -d '{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"tools/list\",\"params\":{}}'\n```
Example JSON-RPC (tools/call):
```json
{ "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": { "name": "valgrind.analyze_memcheck", "arguments": { "target_path": "examples/vulnerable/bin/invalid_read" } } }
```
响应(结果内容为 JSON 文本):
```
{ "jsonrpc": "2.0", "id": 1, "result": { "content": [ { "type": "text", "text": "{\\n \\\"run_id\\\": \\\"...\\\"\\n}" } ] } }
```
## MCP 客户端/主机配置
示例主机配置(伪代码):
```
{
"mcpServers": {
"dynamic-analysis": {
"command": "mcp-da-server",
"args": []
}
}
}
```
## 工具
### valgrind.analyze_memcheck
输入(示例):
```
{
"target_path": "examples/vulnerable/bin/invalid_read",
"args": [],
"cwd": "examples/vulnerable",
"timeout_sec": 30,
"track_origins": true,
"leak_check": "full",
"show_leak_kinds": "all",
"xml": true,
"suppressions": [],
"env": {},
"stdin": "",
"labels": ["demo"]
}
```
对于远程/公共服务器,您可以将二进制文件上传到 R2 并传递 `artifact_id` 或 `target_url`:
```
{
"artifact_id": "abcd1234",
"args": [],
"cwd": "/app/workdir",
"timeout_sec": 60,
"track_origins": true,
"leak_check": "full",
"show_leak_kinds": "all",
"xml": true,
"suppressions": [],
"env": {},
"stdin": ""
}
```
输出(摘要):
```
{
"run_id": "20260313-153012-acde1234",
"status": "completed",
"tool": "memcheck",
"exit_code": 42,
"timed_out": false,
"error_exit_code_triggered": true,
"stats": {
"finding_count": 1,
"high": 1,
"medium": 0,
"low": 0
},
"top_findings": ["..."],
"artifacts": {
"run_dir": "runs/20260313-153012-acde1234",
"report_path": "runs/20260313-153012-acde1234/normalized_report.json",
"xml_path": "runs/20260313-153012-acde1234/valgrind.xml",
"log_path": "runs/20260313-153012-acde1234/valgrind.log",
"stdout_path": "runs/20260313-153012-acde1234/stdout.txt",
"stderr_path": "runs/20260313-153012-acde1234/stderr.txt"
}
}
```
### valgrind.get_report
输入:
```
{ "run_id": "20260313-153012-acde1234" }
```
输出:完整的标准化报告 JSON。
### valgrind.list_findings
输入:
```
{
"run_id": "20260313-153012-acde1234",
"severity": "high",
"kind": "InvalidRead",
"file": "src/main.c",
"function": "parse_input",
"limit": 20
}
```
输出:
```
{
"run_id": "20260313-153012-acde1234",
"count": 3,
"findings": []
}
```
### valgrind.compare_runs
输入:
```
{
"base_run_id": "20260313-153012-acde1234",
"new_run_id": "20260313-153512-acde5678"
}
```
输出:
```
{
"base_run_id": "20260313-153012-acde1234",
"new_run_id": "20260313-153512-acde5678",
"summary": {"fixed": 0, "new": 1, "persistent": 2},
"fixed_findings": [],
"new_findings": [],
"persistent_findings": []
}
```
### valgrind.get_raw_artifact
输入:
```
{ "run_id": "20260313-153012-acde1234", "artifact_type": "xml" }
```
输出:
```
{
"run_id": "20260313-153012-acde1234",
"artifact_type": "xml",
"path": "runs/20260313-153012-acde1234/valgrind.xml",
"content": "...",
"truncated": false,
"size_bytes": 12345
}
```
### artifact.create_upload_url
为二进制文件创建预签名的 R2 上传 URL。
输入:
```
{
"filename": "app.bin",
"content_type": "application/octet-stream",
"size_bytes": 123456,
"sha256": "..."
}
```
输出:
```
{
"artifact_id": "abcd1234",
"upload_url": "https://...presigned-put...",
"download_url": "https://...presigned-get...",
"expires_in_sec": 900,
"key": "uploads/abcd1234/app.bin"
}
```
## 运行产物
每次运行会在 `runs//` 下创建:
- `request.json`
- `command.txt`
- `stdout.txt`
- `stderr.txt`
- `valgrind.xml`
- `valgrind.log`
- `normalized_report.json`
- `summary.json`
- `metadata.json`
## 资源提供者(额外功能)
资源通过以下 URI scheme 暴露:
```
artifact:///
```
示例:
```
artifact://20260313-153012-acde1234/xml
```
## 提示词(额外功能)
服务器暴露了一个 `judge_guidance` 提示词,以帮助 LLM 解释 Memcheck 发现。
## 演示脚本(额外功能)
运行端到端演示(构建示例、运行 Memcheck 两次、比较):
```
python scripts/demo_end_to_end.py
```
## 测试
```
pytest
```
## 已知限制
- 目前仅实现了 Valgrind Memcheck,但注册表支持添加更多动态分析工具。
- JSON-RPC STDIO 传输要求每行一个 JSON 消息。
- 通过 `valgrind.get_raw_artifact` 或资源读取时,大型产物会被截断。
## 扩展
添加新的动态分析工具:
1. 在 `src/mcp_dynamic_analysis_server/tools/` 中实现新的工具处理程序。
2. 根据需要在 `core/` 下添加解析器/标准化器模块。
3. 在 `app.py` 中使用新名称和 JSON schema 注册该工具。
## 需求说明
- Memcheck 强制使用 XML 输出以确保结构化解析。
- 服务器验证 `WORKSPACE_ROOT` 内的可执行路径。
- 产物按运行 ID 隔离。
## 项目结构(节选)
```
mcp_dynamic_analysis_server/
pyproject.toml
README.md
.env.example
src/mcp_dynamic_analysis_server/
app.py
config.py
logging_config.py
models/
tools/
core/
resources/
prompts/
runs/
examples/
tests/
```
标签:C/C++, Cloudflare R2, DLL 劫持, DNS 反向解析, Docker, JSON-RPC, MCP服务器, Memcheck, Python, STDIO, Valgrind, 事务性I/O, 云安全监控, 内存检查, 内存泄漏, 大语言模型, 安全推理, 安全规则引擎, 安全防御评估, 无后门, 服务器, 缓冲区溢出, 证据驱动, 请求拦截, 逆向工具, 静态分析