AdityaGuhaa/multiagent-soc

GitHub: AdityaGuhaa/multiagent-soc

一个基于 Python 和 CrewAI 的多智能体 SOC 自动化框架,实现日志检测、威胁情报增强与自动响应决策。

Stars: 0 | Forks: 0

# 多智能体安全运营中心(SOC) **一个增强型 AI、基于规则的 SOC 原型**,持续流式传输 Linux 认证日志,使用轻量级检测器检测恶意活动,通过 VirusTotal 智能丰富事件,并借助三 Agent 的 CrewAI 工作流自动做出响应决策。 Screenshot from 2026-04-13 15-37-50 ## 目录 1. [概述](#overview) 2. [架构](#architecture) 3. [快速开始](#getting-started) 4. [配置](#configuration) 5. [运行系统](#running-the-system) 6. [测试](#testing) 7. [扩展 SOC](#extending-the-soc) 8. [故障排查与常见问题](#troubleshooting--faq) 9. [许可证](#license) ## 概述 该仓库实现了针对 Linux 服务器 SSH 和 sudo 活动的**实时安全监控流水线**,结合了: - **基于规则的检测**(失败密码、root 登录尝试、快速重连等) - **使用 CrewAI 的 AI 分类**(三个专用 Agent:日志分析、威胁情报、响应决策) - **自动化缓解**(iptables 阻断、Slack 告警、持久化 JSON Lines 告警日志) 所有组件均用 Python 编写,可在任意现代 Linux 主机上运行。设计有意保持模块化,以便以最小摩擦添加新的检测器、情报源或响应动作。 ## 架构 ``` +-------------------+ +-------------------+ +-------------------+ | LogStreamer | ---> | DetectionEngine | ---> | Detection Event | | (tail‑F semantics)| | (composes )| | dict | +-------------------+ +-------------------+ +-------------------+ | | v v +-------------------+ +-------------------+ | SOCSystem | | AlertLogger | | (orchestrates) | | (JSON‑Lines) | +-------------------+ +-------------------+ | v +-------------------+ +-------------------+ | SOCCrew (CrewAI) | ---> | Slack / Firewall | +-------------------+ +-------------------+ | v +-------------------+ | Persisted Alerts | +-------------------+ ``` - **`main.py`** – 入口点;实例化 `SOCSystem` 并启动后台 AI 分析工作器。 - **`utils/log_streamer.py`** – 提供 `LogStreamer`(实时日志读取)与 `MockLogStreamer`(用于确定性单元测试)。 - **`detection/engine.py`** – 定义五个检测器(`BruteForceDetector`、`InvalidUserDetector`、`RootLoginDetector`、`SudoFailureDetector`、`RapidReconnectDetector`)。`DetectionEngine` 在每行输入上运行所有检测器并聚合结果。 - **`agents/crew.py`** – 构建三个 CrewAI Agent: 1. **日志分析器** – 提取攻击类型、IP、用户名、计数、严重性。 2. **威胁情报** – 接收预取的 VirusTotal 数据(通过 `tools/vt_client.py`)。 3. **SOC 响应器** – 决定 `BLOCK` 或 `ALLOW`,并输出严格 JSON。 - **`tools/firewall.py`** – 对 `iptables` 的轻量封装(当 `FIREWALL_DRY_RUN` 为真时为空操作)。遵循 `config.py` 中的白名单。 - **`integrations/slack_alert.py`** – 将格式化的 Markdown 告警发布到 Slack Webhook。 - **`utils/alert_logger.py`** – 将每个决策写入 `alerts.log`(JSON Lines)并提供查询辅助方法(`filter`、`stats`)。 - **`config.py`** – 集中配置;所有值均可通过同名环境变量覆盖(日志路径、阈值、LLM 模型、VirusTotal 密钥、白名单等)。 ## 快速开始 1. **克隆仓库** git clone https://github.com//multiagent-soc.git cd multiagent-soc 2. **创建虚拟环境**(可选但推荐) python -m venv .venv source .venv/bin/activate 3. **安装 Python 依赖** pip install -r requirements.txt 4. **拉取用于 CrewAI 的 LLM 模型**(默认使用 `mistral:latest`)。若本地使用 Ollama: ollama pull mistral:latest 如需其他模型,请在环境变量或 `config.py` 中调整 `OLLAMA_MODEL`。 ## 配置 所有可调参数位于 `config.py`。可通过同名环境变量覆盖任意值: | 变量 | 用途 | 默认值 | |------|------|--------| | `AUTH_LOG_PATH` | 监控的认证日志路径 | `/var/log/auth.log` | | `ALERT_LOG_PATH` | 告警持久化文件 | `./alerts.log` | | `BRUTE_FORCE_THRESHOLD` | 触发暴力破解检测的失败登录计数 | `5` | | `TIME_WINDOW_SECONDS` | 统计尝试的滑动窗口时长(秒) | `60` | | `OLLAMA_MODEL` | CrewAI 使用的 Ollama 模型标签 | `mistral:latest` | | `OLLAMA_BASE_URL` | Ollama API 基础 URL | `http://localhost:11434` | | `VIRUSTOTAL_API_KEY` | VirusTotal 查询 API 密钥(可选) | `''` | | `IP_WHITELIST` | 永不阻断的 IP 列表(逗号分隔) | `127.0.0.1,::1,10.0.0.1` | | `FIREWALL_DRY_RUN` | `true` 时禁用实际 iptables 变更(开发调试用) | `false` | | `LLM_TEMPERATURE` | 控制 LLM 输出随机性 | `0.1` | | `LLM_MAX_TOKENS` | 每个 Agent 响应的最大 Token 数 | `512` | ## 运行系统 ### 生产模式(真实日志) ``` export FIREWALL_DRY_RUN=false # optional – set to true for safe testing python main.py ``` 系统将: 1. 尾随 `AUTH_LOG_PATH` 指定的文件。 2. 在控制台输出彩色原始日志、检测、AI 决策与防火墙动作。 3. 将结构化告警写入 `ALERT_LOG_PATH`。 4. 若配置了 `SLACK_WEBHOOK_URL`,则发送 Slack 通知。 ### 开发 / 干运行模式 ``` export FIREWALL_DRY_RUN=true python main.py ``` 记录阻断但不实际执行 `iptables` 变更。 ### 使用模拟日志流进行快速演示 在 `SOCSystem` 中替换真实的 `LogStreamer` 为 `MockLogStreamer`(或运行提供的测试框架)。模拟重放一组代表性认证日志行,无需 root 权限即可展示检测与 AI 响应。 ## 测试 仓库包含一个专注于 `AlertLogger` 的单元测试套件,可扩展以覆盖检测器和 Crew 工作流。 ``` pytest ``` *运行单个测试文件* ``` pytest tests/test_soc_system.py ``` 运行需要模拟日志流的测试时,`utils/log_streamer.py` 中的 `MockLogStreamer` 类提供确定性输入。 ## 扩展 SOC ### 添加新检测器 1. 在 `detection/engine.py` 中创建实现 `process(self, line: str) -> Optional[dict]` 方法的类。 2. 在 `DetectionEngine.__init__` 中通过追加到 `_detectors` 列表注册该检测器。 3. 按需定义新的检测类型常量;其余流水线(告警记录器、AI Crew)会自动处理。 ### 添加新 AI 步骤 1. 在 `agents/crew.py` 中定义一个新的 CrewAI `Agent`(类似现有三个)。 2. 创建对应的 `Task` 构造器函数。 3. 在 `tasks` 列表中的启动(kickoff)前插入新任务。 4. 如需解释额外 JSON 字段,扩展 `_parse_decision`。 ### 新增响应动作 - 实现包装器(例如 `tools/email_alert.py`),并在 `_run_ai_analysis` 中 Crew 决策后调用。 - 确保动作遵循白名单与干运行设置。 ## 故障排查与常见问题 - **无检测出现** – 确认 `AUTH_LOG_PATH` 指向包含 SSH 认证日志的文件;使用 `tail -f /var/log/auth.log` 确认新行正在写入。 - **AI 决策始终为 `ALLOW`** – 检查 LLM 模型是否可达(Ollama 运行中)且 `OLLAMA_BASE_URL` 正确;检查 CrewAI 日志是否有错误。 - **防火墙阻断未应用** – 确保进程以足够权限(root)运行,或设置 `FIREWALL_DRY_RUN` 为 `false` 并确认 `iptables` 可用。 - **VirusTotal 增强失败** – 确认 `VIRUSTOTAL_API_KEY` 已设置且配额充足;系统将在密钥缺失时回退到占位信息。 - **Slack 通知缺失** – 在环境变量或 `config.py` 中设置 `SLACK_WEBHOOK_URL`Webhook 必须接受 POST JSON 载荷。 ## 许可证 本项目根据 **MIT 许可证** 授权——详情见 `LICENSE` 文件。
标签:AI辅助安全, CrewAI, FTP漏洞扫描, iptables, Linux日志监控, Python安全工具, Slack告警, SOC自动化, SSH监控, sudo审计, 事件分级, 可扩展安全, 多智能体, 威胁情报, 安全编排, 安全运营中心, 实时检测, 开发者工具, 模块化框架, 流式日志处理, 病毒总览, 网络映射, 自动化响应, 规则检测, 逆向工具