miketitus2003-cloud/siem-pipeline
GitHub: miketitus2003-cloud/siem-pipeline
一个轻量级、可扩展的 Python SIEM 管道,提供多格式日志摄入、标准化、MITRE ATT&CK 检测规则和 REST API。
Stars: 0 | Forks: 0
# SIEM Pipeline
[](https://github.com/miketitus2003-cloud/siem-pipeline/actions/workflows/ci.yml)


一个生产级的 **安全信息和事件管理管道**,使用 Python 构建。它接收 JSON、JSONL 和 CSV 日志文件,将其标准化为规范事件模式,评估有状态的 MITRE ATT&CK 映射检测规则,并通过 REST API 展示警报 —— 就像一个迷你 SIEM 引擎。
**在线演示:** [https://siem-pipeline.up.railway.app](https://siem-pipeline.up.railway.app) | **API 文档:** [/docs](https://siem-pipeline.up.railway.app/docs)
## 功能概述
```
Raw Logs (JSON / CSV / JSONL) ← POST /ingest (live API ingestion)
│
▼
Log Parsers ← dirty-data tolerant, format auto-detected
│
▼
Normalization ← 60+ field aliases → canonical NormalizedEvent schema
│
▼
Rule Engine ← stateful, MITRE ATT&CK mapped detection rules
│
▼
SQLite Persistence ← events + alerts stored for historical queries
│
▼
REST API ← GET /alerts GET /stats (filter, paginate, aggregate)
```
## 检测规则
| ID | 规则 | 严重性 | MITRE 技术 | 战术 |
|---|---|---|---|---|
| RULE-1001 | Brute Force Login | **High** | T1110 — Brute Force | Credential Access |
| RULE-1002 | Multi-Source Authentication | Medium | T1078 — Valid Accounts | Initial Access |
| RULE-1003 | Port Scan | Medium | T1046 — Network Service Discovery | Discovery |
| RULE-1004 | Privileged After-Hours Login | **High** | T1078.003 — Local Accounts | Persistence |
| RULE-1005 | Watchlist IP Traffic | **Critical** | T1071 — Application Layer Protocol | Command & Control |
## REST API
| Method | Endpoint | Description |
|---|---|---|
| `GET` | `/` | Landing page |
| `GET` | `/health` | Health check — `{"status": "ok"}` |
| `GET` | `/rules` | List all detection rules with MITRE metadata |
| `GET` | `/events?limit=20` | List normalized events from sample data |
| `POST` | `/run?source=` | Run full pipeline on bundled sample data |
| `POST` | `/ingest` | **Ingest live log records → normalize → detect → persist** |
| `GET` | `/alerts` | Query stored alerts (filter by severity, rule\_id; paginate) |
| `GET` | `/stats` | Aggregate counts by severity and rule across all stored data |
| `GET` | `/docs` | Interactive Swagger UI |
| `GET` | `/redoc` | ReDoc documentation |
### POST /ingest — 示例
```
curl -X POST https://siem-pipeline.up.railway.app/ingest \
-H "Content-Type: application/json" \
-d '{
"source": "firewall",
"logs": [
{"src_ip": "192.0.2.1", "dst_port": 22, "action": "DENY"},
{"src_ip": "10.0.0.5", "event": "login_failed", "user": "admin"}
]
}'
```
### GET /alerts — 示例
```
# 所有关键告警,按时间倒序排列
curl "https://siem-pipeline.up.railway.app/alerts?severity=critical&limit=20"
# 来自特定规则的告警
curl "https://siem-pipeline.up.railway.app/alerts?rule_id=RULE-1001"
```
## 项目结构
```
siem-pipeline/
├── main.py # FastAPI app + landing page
├── siem_pipeline/
│ ├── cli.py # CLI entrypoint (argparse)
│ ├── pipeline.py # Orchestrator — wires all layers
│ ├── db.py # SQLite persistence (events + alerts)
│ ├── parsers/
│ │ ├── base.py # Abstract BaseParser contract
│ │ ├── json_parser.py # JSON array + NDJSON/JSON-L
│ │ └── csv_parser.py # CSV/TSV with type coercion
│ ├── normalizers/
│ │ └── normalizer.py # 60+ field aliases + validation
│ ├── rules/
│ │ ├── base.py # BaseRule + RuleMatch dataclasses
│ │ ├── builtin_rules.py # 5 production-style detection rules
│ │ └── engine.py # Rule loader + evaluation engine
│ └── utils/
│ ├── schema.py # NormalizedEvent dataclass
│ └── logger.py # Structured logging config
├── tests/ # 116 pytest unit + integration tests
├── data/
│ ├── raw/ # Sample input logs (JSON, JSONL, CSV)
│ └── sample_output/ # Example pipeline output
├── config/
│ └── rules_config.yaml # Rule tuning reference
├── requirements.txt
├── Procfile
└── nixpacks.toml
```
## 快速开始
```
git clone https://github.com/miketitus2003-cloud/siem-pipeline.git
cd siem-pipeline
python3 -m venv .venv && source .venv/bin/activate
pip install fastapi "uvicorn[standard]"
# 运行 API
uvicorn main:app --reload
# 或运行 CLI
pip install -e .
siem-pipeline run data/raw/ --output data/processed --source demo
siem-pipeline rules
```
## 规范事件模式
所有日志源均被标准化为 `NormalizedEvent`:
| Field | Type | Description |
|---|---|---|
| `event_id` | str | UUID, auto-generated |
| `pipeline_ts` | str | ISO-8601 timestamp when event entered pipeline |
| `timestamp` | str | Original event time (ISO-8601, UTC) |
| `source_ip` | str | Validated IPv4 source address |
| `dest_ip` | str | Validated IPv4 destination address |
| `source_port` | int | Validated port (0–65535) |
| `dest_port` | int | Validated port (0–65535) |
| `username` | str | Normalized username |
| `event_type` | str | `authentication`, `network`, `process`, etc. |
| `outcome` | str | `success` \| `failure` \| `unknown` |
| `severity` | str | `low` \| `medium` \| `high` \| `critical` |
| `raw_message` | str | Original unparsed log line (audit trail) |
| `extra` | dict | Unmapped vendor-specific fields |
| `mitre_technique` | str | e.g. `T1110` |
| `mitre_tactic` | str | e.g. `Credential Access` |
## 添加自定义规则
```
# my_rules.py
from siem_pipeline.rules.base import BaseRule, RuleMatch
from siem_pipeline.utils.schema import NormalizedEvent
class LargeDataExfiltration(BaseRule):
id = "RULE-9001"
name = "Large Outbound Transfer"
severity = "high"
mitre_technique = "T1048"
mitre_tactic = "Exfiltration"
def evaluate(self, event: NormalizedEvent) -> RuleMatch | None:
bytes_out = int(event.extra.get("bytes_sent", 0) or 0)
if bytes_out > 100_000_000:
return self._match(event, bytes_sent=bytes_out)
return None
```
```
siem-pipeline run data/raw/ --rules-file my_rules.py
```
## 运行测试
```
pip install -r requirements-dev.txt
pytest # 91 tests
pytest --cov=siem_pipeline --cov-report=term-missing
# 116 个测试,82% 覆盖率
```
## 扩展管道
| Area | How |
|---|---|
| New log format | Subclass `BaseParser` → add to `_PARSERS` in `pipeline.py` |
| New field alias | Add entry to `FIELD_MAP` in `normalizers/normalizer.py` |
| New detection rule | Subclass `BaseRule` in `builtin_rules.py` or pass via `--rules-file` |
| Threat intel feed | Replace `WatchlistIPRule.WATCHLIST` with a live API/file fetch |
| Output sink | Extend `Pipeline._write_outputs()` (Elasticsearch, S3, webhook) |
## 技术栈
- **Python 3.11** — stdlib only for core pipeline; no heavy dependencies
- **FastAPI** — REST API with auto-generated OpenAPI docs
- **uvicorn** — ASGI server
- **SQLite** — zero-config persistence for events and alerts (stdlib)
- **pytest** — 116 unit and integration tests
- **Railway** — deployment platform
## 许可证
MIT
标签:AV绕过, Brute Force, Cloudflare, Command and Control, Credential Access, CSV, FastAPI, IP 地址批量处理, JSON, JSONL, MITRE ATT&CK, Python, Railway, REST API, SQLite, 事件管理, 安全管道, 安全规则引擎, 安全运营, 异常检测, 扫描框架, 插件系统, 数据统计, 无后门, 日志摄取, 检测规则, 模式规范化, 端口扫描, 网络安全, 网络测绘, 网络资产发现, 逆向工具, 隐私保护