Seshmanuvarthi/cyber-threat-intelligence

GitHub: Seshmanuvarthi/cyber-threat-intelligence

基于Kafka、Spark、Flink CEP和GraphX构建的实时网络威胁情报管道,集成攻击分类、复合模式检测、图计算分析与可视化仪表盘。

Stars: 0 | Forks: 0

# 网络威胁情报管道 一个实时大数据分析管道,用于接收实时网络流量、检测网络攻击、映射全球威胁流,并在实时仪表盘上显示所有内容——每 2 秒更新一次。 ## 架构概述 ``` CSV Datasets (London + Singapore) │ ▼ [ Apache Kafka ] ← message queue / event highway │ ┌──────┴──────┐ ▼ ▼ [ Spark ] [ Flink CEP ] ← two independent consumers Classifies Detects patterns every event across time windows │ │ ▼ ▼ output_v4/ flink_alerts.json threats/ │ │ └──────┬──────┘ ▼ [ GraphX ] ← batch graph analysis (run manually) PageRank · Components │ ▼ output_graph/ graph_data.json │ ▼ [ FastAPI ] ← WebSocket + REST backend │ ▼ [ React Dashboard ] ← live UI, updates every 2s ``` ## 使用技术 | 层级 | 技术 | 用途 | |---|---|---| | 接收层 | Apache Kafka | 来自两个数据集的并发事件流 | | 处理层 | Apache Spark Structured Streaming | 基于规则的威胁分类 | | CEP | Flink 风格 CEP 引擎 | 多事件攻击模式检测 | | 图计算 | GraphX 风格 PySpark | PageRank · 连通分量 · 威胁中心性 | | 后端 | FastAPI + WebSocket | 向仪表盘实时推送数据 | | 前端 | React + Tailwind CSS | 实时威胁情报仪表盘 | | 地图 | React Leaflet + CartoDB | 带动画弧线的全球攻击地图 | | 图表 | Recharts | 时间线和分布图 | ## 检测内容 ### Spark — 事件分类 每个网络事件都会被实时分类: | 攻击类型 | 触发条件 | 严重性 | |---|---|---| | TLS 漏洞利用 | TLS 握手 payload (`\x16\x03`) | CRITICAL | | 区块链漏洞利用 | JSONRPC / `eth_` payload | CRITICAL | | 暴力破解 | 认证端口 (SSH/RDP/MySQL/FTP...) | HIGH | | HTTP 注入 | HTTP POST payload | HIGH | | 端口扫描 | 系统端口 (< 1024) | MEDIUM | | 侦察探测 | HTTP GET payload | MEDIUM | | 网络探测 | 其他所有内容 | LOW | ### Flink CEP — 模式检测 使用滑动时间窗口检测跨事件序列的攻击活动: | 模式 | 触发条件 | 窗口期 | |---|---|---| | 端口扫描 | 同一 IP 访问 5 个以上不同端口 | 30 秒 | | 暴力破解 | 同一 IP 扫描 2 个以上认证服务 | 60 秒 | | DDoS 爆发 | 4 个以上唯一 IP 泛洪同一端口 | 60 秒 | ### GraphX — 网络分析 将威胁数据视为有向图(国家 = 节点,攻击 = 边)并进行计算: - **PageRank** — 哪个国家是网络中最核心的威胁行为者 - **入/出度** — 每个国家发送与接收的攻击数量 - **连通分量** — 哪些国家属于同一个攻击集群 - **顶级攻击者 IP** — PageRank 得分最高的最频繁源 IP ## 仪表盘面板 | 面板 | 数据源 | 显示内容 | |---|---|---| | 统计卡片 | Spark + Flink | 总事件数、Critical/High/Medium 级别计数、唯一 IP 数、CEP 警报数 | | 全球攻击地图 | Spark | 世界地图上的动画弧线,按严重性着色 | | Flink CEP 警报 | Flink CEP | 端口扫描 / 暴力破解 / DDoS 检测的实时信息流 | | GraphX 网络 | GraphX | 圆形图 — 节点大小 = PageRank,箭头 = 攻击流向 | | 攻击时间线 | Spark (React) | 每分钟每种攻击类型的事件数 | | 攻击分布 | Spark (React) | 攻击类型频率的条形图 | | 顶级攻击者 | Spark + GraphX | 带有命中计数和 PageRank 条形的排名 IP | | 实时威胁信息流 | Spark | 每个分类事件的滚动日志,最新事件位于顶部 | ## 项目结构 ``` cyber-threat-intelligence/ │ ├── producer_v2.py # Kafka producer — streams both CSVs + attack simulator ├── spark_processor.py # Spark Structured Streaming — classifies threats ├── flink_cep.py # Flink-style CEP engine — detects attack patterns ├── graphx_analysis.py # GraphX-style PySpark — PageRank + graph analytics ├── main.py # FastAPI backend — WebSocket + REST endpoints ├── requirements.txt # Python dependencies ├── COMMANDS.md # Step-by-step terminal commands │ └── dashboard/ ├── src/ │ ├── App.jsx # Root — WebSocket + polling logic │ └── components/ │ ├── Header.jsx # Pipeline status + live event counter │ ├── StatsCards.jsx # 6 metric cards │ ├── AttackMap.jsx # Leaflet world map with bezier arcs │ ├── AlertFeed.jsx # Flink CEP alert feed │ ├── NetworkGraph.jsx # SVG circular graph with PageRank │ ├── ThreatTimeline.jsx # Recharts line chart per minute │ ├── ThreatChart.jsx # Recharts bar chart by attack type │ ├── TopAttackers.jsx # IP table with PageRank bars │ └── ThreatFeed.jsx # Live scrolling threat log ├── tailwind.config.js └── package.json ``` ## 设置与安装 ### 前置条件 - Python 3.9+ - Node.js 18+ - Apache Kafka + Zookeeper(在 Mac 上通过 Homebrew 安装) - Java 11+(Spark 运行所需) ### 1. 克隆仓库 ``` git clone https://github.com/Seshmanuvarthi/cyber-threat-intelligence.git cd cyber-threat-intelligence ``` ### 2. 添加数据集 将您的网络流量 CSV 文件放入 `data/` 文件夹: ``` data/ ├── london.csv # London network traffic logs └── singapore.csv # Singapore network traffic logs ``` 所需的 CSV 列:`from`(IP 地址),`port`,`payload` ### 3. Python 环境 ``` python -m venv venv source venv/bin/activate pip install -r requirements.txt ``` ### 4. 仪表盘依赖项 ``` cd dashboard npm install cd .. ``` ## 运行管道 ### 终端 1 — Kafka ``` brew services start zookeeper brew services start kafka ``` ### 终端 2 — Spark 处理器 ``` source venv/bin/activate python spark_processor.py ``` ### 终端 3 — Kafka 生产者 ``` source venv/bin/activate python producer_v2.py ``` ### 终端 4 — Flink CEP ``` source venv/bin/activate python flink_cep.py ``` ### 终端 5 — FastAPI 后端 ``` source venv/bin/activate uvicorn main:app --port 8000 ``` ### 终端 6 — React 仪表盘 ``` cd dashboard npm run dev ``` 打开 **http://localhost:5173** ### 终端 7 — GraphX(在收集约 1 分钟数据后运行) ``` source venv/bin/activate python graphx_analysis.py ``` ## API 端点 | 方法 | 端点 | 描述 | |---|---|---| | GET | `/threats` | 所有已分类的威胁事件 | | GET | `/alerts` | Flink CEP 模式警报 | | GET | `/graph` | GraphX 网络图数据 | | GET | `/stats` | 聚合统计数据 | | POST | `/graphx/run` | 触发 GraphX 批处理分析 | | WS | `/ws` | WebSocket — 每 2 秒推送所有数据 | ## 关键设计决策 **基于会话的数据隔离** — 每次启动 API 时,都会记录一个 `SESSION_START` 时间戳。仅提供在此时间戳之后写入的威胁文件,因此仪表盘始终从零开始。 **带轮询回退的 WebSocket** — 仪表盘通过 WebSocket 连接进行实时推送。如果 WebSocket 连接失败,它会自动回退到每 2.5 秒进行一次 HTTP 轮询,并每 5 秒重试 WebSocket 连接。 **圆形图布局** — NetworkGraph 使用固定的圆形 SVG 布局,而非力导向物理布局。力导向布局在近完全图(每个国家都攻击其他所有国家)上会失败,因为边的吸引力远大于节点的排斥力。无论边的密度如何,圆形布局都能提供清晰、可读的结果。 **攻击模拟** — 生产者每 25 秒在真实 CSV 数据旁边注入合成的暴力破解和 DDoS 爆发,因为原始数据集包含随机的高编号端口,这些端口永远不会自然产生这些模式。 ## 停止运行 ``` # 停止 services brew services stop kafka brew services stop zookeeper # 停止所有 Python processes Ctrl+C in each terminal # 停止 dashboard Ctrl+C in dashboard terminal ```
标签:AV绕过, Beacon Object File, CISA项目, CMS安全, FastAPI, Flink CEP, GraphX, HTTP/HTTPS抓包, IP 地址批量处理, JavaScript, Kafka, Leaflet, PageRank, PE 加载器, PFX证书, Python, React, Recharts, SonarQube插件, Spark, Syscalls, Tailwind CSS, WebSocket, 依赖分析, 可视化大屏, 大数据分析, 威胁分类, 威胁情报, 安全运营, 开发者工具, 异常检测, 态势感知, 扫描框架, 攻击检测, 数据工程, 数据流, 数据管道, 无后门, 流式处理, 红队行动, 网络图谱, 网络安全, 网络安全态势, 网络流量分析, 自定义脚本, 软件工程, 软件成分分析, 逆向工具, 隐私保护