quinnotis/cloud-security-posture
GitHub: quinnotis/cloud-security-posture
一款基于 Python 和 Streamlit 的云安全态势与威胁检测模拟器,通过分析 CloudTrail 日志识别威胁并提供风险评分。
Stars: 0 | Forks: 0
# 云安全态势与威胁检测模拟器
一个用于检测云环境中威胁的安全监控平台,采用基于规则的检测、风险评分和 MITRE ATT&CK 映射技术。
## 概述
该工具模拟企业级云安全监控能力,通过分析云日志(CloudTrail 风格)来检测安全威胁,包括权限提升、不可能的移动和异常 API 活动等。
### 主要功能
- **多规则检测引擎**:可扩展的基于规则的威胁检测
- **风险评分**:警报的量化风险评估
- **Sigma 规则支持**:行业标准的检测规则格式
- **MITRE ATT&CK 映射**:战术与技术分类
- **交互式仪表板**:实时安全态势可视化
## 架构
```
┌─────────────────────────────────────────────────────────────────┐
│ Cloud Security Posture Monitor │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Cloud Logs │ → │ Detection │ → │ Alerts + │ │
│ │ (CloudTrail) │ │ Engine │ │ Risk Scoring │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ Detection Rules │ │
│ │ - Privilege Esc. │ │
│ │ - Impossible Travel │
│ │ - Excessive API │ │
│ │ - Root Usage │ │
│ └──────────────────┘ │
│ │
├─────────────────────────────────────────────────────────────────┤
│ Streamlit Dashboard │
└─────────────────────────────────────────────────────────────────┘
```
## 快速开始
### 安装
```
# Clone repository
git clone https://github.com/quinnotis/cloud-security-posture.git
cd cloud-security-posture
# 创建 virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# 安装 dependencies
pip install -r requirements.txt
```
### 运行检测
```
from src.detector import ThreatDetector
# 初始化 detector
detector = ThreatDetector()
# 分析 CloudTrail logs
alerts = detector.analyze_file("./data/logs/sample_cloudtrail.json")
# 获取 risk summary
summary = detector.get_risk_summary(alerts)
print(f"Risk Score: {summary['overall_risk_score']}")
print(f"Critical Alerts: {summary['critical']}")
# 打印 alerts
for alert in alerts:
print(f"[{alert.severity.upper()}] {alert.rule_name}: {alert.description}")
```
### 运行仪表板
```
streamlit run dashboard/app.py
```
访问地址 http://localhost:8501
## 检测规则
### SEC-001:权限提升尝试
**严重性:** HIGH
检测通过 IAM 策略修改提升权限的尝试。
**检测逻辑:**
- 将 IAM 策略附加到用户/角色
- 自修改权限
- 高风险策略(AdminAccess、PowerUser、`*:*`)
**MITRE 映射:** TA0004 - Privilege Escalation(权限提升)
### SEC-002:不可能的移动
**严重性:** HIGH
检测来自地理上不可能的位置的登录。
**检测逻辑:**
- 比较连续登录的位置
- 计算隐含的移动速度
- 如果速度 > 900 公里/小时(快于商业航班)则发出警报
**MITRE 映射:** TA0001 - Initial Access(初始访问)
### SEC-003:过量 API 调用
**严重性:** MEDIUM
检测表明正在进行枚举或滥用的异常 API 调用量。
**检测逻辑:**
- 跟踪每分钟每用户的 API 调用
- 阈值:每分钟 100 次以上调用
- 高错误率(>50%)表明正在枚举
**MITRE 映射:** TA0007 - Discovery(发现), TA0009 - Collection(收集)
### SEC-004:Root 账户使用
**严重性:** CRITICAL
检测违反最佳实践的 Root 账户使用情况。
**检测逻辑:**
- 来自 Root 身份的任何 API 调用
- 无论操作如何,均为严重严重性
**MITRE 映射:** TA0004 - Privilege Escalation(权限提升)
## 项目结构
```
cloud-security-posture/
├── data/
│ └── logs/ # Cloud log samples
│ └── sample_cloudtrail.json
├── src/
│ ├── detector.py # Main detection engine
│ └── rules/
│ ├── __init__.py
│ └── sigma_rules.py # Sigma rule support
├── dashboard/
│ └── app.py # Streamlit dashboard
├── tests/ # Unit tests
├── requirements.txt
└── README.md
```
## 添加自定义规则
### Python 规则
```
from src.detector import DetectionRule, Alert, ThreatLevel
class CustomRule(DetectionRule):
def __init__(self):
super().__init__(
rule_id="CUSTOM-001",
name="Custom Detection Rule",
severity=ThreatLevel.MEDIUM,
)
self.mitre_tactics = ["TA0001"]
def evaluate(self, events, context):
alerts = []
for event in events:
if self._matches_pattern(event):
alerts.append(Alert(
id=f"ALERT-{event.event_id}",
timestamp=event.timestamp,
rule_id=self.rule_id,
rule_name=self.name,
severity=self.severity,
description="Custom rule triggered",
risk_score=50.0,
))
return alerts
# 添加到 detector
detector = ThreatDetector()
detector.add_rule(CustomRule())
```
### Sigma 规则
```
title: Custom Sigma Rule
id: custom-sigma-001
status: experimental
level: high
description: Detects custom pattern
logsource:
product: aws
service: cloudtrail
detection:
selection:
eventName: SuspiciousAction
condition: selection
```
## 风险评分
警报评分范围为 0-100,基于:
| 因素 | 权重 |
|--------|--------|
| 严重性 | 40% |
| 置信度 | 30% |
| 上下文 | 30% |
**风险等级:**
- 0-30:LOW(低)
- 30-60:MEDIUM(中)
- 60-80:HIGH(高)
- 80-100:CRITICAL(严重)
## 仪表板功能
### 仪表板视图
- 整体风险仪表
- 严重性分布图
- 警报时间线
### 警报视图
- 可筛选的警报表
- 包含证据的警报详情
- 建议措施
### 规则视图
- 活跃的检测规则
- 规则配置
- MITRE 映射
### 分析视图
- 高风险用户 Top 榜
- MITRE ATT&CK 覆盖范围
- 规则有效性
## 示例 CloudTrail 事件
系统处理 CloudTrail 格式的 JSON:
```
{
"Records": [
{
"eventID": "evt-001",
"eventTime": "2024-01-15T14:30:00Z",
"eventSource": "iam.amazonaws.com",
"eventName": "AttachUserPolicy",
"awsRegion": "us-east-1",
"sourceIPAddress": "203.0.113.50",
"userIdentity": {
"type": "IAMUser",
"userName": "admin-user"
}
}
]
}
```
## MITRE ATT&CK 覆盖范围
| 战术 | 技术 | 规则 |
|--------|-----------|------|
| Initial Access(初始访问) | T1078 Valid Accounts(有效账户) | SEC-002 |
| Privilege Escalation(权限提升) | T1098 Account Manipulation(账户操纵) | SEC-001 |
| Privilege Escalation(权限提升) | T1078 Valid Accounts(有效账户) | SEC-004 |
| Discovery(发现) | T1087 Account Discovery(账户发现) | SEC-003 |
| Collection(收集) | T1530 Data from Cloud Storage(来自云存储的数据) | SEC-003 |
## 扩展点
### 添加日志源
- 为新格式扩展 `_normalize_event()` 方法
- 增加对 Azure Activity Logs、GCP Audit Logs 的支持
### 自定义风险评分
- 重写 `calculate_risk_score()` 方法
- 添加上下文因素(资产关键性、用户风险)
### 集成选项
- 通过 webhook 警报进行 SIEM 集成
- SOAR 剧本触发器
- Slack/Teams 通知
## 技术栈
| 组件 | 技术 |
|-----------|------------|
| 检测引擎 | Python, Regex |
| 规则格式 | Sigma (YAML) |
| 数据处理 | Pandas |
| 仪表板 | Streamlit, Plotly |
| 校验 | Pydantic |
## 贡献
1. Fork 该仓库
2. 创建功能分支
3. 为新规则添加测试
4. 提交 pull request
## 许可证
MIT License - 详情请参阅 LICENSE 文件。
使用 Python、Pandas 和 Streamlit 构建。
标签:AMSI绕过, API监控, Cloudflare, CloudTrail, EDR, HTTP/HTTPS抓包, Kubernetes, MITRE ATT&CK, Python, Root账号监控, Sigma规则, Streamlit, 仪表盘, 企业安全, 协议分析, 威胁检测, 安全合规, 安全模拟器, 异常访问, 态势感知, 无后门, 权限提升, 目标导入, 网络代理, 网络资产管理, 脆弱性评估, 访问控制, 逆向工具, 风险评分