mohamednoorulnaseem/AI-Security-Log-Analyst-Agent-

GitHub: mohamednoorulnaseem/AI-Security-Log-Analyst-Agent

一个利用 LangChain ReAct Agent 和向量检索技术自动分析服务器日志、检测多阶段攻击并生成取证报告的 AI 安全分析平台。

Stars: 0 | Forks: 0

# LogSentinel AI:Agentic 安全日志分析师与取证平台 [![Tests](https://static.pigsec.cn/wp-content/uploads/repos/2026/06/f442e745f2100841.svg)](https://github.com/mohamednoorulnaseem/AI-Security-Log-Analyst-Agent-/actions) [![FastAPI](https://img.shields.io/badge/API-FastAPI-009688.svg?style=flat&logo=fastapi&logoColor=white)](https://fastapi.tiangolo.com) [![Streamlit](https://img.shields.io/badge/Frontend-Streamlit-FF4B4B.svg?style=flat&logo=streamlit&logoColor=white)](https://streamlit.io) [![ChromaDB](https://img.shields.io/badge/VectorDB-ChromaDB-yellow.svg?style=flat)](https://www.trychroma.com) [![PostgreSQL](https://img.shields.io/badge/Database-PostgreSQL-336791.svg?style=flat&logo=postgresql&logoColor=white)](https://www.postgresql.org) LogSentinel AI 是一个 AI 驱动的安全分析师 agent,能够读取原始服务器日志、检测异常、分类威胁模式、追踪跨服务器的多阶段攻击,并生成结构化的事件报告。它以即时、可操作的安全洞察和完全透明的取证过程,取代了耗时的手动日志阅读。 该平台从底层构建以确保生产环境的可靠性,实现了基于时间窗口的向量摄取 pipeline、带有自定义取证工具的两阶段 ReAct agent 推理循环、防审计的数据库 schema 以及交互式 Streamlit SOC 仪表板。 ## 架构概述 该平台围绕包含四个不同阶段的微服务栈设计:日志摄取、Agentic 推理、持久化/分发以及可视化交互。 ``` graph TD %% Log Ingestion Layer subgraph Ingestion["1. Log Ingestion & Vector Storage"] RawLogs["Raw Server Logs
(Apache, Auth, Syslog)"] --> Parser["Multi-Format Log Parser
(Auto-detects Format)"] Parser --> Chunker["Chronological Chunker
(15-min Windowing)"] Chunker --> Embedder["OpenAI Embedder
(text-embedding-3-small)"] Embedder --> ChromaDB[("ChromaDB
(Vector Store)")] end %% Agentic Forensics Layer subgraph AgenticReasoning["2. Two-Phase Agentic Analysis"] APIAnalysed["POST /analyse
Request"] --> Agent["SecurityAnalystAgent
(gpt-4o-mini)"] Agent <--> Tools["Agent Tools:
- Semantic Log Search
- Time-Range Filter
- IP Address Correlation
- Attack Timeline Tracer"] Agent --> FormattedData["Phase 2: Structured Formatting
(Schema Binding)"] FormattedData --> Report["Pydantic IncidentReport"] end %% Storage & API Layer subgraph Storage["3. Persistence & Distribution"] Report --> Repo["Async Repository"] Repo --> Postgres[(PostgreSQL)] Repo --> FastAPI["FastAPI Endpoints
(X-API-Key Secure)"] end %% Presentation Layer subgraph UI["4. Presentation & Analytics"] FastAPI --> Dashboard["Streamlit Dashboard
- Interactive Forensic Tabs
- MITRE Stage Timelines
- Collapsible ReAct Traces
- SOC Performance Analytics"] end ChromaDB -.-> Tools ``` ### 核心 Pipeline 执行 1. **基于时间的窗口摄取**:摄取的日志流(支持 Linux `/var/log/auth.log`、系统 syslog RFC 3164 和 Apache Combined 格式)会被解析、验证,并按时间顺序分割为 15 分钟的窗口。 2. **语义向量生成**:数据块通过 LangChain 的 OpenAI embedding 集成(`text-embedding-3-small`)进行处理,并存储到 ChromaDB 向量数据库中。 3. **两阶段 ReAct Agent 推理**: - **第一阶段:信息收集**:agent 运行推理循环(最多 5 次迭代),使用工具查询向量数据库、执行时间范围过滤、追踪 IP 地址,并构建连贯的安全时间线。 - **第二阶段:结构化序列化**:通过 OpenAI 结构化输出,将收集到的证据映射到严格的 Pydantic 模型(`IncidentReport`)中,以确保 API 的确定性。 4. **关系型审计**:生成的报告和详细的执行日志(包括确切的工具调用、推理步骤、延迟和 token 计数)会被保存到 PostgreSQL 数据库中。 5. **交互式仪表板**:安全分析师可以上传文件、运行推理任务、查看实时日志、审计 agent 步骤,并跟踪 SOC 指标(检测率、严重性计数)。 ## 技术特性 * **自动检测日志解析器**:从标准日志格式中提取结构化字段(时间戳、主机、进程、IP、请求路径、状态码)。 * **攻击链追踪器**:分析日志间的相关性,将事件分类为五个关键安全阶段(侦察、访问尝试、初始访问、权限提升和 None)。 * **防审计数据库 Schema**:使用外键覆盖(删除时 `SET NULL`)持久化 agent 运行记录,即使在清除事件记录后也能维护 token 消耗审计日志。 * **REST API 安全性**:通过标头密钥验证(`X-API-Key`)保护 endpoint,并提供一个全面的 `/health` 路由用于验证依赖连接(PostgreSQL、ChromaDB、OpenAI)。 * **Docker 编排**:使用 `docker-compose` 对微服务进行容器化和网络连接。包括在容器启动时通过 Alembic 执行异步数据库迁移升级。 * **集成 Mock 的测试套件**:包含 76 个单元和集成测试,通过率为 100%,可在本地运行而无需外部 API 密钥或数据库连接。 ## 项目结构 ``` AI-Security-Log-Analyst-Agent/ │ ├── alembic/ # Database migration configuration & versions ├── app/ # Application source code │ ├── api/ # FastAPI routes, schemas, and authentication │ │ ├── models/ # Request & Response schemas (Pydantic V2) │ │ └── routes/ # Endpoint controllers (health, logs, analysis) │ ├── agent/ # Security Analyst Agent logic & custom tools │ ├── db/ # SQLAlchemy models, connection pools, repositories │ ├── ingestion/ # File parsers, chunkers, and ChromaDB vector store wrapper │ ├── utils/ # Standardized logging & application utilities │ ├── config.py # Pydantic Settings configuration manager │ └── main.py # FastAPI application entrypoint │ ├── dashboard/ # Streamlit UI dashboard │ ├── components/ # UI CSS injection, metrics styling, and client API │ └── app.py # Dashboard tab configuration & UI layouts │ ├── data/ # Local file databases, log storage, & manifests │ └── logs/ # Ground truth test logs (Apache, Auth, Syslog) │ ├── docker/ # Dockerfiles & container launch scripts │ ├── Dockerfile.app # Production container configuration for FastAPI │ ├── Dockerfile.dashboard # Production container configuration for Streamlit │ └── entrypoint.sh # Launch script executing migrations preceding server start │ ├── scripts/ # Automated toolkits & testing scripts │ ├── generate_logs.py # Planted security threat log generator (500+ lines) │ ├── benchmark.py # LogSentinel AI agent performance benchmark tool │ └── ingest_sample.py # End-to-end vector ingestion testing script │ ├── tests/ # 76-suite pytest unit and integration tests ├── docker-compose.yml # Orchestration configuration mapping all containers ├── pyproject.toml # Code formatting guidelines ├── requirements.txt # Production package dependencies └── README.md # Project handbook & developer documentation ``` ## 环境配置 将 `.env.example` 复制到 `.env` 并配置以下变量: | 变量 | 类型 | 默认值 | 描述 | |---|---|---|---| | `OPENAI_API_KEY` | String | *必填* | 用于 OpenAI embedding 生成和 Agentic 分析的 API key。 | | `OPENAI_MODEL` | String | `gpt-4o-mini` | 用于 ReAct agent 推理的目标 LLM。 | | `OPENAI_EMBEDDING_MODEL`| String | `text-embedding-3-small` | 用于日志块存储的目标 embedding 引擎。 | | `POSTGRES_HOST` | String | `localhost` | 数据库主机(在 Docker 内部使用 `postgres`)。 | | `POSTGRES_PORT` | Integer| `5432` | Postgres 数据库服务端口。 | | `POSTGRES_DB` | String | `logsentinel` | 关系型数据库名称。 | | `POSTGRES_USER` | String | `logsentinel_user` | 数据库用户账号。 | | `POSTGRES_PASSWORD` | String | `change-this-in-production` | 密码凭证。 | | `CHROMA_HOST` | String | `localhost` | ChromaDB 向量主机(在 Docker 内部使用 `chromadb`)。 | | `CHROMA_PORT` | Integer| `8000` | 向量服务端口。 | | `API_KEY` | String | `your-api-key-here` | FastAPI endpoint 的访问 token。 | | `API_BASE_URL` | String | `http://localhost:8080` | 仪表板访问的核心 API 地址。 | ## 快速开始(Docker 部署) 只需一条命令即可启动整个容器化架构。请确保已安装 Docker 和 Docker Compose。 1. **克隆代码仓库**: git clone https://github.com/mohamednoorulnaseem/AI-Security-Log-Analyst-Agent-.git cd AI-Security-Log-Analyst-Agent- 2. **配置环境变量**: 在根目录下创建 `.env` 文件: cp .env.example .env # 打开 .env 并填入你的 OPENAI_API_KEY 及自定义配置详情。 3. **启动所有服务**: docker-compose up --build 4. **验证应用服务**: - **FastAPI 后端 (Swagger API 文档)**: [http://localhost:8080/docs](http://localhost:8080/docs) - **Streamlit 仪表板**: [http://localhost:8501](http://localhost:8501) - **ChromaDB 向量存储**: [http://localhost:8000](http://localhost:8000) ## 本地开发设置 如需在 Docker 之外本地调试组件: 1. **初始化虚拟环境**: python -m venv venv # Windows 下激活: venv\Scripts\activate # Linux/Mac 下激活: source venv/bin/activate 2. **安装依赖项**: pip install -r requirements.txt 3. **数据库设置 (SQLite / PostgreSQL)**: 确保你本地运行了 PostgreSQL,或配置 Alembic 进行迁移。在本地运行迁移: python -c "import alembic.config; alembic.config.main(argv=['upgrade', 'head'])" 4. **独立运行服务器**: - **运行 FastAPI 后端**: uvicorn app.main:main --host 0.0.0.0 --port 8080 --reload - **运行 Streamlit 前端**: streamlit run dashboard/app.py ## 验证与基准测试 LogSentinel AI 具备自动验证和性能评估功能。 ### 自动化测试 使用 pytest 运行完整的 76 项测试套件。该测试套件是隔离的;它会 mock 数据库操作和 API 响应,因此无需激活的 OpenAI API key 或 Postgres 实例即可运行。 ``` python -m pytest tests/ -v ``` ### 取证分析基准测试 使用基准测试工具包针对现实安全事件评估 agent 能力: 1. **生成真实日志文件**: 跨三种日志流生成包含 504 行日志的合成数据集,其中包含正常的后台活动和 **4 个活跃威胁向量**。 python scripts/generate_logs.py 2. **运行 Agent 基准测试**: 针对威胁数据集执行完整的解析、embedding 和推理 pipeline。将输出与真实清单(ground-truth manifests)进行比较以对结果进行评分: python scripts/benchmark.py *(注意:添加 `--mock` 标志即可使用缓存的 mock 模型输出完全离线地运行基准测试套件,从而保护你的 token 预算:`python scripts/benchmark.py --mock`)* #### 基准测试评估结果(实时生产模式) 下表反映了针对实时 LLM(`gemini-2.5-flash`)和本地 embedding(`sentence-transformers/all-MiniLM-L6-v2`)模型运行基准 pipeline 的真实结果: | 攻击场景 | 目标威胁 IP | 真实严重性 | Agent 严重性 | IP 归属 | 延迟 (秒) | 状态 | | :--- | :--- | :--- | :--- | :--- | :--- | :--- | | **SSH 暴力破解** | 203.0.113.42 | CRITICAL | CRITICAL | 匹配 (203.0.113.42) | 40.62s | 已检测 | | **权限提升** | 203.0.113.42 | CRITICAL | CRITICAL | 匹配 (203.0.113.42) | 47.01s | 已检测 | | **端口扫描** | 45.33.32.156 | MEDIUM | MEDIUM | 匹配 (45.33.32.156) | 30.01s | 已检测 | | **SQL 注入尝试** | 198.51.100.77 | HIGH | CRITICAL | 匹配 (198.51.100.77) | 36.24s | 已检测 | **汇总指标:** - **检测率 (TPR)**:100%(检测到了 4/4 个植入的场景) - **类型分类准确率**:100% - **严重性校准**:75% 完全匹配(Agent 将 SQL 注入评级为 `CRITICAL` 而不是目标设定的 `HIGH`,因为 SQLi payload 针对的是高价值数据库 endpoint,并触发了大量的 Web 错误状态码,这是一个合理的升级)。 - **IP 归属准确率**:100% - **平均分析延迟**:38.47s - **评估日志总行数**:504 行 ## 当前局限性 尽管 LogSentinel AI 是一个功能完备的 Agentic 平台,但它仍具有以下限制: * **合成测试数据**:用于基准测试和测试的日志数据集是合成生成的。该平台尚未针对嘈杂的、实时的高吞吐量生产数据流进行过测试。 * **离线 Mock 回退**:为了便于测试和控制 OpenAI API 成本,基准测试的执行依赖于预先录制的模拟响应(离线 mock 模式回退)。 * **单节点部署**:数据库和向量存储在独立的单节点容器内运行,缺乏多副本高可用性、集群状态管理或分布式索引。 * **无实时流处理**:日志摄取围绕批处理文件(`.log`)构建,要求文件在分块和索引之前必须写入磁盘,而不是利用实时的 TCP/UDP 流式 socket 或日志转发器(log shipper)。 ## 有更多时间的话我会改进的地方 * **实时日志转发器集成**:将摄取方式从静态批处理解析转变为流式 agent(例如 Logstash、Filebeat 或 FluentBit),在向量索引之前直接推送到消息队列(如 Redis 或 Kafka)。 * **实时威胁情报源关联**:将动态威胁情报 API(例如 VirusTotal、AbuseIPDB 或 AlienVault OTX)集成到 agent 的工具集中,将日志 IP 地址与实时外部信誉进行关联。 * **NLP/分类器时间线解析**:将攻击链追踪器从基于启发式正则表达式和关键字的分类器升级为微调的 NLP 分类模型,以将任意日志事件与 MITRE ATT&CK 阶段相匹配。 * **分布式向量索引与缓存**:部署分布式向量存储集群并实施语义缓存,以防止在常见重复的服务器流量模式上进行冗余的 LLM 推理循环。 ## 作者 * **Mohamed Noorul Naseem** — AI 工程师
标签:AMSI绕过, AV绕过, FastAPI, Kubernetes, LangChain, Streamlit, 威胁检测, 子域名变形, 安全日志分析, 库, 应急响应, 测试用例, 访问控制, 请求拦截, 轻量级, 逆向工具