Aliromia21/threatstream
GitHub: Aliromia21/threatstream
这是一个演示如何构建生产级实时威胁检测后端架构的项目,通过Kafka流式传输安全事件,进行模式检测并在仪表板上可视化分析。
Stars: 3 | Forks: 0
# ThreatStream
[](https://github.com/Aliromia21/threatstream/actions/workflows/ci.yml)
## 为什么存在这个项目
安全运营中心每秒监控数千个事件——登录失败、端口扫描、可疑请求。本项目演示了如何构建支持实时威胁检测和可视化的后端基础设施,采用了与 CrowdStrike、Splunk 和 Datadog 等公司生产系统中相同的架构模式。
## 系统架构
```
┌──────────────────────────────────────────────────────┐
│ Docker Network │
│ │
│ ┌────────────┐ ┌─────────┐ ┌────────────┐ │
│ │ Event API │───▶│ Kafka │───▶│ Consumer │ │
│ │ :3001 │ │ :9092 │ │ │ │
│ └────────────┘ └─────────┘ └─────┬──────┘ │
│ ▲ │ │
│ │ ▼ │
│ ┌────────────┐ ┌──────────────┐ │
│ │ Simulator │ │ PostgreSQL │ │
│ │ (threats) │ │ :5432 │ │
│ └────────────┘ └───────┬──────┘ │
│ │ │
│ LISTEN/NOTIFY │
│ │ │
│ ┌────────────┐ ┌───────▼──────┐ │
│ │ Dashboard │◀── WebSocket ──│ Stats API │ │
│ │ :8080 │ │ :3003 │ │
│ └────────────┘ └──────────────┘ │
│ │
└──────────────────────────────────────────────────────┘
```
### 数据流
1. **Simulator** 生成逼真的攻击模式(暴力破解、端口扫描、正常流量)并通过 HTTP 发送
2. **Event API** 验证、丰富(添加 UUID、时间戳)事件并将其生产到 Kafka 主题——响应时间 <5ms
3. **Kafka** 在 3 个主题间流式传输事件:`auth-events`、`network-events`、`threat-alerts`——按源 IP 分区
4. **Consumer** 读取事件,写入 PostgreSQL(原始事件 + 预聚合计数器),并运行滑动窗口威胁检测
5. **PostgreSQL** 存储原始事件、每日统计、攻击源和威胁警报——通过 LISTEN/NOTIFY 通知 Stats API
6. **Stats API** 提供历史数据的 REST 端点,并通过 WebSocket 将实时更新推送到仪表板
7. **Dashboard** 渲染实时威胁分析,包含自动更新图表、威胁级别指示器和实时事件流
## 功能
### 事件摄入
- 高性能 HTTP 端点——验证并生产到 Kafka,响应时间 <5ms
- 使用 UUID 和服务端时间戳丰富事件
- 基于事件类型的主题路由(auth → `auth-events`,network → `network-events`)
- 分区键 = 源 IP——保证每个攻击者的事件顺序

### 威胁检测
- **暴力破解检测**——2 分钟内同一 IP 发生 10 次以上认证失败触发 HIGH 严重级别警报
- **端口扫描检测**——1 分钟内同一 IP 扫描 8 个以上不同端口触发 MEDIUM 严重级别警报
- 内存滑动窗口,自动清理过期条目
- 警报持久化到 PostgreSQL,包含严重级别、描述和相关元数据
### 实时仪表板
- 基于失败率的实时威胁级别指示器(LOW / MEDIUM / HIGH / CRITICAL)
- 事件时间线图表(过去 24 小时,按小时粒度)
- 攻击类型细分(柱状图)
- Top 攻击者表格,包含事件计数和最后出现信息
- 滚动实时事件流,带颜色编码的类型标签
- WebSocket 连接,支持自动重连和连接状态指示器
- 加载骨架和数值变化时的闪烁动画
### 数据架构
- **双写模式**——存储原始事件用于历史分析 + 预聚合计数器用于 O(1) 仪表板读取
- **UPSERT with ON CONFLICT**——原子计数器更新,无竞态条件
- **PostgreSQL LISTEN/NOTIFY**——在无需额外基础设施的情况下连接 Consumer 和 Stats API
- **1 秒 WebSocket 批处理**——收集事件然后广播摘要,防止客户端被淹没
### 基础设施
- 7 个容器由 Docker Compose 编排(单命令启动)
- 多阶段 Docker 构建——生产镜像约 150MB,而包含开发依赖的约 500MB
- 带依赖顺序的健康检查——服务仅在其依赖项就绪后启动
- 优雅关闭——Kafka 生产者和消费者在 SIGTERM/SIGINT 时干净断开
## 快速开始
```
# 克隆 repository
git clone https://github.com/Aliromia21/threatstream.git
cd threatstream
# 启动所有服务
docker compose up --build
# 使用 threat simulator 启动(生成实时攻击数据)
docker compose --profile demo up --build
```
| Service | URL |
|---------|-----|
| Dashboard | http://localhost:8080 |
| Event API | http://localhost:3001 |
| Stats API | http://localhost:3003 |
| Health Check | http://localhost:3001/health |
### 发送手动事件
```
curl -X POST http://localhost:3001/events \
-H "Content-Type: application/json" \
-d '{
"type": "auth_failure",
"sourceIp": "185.220.101.42",
"timestamp": "2026-03-20T10:00:00Z",
"metadata": { "username": "admin", "userAgent": "curl/7.88" }
}'
```
## 技术栈
| Layer | Technology | Why |
|-------|-----------|-----|
| Event Streaming | Apache Kafka | 不可变事件日志,具有独立消费者组——事件被存储,处理后不删除 |
| Database | PostgreSQL 16 | JSONB 用于灵活元数据,UPSERT 用于原子计数器,LISTEN/NOTIFY 用于实时桥接 |
| Backend | Node.js, TypeScript, Express | 非阻塞 I/O,用于高吞吐量事件处理 |
| Frontend | React 18, Vite, Tailwind CSS, Recharts | 快速构建,实用优先样式,声明式图表 |
| Real-time | WebSockets (ws library) | 服务器推送,无轮询开销——50KB 库 vs 300KB socket.io |
| DevOps | Docker, Docker Compose, GitHub Actions | 可重现环境,自动化测试,单命令部署 |
## 项目结构
```
threatstream/
├── docker-compose.yml # Orchestrates all 7 containers
├── services/
│ ├── event-api/ # HTTP → Kafka producer
│ │ ├── src/
│ │ │ ├── config/ # Fail-fast environment validation
│ │ │ ├── kafka/ # Producer singleton + topic router
│ │ │ ├── routes/ # /health, /events endpoints
│ │ │ ├── validators/ # Event schema validation
│ │ │ ├── middleware/ # Centralized error handler
│ │ │ └── __tests__/ # Unit + integration tests
│ │ └── Dockerfile
│ ├── consumer/ # Kafka → PostgreSQL + threat detection
│ │ ├── src/
│ │ │ ├── kafka/ # Consumer group subscription
│ │ │ ├── database/ # Connection pool
│ │ │ ├── processors/ # Event processor + threat detector
│ │ │ └── __tests__/ # Threat detection tests
│ │ └── Dockerfile
│ ├── stats-api/ # REST + WebSocket server
│ │ ├── src/
│ │ │ ├── routes/ # /stats/* endpoints
│ │ │ ├── websocket/ # WS server + PG LISTEN/NOTIFY bridge
│ │ │ └── database/ # Read-only connection pool
│ │ └── Dockerfile
│ ├── simulator/ # Realistic threat event generator
│ │ ├── src/
│ │ │ ├── patterns/ # Brute force, port scan, normal traffic
│ │ │ └── config/
│ │ └── Dockerfile
│ └── dashboard/ # React frontend
│ ├── src/
│ │ ├── hooks/ # useWebSocket, useStats
│ │ └── components/ # StatCard, ThreatLevel, Charts, etc.
│ └── Dockerfile
├── infrastructure/
│ └── postgres/
│ └── init.sql # Schema: 5 tables with indexes
└── shared/
└── types/ # Shared TypeScript event definitions
```
## 数据库架构
| Table | Purpose | Access Pattern |
|-------|---------|---------------|
| `events` | 每个原始事件的不可变日志(JSONB 元数据) | 写入密集,用于查询最近事件和时间线 |
| `daily_stats` | 预聚合的每日计数器 | O(1) 读取用于仪表板摘要 |
| `attack_sources` | 每日 Top 攻击者(IP + 事件计数) | 由仪表板 top-attackers 面板读取 |
| `sessions` | 活跃攻击会话跟踪 | 每个事件更新,用于查询活跃威胁 |
| `threat_alerts` | 来自异常检测的分类威胁 | 由威胁检测器写入,由警报面板读取 |
## 测试
```
# 运行所有测试
cd services/event-api && npx jest --coverage
cd services/consumer && npx jest --coverage
```
| Layer | Count | What It Tests |
|-------|-------|--------------|
| Unit | 23+ | 事件验证(类型、IP、时间戳)、主题路由、威胁检测阈值和窗口 |
| Integration | 4+ | 带模拟 Kafka 的 HTTP 端点——验证请求/响应周期 |
| Docker | 5 builds | 所有服务在 CI 中成功编译和构建 |
### CI/CD
每次推送到 `main` 都会触发 GitHub Actions 流水线:lint → test → Docker 构建验证。
## 关键设计决策
| Decision | Trade-off | Alternative |
|----------|-----------|-------------|
| **Kafka over BullMQ** | 基础设施更多,但事件是不可变日志——多个消费者可以独立读取 | BullMQ 在处理后删除作业 |
| **sourceIp as partition key** | 如果一个 IP 占主导地位,存在热分区风险 | 随机分区——负载分布更好,但无顺序保证 |
| **Pre-aggregated tables** | 双写复杂性,但仪表板读取为 O(1) | 物化视图——代码更少,但对刷新时间的控制更少 |
| **PostgreSQL LISTEN/NOTIFY** | 通过数据库耦合 Consumer 和 Stats API | 服务间 HTTP——更灵活但增加了另一个端点 |
| **ws over socket.io** | 没有内置自动重连或房间,但 50KB vs 300KB | socket.io——API 更简单但更重 |
| **In-memory sliding windows** | 重启时状态丢失,但检测 <1ms | Redis——实例间共享状态但增加了基础设施 |
| **Manual validation over Zod** | 代码更多,但完全理解验证内容 | Zod——代码更少,自动 TypeScript 推断 |
## API 参考
### Event API (端口 3001)
| Method | Endpoint | Description |
|--------|----------|-------------|
| POST | `/events` | 摄入安全事件(返回 202) |
| GET | `/health` | 服务健康 + Kafka 连接状态 |
### Stats API (端口 3003)
| Method | Endpoint | Description |
|--------|----------|-------------|
| GET | `/stats/today` | 今日聚合威胁摘要 |
| GET | `/stats/week` | 过去 7 天的每日统计 |
| GET | `/stats/top-attackers` | 按事件计数排名前 10 的 IP |
| GET | `/stats/attack-types` | 按类型分组的事件计数 |
| GET | `/stats/timeline` | 每小时事件计数(过去 24 小时) |
| GET | `/stats/recent-events` | 最近 20 个事件 |
| GET | `/stats/alerts` | 最近的威胁警报 |
| WS | `ws://localhost:3003` | 实时统计更新(1 秒批处理) |
## 构建本项目所学到的内容
本项目推动我从作业队列架构(BullMQ)转向事件流(Kafka),从文档数据库(MongoDB)转向关系建模(PostgreSQL),并从单体模块转向具有独立 Dockerfile 和部署生命周期的真正微服务。
最有价值的见解是:**预聚合表改变了一切**。从 `daily_stats` 读取一行而不是计数数百万个事件,是 5ms 加载的仪表板与 3 秒加载的仪表板之间的区别。我研究的每个生产分析系统都使用了某种形式的这种模式。
## 作者
**Ali Romia** —— 软件工程师
- GitHub: [github.com/Aliromia21](https://github.com/Aliromia21)
- LinkedIn: [linkedin.com/in/aliromia](https://www.linkedin.com/in/aliromia/)
## 许可证
MIT License © Ali Romia 2026
标签:DevSecOps, Docker, Kafka, MITM代理, PostgreSQL, SonarQube插件, 上游代理, 事件流处理, 互联网扫描, 可视化仪表盘, 后端基础设施, 大数据架构, 威胁情报, 安全运营中心, 安全防御评估, 实时检测, 开发者工具, 攻击模式识别, 数据管道, 测试用例, 生产级架构, 红队行动, 网络安全, 网络映射, 自动化攻击, 请求拦截, 软件工程, 软件成分分析, 隐私保护