one-b1t/Mini-SOAR

GitHub: one-b1t/Mini-SOAR

Mini-SOAR 是一个基于 Logstash、Redis、Python 和 Telegram Bot 的轻量级安全编排自动化系统,用于自动检测安全告警、富化威胁情报并联动边界设备执行 IP 封堵。

Stars: 0 | Forks: 0

# MiniSOAR MiniSOAR 是一个基于 **Logstash**、**Redis**、**Python** 和 **Telegram Bot** 的轻量级安全编排器。该系统旨在接收安全警报,对攻击者 IP 进行 enrichment,提供决策建议,并协助对 **Imperva**、**Palo Alto** 和 **Akamai** 等边界设备执行缓解流程。 本文档整合了 README、Context 和旧版 Wiki 中的说明,以便将所有关键信息集中在一处。 ## 目录 - [工作流程概述](#ringkasan-alur-kerja) - [主要功能](#fitur-utama) - [系统架构](#arsitektur-sistem) - [项目结构](#struktur-project) - [环境要求](#persyaratan) - [安装说明](#instalasi) - [环境配置](#konfigurasi-environment) - [拦截操作模式](#mode-operasi-blocking) - [运行服务](#menjalankan-layanan) - [Telegram Bot 命令](#perintah-telegram-bot) - [测试与 Mock 流量](#testing-dan-mock-traffic) - [Machine Learning](#machine-learning) - [运维与安全注意事项](#catatan-operasional-dan-keamanan) ## 工作流程概述 MiniSOAR 的工作流程如下: ``` Proxy Logs / Raw Logs ↓ Elasticsearch ↓ Logstash Detection Pipeline ↓ Redis Queue ↓ Python Alert Daemon ↓ Enrichment, Event Indexing, ML Recommendation ↓ Telegram Notification / Analyst Action ↓ Mitigation Action ke Security Perimeter ``` 简单来说: 1. **Logstash** 从 elasticsearch 读取日志,进行字段标准化,然后检测攻击模式或异常。 2. 有效的警报将作为缓冲队列发送到 **Redis**。 3. **MiniSOAR daemon** 从 Redis 读取事件。 4. Daemon 执行 enrichment、whitelist/bypass 检查、事件索引,以及基于规则/ML 的评分。 5. 警报发送到 **Telegram**,以便分析师查看上下文并采取行动。 6. 如果模式允许,MiniSOAR 可以对边界设备执行自动或半自动的拦截操作。 ## 主要功能 - 基于 **Telegram 的 SOAR 工作流**,用于通知和分析师操作。 - **Redis 队列** 作为缓冲,确保检测和通知过程互不阻塞。 - 使用 AbuseIPDB 和 IP API 进行 **Threat intelligence enrichment**。 - **Whitelist 和 bypass list**,防止对内部或受信任的 IP 产生误报。 - **边界映射**,根据域名/网站确定缓解厂商。 - 通过 API 将缓解措施 **集成** 到安全边界。 - 提供 **MANUAL、SEMI 和 AUTO 模式**,灵活调整自动化级别。 - **Mock 模式**,确保测试不会调用真实的边界 API。 - **事件索引至 Elasticsearch**,用于审计、分析师标注和 ML 需求。 - **Machine learning baseline**,根据历史数据提供 block/allow 建议。 - **跨平台路径处理**,兼容 Linux/WSL 以及 Windows 开发主机。 ## 系统架构 ``` flowchart LR A[Proxy Log / Raw Logs] --> B[Logstash] B -->|Parse, normalize, classify| C[Redis Queue] C -->|Alert payload| D[MiniSOAR Daemon] D --> E[Enrichment] D --> F[Elasticsearch Indexing] D --> G[ML Recommendation] G --> H[Telegram Bot] H --> I[Analyst] H --> J[Mitigation Action] J --> K[Security Perimeter] ``` ### 组件职责 | 组件 | 职责 | |---|---| | Logstash | 读取日志,进行字段标准化,检测警报,并将事件推送到 Redis。 | | Redis | 作为检测 pipeline 和 Python worker 之间的缓冲队列。 | | MiniSOAR Daemon | 读取事件,进行 enrichment、索引、ML 评分,并广播警报。 | | Telegram Bot | 显示警报,接收分析师命令,并执行 callback action。 | | Mitigation Modules | 对安全边界执行 block/unblock。 | | Elasticsearch | 存储事件、分析师标签以及用于 ML 重新训练的数据。 | ## 项目结构 以下是项目的目录组织结构: ``` MiniSOAR/ ├── logstash/ # Konfigurasi pipeline deteksi Logstash │ ├── 01-detection.conf # Pipeline untuk normalisasi dan deteksi alert │ ├── 02-alert-redis.conf # Pipeline pengiriman alert ke Redis │ ├── minisoar-perimeter.yml # Pemetaan target domain ke provider mitigasi │ └── minisoar-whitelist.yml # Whitelist domain, keyword, dan anti-judi untuk Logstash ├── minisoar/ # Kode utama aplikasi SOAR (modul Python) │ ├── __init__.py │ ├── bot.py # Handlers Telegram bot dan interaksi analis │ ├── config.py # Konfigurasi environment dan variabel SOAR │ ├── daemon.py # Core consumer loop dari Redis ke Telegram │ ├── database.py # Pustaka koneksi database (Redis & Elasticsearch) │ ├── utils.py # Fungsi utilitas (Enrichment, IP validation, dll.) │ ├── mitigation/ # Integrasi mitigasi WAF dan perimeter keamanan │ │ ├── __init__.py │ │ ├── akamai.py # Integrasi Fast Purge / Client List Akamai │ │ ├── core.py # Orchestrator block/unblock dan commit │ │ ├── imperva.py # Integrasi API blocking IP Imperva │ │ └── paloalto.py # Integrasi Address Object/Group Palo Alto │ └── ml/ # Modul klasifikasi machine learning (Phase 1 & 2) │ ├── __init__.py │ ├── export.py # Pustaka export dataset dari Elasticsearch/Redis │ ├── inference.py # Logika prediksi decision block │ └── train.py # Pustaka training model klasifikasi baseline ├── scratch/ # Skrip uji coba lokal dan mock data │ ├── mock_traffic.py # Injeksi data simulasi ke Redis │ ├── test_imports.py # Verifikasi import modul Python │ └── test_predict.py # Verifikasi fungsi prediksi ML secara lokal ├── scripts/ # Script wrapper penunjang operasional │ ├── export_dataset.py # Wrapper pengekspor dataset ML │ └── train_baseline.py # Wrapper pen-training model baseline ML ├── tests/ # Unit test suite (Pytest) │ ├── __init__.py │ ├── test_config.py # Test case untuk parsing config │ ├── test_database.py # Test case koneksi & operasi DB │ ├── test_mitigation.py # Test case fungsi mitigasi (mock) │ ├── test_ml.py # Test case inference fallback │ └── test_utils.py # Test case helper utilities ├── .gitignore ├── Changelog.md # Catatan riwayat rilis proyek ├── Readme.md # Dokumentasi utama proyek SOAR ├── WIKI.md # Dokumentasi tambahan teknis operasional ├── env.example # Contoh konfigurasi environment variable └── requirements.txt # Daftar dependensi pustaka Python ``` 为了保持运维兼容性,部分旧版 wrapper 仍然可以使用: | 旧版 Wrapper | 新入口点 | |---|---| | `14_redis_telegram_alert.py` | `minisoar.daemon.main()` | | `09-tele-soar.py` | `minisoar.bot.main()` | | `scripts/export_dataset.py` | `minisoar.ml.export.main()` | | `scripts/train_baseline.py` | `minisoar.ml.train.main()` | ## 环境要求 - Python 3.10+ - Redis Server - Telegram Bot Token 和 Chat ID - Elasticsearch(可选,但建议用于事件存储和 ML 重新训练) - 具备 API 配置功能的安全边界设备 ## 安装说明 ### 1. Windows 打开 PowerShell 或 Command Prompt,进入项目文件夹,然后创建并激活虚拟环境: ``` python -m venv .venv .venv\Scripts\activate pip install -r requirements.txt ``` ### 2. Linux(现代版 Ubuntu/Debian) 在现代 Linux(Python 3.11+ / PEP 668)上,全局安装 Python 包时会出现 `externally-managed-environment` 错误。您**必须**使用虚拟环境来隔离项目依赖: ``` # 更新并安装系统 venv 依赖(如果尚未安装) sudo apt update && sudo apt install -y python3-venv python3-full # 创建 virtual environment python3 -m venv .venv # 激活 virtual environment source .venv/bin/activate # 安装项目依赖 pip install -r requirements.txt ``` ### 3. Logstash(检测基础设施) MiniSOAR 使用 **Logstash** 从安全日志中检测威胁,并将警报转发至 Redis。Logstash 配置使用了 **Pipeline-to-Pipeline Communication** 系统,将处理过程分为两个阶段: * **Detection Pipeline ([01-detection.conf]):** 定期从 Elasticsearch 拉取日志,进行字段标准化,并使用 `aggregate` 过滤器进行威胁关联和聚合。 * **Alert Pipeline ([02-alert-redis.conf]):** 通过 Logstash 内部模块接收 detection pipeline 的输出,设定严重程度(*severity*),格式化通知文本,并将其发送到 Redis 队列 `logstash_alert_queue`。 #### A. 安装 Logstash (Debian/Ubuntu) 运行以下命令以安装 Elastic 官方仓库并安装 Logstash: ``` # 1. 安装 Java OpenJDK(Logstash 的先决条件) sudo apt install -y openjdk-17-jre-headless # 2. 添加 Elastic GPG Key 和 Repositori wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo gpg --dearmor -o /usr/share/keyrings/elasticsearch-keyring.gpg echo "deb [signed-by=/usr/share/keyrings/elasticsearch-keyring.gpg] https://artifacts.elastic.co/packages/8.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-8.x.list # 3. 安装 Logstash sudo apt update && sudo apt install -y logstash ``` #### B. 配置 Logstash 多重 Pipeline 将配置文件 [01-detection.conf] 和 [02-alert-redis.conf] 复制到您的服务器上(例如放在 `/home/ubuntu/Mini-SOAR-dev/`)。 编辑 `/etc/logstash/pipelines.yml` 文件以映射这两个 pipeline: ``` # /etc/logstash/pipelines.yml - pipeline.id: detection-pipeline path.config: "/home/ubuntu/Mini-SOAR-dev/01-detection.conf" # CRITICAL: Wajib bernilai 1 agar filter 'aggregate' berjalan konsisten pada satu utas (thread). pipeline.workers: 1 - pipeline.id: alert-pipeline path.config: "/home/ubuntu/Mini-SOAR-dev/02-alert-redis.conf" ``` #### C. 运行 Logstash 服务 ``` # 激活并以 systemd service 运行 Logstash sudo systemctl daemon-reload sudo systemctl enable logstash sudo systemctl start logstash # 监控 Logstash logs 以验证错误 sudo tail -f /var/log/logstash/logstash-plain.log ``` #### D. 将 MiniSOAR 作为 Systemd Service 运行 为了让告警消费者 daemon 和 Telegram bot 在后台持久运行,并在崩溃或服务器重启后自动启动,请创建以下 systemd service: ##### 1. Alert Daemon 服务 (`minisoar-daemon.service`) 创建一个新的 systemd unit 文件: ``` sudo nano /etc/systemd/system/minisoar-daemon.service ``` 粘贴以下配置(请根据您的环境调整 `User` 和 `WorkingDirectory`): ``` [Unit] Description=MiniSOAR Alert Daemon Service After=network.target redis-server.service [Service] Type=simple User=ubuntu WorkingDirectory=/home/ubuntu/Mini-SOAR ExecStart=/home/ubuntu/Mini-SOAR/.venv/bin/python -m minisoar.daemon Restart=on-failure RestartSec=5s Environment=PYTHONUNBUFFERED=1 [Install] WantedBy=multi-user.target ``` ##### 2. Telegram Bot 服务 (`minisoar-bot.service`) 创建一个新的 systemd unit 文件: ``` sudo nano /etc/systemd/system/minisoar-bot.service ``` 粘贴以下配置: ``` [Unit] Description=MiniSOAR Telegram Bot Service After=network.target [Service] Type=simple User=ubuntu WorkingDirectory=/home/ubuntu/Mini-SOAR ExecStart=/home/ubuntu/Mini-SOAR/.venv/bin/python -m minisoar.bot Restart=on-failure RestartSec=5s Environment=PYTHONUNBUFFERED=1 [Install] WantedBy=multi-user.target ``` ##### 3. 启用与管理服务 运行以下命令以重新加载 systemd,然后启用并启动这两个服务: ``` # Reload systemd configuration sudo systemctl daemon-reload # Enable 开机 auto-start sudo systemctl enable minisoar-daemon sudo systemctl enable minisoar-bot # 立即运行 service sudo systemctl start minisoar-daemon sudo systemctl start minisoar-bot # 检查各个 service 的状态 sudo systemctl status minisoar-daemon sudo systemctl status minisoar-bot ``` 使用以下命令监控服务的活动日志 (stdout/stderr): ``` # 监控 Alert Daemon log journalctl -u minisoar-daemon -f # 监控 Telegram Bot log journalctl -u minisoar-bot -f ``` ## 环境配置 在项目根目录下创建 `.env` 文件。请使用 `env.example` 作为参考模板。 ``` # === Redis Queue === REDIS_HOST=127.0.0.1 REDIS_PORT=6379 REDIS_KEY=logstash_alert_queue # === Telegram Bot === TELEGRAM_BOT=YOUR_TELEGRAM_BOT_TOKEN TELEGRAM_CHAT_ID=YOUR_TELEGRAM_CHAT_ID TELEGRAM_PROCESS_CHAT_ID=YOUR_PROCESS_CHAT_ID ALLOWED_USERS=123456789,987654321 # === Threat Intelligence === ABUSEIPDB_API_KEY=YOUR_ABUSEIPDB_API_KEY ABUSEIPDB_CACHE_TTL=21600 IPAPI_CACHE_TTL=43200 LOOKUP_TIMEOUT=4 # === Elasticsearch === ES_HOSTS=https://YOUR_ELASTICSEARCH_HOST:9200 ES_USER=YOUR_ES_USER ES_PASS=YOUR_ES_PASS ES_VERIFY=false ES_EVENTS_INDEX_PREFIX=minisoar-events ES_LABELS_INDEX_PREFIX=minisoar-labels ES_TIMEOUT=6 MINISOAR_EVENT_WINDOW=60 # === Security Perimeter Credentials === # Imperva IMPERVA_BASE_URL=https://YOUR_IMPERVA_BASE_URL IMPERVA_USERNAME=YOUR_IMPERVA_USERNAME IMPERVA_PASSWORD=YOUR_IMPERVA_PASSWORD IMPERVA_GROUP_NAME=Blocked-IP-Addresses # Palo Alto PA_HOST=https://YOUR_PA_HOST PA_API_KEY=YOUR_PA_API_KEY PA_VSYS=vsys1 PA_GROUP=YOUR_PA_GROUP PA_ADMIN=YOUR_PA_ADMIN # Akamai AKAMAI_BASEURL=https://YOUR_AKAMAI_BASEURL AKAMAI_LIST_ID=YOUR_AKAMAI_LIST_ID AKAMAI_CLIENT_TOKEN=YOUR_AKAMAI_CLIENT_TOKEN AKAMAI_CLIENT_SECRET=YOUR_AKAMAI_CLIENT_SECRET AKAMAI_ACCESS_TOKEN=YOUR_AKAMAI_ACCESS_TOKEN AKAMAI_ACCOUNT_SWITCH= # === Testing / Safety === MINISOAR_MOCK=1 # === Blocking Mode === MINISOAR_BLOCKING_MODE=MANUAL ```,StartLine:191,TargetContent: ### 支持配置文件 | File | Fungsi | |---|---| | `logstash/minisoar-perimeter.yml` | Mapping website/domain ke provider mitigasi. | | `logstash/minisoar-whitelist.yml` | Whitelist website, keyword, dan anti-judi untuk pendeteksian Logstash. | | `minisoar-whitelist.txt` | IP/CIDR trusted. Alert tetap dikirim, tetapi tombol block disembunyikan. | | `minisoar-bypass.txt` | IP/CIDR yang diabaikan total. Alert tidak dikirim ke Telegram. | --- ## Blocking 操作模式 MiniSOAR mendukung tiga mode melalui `MINISOAR_BLOCKING_MODE`. | Mode | Perilaku | Rekomendasi Penggunaan | |---|---|---| | `MANUAL` | Semua alert dikirim ke Telegram. Analis memilih action secara manual. | Paling aman untuk production awal. | | `SEMI` | AI memberi rekomendasi. Jika confidence tinggi, sistem dapat melakukan auto-block. | Cocok setelah rule dan whitelist stabil. | | `AUTO` | Prediksi berbahaya langsung diblokir ke perimeter. | Gunakan hanya jika model, whitelist, dan rollback sudah matang. | > Untuk staging atau demo, aktifkan `MINISOAR_MOCK=1` agar API block/unblock tidak benar-benar dijalankan ke vendor perimeter. --- ## 运行服务 Jalankan dua proses utama berikut di terminal terpisah. ### 1. Alert daemon Daemon membaca queue Redis, melakukan enrichment, indexing, ML scoring, dan mengirim alert ke Telegram. ```bash python -m minisoar.daemon ``` 或者通过旧版 wrapper 运行: ``` python 14_redis_telegram_alert.py ``` ### 2. Telegram bot handler Bot 接收来自 Telegram 的操作员命令和 callback action。 ``` python -m minisoar.bot ``` 或者通过旧版 wrapper 运行: ``` python 09-tele-soar.py ``` ## Telegram Bot 命令(Imperva、Palo Alto 和 Akamai 案例研究) | 命令 | 描述 | 平台 | |---|---|---| | `/help` | 显示帮助和命令摘要。 | Bot | | `/blockonimperva ` | 将 IP 添加到 Imperva blocklist。 | Imperva | | `/unblockonimperva ` | 从 Imperva blocklist 中移除 IP。 | Imperva | | `/blocklistimperva` | 显示在 Imperva 上被 block 的 IP。 | Imperva | | `/tracev ` | 在 Imperva 中追踪违规行为。 | Imperva | | `/blockonpalo ` | 将 IP 添加到 Palo Alto address group。 | Palo Alto | | `/unblockonpalo ` | 从 Palo Alto address group 中移除 IP。 | Palo Alto | | `/commitpalo` | 执行 Palo Alto 的 partial commit。 | Palo Alto | | `/blockonakamai ` | 将 IP 添加到 Akamai Client List。 | Akamai | | `/unblockonakamai ` | 从 Akamai Client List 中移除 IP。 | Akamai | | `/blocklistakamai` | 显示在 Akamai 上被 block 的 IP。 | Akamai | | `/activateakamai` | 将 Akamai 配置激活到 STAGING 和 PRODUCTION。 | Akamai | | `/activationstatus ` | 检查 Akamai 的激活状态。 | Akamai | ## 测试与 Mock 流量 要在没有真实 Logstash 的情况下测试端到端流程,请将 payload 直接注入到 Redis。 确保 daemon 和 bot 已经运行,然后执行: ``` python -c "import redis, json, datetime; r = redis.StrictRedis(host='127.0.0.1', port=6379); payload = {'alert': {'type': 'alert_webshell_immediate', 'server_name': 'mock-target.com', 'src_ip': '8.8.8.8', 'method': 'POST', 'url': '/api/upload.php', 'status': '200', 'severity': 'high'}, 'tags': ['alert_webshell_immediate'], '@timestamp': datetime.datetime.now(datetime.timezone.utc).isoformat()}; r.lpush('logstash_alert_queue', json.dumps(payload)); print('Mock traffic berhasil dikirim ke Redis!')" ``` 如果可用,可以使用以下命令运行更完整的 mock 流量: ``` python scratch/mock_traffic.py ``` ## Machine Learning MiniSOAR 拥有一个简单的 ML pipeline,用于根据历史事件和分析师的标签提供 block/allow 建议。 ### 1. 导出数据集 ``` python -m minisoar.ml.export ``` 或者通过旧版 wrapper 运行: ``` python scripts/export_dataset.py ``` 主要输出:`dataset.csv`。 ### 2. 训练 baseline 模型 ``` python -m minisoar.ml.train ``` 或者通过旧版 wrapper 运行: ``` python scripts/train_baseline.py ``` 主要输出:`baseline_model.joblib`。 ### 3. 重启 daemon 模型在 daemon 启动时加载。重新训练后,请重启 daemon 以使用新模型。 ``` # 使用 Ctrl+C 停止 daemon,然后重新运行 python -m minisoar.daemon ``` ## 运维与安全注意事项 - 请勿将 `.env` 文件或厂商凭证提交到代码仓库中。 - 在开发、演示或 staging 环境下,请设置 `MINISOAR_MOCK=1`。 - 在启用 `SEMI` 或 `AUTO` 模式之前,请先从 `MINISOAR_BLOCKING_MODE=MANUAL` 开始。 - 确保 `ALLOWED_USERS` 仅包含经过授权的操作员 Telegram user ID。 - 定期审查 whitelist 和 bypass list。 - 在生产环境中,请对 Elasticsearch 使用 TLS 验证。除非是用于实验室/测试,否则请避免使用 `ES_VERIFY=false`。 - 保留操作审计日志,以便可以追溯 block/unblock 活动。 - 对于使用 `aggregate` 过滤器的 Logstash pipeline,请设置 `pipeline.workers: 1`,以确保事件聚合保持一致。
标签:Python, SOAR, 内容过滤, 安全编排与自动化响应, 搜索引擎查询, 无后门, 自动化封禁, 自动化防御, 逆向工具