Aliromia21/threatstream

GitHub: Aliromia21/threatstream

这是一个演示如何构建生产级实时威胁检测后端架构的项目,通过Kafka流式传输安全事件,进行模式检测并在仪表板上可视化分析。

Stars: 3 | Forks: 0

# ThreatStream [![CI](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/f728ff8c52013946.svg)](https://github.com/Aliromia21/threatstream/actions/workflows/ci.yml) ## 为什么存在这个项目 安全运营中心每秒监控数千个事件——登录失败、端口扫描、可疑请求。本项目演示了如何构建支持实时威胁检测和可视化的后端基础设施,采用了与 CrowdStrike、Splunk 和 Datadog 等公司生产系统中相同的架构模式。 ## 系统架构 ``` ┌──────────────────────────────────────────────────────┐ │ Docker Network │ │ │ │ ┌────────────┐ ┌─────────┐ ┌────────────┐ │ │ │ Event API │───▶│ Kafka │───▶│ Consumer │ │ │ │ :3001 │ │ :9092 │ │ │ │ │ └────────────┘ └─────────┘ └─────┬──────┘ │ │ ▲ │ │ │ │ ▼ │ │ ┌────────────┐ ┌──────────────┐ │ │ │ Simulator │ │ PostgreSQL │ │ │ │ (threats) │ │ :5432 │ │ │ └────────────┘ └───────┬──────┘ │ │ │ │ │ LISTEN/NOTIFY │ │ │ │ │ ┌────────────┐ ┌───────▼──────┐ │ │ │ Dashboard │◀── WebSocket ──│ Stats API │ │ │ │ :8080 │ │ :3003 │ │ │ └────────────┘ └──────────────┘ │ │ │ └──────────────────────────────────────────────────────┘ ``` ### 数据流 1. **Simulator** 生成逼真的攻击模式(暴力破解、端口扫描、正常流量)并通过 HTTP 发送 2. **Event API** 验证、丰富(添加 UUID、时间戳)事件并将其生产到 Kafka 主题——响应时间 <5ms 3. **Kafka** 在 3 个主题间流式传输事件:`auth-events`、`network-events`、`threat-alerts`——按源 IP 分区 4. **Consumer** 读取事件,写入 PostgreSQL(原始事件 + 预聚合计数器),并运行滑动窗口威胁检测 5. **PostgreSQL** 存储原始事件、每日统计、攻击源和威胁警报——通过 LISTEN/NOTIFY 通知 Stats API 6. **Stats API** 提供历史数据的 REST 端点,并通过 WebSocket 将实时更新推送到仪表板 7. **Dashboard** 渲染实时威胁分析,包含自动更新图表、威胁级别指示器和实时事件流 ## 功能 ### 事件摄入 - 高性能 HTTP 端点——验证并生产到 Kafka,响应时间 <5ms - 使用 UUID 和服务端时间戳丰富事件 - 基于事件类型的主题路由(auth → `auth-events`,network → `network-events`) - 分区键 = 源 IP——保证每个攻击者的事件顺序 ![ThreatStream Dashboard](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/266d64e388013947.png) ### 威胁检测 - **暴力破解检测**——2 分钟内同一 IP 发生 10 次以上认证失败触发 HIGH 严重级别警报 - **端口扫描检测**——1 分钟内同一 IP 扫描 8 个以上不同端口触发 MEDIUM 严重级别警报 - 内存滑动窗口,自动清理过期条目 - 警报持久化到 PostgreSQL,包含严重级别、描述和相关元数据 ### 实时仪表板 - 基于失败率的实时威胁级别指示器(LOW / MEDIUM / HIGH / CRITICAL) - 事件时间线图表(过去 24 小时,按小时粒度) - 攻击类型细分(柱状图) - Top 攻击者表格,包含事件计数和最后出现信息 - 滚动实时事件流,带颜色编码的类型标签 - WebSocket 连接,支持自动重连和连接状态指示器 - 加载骨架和数值变化时的闪烁动画 ### 数据架构 - **双写模式**——存储原始事件用于历史分析 + 预聚合计数器用于 O(1) 仪表板读取 - **UPSERT with ON CONFLICT**——原子计数器更新,无竞态条件 - **PostgreSQL LISTEN/NOTIFY**——在无需额外基础设施的情况下连接 Consumer 和 Stats API - **1 秒 WebSocket 批处理**——收集事件然后广播摘要,防止客户端被淹没 ### 基础设施 - 7 个容器由 Docker Compose 编排(单命令启动) - 多阶段 Docker 构建——生产镜像约 150MB,而包含开发依赖的约 500MB - 带依赖顺序的健康检查——服务仅在其依赖项就绪后启动 - 优雅关闭——Kafka 生产者和消费者在 SIGTERM/SIGINT 时干净断开 ## 快速开始 ``` # 克隆 repository git clone https://github.com/Aliromia21/threatstream.git cd threatstream # 启动所有服务 docker compose up --build # 使用 threat simulator 启动(生成实时攻击数据) docker compose --profile demo up --build ``` | Service | URL | |---------|-----| | Dashboard | http://localhost:8080 | | Event API | http://localhost:3001 | | Stats API | http://localhost:3003 | | Health Check | http://localhost:3001/health | ### 发送手动事件 ``` curl -X POST http://localhost:3001/events \ -H "Content-Type: application/json" \ -d '{ "type": "auth_failure", "sourceIp": "185.220.101.42", "timestamp": "2026-03-20T10:00:00Z", "metadata": { "username": "admin", "userAgent": "curl/7.88" } }' ``` ## 技术栈 | Layer | Technology | Why | |-------|-----------|-----| | Event Streaming | Apache Kafka | 不可变事件日志,具有独立消费者组——事件被存储,处理后不删除 | | Database | PostgreSQL 16 | JSONB 用于灵活元数据,UPSERT 用于原子计数器,LISTEN/NOTIFY 用于实时桥接 | | Backend | Node.js, TypeScript, Express | 非阻塞 I/O,用于高吞吐量事件处理 | | Frontend | React 18, Vite, Tailwind CSS, Recharts | 快速构建,实用优先样式,声明式图表 | | Real-time | WebSockets (ws library) | 服务器推送,无轮询开销——50KB 库 vs 300KB socket.io | | DevOps | Docker, Docker Compose, GitHub Actions | 可重现环境,自动化测试,单命令部署 | ## 项目结构 ``` threatstream/ ├── docker-compose.yml # Orchestrates all 7 containers ├── services/ │ ├── event-api/ # HTTP → Kafka producer │ │ ├── src/ │ │ │ ├── config/ # Fail-fast environment validation │ │ │ ├── kafka/ # Producer singleton + topic router │ │ │ ├── routes/ # /health, /events endpoints │ │ │ ├── validators/ # Event schema validation │ │ │ ├── middleware/ # Centralized error handler │ │ │ └── __tests__/ # Unit + integration tests │ │ └── Dockerfile │ ├── consumer/ # Kafka → PostgreSQL + threat detection │ │ ├── src/ │ │ │ ├── kafka/ # Consumer group subscription │ │ │ ├── database/ # Connection pool │ │ │ ├── processors/ # Event processor + threat detector │ │ │ └── __tests__/ # Threat detection tests │ │ └── Dockerfile │ ├── stats-api/ # REST + WebSocket server │ │ ├── src/ │ │ │ ├── routes/ # /stats/* endpoints │ │ │ ├── websocket/ # WS server + PG LISTEN/NOTIFY bridge │ │ │ └── database/ # Read-only connection pool │ │ └── Dockerfile │ ├── simulator/ # Realistic threat event generator │ │ ├── src/ │ │ │ ├── patterns/ # Brute force, port scan, normal traffic │ │ │ └── config/ │ │ └── Dockerfile │ └── dashboard/ # React frontend │ ├── src/ │ │ ├── hooks/ # useWebSocket, useStats │ │ └── components/ # StatCard, ThreatLevel, Charts, etc. │ └── Dockerfile ├── infrastructure/ │ └── postgres/ │ └── init.sql # Schema: 5 tables with indexes └── shared/ └── types/ # Shared TypeScript event definitions ``` ## 数据库架构 | Table | Purpose | Access Pattern | |-------|---------|---------------| | `events` | 每个原始事件的不可变日志(JSONB 元数据) | 写入密集,用于查询最近事件和时间线 | | `daily_stats` | 预聚合的每日计数器 | O(1) 读取用于仪表板摘要 | | `attack_sources` | 每日 Top 攻击者(IP + 事件计数) | 由仪表板 top-attackers 面板读取 | | `sessions` | 活跃攻击会话跟踪 | 每个事件更新,用于查询活跃威胁 | | `threat_alerts` | 来自异常检测的分类威胁 | 由威胁检测器写入,由警报面板读取 | ## 测试 ``` # 运行所有测试 cd services/event-api && npx jest --coverage cd services/consumer && npx jest --coverage ``` | Layer | Count | What It Tests | |-------|-------|--------------| | Unit | 23+ | 事件验证(类型、IP、时间戳)、主题路由、威胁检测阈值和窗口 | | Integration | 4+ | 带模拟 Kafka 的 HTTP 端点——验证请求/响应周期 | | Docker | 5 builds | 所有服务在 CI 中成功编译和构建 | ### CI/CD 每次推送到 `main` 都会触发 GitHub Actions 流水线:lint → test → Docker 构建验证。 ## 关键设计决策 | Decision | Trade-off | Alternative | |----------|-----------|-------------| | **Kafka over BullMQ** | 基础设施更多,但事件是不可变日志——多个消费者可以独立读取 | BullMQ 在处理后删除作业 | | **sourceIp as partition key** | 如果一个 IP 占主导地位,存在热分区风险 | 随机分区——负载分布更好,但无顺序保证 | | **Pre-aggregated tables** | 双写复杂性,但仪表板读取为 O(1) | 物化视图——代码更少,但对刷新时间的控制更少 | | **PostgreSQL LISTEN/NOTIFY** | 通过数据库耦合 Consumer 和 Stats API | 服务间 HTTP——更灵活但增加了另一个端点 | | **ws over socket.io** | 没有内置自动重连或房间,但 50KB vs 300KB | socket.io——API 更简单但更重 | | **In-memory sliding windows** | 重启时状态丢失,但检测 <1ms | Redis——实例间共享状态但增加了基础设施 | | **Manual validation over Zod** | 代码更多,但完全理解验证内容 | Zod——代码更少,自动 TypeScript 推断 | ## API 参考 ### Event API (端口 3001) | Method | Endpoint | Description | |--------|----------|-------------| | POST | `/events` | 摄入安全事件(返回 202) | | GET | `/health` | 服务健康 + Kafka 连接状态 | ### Stats API (端口 3003) | Method | Endpoint | Description | |--------|----------|-------------| | GET | `/stats/today` | 今日聚合威胁摘要 | | GET | `/stats/week` | 过去 7 天的每日统计 | | GET | `/stats/top-attackers` | 按事件计数排名前 10 的 IP | | GET | `/stats/attack-types` | 按类型分组的事件计数 | | GET | `/stats/timeline` | 每小时事件计数(过去 24 小时) | | GET | `/stats/recent-events` | 最近 20 个事件 | | GET | `/stats/alerts` | 最近的威胁警报 | | WS | `ws://localhost:3003` | 实时统计更新(1 秒批处理) | ## 构建本项目所学到的内容 本项目推动我从作业队列架构(BullMQ)转向事件流(Kafka),从文档数据库(MongoDB)转向关系建模(PostgreSQL),并从单体模块转向具有独立 Dockerfile 和部署生命周期的真正微服务。 最有价值的见解是:**预聚合表改变了一切**。从 `daily_stats` 读取一行而不是计数数百万个事件,是 5ms 加载的仪表板与 3 秒加载的仪表板之间的区别。我研究的每个生产分析系统都使用了某种形式的这种模式。 ## 作者 **Ali Romia** —— 软件工程师 - GitHub: [github.com/Aliromia21](https://github.com/Aliromia21) - LinkedIn: [linkedin.com/in/aliromia](https://www.linkedin.com/in/aliromia/) ## 许可证 MIT License © Ali Romia 2026
标签:DevSecOps, Docker, Kafka, MITM代理, PostgreSQL, SonarQube插件, 上游代理, 事件流处理, 互联网扫描, 可视化仪表盘, 后端基础设施, 大数据架构, 威胁情报, 安全运营中心, 安全防御评估, 实时检测, 开发者工具, 攻击模式识别, 数据管道, 测试用例, 生产级架构, 红队行动, 网络安全, 网络映射, 自动化攻击, 请求拦截, 软件工程, 软件成分分析, 隐私保护