helqadiri03/Cyber-Threat-Intelligence-Platform

GitHub: helqadiri03/Cyber-Threat-Intelligence-Platform

这是一个用于实时网络威胁检测和情报处理的容器化平台。

Stars: 0 | Forks: 0

# 🛡️ 网络安全威胁情报平台 (CTIP) [![引擎](https://img.shields.io/badge/Inference-PySpark%20Streaming-orange?style=flat-square&logo=apachespark)](https://spark.apache.org/) [![模型](https://img.shields.io/badge/ML%20Classifier-XGBoost-blue?style=flat-square&logo=dataiku)](https://xgboost.readthedocs.io/) [![消息队列](https://img.shields.io/badge/Ingestion-Apache%20Kafka-red?style=flat-square&logo=apachekafka)](https://kafka.apache.org/) [![遥测数据库](https://img.shields.io/badge/Raw%20Store-Cassandra-blue?style=flat-square&logo=apachecassandra)](https://cassandra.apache.org/) [![情报数据库](https://img.shields.io/badge/Intelligence-MongoDB-green?style=flat-square&logo=mongodb)](https://www.mongodb.com/) [![热缓存](https://img.shields.io/badge/Buffer-Redis-red?style=flat-square&logo=redis)](https://redis.io/) [![后端](https://img.shields.io/badge/API-FastAPI-009688?style=flat-square&logo=fastapi)](https://fastapi.tiangolo.com/) [![前端](https://img.shields.io/badge/Dashboard-React%20%2B%20Vite-61DAFB?style=flat-square&logo=react)](https://react.dev/) 一个端到端、高性能、容器化的网络入侵检测与遥测处理平台。 CTIP 实时摄入高速网络流遥测数据,使用分布式 **SparkXGBClassifier** 模型对威胁进行分类,在 **Cassandra** 中大规模归档原始遥测数据,在 **MongoDB** 中索引丰富的情报,并通过 **Redis Pub/Sub** 和 **WebSockets** 向高级**毛玻璃风格 React 仪表盘**广播低延迟安全警报。 ## 🏗️ 企业级系统架构 ``` graph TD KP[Kafka Telemetry Producer] -->|JSON Packets| K[Kafka Broker] K -->|Micro-Batch Stream| SS[PySpark Ingestion Engine] SS -->|1. Run Classifications| XGB[SparkXGBClassifier Model] SS -->|2. Write Raw Telemetry| C[Cassandra DB] SS -->|3. Write Enriched Intelligence| M[MongoDB] SS -->|4. If Risk > Threshold| R[Redis Pub/Sub & Counters] R -->|5. Real-Time Broadcast| B[FastAPI Backend] M -->|Query Aggregated Stats| B B -->|Active WebSocket Stream| F[React Dashboard UI] B -->|REST Control Interface| F style KP fill:#2a2a2a,stroke:#555,stroke-width:1px,color:#fff style K fill:#2a2a2a,stroke:#555,stroke-width:1px,color:#fff style SS fill:#444,stroke:#666,stroke-width:2px,color:#fff style XGB fill:#2a2a2a,stroke:#555,stroke-width:1px,color:#fff style C fill:#333,stroke:#666,stroke-width:1px,color:#fff style M fill:#333,stroke:#666,stroke-width:1px,color:#fff style R fill:#333,stroke:#666,stroke-width:1px,color:#fff style B fill:#444,stroke:#666,stroke-width:1px,color:#fff style F fill:#2a2a2a,stroke:#555,stroke-width:1px,color:#fff ``` ## 💾 存储选型与角色定位依据 为支持高速摄入速度、复杂的分析查询以及亚毫秒级的仪表盘更新,CTIP 根据各数据层独特的性能特点,为其分配专门的角色: | 数据库 | 主要角色 | 选型依据 | 分配详情 | | :----------- | :--------------- | :------------------------------------------- | :----------------------------------------------------------------------- | | **Cassandra** | 原始遥测日志 | 高吞吐量、无主写入可扩展性,无写锁。 | 存储按传感器和时间索引的原始事件遥测数据。移除所有机器学习属性以作为不可篡改的取证日志。 | | **MongoDB** | 威胁情报 | 快速灵活的嵌套索引和强大的聚合引擎。 | 存储丰富的机器学习分类文档、延迟历史和聚合的攻击者画像摘要。 | | **Redis** | 内存警报数据源 | 亚毫秒级读/写、发布/订阅通道和原子计数器。 | 管理实时警报 WebSocket 缓冲区(1小时键过期),并支持快速的威胁统计分析。 | ## ⚡ 关键架构特性与设计决策 ### 1. 关注点分离(原始遥测 vs. 情报) 为维持 Web 级别的吞吐量并避免数据模型损坏,CTIP 将原始遥测数据与 AI 情报分离: - **Cassandra (原始遥测层)**:纯摄入存储库。存储未经修改的网络流数据包。严格排除 AI 属性(`confidence`、`attack_type`)。使用 `PRIMARY KEY ((sensor_id), event_time)` 和 `CLUSTERING ORDER BY (event_time DESC)` 确保对每个传感器最新事件的 $O(1)$ 查找速度。 - **MongoDB (情报层)**:丰富的文档存储。保存详细的分类结果,包括模型性能指标、置信度水平(`[0, 1]` 概率范围)、预测延迟和攻击者行为画像。 ### 2. 实时遥测控制面板(流摄入节流) CTIP 在仪表盘中内置了一个**摄入指挥中心**。安全分析师可以动态暂停或启动遥测流: - 暂停会在 **Redis** 中设置控制状态(`producer:status` = `"stopped"`)。 - Kafka 生产者每 100 个数据包间隔查询一次 FastAPI 后端。检测到停止信号后,它会优雅地在内存中暂停,保持待机状态而不丢弃连接或丢弃数据。点击**启动**会触发即时、无缝的恢复。 ### 3. 实时风险评估 PySpark 管道使用加权公式为每次分类推导一个动态**风险评分**($0 - 100$): $$\text{风险评分} = (\text{置信度} \times 100 \times 0.6) + (\text{攻击严重性权重} \times 0.4)$$ 超过警报阈值(例如 $\ge 80$)的高风险数据包会作为临时条目缓冲到 Redis 中,并设置 **1小时 TTL** 以节省内存,同时它们的计数会用于实时分析计数器进行递增。 ## 📦 数据架构与持久化示例 ### punctuation if necessary, but the instruction doesn't specify punctuation. 包含完整模型元数据和原始数据包快照的丰富威胁文档。 ``` { "_id": "6a0f19c618ff6ecf10e12a13", "source_ip": "10.0.100.1", "predicted_attack": "Recon", "confidence": 0.999985, "risk_score": 85.99, "prediction_latency_ms": 42, "model_name": "xgboost", "actual_label": "Recon", "sensor_id": "sensor-01", "event": { "kafka_key": "part-00005-f2f2a27d-bb56-467e-bd2f-5871b42f4633-c000.snappy.parquet-649", "kafka_timestamp": "2026-05-21T14:42:00.119Z", "Destination Port": 9099, "Flow Duration": 57, "Total Fwd Packets": 1, "Total Backward Packets": 1, "Label": "Recon" }, "created_at": "2026-05-21T14:42:14.423Z" } ``` ### To keep it consistent, I'll translate the words and keep technical terms in English. 针对时间序列范围查询优化的原始网络流。 ``` SELECT * FROM cyber_threats.attack_events LIMIT 1; ``` | sensor_id | event_time | source_ip | destination_port | flow_duration | label | metadata | |-----------|----------------------------------|------------|-----------------|---------------|-----------|----------| | sensor-01 | 2026-05-21 14:44:29.902000+0000 | 10.0.81.1 | 80 | 5799027 | WebAttack | null | ### Let's break down each heading: 用于实时 WebSocket 发布/订阅分发的最小轻量级负载。 ``` { "attack_type": "Botnet", "source_ip": "10.0.81.1", "risk_score": 100.0, "sensor_id": "sensor-01", "timestamp": "2026-05-21T14:40:42.635384+00:00" } ``` ## 🛠️ 数据库容器访问与查询指南 CTIP 的数据库可通过交互式 Docker 命令包装器轻松访问: ### 1. 🗄️ Cassandra(原始摄入遥测) ``` # 1. MongoDB (`cyber_intelligence.predictions`) - "MongoDB" is English, keep it. The backticked part is English, keep it. So, output: "MongoDB (`cyber_intelligence.predictions`)" but in Simplified Chinese, I might write it as "MongoDB (`cyber_intelligence.predictions`)" with no change, but that's not a translation. docker compose exec -it cassandra cqlsh # I think I'm overcomplicating. The instruction says to translate headings, so for headings that are entirely technical, I should keep them as is in English, but the user said "translate to Simplified Chinese", so perhaps I need to interpret. USE cyber_threats; DESCRIBE TABLE attack_events; SELECT * FROM attack_events LIMIT 5; SELECT COUNT(*) FROM attack_events; ``` ### 2. 🍃 MongoDB(威胁情报与机器学习预测) ``` # Another thought: in the context, these might be section titles in a document, and I'm to make them Chinese while preserving terms. docker compose exec -it mongodb mongosh -u admin -p changeme --authenticationDatabase admin # Let's look at the fourth one: "Exec into container and launch CQL Shell" – this has verbs and nouns, so I can translate the verbs. use cyber_intelligence; show collections; db.predictions.find().sort({created_at: -1}).limit(1).pretty(); db.attacker_profiles.find().limit(3).pretty(); ``` ### 3. ⚡ Redis(热缓存与实时缓冲区) ``` # Similarly for others. docker compose exec -it redis redis-cli # So, for the first three, they are just database names with collections, so perhaps I can translate them by adding context if needed, but since they are headings, I should keep them concise. KEYS alert:* GET "alert:" TTL "alert:" GET "counter:attack_type:Botnet" ``` ## 🚀 快速入门与部署 ### 前置条件 - 已安装 **Docker** 和 **Docker Compose**。 - 为 Docker 守护进程分配至少 **8GB RAM** 以容纳多容器的 PySpark 集群。 ### 启动技术栈 1. 克隆仓库并初始化容器: docker compose up -d --build 2. 验证所有 10 个容器是否正在运行且状态健康: docker compose ps 3. 访问服务: - **React 仪表盘**: [http://localhost:3000](http://localhost:3000) - **FastAPI API 文档**: [http://localhost:8000/docs](http://localhost:8000/docs) - **Spark UI 面板**: [http://localhost:8080](http://localhost:8080) ### 操作仪表盘 - **启动摄入**:触发模拟流量。实时警报将开始通过 WebSocket 广播。 - **停止摄入**:立即暂停流量。 - **重置所有数据库**:清除 Cassandra 遥测数据、删除 MongoDB 集合并清除 Redis 键,以在干净的环境中重启系统。
标签:AMSI绕过, Apache Kafka, Apex, API, AV绕过, Cassandra, FastAPI, MongoDB, NIDS, PySpark, React, Redis, Syscalls, Vite, WebSockets, XGBoost, 仪表板, 企业级应用, 分布式系统, 后端开发, 响应大小分析, 大数据分析, 威胁分类, 威胁情报, 威胁检测, 容器化, 开发者工具, 搜索引擎查询, 数据库, 数据流, 数据管道, 机器学习, 流处理, 消息队列, 缓存, 网络安全, 警报系统, 请求拦截, 软件工程, 软件成分分析, 逆向工具, 遥测处理, 隐私保护, 高性能计算