RamBeer6/soc-cloud-threat-detection-pipeline

GitHub: RamBeer6/soc-cloud-threat-detection-pipeline

一个面向SOC场景的Python威胁检测管道项目,通过模拟日志生成、规则引擎分析和告警输出,完整展示了从原始事件到可操作发现的检测工程工作流。

Stars: 1 | Forks: 0

# 云威胁检测管道(面向 SOC 的系统) [![CI](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/36f2e3c2d4221014.svg)](https://github.com/RamBeer6/soc-cloud-threat-detection-pipeline/actions/workflows/ci.yml) 一个生产级风格的 Python 项目,模拟真实的安全运营中心(SOC)检测管道。系统生成结构化的身份验证日志,使用基于规则的检测逻辑对其进行分析,发出警报,并打印运营仪表盘摘要。 本仓库专为面向网络安全、DevSecOps、检测工程和 SOC 分析师等职位的 GitHub 作品集而设计。 ## 为什么这个项目很重要 现代 SOC 团队依赖遥测管道将嘈杂的身份验证事件转化为可操作的发现。本项目展示了该工作流背后的实用构建模块: - 真实的日志生成,而非简单的测试输入 - 针对暴力破解行为、可疑 IP 活动、重复访问模式和短窗口激增的检测规则 - 保留分析师上下文的警报机制 - Docker 化执行,便于本地可移植测试 - 易于扩展新检测功能的代码结构 ## 系统功能 该管道执行四个核心功能: 1. 以 JSONL 格式生成身份验证类事件。 2. 通过模块化的检测规则处理事件流。 3. 将警报同时输出到控制台和 `logs/alerts.log`。 4. 生成包含事件和警报统计信息的 CLI 仪表盘。 ## 展示的关键技能 - SOC 思维的检测工程 - 安全工作流的 Python 自动化 - 结构化日志记录和事件规范化 - 具有可配置阈值的基于规则的威胁分析 - 警报转发和轻量级安全集成 - Docker 打包和本地开发者人体工程学体验 - 用于保障管道可靠性的自动化测试 ## 涵盖的威胁场景 - 来自单一 IP 的暴力破解登录尝试 - 来自同一来源的重复访问突发 - 异常的短窗口活动激增 - 来自模拟威胁情报 IP 的流量 - 分析层中的畸形输入处理 ## 架构概览 ``` +-------------------+ +-------------------+ +-------------------+ | Log Generator | ----> | Log Analyzer | ----> | Alert Manager | | JSONL auth events | | Rule-based engine | | Console + file | +-------------------+ +-------------------+ +-------------------+ | v +-------------------+ | CLI Dashboard | | SOC-style summary | +-------------------+ ``` ## 检测逻辑 分析器有意采用离散规则构建,以便将来无需重写整个管道即可添加新的检测功能。 当前检测项: - `brute_force_suspected` 当同一 IP 在短时间窗口内的失败登录次数超过配置的阈值时触发。 - `repeated_access_pattern` 当同一 IP 在短时间间隔内产生大量身份验证事件时触发。 - `unusual_activity_spike` 当来自单一 IP 的活动量在滚动窗口内急剧上升时触发。 - `known_suspicious_ip_activity` 当事件源自模拟威胁情报列表中的 IP 时触发。 默认阈值: - 失败登录:`120` 秒内 `5` 次 - 重复访问:`60` 秒内 `10` 次事件 - 活动激增:`90` 秒内 `15` 次事件 ## 项目结构 ``` Cloud Threat Detection Pipeline (SOC Project)/ |-- docker/ | |-- Dockerfile | `-- docker-compose.yml |-- logs/ | |-- alerts.log | `-- events.jsonl |-- src/ | |-- alerts.py | |-- analyzer.py | |-- detector_rules.py | |-- log_generator.py | |-- main.py | `-- web_dashboard.py |-- static/ | `-- dashboard.css |-- templates/ | `-- dashboard.html |-- README.md `-- requirements.txt ``` ## 日志事件示例 ``` { "timestamp": "2026-04-16T11:41:15.792790+00:00", "ip": "45.95.147.27", "user": "ivan", "action": "login_failed", "country": "CN", "source": "auth-service", "user_agent": "Mozilla/5.0 (X11; Linux x86_64)", "outcome": "failure", "explanation": "Authentication attempt from an IP marked as suspicious", "threat_context": "known_bad_ip" } ``` ## 警报示例 ``` { "timestamp": "2026-04-16T11:42:04.792790+00:00", "ip": "45.95.147.27", "threat_type": "brute_force_suspected", "explanation": "Detected 5 failed login attempts from 45.95.147.27 within 120 seconds.", "severity": "high", "user": "grace", "metadata": { "failed_attempts": 5, "window_seconds": 120, "rule": "failed_login_burst" } } ``` ## 仪表盘输出示例 ``` SOC PIPELINE DASHBOARD ====================== Total events processed : 200 Alerts generated : 24 Top active IPs: - 172.16.4.18: 70 events - 10.0.10.35: 23 events Top suspicious IPs: - 172.16.4.18: 8 alerts - 45.95.147.27: 3 alerts Alert types: - brute_force_suspected: 8 - repeated_access_pattern: 6 - unusual_activity_spike: 6 - known_suspicious_ip_activity: 4 ``` ## 截图 ### Web 仪表盘 ![Dashboard](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/5a036e0c54221017.png) ### CLI 输出 ![CLI Output](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/6ebdc41d1e221019.png) ### 警报日志 ![Alerts Log](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/a2ab4ad341221021.png) ## 如何在本地运行 ### 1. 仅生成示例日志 ``` python src/main.py generate --count 200 --output logs/events.jsonl --seed 42 ``` ### 2. 仅分析现有日志 ``` python src/main.py analyze --input logs/events.jsonl --alerts-output logs/alerts.log ``` ### 3. 运行完整管道 ``` python src/main.py run --count 200 --output logs/events.jsonl --alerts-output logs/alerts.log --seed 42 ``` ### 4. 直接运行单个模块 ``` python src/log_generator.py --count 200 --output logs/events.jsonl --seed 42 python src/analyzer.py --input logs/events.jsonl --alerts-output logs/alerts.log ``` ## Docker 设置 构建并运行完整管道: ``` docker compose -f docker/docker-compose.yml up --build ``` 容器通过挂载卷将生成的事件和警报写入本地 `logs/` 目录。 ## Web 仪表盘 针对当前日志集启动 Flask 仪表盘: ``` python src/main.py dashboard --input logs/events.jsonl --alerts-output logs/alerts.log ``` 然后打开: ``` http://127.0.0.1:5000 ``` 仪表盘显示: - 已处理事件总数 - 已生成警报总数 - 最活跃的 IP - 最可疑的 IP - 警报类型细分 - 最近的持久化警报 ## 警报转发 管道可以选择将警报作为 JSON 转发到 webhook 端点。这有助于模拟与以下组件的集成: - Slack 入站 webhook - 自定义 SOAR 处理程序 - 事件路由服务 - 内部安全自动化端点 显式指定 webhook 运行: ``` python src/main.py run --webhook-url https://example.com/security-alerts ``` 或者通过环境变量定义: ``` export SOC_ALERT_WEBHOOK_URL=https://example.com/security-alerts python src/main.py analyze ``` `/.env.example` 中包含了一个示例环境文件。 ## 测试 运行自动化测试套件: ``` python -m unittest discover -s tests -v ``` 测试验证了: - 日志生成和必填事件字段 - 所有核心警报类型的端到端检测 - 有效 JSON 格式的警报日志持久化 - 畸形事件的输入验证 - webhook 投递行为 - 仪表盘渲染 ## 开发者快捷方式 如果您使用 `make`,本项目包含了常用目标: ``` make install make test make run make dashboard ``` ## GitHub 展示清单 在发布之前,该仓库的结构已经能够支持强大的展示效果: - 带有架构和执行指南的专业 README - Docker 化的可运行演示 - 自动化测试 - 可选的 webhook 集成 - CLI 和 Web 仪表盘视图 - 通过 `.gitignore` 保持干净的仓库卫生 有关可直接粘贴的 GitHub 仓库元数据,请参阅 [docs/github-launch-kit.md](docs/github-launch-kit.md)。 ## 支持文档 - [检测规则参考](docs/detection-rules.md) - [示例事件报告](docs/sample-incident-report.md) ## Linux 友好运行脚本 用于在 Linux 或 macOS 上进行完整演示运行的快捷命令: ``` chmod +x run_pipeline.sh ./run_pipeline.sh ``` ## 使用的技术 - Python 3.12 - JSONL 结构化日志 - 基于规则的检测工程 - Docker 和 Docker Compose - 兼容 Linux 的命令流 ## 许可证 本项目基于 MIT 许可证发布。请参阅 `LICENSE`。 ## 为什么它非常适合安全作品集 本项目展示了直接映射到安全工程和 SOC 工作的技能: - 威胁检测思维 - 安全工作流的 Python 自动化 - 日志解析和事件富化 - 警报设计和运营输出 - 适合未来 SIEM 或云集成的模块化架构 ## 未来增强 更高级版本可能的后续升级: - 从 Kafka、syslog 或 CloudWatch 摄取日志 - 将警报导出到 Slack、电子邮件或工单系统 - 将遥测数据存储在 PostgreSQL 或 Elasticsearch 中 - 使用外部威胁情报源丰富可疑 IP
标签:DevSecOps, DevSecOps项目实践, Docker容器化, IP 地址批量处理, SOC分析师, Webhook转发, 上游代理, 云威胁检测, 仪表盘, 免杀技术, 告警系统, 基于规则的检测, 安全信息与事件管理, 安全工程, 安全运营中心, 开源安全项目, 搜索引擎爬取, 时序数据库, 暴力破解检测, 版权保护, 红队行动, 网络映射, 请求拦截, 身份验证安全, 逆向工具