AdityaGuhaa/multiagent-soc
GitHub: AdityaGuhaa/multiagent-soc
一个基于 Python 和 CrewAI 的多智能体 SOC 自动化框架,实现日志检测、威胁情报增强与自动响应决策。
Stars: 0 | Forks: 0
# 多智能体安全运营中心(SOC)
**一个增强型 AI、基于规则的 SOC 原型**,持续流式传输 Linux 认证日志,使用轻量级检测器检测恶意活动,通过 VirusTotal 智能丰富事件,并借助三 Agent 的 CrewAI 工作流自动做出响应决策。
## 目录
1. [概述](#overview)
2. [架构](#architecture)
3. [快速开始](#getting-started)
4. [配置](#configuration)
5. [运行系统](#running-the-system)
6. [测试](#testing)
7. [扩展 SOC](#extending-the-soc)
8. [故障排查与常见问题](#troubleshooting--faq)
9. [许可证](#license)
## 概述
该仓库实现了针对 Linux 服务器 SSH 和 sudo 活动的**实时安全监控流水线**,结合了:
- **基于规则的检测**(失败密码、root 登录尝试、快速重连等)
- **使用 CrewAI 的 AI 分类**(三个专用 Agent:日志分析、威胁情报、响应决策)
- **自动化缓解**(iptables 阻断、Slack 告警、持久化 JSON Lines 告警日志)
所有组件均用 Python 编写,可在任意现代 Linux 主机上运行。设计有意保持模块化,以便以最小摩擦添加新的检测器、情报源或响应动作。
## 架构
```
+-------------------+ +-------------------+ +-------------------+
| LogStreamer | ---> | DetectionEngine | ---> | Detection Event |
| (tail‑F semantics)| | (composes )| | dict |
+-------------------+ +-------------------+ +-------------------+
| |
v v
+-------------------+ +-------------------+
| SOCSystem | | AlertLogger |
| (orchestrates) | | (JSON‑Lines) |
+-------------------+ +-------------------+
|
v
+-------------------+ +-------------------+
| SOCCrew (CrewAI) | ---> | Slack / Firewall |
+-------------------+ +-------------------+
|
v
+-------------------+
| Persisted Alerts |
+-------------------+
```
- **`main.py`** – 入口点;实例化 `SOCSystem` 并启动后台 AI 分析工作器。
- **`utils/log_streamer.py`** – 提供 `LogStreamer`(实时日志读取)与 `MockLogStreamer`(用于确定性单元测试)。
- **`detection/engine.py`** – 定义五个检测器(`BruteForceDetector`、`InvalidUserDetector`、`RootLoginDetector`、`SudoFailureDetector`、`RapidReconnectDetector`)。`DetectionEngine` 在每行输入上运行所有检测器并聚合结果。
- **`agents/crew.py`** – 构建三个 CrewAI Agent:
1. **日志分析器** – 提取攻击类型、IP、用户名、计数、严重性。
2. **威胁情报** – 接收预取的 VirusTotal 数据(通过 `tools/vt_client.py`)。
3. **SOC 响应器** – 决定 `BLOCK` 或 `ALLOW`,并输出严格 JSON。
- **`tools/firewall.py`** – 对 `iptables` 的轻量封装(当 `FIREWALL_DRY_RUN` 为真时为空操作)。遵循 `config.py` 中的白名单。
- **`integrations/slack_alert.py`** – 将格式化的 Markdown 告警发布到 Slack Webhook。
- **`utils/alert_logger.py`** – 将每个决策写入 `alerts.log`(JSON Lines)并提供查询辅助方法(`filter`、`stats`)。
- **`config.py`** – 集中配置;所有值均可通过同名环境变量覆盖(日志路径、阈值、LLM 模型、VirusTotal 密钥、白名单等)。
## 快速开始
1. **克隆仓库**
git clone https://github.com//multiagent-soc.git
cd multiagent-soc
2. **创建虚拟环境**(可选但推荐)
python -m venv .venv
source .venv/bin/activate
3. **安装 Python 依赖**
pip install -r requirements.txt
4. **拉取用于 CrewAI 的 LLM 模型**(默认使用 `mistral:latest`)。若本地使用 Ollama:
ollama pull mistral:latest
如需其他模型,请在环境变量或 `config.py` 中调整 `OLLAMA_MODEL`。
## 配置
所有可调参数位于 `config.py`。可通过同名环境变量覆盖任意值:
| 变量 | 用途 | 默认值 |
|------|------|--------|
| `AUTH_LOG_PATH` | 监控的认证日志路径 | `/var/log/auth.log` |
| `ALERT_LOG_PATH` | 告警持久化文件 | `./alerts.log` |
| `BRUTE_FORCE_THRESHOLD` | 触发暴力破解检测的失败登录计数 | `5` |
| `TIME_WINDOW_SECONDS` | 统计尝试的滑动窗口时长(秒) | `60` |
| `OLLAMA_MODEL` | CrewAI 使用的 Ollama 模型标签 | `mistral:latest` |
| `OLLAMA_BASE_URL` | Ollama API 基础 URL | `http://localhost:11434` |
| `VIRUSTOTAL_API_KEY` | VirusTotal 查询 API 密钥(可选) | `''` |
| `IP_WHITELIST` | 永不阻断的 IP 列表(逗号分隔) | `127.0.0.1,::1,10.0.0.1` |
| `FIREWALL_DRY_RUN` | `true` 时禁用实际 iptables 变更(开发调试用) | `false` |
| `LLM_TEMPERATURE` | 控制 LLM 输出随机性 | `0.1` |
| `LLM_MAX_TOKENS` | 每个 Agent 响应的最大 Token 数 | `512` |
## 运行系统
### 生产模式(真实日志)
```
export FIREWALL_DRY_RUN=false # optional – set to true for safe testing
python main.py
```
系统将:
1. 尾随 `AUTH_LOG_PATH` 指定的文件。
2. 在控制台输出彩色原始日志、检测、AI 决策与防火墙动作。
3. 将结构化告警写入 `ALERT_LOG_PATH`。
4. 若配置了 `SLACK_WEBHOOK_URL`,则发送 Slack 通知。
### 开发 / 干运行模式
```
export FIREWALL_DRY_RUN=true
python main.py
```
记录阻断但不实际执行 `iptables` 变更。
### 使用模拟日志流进行快速演示
在 `SOCSystem` 中替换真实的 `LogStreamer` 为 `MockLogStreamer`(或运行提供的测试框架)。模拟重放一组代表性认证日志行,无需 root 权限即可展示检测与 AI 响应。
## 测试
仓库包含一个专注于 `AlertLogger` 的单元测试套件,可扩展以覆盖检测器和 Crew 工作流。
```
pytest
```
*运行单个测试文件*
```
pytest tests/test_soc_system.py
```
运行需要模拟日志流的测试时,`utils/log_streamer.py` 中的 `MockLogStreamer` 类提供确定性输入。
## 扩展 SOC
### 添加新检测器
1. 在 `detection/engine.py` 中创建实现 `process(self, line: str) -> Optional[dict]` 方法的类。
2. 在 `DetectionEngine.__init__` 中通过追加到 `_detectors` 列表注册该检测器。
3. 按需定义新的检测类型常量;其余流水线(告警记录器、AI Crew)会自动处理。
### 添加新 AI 步骤
1. 在 `agents/crew.py` 中定义一个新的 CrewAI `Agent`(类似现有三个)。
2. 创建对应的 `Task` 构造器函数。
3. 在 `tasks` 列表中的启动(kickoff)前插入新任务。
4. 如需解释额外 JSON 字段,扩展 `_parse_decision`。
### 新增响应动作
- 实现包装器(例如 `tools/email_alert.py`),并在 `_run_ai_analysis` 中 Crew 决策后调用。
- 确保动作遵循白名单与干运行设置。
## 故障排查与常见问题
- **无检测出现** – 确认 `AUTH_LOG_PATH` 指向包含 SSH 认证日志的文件;使用 `tail -f /var/log/auth.log` 确认新行正在写入。
- **AI 决策始终为 `ALLOW`** – 检查 LLM 模型是否可达(Ollama 运行中)且 `OLLAMA_BASE_URL` 正确;检查 CrewAI 日志是否有错误。
- **防火墙阻断未应用** – 确保进程以足够权限(root)运行,或设置 `FIREWALL_DRY_RUN` 为 `false` 并确认 `iptables` 可用。
- **VirusTotal 增强失败** – 确认 `VIRUSTOTAL_API_KEY` 已设置且配额充足;系统将在密钥缺失时回退到占位信息。
- **Slack 通知缺失** – 在环境变量或 `config.py` 中设置 `SLACK_WEBHOOK_URL`Webhook 必须接受 POST JSON 载荷。
## 许可证
本项目根据 **MIT 许可证** 授权——详情见 `LICENSE` 文件。
## 目录
1. [概述](#overview)
2. [架构](#architecture)
3. [快速开始](#getting-started)
4. [配置](#configuration)
5. [运行系统](#running-the-system)
6. [测试](#testing)
7. [扩展 SOC](#extending-the-soc)
8. [故障排查与常见问题](#troubleshooting--faq)
9. [许可证](#license)
## 概述
该仓库实现了针对 Linux 服务器 SSH 和 sudo 活动的**实时安全监控流水线**,结合了:
- **基于规则的检测**(失败密码、root 登录尝试、快速重连等)
- **使用 CrewAI 的 AI 分类**(三个专用 Agent:日志分析、威胁情报、响应决策)
- **自动化缓解**(iptables 阻断、Slack 告警、持久化 JSON Lines 告警日志)
所有组件均用 Python 编写,可在任意现代 Linux 主机上运行。设计有意保持模块化,以便以最小摩擦添加新的检测器、情报源或响应动作。
## 架构
```
+-------------------+ +-------------------+ +-------------------+
| LogStreamer | ---> | DetectionEngine | ---> | Detection Event |
| (tail‑F semantics)| | (composes )| | dict |
+-------------------+ +-------------------+ +-------------------+
| |
v v
+-------------------+ +-------------------+
| SOCSystem | | AlertLogger |
| (orchestrates) | | (JSON‑Lines) |
+-------------------+ +-------------------+
|
v
+-------------------+ +-------------------+
| SOCCrew (CrewAI) | ---> | Slack / Firewall |
+-------------------+ +-------------------+
|
v
+-------------------+
| Persisted Alerts |
+-------------------+
```
- **`main.py`** – 入口点;实例化 `SOCSystem` 并启动后台 AI 分析工作器。
- **`utils/log_streamer.py`** – 提供 `LogStreamer`(实时日志读取)与 `MockLogStreamer`(用于确定性单元测试)。
- **`detection/engine.py`** – 定义五个检测器(`BruteForceDetector`、`InvalidUserDetector`、`RootLoginDetector`、`SudoFailureDetector`、`RapidReconnectDetector`)。`DetectionEngine` 在每行输入上运行所有检测器并聚合结果。
- **`agents/crew.py`** – 构建三个 CrewAI Agent:
1. **日志分析器** – 提取攻击类型、IP、用户名、计数、严重性。
2. **威胁情报** – 接收预取的 VirusTotal 数据(通过 `tools/vt_client.py`)。
3. **SOC 响应器** – 决定 `BLOCK` 或 `ALLOW`,并输出严格 JSON。
- **`tools/firewall.py`** – 对 `iptables` 的轻量封装(当 `FIREWALL_DRY_RUN` 为真时为空操作)。遵循 `config.py` 中的白名单。
- **`integrations/slack_alert.py`** – 将格式化的 Markdown 告警发布到 Slack Webhook。
- **`utils/alert_logger.py`** – 将每个决策写入 `alerts.log`(JSON Lines)并提供查询辅助方法(`filter`、`stats`)。
- **`config.py`** – 集中配置;所有值均可通过同名环境变量覆盖(日志路径、阈值、LLM 模型、VirusTotal 密钥、白名单等)。
## 快速开始
1. **克隆仓库**
git clone https://github.com/标签:AI辅助安全, CrewAI, FTP漏洞扫描, iptables, Linux日志监控, Python安全工具, Slack告警, SOC自动化, SSH监控, sudo审计, 事件分级, 可扩展安全, 多智能体, 威胁情报, 安全编排, 安全运营中心, 实时检测, 开发者工具, 模块化框架, 流式日志处理, 病毒总览, 网络映射, 自动化响应, 规则检测, 逆向工具