Seshmanuvarthi/cyber-threat-intelligence
GitHub: Seshmanuvarthi/cyber-threat-intelligence
基于Kafka、Spark、Flink CEP和GraphX构建的实时网络威胁情报管道,集成攻击分类、复合模式检测、图计算分析与可视化仪表盘。
Stars: 0 | Forks: 0
# 网络威胁情报管道
一个实时大数据分析管道,用于接收实时网络流量、检测网络攻击、映射全球威胁流,并在实时仪表盘上显示所有内容——每 2 秒更新一次。
## 架构概述
```
CSV Datasets (London + Singapore)
│
▼
[ Apache Kafka ] ← message queue / event highway
│
┌──────┴──────┐
▼ ▼
[ Spark ] [ Flink CEP ] ← two independent consumers
Classifies Detects patterns
every event across time windows
│ │
▼ ▼
output_v4/ flink_alerts.json
threats/
│ │
└──────┬──────┘
▼
[ GraphX ] ← batch graph analysis (run manually)
PageRank · Components
│
▼
output_graph/
graph_data.json
│
▼
[ FastAPI ] ← WebSocket + REST backend
│
▼
[ React Dashboard ] ← live UI, updates every 2s
```
## 使用技术
| 层级 | 技术 | 用途 |
|---|---|---|
| 接收层 | Apache Kafka | 来自两个数据集的并发事件流 |
| 处理层 | Apache Spark Structured Streaming | 基于规则的威胁分类 |
| CEP | Flink 风格 CEP 引擎 | 多事件攻击模式检测 |
| 图计算 | GraphX 风格 PySpark | PageRank · 连通分量 · 威胁中心性 |
| 后端 | FastAPI + WebSocket | 向仪表盘实时推送数据 |
| 前端 | React + Tailwind CSS | 实时威胁情报仪表盘 |
| 地图 | React Leaflet + CartoDB | 带动画弧线的全球攻击地图 |
| 图表 | Recharts | 时间线和分布图 |
## 检测内容
### Spark — 事件分类
每个网络事件都会被实时分类:
| 攻击类型 | 触发条件 | 严重性 |
|---|---|---|
| TLS 漏洞利用 | TLS 握手 payload (`\x16\x03`) | CRITICAL |
| 区块链漏洞利用 | JSONRPC / `eth_` payload | CRITICAL |
| 暴力破解 | 认证端口 (SSH/RDP/MySQL/FTP...) | HIGH |
| HTTP 注入 | HTTP POST payload | HIGH |
| 端口扫描 | 系统端口 (< 1024) | MEDIUM |
| 侦察探测 | HTTP GET payload | MEDIUM |
| 网络探测 | 其他所有内容 | LOW |
### Flink CEP — 模式检测
使用滑动时间窗口检测跨事件序列的攻击活动:
| 模式 | 触发条件 | 窗口期 |
|---|---|---|
| 端口扫描 | 同一 IP 访问 5 个以上不同端口 | 30 秒 |
| 暴力破解 | 同一 IP 扫描 2 个以上认证服务 | 60 秒 |
| DDoS 爆发 | 4 个以上唯一 IP 泛洪同一端口 | 60 秒 |
### GraphX — 网络分析
将威胁数据视为有向图(国家 = 节点,攻击 = 边)并进行计算:
- **PageRank** — 哪个国家是网络中最核心的威胁行为者
- **入/出度** — 每个国家发送与接收的攻击数量
- **连通分量** — 哪些国家属于同一个攻击集群
- **顶级攻击者 IP** — PageRank 得分最高的最频繁源 IP
## 仪表盘面板
| 面板 | 数据源 | 显示内容 |
|---|---|---|
| 统计卡片 | Spark + Flink | 总事件数、Critical/High/Medium 级别计数、唯一 IP 数、CEP 警报数 |
| 全球攻击地图 | Spark | 世界地图上的动画弧线,按严重性着色 |
| Flink CEP 警报 | Flink CEP | 端口扫描 / 暴力破解 / DDoS 检测的实时信息流 |
| GraphX 网络 | GraphX | 圆形图 — 节点大小 = PageRank,箭头 = 攻击流向 |
| 攻击时间线 | Spark (React) | 每分钟每种攻击类型的事件数 |
| 攻击分布 | Spark (React) | 攻击类型频率的条形图 |
| 顶级攻击者 | Spark + GraphX | 带有命中计数和 PageRank 条形的排名 IP |
| 实时威胁信息流 | Spark | 每个分类事件的滚动日志,最新事件位于顶部 |
## 项目结构
```
cyber-threat-intelligence/
│
├── producer_v2.py # Kafka producer — streams both CSVs + attack simulator
├── spark_processor.py # Spark Structured Streaming — classifies threats
├── flink_cep.py # Flink-style CEP engine — detects attack patterns
├── graphx_analysis.py # GraphX-style PySpark — PageRank + graph analytics
├── main.py # FastAPI backend — WebSocket + REST endpoints
├── requirements.txt # Python dependencies
├── COMMANDS.md # Step-by-step terminal commands
│
└── dashboard/
├── src/
│ ├── App.jsx # Root — WebSocket + polling logic
│ └── components/
│ ├── Header.jsx # Pipeline status + live event counter
│ ├── StatsCards.jsx # 6 metric cards
│ ├── AttackMap.jsx # Leaflet world map with bezier arcs
│ ├── AlertFeed.jsx # Flink CEP alert feed
│ ├── NetworkGraph.jsx # SVG circular graph with PageRank
│ ├── ThreatTimeline.jsx # Recharts line chart per minute
│ ├── ThreatChart.jsx # Recharts bar chart by attack type
│ ├── TopAttackers.jsx # IP table with PageRank bars
│ └── ThreatFeed.jsx # Live scrolling threat log
├── tailwind.config.js
└── package.json
```
## 设置与安装
### 前置条件
- Python 3.9+
- Node.js 18+
- Apache Kafka + Zookeeper(在 Mac 上通过 Homebrew 安装)
- Java 11+(Spark 运行所需)
### 1. 克隆仓库
```
git clone https://github.com/Seshmanuvarthi/cyber-threat-intelligence.git
cd cyber-threat-intelligence
```
### 2. 添加数据集
将您的网络流量 CSV 文件放入 `data/` 文件夹:
```
data/
├── london.csv # London network traffic logs
└── singapore.csv # Singapore network traffic logs
```
所需的 CSV 列:`from`(IP 地址),`port`,`payload`
### 3. Python 环境
```
python -m venv venv
source venv/bin/activate
pip install -r requirements.txt
```
### 4. 仪表盘依赖项
```
cd dashboard
npm install
cd ..
```
## 运行管道
### 终端 1 — Kafka
```
brew services start zookeeper
brew services start kafka
```
### 终端 2 — Spark 处理器
```
source venv/bin/activate
python spark_processor.py
```
### 终端 3 — Kafka 生产者
```
source venv/bin/activate
python producer_v2.py
```
### 终端 4 — Flink CEP
```
source venv/bin/activate
python flink_cep.py
```
### 终端 5 — FastAPI 后端
```
source venv/bin/activate
uvicorn main:app --port 8000
```
### 终端 6 — React 仪表盘
```
cd dashboard
npm run dev
```
打开 **http://localhost:5173**
### 终端 7 — GraphX(在收集约 1 分钟数据后运行)
```
source venv/bin/activate
python graphx_analysis.py
```
## API 端点
| 方法 | 端点 | 描述 |
|---|---|---|
| GET | `/threats` | 所有已分类的威胁事件 |
| GET | `/alerts` | Flink CEP 模式警报 |
| GET | `/graph` | GraphX 网络图数据 |
| GET | `/stats` | 聚合统计数据 |
| POST | `/graphx/run` | 触发 GraphX 批处理分析 |
| WS | `/ws` | WebSocket — 每 2 秒推送所有数据 |
## 关键设计决策
**基于会话的数据隔离** — 每次启动 API 时,都会记录一个 `SESSION_START` 时间戳。仅提供在此时间戳之后写入的威胁文件,因此仪表盘始终从零开始。
**带轮询回退的 WebSocket** — 仪表盘通过 WebSocket 连接进行实时推送。如果 WebSocket 连接失败,它会自动回退到每 2.5 秒进行一次 HTTP 轮询,并每 5 秒重试 WebSocket 连接。
**圆形图布局** — NetworkGraph 使用固定的圆形 SVG 布局,而非力导向物理布局。力导向布局在近完全图(每个国家都攻击其他所有国家)上会失败,因为边的吸引力远大于节点的排斥力。无论边的密度如何,圆形布局都能提供清晰、可读的结果。
**攻击模拟** — 生产者每 25 秒在真实 CSV 数据旁边注入合成的暴力破解和 DDoS 爆发,因为原始数据集包含随机的高编号端口,这些端口永远不会自然产生这些模式。
## 停止运行
```
# 停止 services
brew services stop kafka
brew services stop zookeeper
# 停止所有 Python processes
Ctrl+C in each terminal
# 停止 dashboard
Ctrl+C in dashboard terminal
```
标签:AV绕过, Beacon Object File, CISA项目, CMS安全, FastAPI, Flink CEP, GraphX, HTTP/HTTPS抓包, IP 地址批量处理, JavaScript, Kafka, Leaflet, PageRank, PE 加载器, PFX证书, Python, React, Recharts, SonarQube插件, Spark, Syscalls, Tailwind CSS, WebSocket, 依赖分析, 可视化大屏, 大数据分析, 威胁分类, 威胁情报, 安全运营, 开发者工具, 异常检测, 态势感知, 扫描框架, 攻击检测, 数据工程, 数据流, 数据管道, 无后门, 流式处理, 红队行动, 网络图谱, 网络安全, 网络安全态势, 网络流量分析, 自定义脚本, 软件工程, 软件成分分析, 逆向工具, 隐私保护