miketitus2003-cloud/siem-pipeline

GitHub: miketitus2003-cloud/siem-pipeline

一个轻量级、可扩展的 Python SIEM 管道,提供多格式日志摄入、标准化、MITRE ATT&CK 检测规则和 REST API。

Stars: 0 | Forks: 0

# SIEM Pipeline [![CI](https://static.pigsec.cn/wp-content/uploads/repos/2026/03/0eca90eece203348.svg)](https://github.com/miketitus2003-cloud/siem-pipeline/actions/workflows/ci.yml) ![Python](https://img.shields.io/badge/python-3.11%20%7C%203.12-blue) ![License](https://img.shields.io/badge/license-MIT-green) 一个生产级的 **安全信息和事件管理管道**,使用 Python 构建。它接收 JSON、JSONL 和 CSV 日志文件,将其标准化为规范事件模式,评估有状态的 MITRE ATT&CK 映射检测规则,并通过 REST API 展示警报 —— 就像一个迷你 SIEM 引擎。 **在线演示:** [https://siem-pipeline.up.railway.app](https://siem-pipeline.up.railway.app)  |  **API 文档:** [/docs](https://siem-pipeline.up.railway.app/docs) ## 功能概述 ``` Raw Logs (JSON / CSV / JSONL) ← POST /ingest (live API ingestion) │ ▼ Log Parsers ← dirty-data tolerant, format auto-detected │ ▼ Normalization ← 60+ field aliases → canonical NormalizedEvent schema │ ▼ Rule Engine ← stateful, MITRE ATT&CK mapped detection rules │ ▼ SQLite Persistence ← events + alerts stored for historical queries │ ▼ REST API ← GET /alerts GET /stats (filter, paginate, aggregate) ``` ## 检测规则 | ID | 规则 | 严重性 | MITRE 技术 | 战术 | |---|---|---|---|---| | RULE-1001 | Brute Force Login | **High** | T1110 — Brute Force | Credential Access | | RULE-1002 | Multi-Source Authentication | Medium | T1078 — Valid Accounts | Initial Access | | RULE-1003 | Port Scan | Medium | T1046 — Network Service Discovery | Discovery | | RULE-1004 | Privileged After-Hours Login | **High** | T1078.003 — Local Accounts | Persistence | | RULE-1005 | Watchlist IP Traffic | **Critical** | T1071 — Application Layer Protocol | Command & Control | ## REST API | Method | Endpoint | Description | |---|---|---| | `GET` | `/` | Landing page | | `GET` | `/health` | Health check — `{"status": "ok"}` | | `GET` | `/rules` | List all detection rules with MITRE metadata | | `GET` | `/events?limit=20` | List normalized events from sample data | | `POST` | `/run?source=` | Run full pipeline on bundled sample data | | `POST` | `/ingest` | **Ingest live log records → normalize → detect → persist** | | `GET` | `/alerts` | Query stored alerts (filter by severity, rule\_id; paginate) | | `GET` | `/stats` | Aggregate counts by severity and rule across all stored data | | `GET` | `/docs` | Interactive Swagger UI | | `GET` | `/redoc` | ReDoc documentation | ### POST /ingest — 示例 ``` curl -X POST https://siem-pipeline.up.railway.app/ingest \ -H "Content-Type: application/json" \ -d '{ "source": "firewall", "logs": [ {"src_ip": "192.0.2.1", "dst_port": 22, "action": "DENY"}, {"src_ip": "10.0.0.5", "event": "login_failed", "user": "admin"} ] }' ``` ### GET /alerts — 示例 ``` # 所有关键告警,按时间倒序排列 curl "https://siem-pipeline.up.railway.app/alerts?severity=critical&limit=20" # 来自特定规则的告警 curl "https://siem-pipeline.up.railway.app/alerts?rule_id=RULE-1001" ``` ## 项目结构 ``` siem-pipeline/ ├── main.py # FastAPI app + landing page ├── siem_pipeline/ │ ├── cli.py # CLI entrypoint (argparse) │ ├── pipeline.py # Orchestrator — wires all layers │ ├── db.py # SQLite persistence (events + alerts) │ ├── parsers/ │ │ ├── base.py # Abstract BaseParser contract │ │ ├── json_parser.py # JSON array + NDJSON/JSON-L │ │ └── csv_parser.py # CSV/TSV with type coercion │ ├── normalizers/ │ │ └── normalizer.py # 60+ field aliases + validation │ ├── rules/ │ │ ├── base.py # BaseRule + RuleMatch dataclasses │ │ ├── builtin_rules.py # 5 production-style detection rules │ │ └── engine.py # Rule loader + evaluation engine │ └── utils/ │ ├── schema.py # NormalizedEvent dataclass │ └── logger.py # Structured logging config ├── tests/ # 116 pytest unit + integration tests ├── data/ │ ├── raw/ # Sample input logs (JSON, JSONL, CSV) │ └── sample_output/ # Example pipeline output ├── config/ │ └── rules_config.yaml # Rule tuning reference ├── requirements.txt ├── Procfile └── nixpacks.toml ``` ## 快速开始 ``` git clone https://github.com/miketitus2003-cloud/siem-pipeline.git cd siem-pipeline python3 -m venv .venv && source .venv/bin/activate pip install fastapi "uvicorn[standard]" # 运行 API uvicorn main:app --reload # 或运行 CLI pip install -e . siem-pipeline run data/raw/ --output data/processed --source demo siem-pipeline rules ``` ## 规范事件模式 所有日志源均被标准化为 `NormalizedEvent`: | Field | Type | Description | |---|---|---| | `event_id` | str | UUID, auto-generated | | `pipeline_ts` | str | ISO-8601 timestamp when event entered pipeline | | `timestamp` | str | Original event time (ISO-8601, UTC) | | `source_ip` | str | Validated IPv4 source address | | `dest_ip` | str | Validated IPv4 destination address | | `source_port` | int | Validated port (0–65535) | | `dest_port` | int | Validated port (0–65535) | | `username` | str | Normalized username | | `event_type` | str | `authentication`, `network`, `process`, etc. | | `outcome` | str | `success` \| `failure` \| `unknown` | | `severity` | str | `low` \| `medium` \| `high` \| `critical` | | `raw_message` | str | Original unparsed log line (audit trail) | | `extra` | dict | Unmapped vendor-specific fields | | `mitre_technique` | str | e.g. `T1110` | | `mitre_tactic` | str | e.g. `Credential Access` | ## 添加自定义规则 ``` # my_rules.py from siem_pipeline.rules.base import BaseRule, RuleMatch from siem_pipeline.utils.schema import NormalizedEvent class LargeDataExfiltration(BaseRule): id = "RULE-9001" name = "Large Outbound Transfer" severity = "high" mitre_technique = "T1048" mitre_tactic = "Exfiltration" def evaluate(self, event: NormalizedEvent) -> RuleMatch | None: bytes_out = int(event.extra.get("bytes_sent", 0) or 0) if bytes_out > 100_000_000: return self._match(event, bytes_sent=bytes_out) return None ``` ``` siem-pipeline run data/raw/ --rules-file my_rules.py ``` ## 运行测试 ``` pip install -r requirements-dev.txt pytest # 91 tests pytest --cov=siem_pipeline --cov-report=term-missing # 116 个测试,82% 覆盖率 ``` ## 扩展管道 | Area | How | |---|---| | New log format | Subclass `BaseParser` → add to `_PARSERS` in `pipeline.py` | | New field alias | Add entry to `FIELD_MAP` in `normalizers/normalizer.py` | | New detection rule | Subclass `BaseRule` in `builtin_rules.py` or pass via `--rules-file` | | Threat intel feed | Replace `WatchlistIPRule.WATCHLIST` with a live API/file fetch | | Output sink | Extend `Pipeline._write_outputs()` (Elasticsearch, S3, webhook) | ## 技术栈 - **Python 3.11** — stdlib only for core pipeline; no heavy dependencies - **FastAPI** — REST API with auto-generated OpenAPI docs - **uvicorn** — ASGI server - **SQLite** — zero-config persistence for events and alerts (stdlib) - **pytest** — 116 unit and integration tests - **Railway** — deployment platform ## 许可证 MIT
标签:AV绕过, Brute Force, Cloudflare, Command and Control, Credential Access, CSV, FastAPI, IP 地址批量处理, JSON, JSONL, MITRE ATT&CK, Python, Railway, REST API, SQLite, 事件管理, 安全管道, 安全规则引擎, 安全运营, 异常检测, 扫描框架, 插件系统, 数据统计, 无后门, 日志摄取, 检测规则, 模式规范化, 端口扫描, 网络安全, 网络测绘, 网络资产发现, 逆向工具, 隐私保护