RamBeer6/soc-cloud-threat-detection-pipeline
GitHub: RamBeer6/soc-cloud-threat-detection-pipeline
一个面向SOC场景的Python威胁检测管道项目,通过模拟日志生成、规则引擎分析和告警输出,完整展示了从原始事件到可操作发现的检测工程工作流。
Stars: 1 | Forks: 0
# 云威胁检测管道(面向 SOC 的系统)
[](https://github.com/RamBeer6/soc-cloud-threat-detection-pipeline/actions/workflows/ci.yml)
一个生产级风格的 Python 项目,模拟真实的安全运营中心(SOC)检测管道。系统生成结构化的身份验证日志,使用基于规则的检测逻辑对其进行分析,发出警报,并打印运营仪表盘摘要。
本仓库专为面向网络安全、DevSecOps、检测工程和 SOC 分析师等职位的 GitHub 作品集而设计。
## 为什么这个项目很重要
现代 SOC 团队依赖遥测管道将嘈杂的身份验证事件转化为可操作的发现。本项目展示了该工作流背后的实用构建模块:
- 真实的日志生成,而非简单的测试输入
- 针对暴力破解行为、可疑 IP 活动、重复访问模式和短窗口激增的检测规则
- 保留分析师上下文的警报机制
- Docker 化执行,便于本地可移植测试
- 易于扩展新检测功能的代码结构
## 系统功能
该管道执行四个核心功能:
1. 以 JSONL 格式生成身份验证类事件。
2. 通过模块化的检测规则处理事件流。
3. 将警报同时输出到控制台和 `logs/alerts.log`。
4. 生成包含事件和警报统计信息的 CLI 仪表盘。
## 展示的关键技能
- SOC 思维的检测工程
- 安全工作流的 Python 自动化
- 结构化日志记录和事件规范化
- 具有可配置阈值的基于规则的威胁分析
- 警报转发和轻量级安全集成
- Docker 打包和本地开发者人体工程学体验
- 用于保障管道可靠性的自动化测试
## 涵盖的威胁场景
- 来自单一 IP 的暴力破解登录尝试
- 来自同一来源的重复访问突发
- 异常的短窗口活动激增
- 来自模拟威胁情报 IP 的流量
- 分析层中的畸形输入处理
## 架构概览
```
+-------------------+ +-------------------+ +-------------------+
| Log Generator | ----> | Log Analyzer | ----> | Alert Manager |
| JSONL auth events | | Rule-based engine | | Console + file |
+-------------------+ +-------------------+ +-------------------+
|
v
+-------------------+
| CLI Dashboard |
| SOC-style summary |
+-------------------+
```
## 检测逻辑
分析器有意采用离散规则构建,以便将来无需重写整个管道即可添加新的检测功能。
当前检测项:
- `brute_force_suspected`
当同一 IP 在短时间窗口内的失败登录次数超过配置的阈值时触发。
- `repeated_access_pattern`
当同一 IP 在短时间间隔内产生大量身份验证事件时触发。
- `unusual_activity_spike`
当来自单一 IP 的活动量在滚动窗口内急剧上升时触发。
- `known_suspicious_ip_activity`
当事件源自模拟威胁情报列表中的 IP 时触发。
默认阈值:
- 失败登录:`120` 秒内 `5` 次
- 重复访问:`60` 秒内 `10` 次事件
- 活动激增:`90` 秒内 `15` 次事件
## 项目结构
```
Cloud Threat Detection Pipeline (SOC Project)/
|-- docker/
| |-- Dockerfile
| `-- docker-compose.yml
|-- logs/
| |-- alerts.log
| `-- events.jsonl
|-- src/
| |-- alerts.py
| |-- analyzer.py
| |-- detector_rules.py
| |-- log_generator.py
| |-- main.py
| `-- web_dashboard.py
|-- static/
| `-- dashboard.css
|-- templates/
| `-- dashboard.html
|-- README.md
`-- requirements.txt
```
## 日志事件示例
```
{
"timestamp": "2026-04-16T11:41:15.792790+00:00",
"ip": "45.95.147.27",
"user": "ivan",
"action": "login_failed",
"country": "CN",
"source": "auth-service",
"user_agent": "Mozilla/5.0 (X11; Linux x86_64)",
"outcome": "failure",
"explanation": "Authentication attempt from an IP marked as suspicious",
"threat_context": "known_bad_ip"
}
```
## 警报示例
```
{
"timestamp": "2026-04-16T11:42:04.792790+00:00",
"ip": "45.95.147.27",
"threat_type": "brute_force_suspected",
"explanation": "Detected 5 failed login attempts from 45.95.147.27 within 120 seconds.",
"severity": "high",
"user": "grace",
"metadata": {
"failed_attempts": 5,
"window_seconds": 120,
"rule": "failed_login_burst"
}
}
```
## 仪表盘输出示例
```
SOC PIPELINE DASHBOARD
======================
Total events processed : 200
Alerts generated : 24
Top active IPs:
- 172.16.4.18: 70 events
- 10.0.10.35: 23 events
Top suspicious IPs:
- 172.16.4.18: 8 alerts
- 45.95.147.27: 3 alerts
Alert types:
- brute_force_suspected: 8
- repeated_access_pattern: 6
- unusual_activity_spike: 6
- known_suspicious_ip_activity: 4
```
## 截图
### Web 仪表盘

### CLI 输出

### 警报日志

## 如何在本地运行
### 1. 仅生成示例日志
```
python src/main.py generate --count 200 --output logs/events.jsonl --seed 42
```
### 2. 仅分析现有日志
```
python src/main.py analyze --input logs/events.jsonl --alerts-output logs/alerts.log
```
### 3. 运行完整管道
```
python src/main.py run --count 200 --output logs/events.jsonl --alerts-output logs/alerts.log --seed 42
```
### 4. 直接运行单个模块
```
python src/log_generator.py --count 200 --output logs/events.jsonl --seed 42
python src/analyzer.py --input logs/events.jsonl --alerts-output logs/alerts.log
```
## Docker 设置
构建并运行完整管道:
```
docker compose -f docker/docker-compose.yml up --build
```
容器通过挂载卷将生成的事件和警报写入本地 `logs/` 目录。
## Web 仪表盘
针对当前日志集启动 Flask 仪表盘:
```
python src/main.py dashboard --input logs/events.jsonl --alerts-output logs/alerts.log
```
然后打开:
```
http://127.0.0.1:5000
```
仪表盘显示:
- 已处理事件总数
- 已生成警报总数
- 最活跃的 IP
- 最可疑的 IP
- 警报类型细分
- 最近的持久化警报
## 警报转发
管道可以选择将警报作为 JSON 转发到 webhook 端点。这有助于模拟与以下组件的集成:
- Slack 入站 webhook
- 自定义 SOAR 处理程序
- 事件路由服务
- 内部安全自动化端点
显式指定 webhook 运行:
```
python src/main.py run --webhook-url https://example.com/security-alerts
```
或者通过环境变量定义:
```
export SOC_ALERT_WEBHOOK_URL=https://example.com/security-alerts
python src/main.py analyze
```
`/.env.example` 中包含了一个示例环境文件。
## 测试
运行自动化测试套件:
```
python -m unittest discover -s tests -v
```
测试验证了:
- 日志生成和必填事件字段
- 所有核心警报类型的端到端检测
- 有效 JSON 格式的警报日志持久化
- 畸形事件的输入验证
- webhook 投递行为
- 仪表盘渲染
## 开发者快捷方式
如果您使用 `make`,本项目包含了常用目标:
```
make install
make test
make run
make dashboard
```
## GitHub 展示清单
在发布之前,该仓库的结构已经能够支持强大的展示效果:
- 带有架构和执行指南的专业 README
- Docker 化的可运行演示
- 自动化测试
- 可选的 webhook 集成
- CLI 和 Web 仪表盘视图
- 通过 `.gitignore` 保持干净的仓库卫生
有关可直接粘贴的 GitHub 仓库元数据,请参阅 [docs/github-launch-kit.md](docs/github-launch-kit.md)。
## 支持文档
- [检测规则参考](docs/detection-rules.md)
- [示例事件报告](docs/sample-incident-report.md)
## Linux 友好运行脚本
用于在 Linux 或 macOS 上进行完整演示运行的快捷命令:
```
chmod +x run_pipeline.sh
./run_pipeline.sh
```
## 使用的技术
- Python 3.12
- JSONL 结构化日志
- 基于规则的检测工程
- Docker 和 Docker Compose
- 兼容 Linux 的命令流
## 许可证
本项目基于 MIT 许可证发布。请参阅 `LICENSE`。
## 为什么它非常适合安全作品集
本项目展示了直接映射到安全工程和 SOC 工作的技能:
- 威胁检测思维
- 安全工作流的 Python 自动化
- 日志解析和事件富化
- 警报设计和运营输出
- 适合未来 SIEM 或云集成的模块化架构
## 未来增强
更高级版本可能的后续升级:
- 从 Kafka、syslog 或 CloudWatch 摄取日志
- 将警报导出到 Slack、电子邮件或工单系统
- 将遥测数据存储在 PostgreSQL 或 Elasticsearch 中
- 使用外部威胁情报源丰富可疑 IP
标签:DevSecOps, DevSecOps项目实践, Docker容器化, IP 地址批量处理, SOC分析师, Webhook转发, 上游代理, 云威胁检测, 仪表盘, 免杀技术, 告警系统, 基于规则的检测, 安全信息与事件管理, 安全工程, 安全运营中心, 开源安全项目, 搜索引擎爬取, 时序数据库, 暴力破解检测, 版权保护, 红队行动, 网络映射, 请求拦截, 身份验证安全, 逆向工具