one-b1t/Mini-SOAR
GitHub: one-b1t/Mini-SOAR
Mini-SOAR 是一个基于 Logstash、Redis、Python 和 Telegram Bot 的轻量级安全编排自动化系统,用于自动检测安全告警、富化威胁情报并联动边界设备执行 IP 封堵。
Stars: 0 | Forks: 0
# MiniSOAR
MiniSOAR 是一个基于 **Logstash**、**Redis**、**Python** 和 **Telegram Bot** 的轻量级安全编排器。该系统旨在接收安全警报,对攻击者 IP 进行 enrichment,提供决策建议,并协助对 **Imperva**、**Palo Alto** 和 **Akamai** 等边界设备执行缓解流程。
本文档整合了 README、Context 和旧版 Wiki 中的说明,以便将所有关键信息集中在一处。
## 目录
- [工作流程概述](#ringkasan-alur-kerja)
- [主要功能](#fitur-utama)
- [系统架构](#arsitektur-sistem)
- [项目结构](#struktur-project)
- [环境要求](#persyaratan)
- [安装说明](#instalasi)
- [环境配置](#konfigurasi-environment)
- [拦截操作模式](#mode-operasi-blocking)
- [运行服务](#menjalankan-layanan)
- [Telegram Bot 命令](#perintah-telegram-bot)
- [测试与 Mock 流量](#testing-dan-mock-traffic)
- [Machine Learning](#machine-learning)
- [运维与安全注意事项](#catatan-operasional-dan-keamanan)
## 工作流程概述
MiniSOAR 的工作流程如下:
```
Proxy Logs / Raw Logs
↓
Elasticsearch
↓
Logstash Detection Pipeline
↓
Redis Queue
↓
Python Alert Daemon
↓
Enrichment, Event Indexing, ML Recommendation
↓
Telegram Notification / Analyst Action
↓
Mitigation Action ke Security Perimeter
```
简单来说:
1. **Logstash** 从 elasticsearch 读取日志,进行字段标准化,然后检测攻击模式或异常。
2. 有效的警报将作为缓冲队列发送到 **Redis**。
3. **MiniSOAR daemon** 从 Redis 读取事件。
4. Daemon 执行 enrichment、whitelist/bypass 检查、事件索引,以及基于规则/ML 的评分。
5. 警报发送到 **Telegram**,以便分析师查看上下文并采取行动。
6. 如果模式允许,MiniSOAR 可以对边界设备执行自动或半自动的拦截操作。
## 主要功能
- 基于 **Telegram 的 SOAR 工作流**,用于通知和分析师操作。
- **Redis 队列** 作为缓冲,确保检测和通知过程互不阻塞。
- 使用 AbuseIPDB 和 IP API 进行 **Threat intelligence enrichment**。
- **Whitelist 和 bypass list**,防止对内部或受信任的 IP 产生误报。
- **边界映射**,根据域名/网站确定缓解厂商。
- 通过 API 将缓解措施 **集成** 到安全边界。
- 提供 **MANUAL、SEMI 和 AUTO 模式**,灵活调整自动化级别。
- **Mock 模式**,确保测试不会调用真实的边界 API。
- **事件索引至 Elasticsearch**,用于审计、分析师标注和 ML 需求。
- **Machine learning baseline**,根据历史数据提供 block/allow 建议。
- **跨平台路径处理**,兼容 Linux/WSL 以及 Windows 开发主机。
## 系统架构
```
flowchart LR
A[Proxy Log / Raw Logs] --> B[Logstash]
B -->|Parse, normalize, classify| C[Redis Queue]
C -->|Alert payload| D[MiniSOAR Daemon]
D --> E[Enrichment]
D --> F[Elasticsearch Indexing]
D --> G[ML Recommendation]
G --> H[Telegram Bot]
H --> I[Analyst]
H --> J[Mitigation Action]
J --> K[Security Perimeter]
```
### 组件职责
| 组件 | 职责 |
|---|---|
| Logstash | 读取日志,进行字段标准化,检测警报,并将事件推送到 Redis。 |
| Redis | 作为检测 pipeline 和 Python worker 之间的缓冲队列。 |
| MiniSOAR Daemon | 读取事件,进行 enrichment、索引、ML 评分,并广播警报。 |
| Telegram Bot | 显示警报,接收分析师命令,并执行 callback action。 |
| Mitigation Modules | 对安全边界执行 block/unblock。 |
| Elasticsearch | 存储事件、分析师标签以及用于 ML 重新训练的数据。 |
## 项目结构
以下是项目的目录组织结构:
```
MiniSOAR/
├── logstash/ # Konfigurasi pipeline deteksi Logstash
│ ├── 01-detection.conf # Pipeline untuk normalisasi dan deteksi alert
│ ├── 02-alert-redis.conf # Pipeline pengiriman alert ke Redis
│ ├── minisoar-perimeter.yml # Pemetaan target domain ke provider mitigasi
│ └── minisoar-whitelist.yml # Whitelist domain, keyword, dan anti-judi untuk Logstash
├── minisoar/ # Kode utama aplikasi SOAR (modul Python)
│ ├── __init__.py
│ ├── bot.py # Handlers Telegram bot dan interaksi analis
│ ├── config.py # Konfigurasi environment dan variabel SOAR
│ ├── daemon.py # Core consumer loop dari Redis ke Telegram
│ ├── database.py # Pustaka koneksi database (Redis & Elasticsearch)
│ ├── utils.py # Fungsi utilitas (Enrichment, IP validation, dll.)
│ ├── mitigation/ # Integrasi mitigasi WAF dan perimeter keamanan
│ │ ├── __init__.py
│ │ ├── akamai.py # Integrasi Fast Purge / Client List Akamai
│ │ ├── core.py # Orchestrator block/unblock dan commit
│ │ ├── imperva.py # Integrasi API blocking IP Imperva
│ │ └── paloalto.py # Integrasi Address Object/Group Palo Alto
│ └── ml/ # Modul klasifikasi machine learning (Phase 1 & 2)
│ ├── __init__.py
│ ├── export.py # Pustaka export dataset dari Elasticsearch/Redis
│ ├── inference.py # Logika prediksi decision block
│ └── train.py # Pustaka training model klasifikasi baseline
├── scratch/ # Skrip uji coba lokal dan mock data
│ ├── mock_traffic.py # Injeksi data simulasi ke Redis
│ ├── test_imports.py # Verifikasi import modul Python
│ └── test_predict.py # Verifikasi fungsi prediksi ML secara lokal
├── scripts/ # Script wrapper penunjang operasional
│ ├── export_dataset.py # Wrapper pengekspor dataset ML
│ └── train_baseline.py # Wrapper pen-training model baseline ML
├── tests/ # Unit test suite (Pytest)
│ ├── __init__.py
│ ├── test_config.py # Test case untuk parsing config
│ ├── test_database.py # Test case koneksi & operasi DB
│ ├── test_mitigation.py # Test case fungsi mitigasi (mock)
│ ├── test_ml.py # Test case inference fallback
│ └── test_utils.py # Test case helper utilities
├── .gitignore
├── Changelog.md # Catatan riwayat rilis proyek
├── Readme.md # Dokumentasi utama proyek SOAR
├── WIKI.md # Dokumentasi tambahan teknis operasional
├── env.example # Contoh konfigurasi environment variable
└── requirements.txt # Daftar dependensi pustaka Python
```
为了保持运维兼容性,部分旧版 wrapper 仍然可以使用:
| 旧版 Wrapper | 新入口点 |
|---|---|
| `14_redis_telegram_alert.py` | `minisoar.daemon.main()` |
| `09-tele-soar.py` | `minisoar.bot.main()` |
| `scripts/export_dataset.py` | `minisoar.ml.export.main()` |
| `scripts/train_baseline.py` | `minisoar.ml.train.main()` |
## 环境要求
- Python 3.10+
- Redis Server
- Telegram Bot Token 和 Chat ID
- Elasticsearch(可选,但建议用于事件存储和 ML 重新训练)
- 具备 API 配置功能的安全边界设备
## 安装说明
### 1. Windows
打开 PowerShell 或 Command Prompt,进入项目文件夹,然后创建并激活虚拟环境:
```
python -m venv .venv
.venv\Scripts\activate
pip install -r requirements.txt
```
### 2. Linux(现代版 Ubuntu/Debian)
在现代 Linux(Python 3.11+ / PEP 668)上,全局安装 Python 包时会出现 `externally-managed-environment` 错误。您**必须**使用虚拟环境来隔离项目依赖:
```
# 更新并安装系统 venv 依赖(如果尚未安装)
sudo apt update && sudo apt install -y python3-venv python3-full
# 创建 virtual environment
python3 -m venv .venv
# 激活 virtual environment
source .venv/bin/activate
# 安装项目依赖
pip install -r requirements.txt
```
### 3. Logstash(检测基础设施)
MiniSOAR 使用 **Logstash** 从安全日志中检测威胁,并将警报转发至 Redis。Logstash 配置使用了 **Pipeline-to-Pipeline Communication** 系统,将处理过程分为两个阶段:
* **Detection Pipeline ([01-detection.conf]):** 定期从 Elasticsearch 拉取日志,进行字段标准化,并使用 `aggregate` 过滤器进行威胁关联和聚合。
* **Alert Pipeline ([02-alert-redis.conf]):** 通过 Logstash 内部模块接收 detection pipeline 的输出,设定严重程度(*severity*),格式化通知文本,并将其发送到 Redis 队列 `logstash_alert_queue`。
#### A. 安装 Logstash (Debian/Ubuntu)
运行以下命令以安装 Elastic 官方仓库并安装 Logstash:
```
# 1. 安装 Java OpenJDK(Logstash 的先决条件)
sudo apt install -y openjdk-17-jre-headless
# 2. 添加 Elastic GPG Key 和 Repositori
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo gpg --dearmor -o /usr/share/keyrings/elasticsearch-keyring.gpg
echo "deb [signed-by=/usr/share/keyrings/elasticsearch-keyring.gpg] https://artifacts.elastic.co/packages/8.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-8.x.list
# 3. 安装 Logstash
sudo apt update && sudo apt install -y logstash
```
#### B. 配置 Logstash 多重 Pipeline
将配置文件 [01-detection.conf] 和 [02-alert-redis.conf] 复制到您的服务器上(例如放在 `/home/ubuntu/Mini-SOAR-dev/`)。
编辑 `/etc/logstash/pipelines.yml` 文件以映射这两个 pipeline:
```
# /etc/logstash/pipelines.yml
- pipeline.id: detection-pipeline
path.config: "/home/ubuntu/Mini-SOAR-dev/01-detection.conf"
# CRITICAL: Wajib bernilai 1 agar filter 'aggregate' berjalan konsisten pada satu utas (thread).
pipeline.workers: 1
- pipeline.id: alert-pipeline
path.config: "/home/ubuntu/Mini-SOAR-dev/02-alert-redis.conf"
```
#### C. 运行 Logstash 服务
```
# 激活并以 systemd service 运行 Logstash
sudo systemctl daemon-reload
sudo systemctl enable logstash
sudo systemctl start logstash
# 监控 Logstash logs 以验证错误
sudo tail -f /var/log/logstash/logstash-plain.log
```
#### D. 将 MiniSOAR 作为 Systemd Service 运行
为了让告警消费者 daemon 和 Telegram bot 在后台持久运行,并在崩溃或服务器重启后自动启动,请创建以下 systemd service:
##### 1. Alert Daemon 服务 (`minisoar-daemon.service`)
创建一个新的 systemd unit 文件:
```
sudo nano /etc/systemd/system/minisoar-daemon.service
```
粘贴以下配置(请根据您的环境调整 `User` 和 `WorkingDirectory`):
```
[Unit]
Description=MiniSOAR Alert Daemon Service
After=network.target redis-server.service
[Service]
Type=simple
User=ubuntu
WorkingDirectory=/home/ubuntu/Mini-SOAR
ExecStart=/home/ubuntu/Mini-SOAR/.venv/bin/python -m minisoar.daemon
Restart=on-failure
RestartSec=5s
Environment=PYTHONUNBUFFERED=1
[Install]
WantedBy=multi-user.target
```
##### 2. Telegram Bot 服务 (`minisoar-bot.service`)
创建一个新的 systemd unit 文件:
```
sudo nano /etc/systemd/system/minisoar-bot.service
```
粘贴以下配置:
```
[Unit]
Description=MiniSOAR Telegram Bot Service
After=network.target
[Service]
Type=simple
User=ubuntu
WorkingDirectory=/home/ubuntu/Mini-SOAR
ExecStart=/home/ubuntu/Mini-SOAR/.venv/bin/python -m minisoar.bot
Restart=on-failure
RestartSec=5s
Environment=PYTHONUNBUFFERED=1
[Install]
WantedBy=multi-user.target
```
##### 3. 启用与管理服务
运行以下命令以重新加载 systemd,然后启用并启动这两个服务:
```
# Reload systemd configuration
sudo systemctl daemon-reload
# Enable 开机 auto-start
sudo systemctl enable minisoar-daemon
sudo systemctl enable minisoar-bot
# 立即运行 service
sudo systemctl start minisoar-daemon
sudo systemctl start minisoar-bot
# 检查各个 service 的状态
sudo systemctl status minisoar-daemon
sudo systemctl status minisoar-bot
```
使用以下命令监控服务的活动日志 (stdout/stderr):
```
# 监控 Alert Daemon log
journalctl -u minisoar-daemon -f
# 监控 Telegram Bot log
journalctl -u minisoar-bot -f
```
## 环境配置
在项目根目录下创建 `.env` 文件。请使用 `env.example` 作为参考模板。
```
# === Redis Queue ===
REDIS_HOST=127.0.0.1
REDIS_PORT=6379
REDIS_KEY=logstash_alert_queue
# === Telegram Bot ===
TELEGRAM_BOT=YOUR_TELEGRAM_BOT_TOKEN
TELEGRAM_CHAT_ID=YOUR_TELEGRAM_CHAT_ID
TELEGRAM_PROCESS_CHAT_ID=YOUR_PROCESS_CHAT_ID
ALLOWED_USERS=123456789,987654321
# === Threat Intelligence ===
ABUSEIPDB_API_KEY=YOUR_ABUSEIPDB_API_KEY
ABUSEIPDB_CACHE_TTL=21600
IPAPI_CACHE_TTL=43200
LOOKUP_TIMEOUT=4
# === Elasticsearch ===
ES_HOSTS=https://YOUR_ELASTICSEARCH_HOST:9200
ES_USER=YOUR_ES_USER
ES_PASS=YOUR_ES_PASS
ES_VERIFY=false
ES_EVENTS_INDEX_PREFIX=minisoar-events
ES_LABELS_INDEX_PREFIX=minisoar-labels
ES_TIMEOUT=6
MINISOAR_EVENT_WINDOW=60
# === Security Perimeter Credentials ===
# Imperva
IMPERVA_BASE_URL=https://YOUR_IMPERVA_BASE_URL
IMPERVA_USERNAME=YOUR_IMPERVA_USERNAME
IMPERVA_PASSWORD=YOUR_IMPERVA_PASSWORD
IMPERVA_GROUP_NAME=Blocked-IP-Addresses
# Palo Alto
PA_HOST=https://YOUR_PA_HOST
PA_API_KEY=YOUR_PA_API_KEY
PA_VSYS=vsys1
PA_GROUP=YOUR_PA_GROUP
PA_ADMIN=YOUR_PA_ADMIN
# Akamai
AKAMAI_BASEURL=https://YOUR_AKAMAI_BASEURL
AKAMAI_LIST_ID=YOUR_AKAMAI_LIST_ID
AKAMAI_CLIENT_TOKEN=YOUR_AKAMAI_CLIENT_TOKEN
AKAMAI_CLIENT_SECRET=YOUR_AKAMAI_CLIENT_SECRET
AKAMAI_ACCESS_TOKEN=YOUR_AKAMAI_ACCESS_TOKEN
AKAMAI_ACCOUNT_SWITCH=
# === Testing / Safety ===
MINISOAR_MOCK=1
# === Blocking Mode ===
MINISOAR_BLOCKING_MODE=MANUAL
```,StartLine:191,TargetContent:
### 支持配置文件
| File | Fungsi |
|---|---|
| `logstash/minisoar-perimeter.yml` | Mapping website/domain ke provider mitigasi. |
| `logstash/minisoar-whitelist.yml` | Whitelist website, keyword, dan anti-judi untuk pendeteksian Logstash. |
| `minisoar-whitelist.txt` | IP/CIDR trusted. Alert tetap dikirim, tetapi tombol block disembunyikan. |
| `minisoar-bypass.txt` | IP/CIDR yang diabaikan total. Alert tidak dikirim ke Telegram. |
---
## Blocking 操作模式
MiniSOAR mendukung tiga mode melalui `MINISOAR_BLOCKING_MODE`.
| Mode | Perilaku | Rekomendasi Penggunaan |
|---|---|---|
| `MANUAL` | Semua alert dikirim ke Telegram. Analis memilih action secara manual. | Paling aman untuk production awal. |
| `SEMI` | AI memberi rekomendasi. Jika confidence tinggi, sistem dapat melakukan auto-block. | Cocok setelah rule dan whitelist stabil. |
| `AUTO` | Prediksi berbahaya langsung diblokir ke perimeter. | Gunakan hanya jika model, whitelist, dan rollback sudah matang. |
> Untuk staging atau demo, aktifkan `MINISOAR_MOCK=1` agar API block/unblock tidak benar-benar dijalankan ke vendor perimeter.
---
## 运行服务
Jalankan dua proses utama berikut di terminal terpisah.
### 1. Alert daemon
Daemon membaca queue Redis, melakukan enrichment, indexing, ML scoring, dan mengirim alert ke Telegram.
```bash
python -m minisoar.daemon
```
或者通过旧版 wrapper 运行:
```
python 14_redis_telegram_alert.py
```
### 2. Telegram bot handler
Bot 接收来自 Telegram 的操作员命令和 callback action。
```
python -m minisoar.bot
```
或者通过旧版 wrapper 运行:
```
python 09-tele-soar.py
```
## Telegram Bot 命令(Imperva、Palo Alto 和 Akamai 案例研究)
| 命令 | 描述 | 平台 |
|---|---|---|
| `/help` | 显示帮助和命令摘要。 | Bot |
| `/blockonimperva ` | 将 IP 添加到 Imperva blocklist。 | Imperva |
| `/unblockonimperva ` | 从 Imperva blocklist 中移除 IP。 | Imperva |
| `/blocklistimperva` | 显示在 Imperva 上被 block 的 IP。 | Imperva |
| `/tracev ` | 在 Imperva 中追踪违规行为。 | Imperva |
| `/blockonpalo ` | 将 IP 添加到 Palo Alto address group。 | Palo Alto |
| `/unblockonpalo ` | 从 Palo Alto address group 中移除 IP。 | Palo Alto |
| `/commitpalo` | 执行 Palo Alto 的 partial commit。 | Palo Alto |
| `/blockonakamai ` | 将 IP 添加到 Akamai Client List。 | Akamai |
| `/unblockonakamai ` | 从 Akamai Client List 中移除 IP。 | Akamai |
| `/blocklistakamai` | 显示在 Akamai 上被 block 的 IP。 | Akamai |
| `/activateakamai` | 将 Akamai 配置激活到 STAGING 和 PRODUCTION。 | Akamai |
| `/activationstatus ` | 检查 Akamai 的激活状态。 | Akamai |
## 测试与 Mock 流量
要在没有真实 Logstash 的情况下测试端到端流程,请将 payload 直接注入到 Redis。
确保 daemon 和 bot 已经运行,然后执行:
```
python -c "import redis, json, datetime; r = redis.StrictRedis(host='127.0.0.1', port=6379); payload = {'alert': {'type': 'alert_webshell_immediate', 'server_name': 'mock-target.com', 'src_ip': '8.8.8.8', 'method': 'POST', 'url': '/api/upload.php', 'status': '200', 'severity': 'high'}, 'tags': ['alert_webshell_immediate'], '@timestamp': datetime.datetime.now(datetime.timezone.utc).isoformat()}; r.lpush('logstash_alert_queue', json.dumps(payload)); print('Mock traffic berhasil dikirim ke Redis!')"
```
如果可用,可以使用以下命令运行更完整的 mock 流量:
```
python scratch/mock_traffic.py
```
## Machine Learning
MiniSOAR 拥有一个简单的 ML pipeline,用于根据历史事件和分析师的标签提供 block/allow 建议。
### 1. 导出数据集
```
python -m minisoar.ml.export
```
或者通过旧版 wrapper 运行:
```
python scripts/export_dataset.py
```
主要输出:`dataset.csv`。
### 2. 训练 baseline 模型
```
python -m minisoar.ml.train
```
或者通过旧版 wrapper 运行:
```
python scripts/train_baseline.py
```
主要输出:`baseline_model.joblib`。
### 3. 重启 daemon
模型在 daemon 启动时加载。重新训练后,请重启 daemon 以使用新模型。
```
# 使用 Ctrl+C 停止 daemon,然后重新运行
python -m minisoar.daemon
```
## 运维与安全注意事项
- 请勿将 `.env` 文件或厂商凭证提交到代码仓库中。
- 在开发、演示或 staging 环境下,请设置 `MINISOAR_MOCK=1`。
- 在启用 `SEMI` 或 `AUTO` 模式之前,请先从 `MINISOAR_BLOCKING_MODE=MANUAL` 开始。
- 确保 `ALLOWED_USERS` 仅包含经过授权的操作员 Telegram user ID。
- 定期审查 whitelist 和 bypass list。
- 在生产环境中,请对 Elasticsearch 使用 TLS 验证。除非是用于实验室/测试,否则请避免使用 `ES_VERIFY=false`。
- 保留操作审计日志,以便可以追溯 block/unblock 活动。
- 对于使用 `aggregate` 过滤器的 Logstash pipeline,请设置 `pipeline.workers: 1`,以确保事件聚合保持一致。
标签:Python, SOAR, 内容过滤, 安全编排与自动化响应, 搜索引擎查询, 无后门, 自动化封禁, 自动化防御, 逆向工具