jztchl/ai_security_advisor

GitHub: jztchl/ai_security_advisor

结合Semgrep静态分析与AI大模型推理的代码漏洞检测服务,支持异步处理和实时SSE结果推送。

Stars: 0 | Forks: 0

# ai_security_advisor AI 驱动的代码安全分析器。 该服务暴露一个 FastAPI 端点,接受源代码文件上传,在 SQLAlchemy 支持的数据库中存储“任务”记录,并异步处理文件。 代码**首先使用 Semgrep(静态分析)进行分析**,然后将发现结果**传递给 Mistral AI 进行更高层次的推理、优先级排序和建议**(以 Gemini 作为备用)。 系统还支持 **Server-Sent Events (SSE)**,以在分析任务**完成或失败**时实时通知客户端。 ## 功能特性 * 用于安全分析的文件上传 API (`POST /analyze/`) * 使用 **Semgrep** 进行静态分析 * 使用 **Mistral AI** 进行基于 AI 的增强(以 Gemini 作为备用) * 使用 **Celery + Redis** 进行异步/后台处理 * 任务跟踪存储在 SQL 数据库中(通过 `DATABASE_URL` 支持 SQLite) * 通过 `task_id` 检索结果 * 实时 **SSE 通知**,用于任务状态更新 * 结构化 JSON 输出(严重性、漏洞、建议、评分) ## 架构 / 数据流 ### 高层流程 1. 客户端通过 `POST /analyze/` 上传源代码文件 2. API 存储文件并创建状态为 `pending` 的 `Task` 记录 3. 后台 Worker 处理任务: * 在上传的文件上运行 **Semgrep** * 将 Semgrep 发现结果 + 源上下文传递给 **Mistral AI** * Mistral 生成风险评估、严重性和建议(如果需要则回退到 Gemini) 4. 任务状态更新为 `completed` 或 `failed` 5. 客户端可以: * 通过 REST 获取结果 * 订阅 **SSE** 以获取实时更新 ### 详细流程 * **API 层** `app.py` `apis/analyze_apis.py` * **持久化** `db/database.py` `db/models/tasks.py` * **后台处理** `celery_app.py` `tasks/process_file.py` * **静态分析** `semgrep`(从 Worker 调用) * **AI 集成** `core/ai.py` `core/mistral.py` `core/gemini.py` * **通知** 用于任务状态更新的 SSE 端点 ## 分析管道 ### 1. Semgrep(静态分析) * 首先在上传的源文件上运行 * 检测: * 常见漏洞 * 不安全模式 * 特定语言的安全问题 * 生成结构化发现(规则、位置、严重性) ### 2. Mistral AI(主要 AI 增强) * 接收: * 原始源代码 * Semgrep 发现结果 * 如果 Mistral 不可用,则回退到 Gemini * 执行: * 上下文推理 * 风险优先级排序 * 误报减少 * 可操作的安全建议 * 生成最终 JSON 报告 这种混合方法将**确定性静态分析**与**基于 LLM 的推理**相结合。 ## API 端点 ### 上传并启动分析 ``` POST /analyze/ ``` * Multipart 文件上传 * 创建状态为 `pending` 的新任务 * 将后台处理任务入队 ### 获取分析结果 ``` GET /analyze/results/{task_id} ``` 返回: * `pending` → 分析正在进行中 * `completed` → 完整的分析结果 * `failed` → 任务失败(无结果负载) ### 列出分析任务 ``` GET /analyze/tasks ``` 返回分页的任务元数据(id、filename、status、timestamps)。 ### SSE: 任务状态通知 ``` GET /analyze/events ``` * Server-Sent Events 流 * 在任务状态变更时发出事件: * completed * failed 适用于: * 实时仪表板 * 自动刷新的 UI * 消除前端轮询 ## 结果结构示例 ``` { "analysis_id": "uuid", "overall_severity": "Low", "score": 90, "vulnerabilities": [], "recommendations": [ "Validate and sanitize all user inputs.", "Avoid logging sensitive data." ] } ``` ## 环境要求 * 推荐使用 Python 3.10+ * Redis(Celery 后台处理必需) * Semgrep(CLI 必须可用) * Google Gemini API 密钥 关键 Python 依赖(见 `requirements.txt`): * `fastapi`, `uvicorn` * `celery`, `redis` * `sqlalchemy` * `mistralai` * `google-genai` * `pydantic-settings` * `aiofiles` * `semgrep` **注意:** 依赖项已拆分为 `requirements1.txt` 和 `requirements1.txt2.txt`,以避免 OpenTelemetry 包与其他库之间的依赖冲突。请按顺序安装这两个文件。 ## 配置 (.env) 本项目使用 `pydantic-settings` 并从 `.env` 读取环境变量。 在仓库根目录创建一个 `.env` 文件: ``` DATABASE_URL=sqlite:///./security_advisor.db GEMINI_API_KEY=your_gemini_api_key_here MISTRAL_API_KEY=your_mistral_api_key_here REDIS_URL=redis://localhost:6379/0 REDIS_CELERY_BROKER=redis://localhost:6379/0 ``` ## 为什么采用这种设计 * **Semgrep** 提供快速、确定性的静态分析 * **Mistral AI** 增加上下文感知的推理和优先级排序(以 Gemini 作为备用) * **Celery + Redis** 将 API 响应能力与繁重的分析任务解耦 * **SSE** 实现实时用户体验,无需轮询 * API、分析和存储层之间清晰分离 ## 添加新的 AI 提供商 随时修改系统以使用任何精通编码和漏洞分析的 LLM!该架构专为易于扩展而设计: ### 添加新的 AI 提供商: 1. **创建一个新的 provider 文件** 在 `core/` 中(例如 `core/openai.py`, `core/claude.py`) 2. **实现所需函数**,签名如下: def generate_content_{provider}(instruction: str, context: str) -> dict: # Your LLM API call logic here # Return parsed JSON result or None on failure 3. **将 provider 添加到模型字典** 在 [core/ai.py] 中 model_providers = { "mistral": generate_content_mistral, "gemini": gemini_generate_content, "your_provider": generate_content_your_provider, # Add this } 4. **在配置中添加 API 密钥**: # config.py YOUR_PROVIDER_API_KEY: str 5. **更新 requirements.txt** 添加必要的 SDK ### Provider 结构示例: ``` # core/openai.py from openai import OpenAI from config import settings from utils.to_json import to_json client = OpenAI(api_key=settings.OPENAI_API_KEY) def generate_content_openai(instruction: str, context: str) -> dict: try: content = instruction + "\n\n" + context response = client.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": content}] ) return to_json(response.choices[0].message.content) except Exception as e: logger.error(f"Error generating content: {e}") return None ``` 系统将按顺序自动尝试所有可用的 provider,并使用第一个成功的 provider,从而提供内置的冗余和回退支持。
标签:AI安全, API安全, AV绕过, Celery, Chat Copilot, CISA项目, DevSecOps, DLL 劫持, DNS 反向解析, FastAPI, Gemini, JSON报告, JSON输出, Mistral AI, Python, Redis, SAST, Semgrep, SQLAlchemy, SQLite, SSE, WordPress安全扫描, 上游代理, 云安全监控, 代码安全, 后台任务, 大语言模型, 安全专业人员, 安全合规, 异步处理, 搜索引擎查询, 文件上传, 无后门, 智能体, 服务器推送事件, 漏洞枚举, 盲注攻击, 结构化查询, 网络代理, 自动化安全, 自定义脚本, 逆向工具, 防御框架, 静态分析