# 基于AI的网络验证系统
An intelligent system that analyzes network traffic and identifies anomalies or potential security threats using AI/ML techniques. The system uses machine learning models to detect unusual patterns, predict potential attacks, and provide real-time threat intelligence.
## 功能特性
- **网络流量捕获与分析**:实时数据包捕获与协议识别
- **ML异常检测**:使用Isolation Forest模型检测异常流量模式
- **威胁预测**:Random Forest分类器识别8类威胁
- **实时威胁检测**:通过WebSocket更新进行持续监控
- **交互式仪表盘**:实时可视化与威胁监控
- **自动化响应系统**:IP封锁、速率限制与隔离
- **全面报告**:包含建议的执行摘要
## 项目结构
```
AI-Based Network Verification System/
├── app.py # Flask application entry point
├── config.py # Configuration settings
├── requirements.txt # Python dependencies
├── models/ # ML models
│ ├── anomaly_detector.py
│ └── threat_predictor.py
├── services/ # Business logic
│ ├── traffic_capture.py
│ ├── threat_detection.py
│ └── response_system.py
├── routes/ # Flask blueprints
│ ├── main.py
│ ├── dashboard.py
│ └── api.py
├── templates/ # HTML templates
│ ├── base.html
│ ├── index.html
│ ├── dashboard.html
│ ├── reports.html
│ └── about.html
├── static/ # CSS/JS assets
├── utils/ # Helper functions
└── data/ # Data storage
```
## 快速启动
1. **安装依赖**
pip install -r requirements.txt
2. **运行应用程序**
python app.py
3. **访问仪表盘**
在浏览器中打开 `http://localhost:5000/dashboard`。
## 机器学习模型
### 异常检测器 (Isolation Forest)
- 检测网络流量模式中的异常值
- 可配置污染率和阈值
- 从数据包元数据中提取特征
### 威胁预测器 (Random Forest)
- 分类8种威胁类型:
- 正常、端口扫描、DDoS、暴力破解
- 恶意软件、数据窃取、ARP欺骗、DNS隧道
- 带有概率分布的置信度评分
## API参考
| 方法 | 端点 | 描述 |
|--------|----------|-------------|
| GET | `/api/system/status` | 系统状态 |
| POST | `/api/traffic/start` | 开始捕获 |
| POST | `/api/traffic/stop` | 停止捕获 |
| POST | `/api/detection/start` | 开始检测 |
| POST | `/api/detection/stop` | 停止检测 |
| POST | `/api/reports/generate` | 生成报告 |
## 仪表盘
仪表盘提供:
- 实时数据包统计
- 实时威胁检测信息流
- 异常分数趋势
- 威胁类型分布图
- 严重性分布
- 自动化响应状态
- 手动测试数据包注入
## 自动化响应
| 严重性 | 操作 |
|----------|---------|
| 严重 | 警报、日志、封锁IP、隔离 |
| 高 | 警报、日志、封锁IP、速率限制 |
| 中 | 警告、日志、速率限制 |
| 低 | 警报、日志 |
| 正常 | 日志 |
# 基于AI的网络验证系统 - 完整开发指南
## 项目概述
一个使用AI/ML技术分析网络流量并识别异常或潜在安全威胁的智能系统。使用Flask、scikit-learn、Chart.js和SocketIO构建。
## 开发阶段
### 阶段1:项目设置与配置
- 创建文件夹结构 (models/, services/, routes/, templates/, utils/, static/)
- 编写requirements.txt包含所有依赖项
- 创建config.py集中配置
- 使用应用工厂模式初始化Flask应用
### 阶段2:ML模型开发
- 使用Isolation Forest实现AnomalyDetector
- 使用Random Forest分类器实现ThreatPredictor
- 添加模型持久化(使用pickle保存/加载)
- 创建特征提取管道
### 阶段3:服务层
- TrafficCaptureService — 合成数据包生成与统计
- ThreatDetectionService — 协调两个ML模型
- AutomatedResponseSystem — 威胁响应操作
### 阶段4:Web界面与API
- 主路由 (/, /about)
- 仪表盘路由 (/dashboard)
- API端点 (/api/*)
- 使用Bootstrap 5 + Chart.js的HTML模板
- 用于实时更新的SocketIO
### 阶段5:界面优化与修复
- 将深色背景的text-muted更改为text-light
- 添加自定义动画SVG图标(替代Font Awesome)
- 添加倾斜裁剪路径按钮
- 添加logo.png网站图标和导航栏品牌标识
- 添加无数据Chart.js插件
- 将版权年份更新至2026年
## 完整项目目录树
```
AI-Based Network Verification System/
|-- app.py # Main Flask application
|-- config.py # Configuration class
|-- requirements.txt # Python dependencies
|-- README.md # Project documentation
|-- DEVELOPMENT.md # This file
|
|-- models/ # ML Models
| |-- __init__.py
| |-- anomaly_detector.py # Isolation Forest anomaly detection
| |-- threat_predictor.py # Random Forest threat classification
|
|-- services/ # Business Logic
| |-- __init__.py
| |-- traffic_capture.py # Synthetic traffic generation
| |-- threat_detection.py # ML orchestration & analysis
| |-- response_system.py # Automated threat response
|
|-- routes/ # Flask Blueprints
| |-- __init__.py
| |-- main.py # Home, about pages
| |-- dashboard.py # Dashboard route
| |-- api.py # REST API endpoints
|
|-- templates/ # HTML Templates (Jinja2)
| |-- base.html # Base layout with navbar, footer
| |-- index.html # Landing page with feature cards
| |-- dashboard.html # Real-time monitoring dashboard
| |-- reports.html # Security reports & charts
| |-- about.html # System information page
| |-- logo/
| |-- logo.png # Website logo
|
|-- static/ # Static Assets
| |-- logo/
| |-- logo.png # Favicon & navbar logo
|
|-- utils/ # Helper Functions
| |-- __init__.py
| |-- helpers.py # format_timestamp, calculate_stats, etc.
|
|-- data/ # Data storage (created at runtime)
|-- reports/ # Generated reports (created at runtime)
|-- ml_models_saved/ # Saved ML models (created at runtime)
```
## 完整文件内容
### requirements.txt
```
Flask==2.3.3
Flask-SocketIO==5.3.6
numpy==1.24.3
pandas==2.0.3
scikit-learn==1.3.0
scapy==2.5.0
matplotlib==3.7.2
seaborn==0.12.2
plotly==5.16.1
joblib==1.3.2
Werkzeug==2.3.7
eventlet==0.33.3
requests==2.31.0
Flask-CORS==4.0.0
python-dateutil==2.8.2
threading2==0.1.2
psutil==5.9.5
```
### config.py
```
import os
class Config:
SECRET_KEY = os.environ.get('SECRET_KEY') or 'network-verification-ai-secret-2026'
DEBUG = True
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
MODEL_DIR = os.path.join(BASE_DIR, 'ml_models_saved')
DATA_DIR = os.path.join(BASE_DIR, 'data')
CAPTURE_INTERFACE = 'eth0'
CAPTURE_PACKET_COUNT = 100
CAPTURE_TIMEOUT = 30
ANOMALY_THRESHOLD = 0.7
THREAT_CONFIDENCE_THRESHOLD = 0.75
RETRAIN_INTERVAL = 3600
AUTO_BLOCK_ENABLED = True
ALERT_WEBHOOK = None
EMAIL_ALERTS = False
DASHBOARD_REFRESH_INTERVAL = 5000
MAX_LOG_ENTRIES = 1000
LOG_FILE = os.path.join(DATA_DIR, 'network_logs.csv')
REPORT_DIR = os.path.join(BASE_DIR, 'reports')
@staticmethod
def init_app(app):
os.makedirs(Config.MODEL_DIR, exist_ok=True)
os.makedirs(Config.DATA_DIR, exist_ok=True)
os.makedirs(Config.REPORT_DIR, exist_ok=True)
```
### app.py
```
import os
from flask import Flask
from flask_socketio import SocketIO
from config import Config
from services.traffic_capture import TrafficCaptureService
from services.threat_detection import ThreatDetectionService
from services.response_system import AutomatedResponseSystem
from routes.main import main_bp
from routes.dashboard import dashboard_bp
from routes.api import api_bp
socketio = SocketIO(cors_allowed_origins="*")
def create_app(config_class=Config):
app = Flask(__name__)
app.config.from_object(config_class)
config_class.init_app(app)
app.traffic_service = TrafficCaptureService(max_history=5000)
app.threat_service = ThreatDetectionService(
traffic_service=app.traffic_service,
model_dir=config_class.MODEL_DIR
)
app.response_service = AutomatedResponseSystem(enabled=True)
app.register_blueprint(main_bp)
app.register_blueprint(dashboard_bp)
app.register_blueprint(api_bp)
socketio.init_app(app)
background_thread = None
@socketio.on('connect')
def handle_connect():
socketio.emit('status', {'connected': True, 'message': 'Real-time monitoring active'})
@socketio.on('disconnect')
def handle_disconnect():
pass
@app.route('/favicon.ico')
def favicon():
return '', 204
return app
def run_background_emitter(app):
import time
with app.app_context():
while True:
try:
if app.traffic_service.is_capturing and app.threat_service.detection_active:
threats = app.threat_service.get_recent_threats(count=5)
if threats:
socketio.emit('new_threats', {'threats': threats})
stats = app.traffic_service.get_statistics()
socketio.emit('traffic_update', stats)
resp_stats = app.response_service.get_response_stats()
socketio.emit('response_update', resp_stats)
time.sleep(2)
except Exception:
time.sleep(2)
if __name__ == '__main__':
app = create_app()
app.traffic_service.start_capture(interval=0.5)
app.threat_service.start_detection(interval=1.0)
import threading
emitter_thread = threading.Thread(target=run_background_emitter, args=(app,))
emitter_thread.daemon = True
emitter_thread.start()
if not app.threat_service.anomaly_detector.is_fitted:
print("Training ML models with synthetic data...")
app.threat_service.train_models()
print("Model training complete. Ready to analyze traffic.")
print("=" * 60)
print("AI-Based Network Verification System")
print("=" * 60)
print("Access the dashboard at: http://localhost:5000/dashboard")
print("API documentation available at: http://localhost:5000/api/system/status")
print("=" * 60)
socketio.run(app, host='0.0.0.0', port=5000, debug=True, use_reloader=False)
```
### models/__init__.py
```
from .anomaly_detector import AnomalyDetector
from .threat_predictor import ThreatPredictor
__all__ = ['AnomalyDetector', 'ThreatPredictor']
```
### models/anomaly_detector.py
```
import os
import pickle
import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from datetime import datetime
import threading
class AnomalyDetector:
def __init__(self, model_dir='ml_models_saved', threshold=0.7):
self.model_dir = model_dir
self.threshold = threshold
self.model = None
self.scaler = StandardScaler()
self.pca = PCA(n_components=5)
self.is_fitted = False
self.lock = threading.Lock()
self.model_path = os.path.join(model_dir, 'anomaly_model.pkl')
self.scaler_path = os.path.join(model_dir, 'anomaly_scaler.pkl')
self._load_or_initialize()
def _load_or_initialize(self):
if os.path.exists(self.model_path) and os.path.exists(self.scaler_path):
try:
with open(self.model_path, 'rb') as f:
self.model = pickle.load(f)
with open(self.scaler_path, 'rb') as f:
self.scaler = pickle.load(f)
self.is_fitted = True
return
except Exception:
pass
self.model = IsolationForest(
n_estimators=100,
contamination=0.1,
random_state=42,
n_jobs=-1
)
self.is_fitted = False
def extract_features(self, packet_data):
if isinstance(packet_data, dict):
features = [
packet_data.get('packet_size', 0),
packet_data.get('duration', 0),
packet_data.get('protocol_type', 0),
packet_data.get('src_port', 0),
packet_data.get('dst_port', 0),
packet_data.get('flag_count', 0),
packet_data.get('byte_rate', 0),
packet_data.get('packet_rate', 0),
]
elif isinstance(packet_data, (list, np.ndarray)):
features = list(packet_data)
else:
features = [0] * 8
return np.array(features, dtype=float).reshape(1, -1)
def fit(self, data):
with self.lock:
if isinstance(data, list):
df = pd.DataFrame(data)
elif isinstance(data, pd.DataFrame):
df = data
else:
return False
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) < 2:
return False
X = df[numeric_cols].fillna(0).values
if len(X) < 10:
return False
X_scaled = self.scaler.fit_transform(X)
self.model.fit(X_scaled)
self.is_fitted = True
self._save_model()
return True
def predict(self, packet_data):
with self.lock:
if not self.is_fitted:
return {'is_anomaly': False, 'score': 0.5, 'confidence': 0.0}
try:
features = self.extract_features(packet_data)
features_scaled = self.scaler.transform(features)
prediction = self.model.predict(features_scaled)[0]
raw_score = self.model.decision_function(features_scaled)[0]
normalized_score = 1.0 / (1.0 + np.exp(raw_score * 2))
is_anomaly = normalized_score >= self.threshold
return {
'is_anomaly': bool(is_anomaly),
'score': round(float(normalized_score), 4),
'confidence': round(float(abs(raw_score)), 4),
'prediction': int(prediction)
}
except Exception as e:
return {'is_anomaly': False, 'score': 0.5, 'confidence': 0.0, 'error': str(e)}
def predict_batch(self, data_list):
results = []
for item in data_list:
results.append(self.predict(item))
return results
def _save_model(self):
os.makedirs(self.model_dir, exist_ok=True)
with open(self.model_path, 'wb') as f:
pickle.dump(self.model, f)
with open(self.scaler_path, 'wb') as f:
pickle.dump(self.scaler, f)
def get_model_status(self):
return {
'is_fitted': self.is_fitted,
'threshold': self.threshold,
'model_type': 'IsolationForest',
'last_updated': datetime.now().isoformat()
}
```
### models/threat_predictor.py
```
import os
import pickle
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split
from datetime import datetime
import threading
class ThreatPredictor:
THREAT_TYPES = [
'Normal', 'Port Scan', 'DDoS', 'Brute Force',
'Malware', 'Data Exfiltration', 'ARP Spoofing', 'DNS Tunneling'
]
def __init__(self, model_dir='ml_models_saved', threshold=0.75):
self.model_dir = model_dir
self.threshold = threshold
self.model = None
self.scaler = StandardScaler()
self.label_encoder = LabelEncoder()
self.is_fitted = False
self.lock = threading.Lock()
self.model_path = os.path.join(model_dir, 'threat_model.pkl')
self.scaler_path = os.path.join(model_dir, 'threat_scaler.pkl')
self.encoder_path = os.path.join(model_dir, 'threat_encoder.pkl')
self._load_or_initialize()
def _load_or_initialize(self):
if os.path.exists(self.model_path) and os.path.exists(self.scaler_path):
try:
with open(self.model_path, 'rb') as f:
self.model = pickle.load(f)
with open(self.scaler_path, 'rb') as f:
self.scaler = pickle.load(f)
with open(self.encoder_path, 'rb') as f:
self.label_encoder = pickle.load(f)
self.is_fitted = True
return
except Exception:
pass
self.model = RandomForestClassifier(
n_estimators=100,
max_depth=15,
random_state=42,
n_jobs=-1
)
self.label_encoder.fit(self.THREAT_TYPES)
self.is_fitted = False
def extract_features(self, packet_data):
if isinstance(packet_data, dict):
features = [
packet_data.get('packet_size', 0),
packet_data.get('duration', 0),
packet_data.get('protocol_type', 0),
packet_data.get('src_port', 0),
packet_data.get('dst_port', 0),
packet_data.get('flag_count', 0),
packet_data.get('byte_rate', 0),
packet_data.get('packet_rate', 0),
packet_data.get('connection_count', 0),
packet_data.get('unique_dst_ips', 0),
packet_data.get('unique_dst_ports', 0),
packet_data.get('syn_count', 0),
packet_data.get('ack_count', 0),
packet_data.get('rst_count', 0),
packet_data.get('fin_count', 0),
]
elif isinstance(packet_data, (list, np.ndarray)):
features = list(packet_data)
else:
features = [0] * 15
return np.array(features, dtype=float).reshape(1, -1)
def fit(self, data, labels=None):
with self.lock:
if isinstance(data, list):
df = pd.DataFrame(data)
elif isinstance(data, pd.DataFrame):
df = data
else:
return False
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) < 2:
return False
X = df[numeric_cols].fillna(0).values
if labels is not None:
y = np.array(labels)
elif 'label' in df.columns:
y = df['label'].values
else:
y = np.random.choice(self.THREAT_TYPES, size=len(X))
self.label_encoder.fit(y)
y_encoded = self.label_encoder.transform(y)
if len(np.unique(y_encoded)) < 2:
return False
X_train, X_test, y_train, y_test = train_test_split(
X, y_encoded, test_size=0.2, random_state=42, stratify=y_encoded
)
X_train_scaled = self.scaler.fit_transform(X_train)
self.model.fit(X_train_scaled, y_train)
X_test_scaled = self.scaler.transform(X_test)
accuracy = self.model.score(X_test_scaled, y_test)
self.is_fitted = True
self._save_model()
return {'success': True, 'accuracy': round(float(accuracy), 4)}
def predict(self, packet_data):
with self.lock:
if not self.is_fitted:
return {
'threat_type': 'Unknown',
'confidence': 0.0,
'is_threat': False,
'all_probabilities': {}
}
try:
features = self.extract_features(packet_data)
features_scaled = self.scaler.transform(features)
probabilities = self.model.predict_proba(features_scaled)[0]
prediction = self.model.predict(features_scaled)[0]
threat_type = self.label_encoder.inverse_transform([prediction])[0]
confidence = float(np.max(probabilities))
all_probs = {}
for idx, cls in enumerate(self.label_encoder.classes_):
all_probs[cls] = round(float(probabilities[idx]), 4)
is_threat = (threat_type != 'Normal' and confidence >= self.threshold)
return {
'threat_type': threat_type,
'confidence': round(confidence, 4),
'is_threat': is_threat,
'all_probabilities': all_probs
}
except Exception as e:
return {
'threat_type': 'Error',
'confidence': 0.0,
'is_threat': False,
'error': str(e)
}
def predict_batch(self, data_list):
results = []
for item in data_list:
results.append(self.predict(item))
return results
def _save_model(self):
os.makedirs(self.model_dir, exist_ok=True)
with open(self.model_path, 'wb') as f:
pickle.dump(self.model, f)
with open(self.scaler_path, 'wb') as f:
pickle.dump(self.scaler, f)
with open(self.encoder_path, 'wb') as f:
pickle.dump(self.label_encoder, f)
def get_model_status(self):
return {
'is_fitted': self.is_fitted,
'threshold': self.threshold,
'model_type': 'RandomForestClassifier',
'threat_types': self.THREAT_TYPES,
'last_updated': datetime.now().isoformat()
}
def feature_importance(self):
if not self.is_fitted or not hasattr(self.model, 'feature_importances_'):
return {}
feature_names = [
'packet_size', 'duration', 'protocol_type', 'src_port', 'dst_port',
'flag_count', 'byte_rate', 'packet_rate', 'connection_count',
'unique_dst_ips', 'unique_dst_ports', 'syn_count', 'ack_count',
'rst_count', 'fin_count'
]
importances = self.model.feature_importances_
return {
name: round(float(imp), 4)
for name, imp in zip(feature_names, importances)
}
```
### services/__init__.py
```
from .traffic_capture import TrafficCaptureService
from .threat_detection import ThreatDetectionService
from .response_system import AutomatedResponseSystem
__all__ = ['TrafficCaptureService', 'ThreatDetectionService', 'AutomatedResponseSystem']
```
### services/traffic_capture.py
```
import time
import random
import threading
import numpy as np
from datetime import datetime
from collections import deque
class TrafficCaptureService:
PROTOCOLS = {6: 'TCP', 17: 'UDP', 1: 'ICMP', 2: 'IGMP'}
def __init__(self, max_history=5000):
self.max_history = max_history
self.traffic_history = deque(maxlen=max_history)
self.is_capturing = False
self.capture_thread = None
self.packet_count = 0
self.lock = threading.Lock()
self.statistics = {
'total_packets': 0,
'total_bytes': 0,
'protocol_counts': {},
'src_ips': set(),
'dst_ips': set(),
'start_time': None
}
def _generate_synthetic_packet(self):
protocols = [6, 17, 1, 2]
weights = [0.7, 0.2, 0.08, 0.02]
protocol_num = np.random.choice(protocols, p=weights)
protocol_name = self.PROTOCOLS.get(protocol_num, 'OTHER')
base_size = np.random.normal(500, 300)
packet_size = max(40, int(base_size))
src_octets = [random.randint(1, 254) for _ in range(4)]
dst_octets = [random.randint(1, 254) for _ in range(4)]
if random.random() < 0.4:
src_octets[0:2] = [192, 168]
dst_octets[0:2] = [192, 168]
src_ip = '.'.join(map(str, src_octets))
dst_ip = '.'.join(map(str, dst_octets))
src_port = random.choice([80, 443, 22, 21, 53, 1234, 8080, random.randint(1024, 65535)])
dst_port = random.choice([80, 443, 22, 21, 53, 3306, 5432, 8080])
duration = max(0.001, random.expovariate(1.0 / 2.0))
flags = random.randint(0, 63)
flag_count = bin(flags).count('1')
return {
'timestamp': datetime.now().timestamp(),
'packet_size': packet_size,
'duration': round(duration, 4),
'protocol_type': protocol_num,
'protocol_name': protocol_name,
'src_ip': src_ip,
'dst_ip': dst_ip,
'src_port': src_port,
'dst_port': dst_port,
'flag_count': flag_count,
'byte_rate': round(packet_size / max(duration, 0.001), 2),
'packet_rate': round(random.uniform(0.1, 1000.0), 2),
'connection_count': random.randint(1, 50),
'unique_dst_ips': random.randint(1, 10),
'unique_dst_ports': random.randint(1, 5),
'syn_count': random.randint(0, 5),
'ack_count': random.randint(0, 5),
'rst_count': random.randint(0, 2),
'fin_count': random.randint(0, 3),
}
def start_capture(self, interval=0.5):
if self.is_capturing:
return False
self.is_capturing = True
self.statistics['start_time'] = datetime.now().timestamp()
self.capture_thread = threading.Thread(target=self._capture_loop, args=(interval,))
self.capture_thread.daemon = True
self.capture_thread.start()
return True
def _capture_loop(self, interval):
while self.is_capturing:
try:
packet = self._generate_synthetic_packet()
with self.lock:
self.traffic_history.append(packet)
self.packet_count += 1
self.statistics['total_packets'] += 1
self.statistics['total_bytes'] += packet['packet_size']
proto = packet['protocol_name']
self.statistics['protocol_counts'][proto] = \
self.statistics['protocol_counts'].get(proto, 0) + 1
self.statistics['src_ips'].add(packet['src_ip'])
self.statistics['dst_ips'].add(packet['dst_ip'])
time.sleep(interval)
except Exception:
continue
def stop_capture(self):
self.is_capturing = False
if self.capture_thread:
self.capture_thread.join(timeout=2)
return True
def get_recent_packets(self, count=100):
with self.lock:
history_list = list(self.traffic_history)
return history_list[-count:] if len(history_list) > count else history_list
def get_statistics(self):
with self.lock:
stats = dict(self.statistics)
stats['src_ips'] = len(stats['src_ips'])
stats['dst_ips'] = len(stats['dst_ips'])
stats['capture_active'] = self.is_capturing
stats['current_queue_size'] = len(self.traffic_history)
elapsed = time.time() - stats['start_time'] if stats['start_time'] else 0
stats['elapsed_seconds'] = round(elapsed, 2)
stats['packets_per_second'] = round(stats['total_packets'] / max(elapsed, 1), 2)
return stats
def get_traffic_summary(self, window_seconds=60):
cutoff = datetime.now().timestamp() - window_seconds
with self.lock:
recent = [p for p in self.traffic_history if p['timestamp'] >= cutoff]
if not recent:
return {'packet_count': 0, 'total_bytes': 0, 'avg_size': 0, 'protocols': {}}
sizes = [p['packet_size'] for p in recent]
protocols = {}
for p in recent:
proto = p['protocol_name']
protocols[proto] = protocols.get(proto, 0) + 1
return {
'packet_count': len(recent),
'total_bytes': sum(sizes),
'avg_size': round(sum(sizes) / len(sizes), 2),
'protocols': protocols
}
def inject_test_packet(self, packet_type='normal'):
packet = self._generate_synthetic_packet()
if packet_type == 'ddos':
packet['packet_size'] = random.randint(40, 80)
packet['packet_rate'] = random.randint(5000, 10000)
packet['connection_count'] = random.randint(200, 500)
packet['unique_dst_ips'] = 1
packet['syn_count'] = 10
elif packet_type == 'port_scan':
packet['packet_size'] = random.randint(40, 60)
packet['unique_dst_ports'] = random.randint(50, 100)
packet['connection_count'] = random.randint(100, 200)
packet['fin_count'] = 5
elif packet_type == 'exfiltration':
packet['packet_size'] = random.randint(1400, 1500)
packet['byte_rate'] = random.randint(10000, 50000)
packet['duration'] = random.uniform(10, 30)
with self.lock:
self.traffic_history.append(packet)
self.packet_count += 1
return packet
```
### services/threat_detection.py
```
import time
import threading
from datetime import datetime
from collections import deque
from models.anomaly_detector import AnomalyDetector
from models.threat_predictor import ThreatPredictor
class ThreatDetectionService:
def __init__(self, traffic_service, model_dir='ml_models_saved'):
self.traffic_service = traffic_service
self.model_dir = model_dir
self.anomaly_detector = AnomalyDetector(model_dir=model_dir)
self.threat_predictor = ThreatPredictor(model_dir=model_dir)
self.detected_threats = deque(maxlen=1000)
self.detection_active = False
self.detection_thread = None
self.lock = threading.Lock()
self.detection_stats = {
'total_scanned': 0,
'anomalies_detected': 0,
'threats_detected': 0,
'false_positives': 0,
'last_detection_time': None
}
def start_detection(self, interval=1.0, batch_size=10):
if self.detection_active:
return False
self.detection_active = True
self.detection_thread = threading.Thread(
target=self._detection_loop,
args=(interval, batch_size)
)
self.detection_thread.daemon = True
self.detection_thread.start()
return True
def _detection_loop(self, interval, batch_size):
while self.detection_active:
try:
packets = self.traffic_service.get_recent_packets(count=batch_size)
if packets:
self._analyze_batch(packets)
time.sleep(interval)
except Exception:
continue
def _analyze_batch(self, packets):
for packet in packets:
result = self._analyze_single(packet)
with self.lock:
self.detection_stats['total_scanned'] += 1
self.detection_stats['last_detection_time'] = datetime.now().isoformat()
if result.get('is_anomaly'):
self.detection_stats['anomalies_detected'] += 1
if result.get('is_threat'):
self.detection_stats['threats_detected'] += 1
self.detected_threats.append(result)
def _analyze_single(self, packet):
anomaly_result = self.anomaly_detector.predict(packet)
threat_result = self.threat_predictor.predict(packet)
combined_score = self._calculate_combined_score(anomaly_result, threat_result)
analysis = {
'timestamp': datetime.now().isoformat(),
'packet': packet,
'is_anomaly': anomaly_result.get('is_anomaly', False),
'anomaly_score': anomaly_result.get('score', 0),
'anomaly_confidence': anomaly_result.get('confidence', 0),
'threat_type': threat_result.get('threat_type', 'Unknown'),
'threat_confidence': threat_result.get('confidence', 0),
'is_threat': threat_result.get('is_threat', False),
'combined_score': round(combined_score, 4),
'all_probabilities': threat_result.get('all_probabilities', {})
}
if combined_score >= 0.9:
analysis['severity'] = 'Critical'
elif combined_score >= 0.75:
analysis['severity'] = 'High'
elif combined_score >= 0.5:
analysis['severity'] = 'Medium'
elif combined_score >= 0.3:
analysis['severity'] = 'Low'
else:
analysis['severity'] = 'Normal'
return analysis
def _calculate_combined_score(self, anomaly_result, threat_result):
anomaly_score = anomaly_result.get('score', 0)
threat_conf = threat_result.get('confidence', 0)
threat_type = threat_result.get('threat_type', 'Normal')
if threat_type != 'Normal':
return 0.4 * anomaly_score + 0.6 * threat_conf
return 0.6 * anomaly_score + 0.4 * threat_conf
def stop_detection(self):
self.detection_active = False
if self.detection_thread:
self.detection_thread.join(timeout=2)
return True
def get_recent_threats(self, count=50):
with self.lock:
threats = list(self.detected_threats)
return threats[-count:] if len(threats) > count else threats
def get_detection_stats(self):
with self.lock:
return dict(self.detection_stats)
def get_threat_summary(self):
with self.lock:
threats = list(self.detected_threats)
if not threats:
return {}
summary = {}
for t in threats:
threat_type = t.get('threat_type', 'Unknown')
summary[threat_type] = summary.get(threat_type, 0) + 1
return summary
def get_severity_distribution(self):
with self.lock:
threats = list(self.detected_threats)
if not threats:
return {}
distribution = {}
for t in threats:
sev = t.get('severity', 'Normal')
distribution[sev] = distribution.get(sev, 0) + 1
return distribution
def train_models(self, historical_data=None, labels=None):
results = {}
if historical_data:
results['anomaly_detector'] = {'success': self.anomaly_detector.fit(historical_data)}
results['threat_predictor'] = self.threat_predictor.fit(historical_data, labels)
else:
synthetic_data = self._generate_training_data(1000)
results['anomaly_detector'] = {'success': self.anomaly_detector.fit(synthetic_data)}
results['threat_predictor'] = self.threat_predictor.fit(synthetic_data)
return results
def _generate_training_data(self, count=1000):
import random
import numpy as np
data = []
threat_types = ['Normal', 'Port Scan', 'DDoS', 'Brute Force',
'Malware', 'Data Exfiltration', 'ARP Spoofing', 'DNS Tunneling']
for _ in range(count):
threat = random.choice(threat_types)
if threat == 'Normal':
packet_size = random.randint(40, 1500)
packet_rate = random.uniform(0.1, 100)
connection_count = random.randint(1, 10)
unique_dst_ports = random.randint(1, 3)
syn_count = random.randint(0, 2)
elif threat == 'DDoS':
packet_size = random.randint(40, 80)
packet_rate = random.uniform(1000, 10000)
connection_count = random.randint(200, 500)
unique_dst_ports = random.randint(1, 2)
syn_count = random.randint(5, 15)
elif threat == 'Port Scan':
packet_size = random.randint(40, 60)
packet_rate = random.uniform(50, 500)
connection_count = random.randint(50, 200)
unique_dst_ports = random.randint(50, 100)
syn_count = random.randint(3, 8)
elif threat == 'Data Exfiltration':
packet_size = random.randint(1000, 1500)
packet_rate = random.uniform(10, 100)
connection_count = random.randint(1, 5)
unique_dst_ports = random.randint(1, 2)
syn_count = random.randint(0, 1)
else:
packet_size = random.randint(100, 800)
packet_rate = random.uniform(1, 500)
connection_count = random.randint(5, 50)
unique_dst_ports = random.randint(1, 10)
syn_count = random.randint(0, 5)
data.append({
'packet_size': packet_size,
'duration': round(random.uniform(0.001, 10), 4),
'protocol_type': random.choice([6, 17, 1]),
'src_port': random.randint(1024, 65535),
'dst_port': random.choice([80, 443, 22, 53, 3306]),
'flag_count': random.randint(0, 6),
'byte_rate': round(packet_size / max(random.uniform(0.001, 10), 0.001), 2),
'packet_rate': round(packet_rate, 2),
'connection_count': connection_count,
'unique_dst_ips': random.randint(1, 10),
'unique_dst_ports': unique_dst_ports,
'syn_count': syn_count,
'ack_count': random.randint(0, syn_count),
'rst_count': random.randint(0, 2),
'fin_count': random.randint(0, 3),
'label': threat
})
return data
```
### services/response_system.py
```
import time
import threading
from datetime import datetime
from collections import deque
class AutomatedResponseSystem:
RESPONSE_ACTIONS = {
'alert': {'description': 'Generate alert notification', 'auto_execute': True},
'log': {'description': 'Log incident details', 'auto_execute': True},
'block_ip': {'description': 'Block source IP address', 'auto_execute': False},
'rate_limit': {'description': 'Apply rate limiting', 'auto_execute': False},
'quarantine': {'description': 'Quarantine affected host', 'auto_execute': False},
'isolate': {'description': 'Network isolation', 'auto_execute': False},
}
SEVERITY_ACTIONS = {
'Critical': ['alert', 'log', 'block_ip', 'isolate'],
'High': ['alert', 'log', 'block_ip', 'rate_limit'],
'Medium': ['alert', 'log', 'rate_limit'],
'Low': ['alert', 'log'],
'Normal': ['log']
}
def __init__(self, enabled=True):
self.enabled = enabled
self.response_history = deque(maxlen=500)
self.blocked_ips = set()
self.rate_limited_ips = {}
self.quarantined_hosts = set()
self.lock = threading.Lock()
self.response_stats = {
'total_responses': 0,
'alerts_sent': 0,
'ips_blocked': 0,
'rate_limits_applied': 0,
'hosts_quarantined': 0,
'last_response_time': None
}
def process_threat(self, threat_analysis):
if not self.enabled:
return {'status': 'disabled', 'actions': []}
severity = threat_analysis.get('severity', 'Normal')
src_ip = threat_analysis.get('packet', {}).get('src_ip', 'unknown')
threat_type = threat_analysis.get('threat_type', 'Unknown')
actions_to_take = self.SEVERITY_ACTIONS.get(severity, ['log'])
executed_actions = []
for action in actions_to_take:
config = self.RESPONSE_ACTIONS.get(action, {})
if config.get('auto_execute', False):
result = self._execute_action(action, threat_analysis)
executed_actions.append({
'action': action,
'status': 'executed',
'result': result,
'timestamp': datetime.now().isoformat()
})
else:
executed_actions.append({
'action': action,
'status': 'pending_approval',
'timestamp': datetime.now().isoformat()
})
response_record = {
'timestamp': datetime.now().isoformat(),
'threat_type': threat_type,
'severity': severity,
'src_ip': src_ip,
'actions': executed_actions,
'threat_analysis': threat_analysis
}
with self.lock:
self.response_history.append(response_record)
self.response_stats['total_responses'] += 1
self.response_stats['last_response_time'] = datetime.now().isoformat()
return {'status': 'processed', 'actions': executed_actions}
def _execute_action(self, action, threat_analysis):
src_ip = threat_analysis.get('packet', {}).get('src_ip', 'unknown')
if action == 'alert':
with self.lock:
self.response_stats['alerts_sent'] += 1
return {'alert_sent': True, 'message': f'Threat detected from {src_ip}'}
elif action == 'log':
return {'logged': True, 'log_level': 'warning'}
elif action == 'block_ip':
with self.lock:
self.blocked_ips.add(src_ip)
self.response_stats['ips_blocked'] += 1
return {'ip_blocked': src_ip, 'duration': 'indefinite'}
elif action == 'rate_limit':
with self.lock:
self.rate_limited_ips[src_ip] = {
'max_packets_per_second': 10,
'applied_at': datetime.now().isoformat()
}
self.response_stats['rate_limits_applied'] += 1
return {'rate_limited': src_ip, 'limit': '10 pps'}
elif action == 'quarantine':
with self.lock:
self.quarantined_hosts.add(src_ip)
self.response_stats['hosts_quarantined'] += 1
return {'quarantined': src_ip, 'network_segment': 'isolated'}
elif action == 'isolate':
return {'isolated': src_ip, 'network_access': 'denied'}
return {'status': 'unknown_action'}
def approve_pending_action(self, record_id, action_name):
with self.lock:
if record_id >= len(self.response_history):
return {'status': 'error', 'message': 'Invalid record ID'}
record = list(self.response_history)[record_id]
for action in record['actions']:
if action['action'] == action_name and action['status'] == 'pending_approval':
result = self._execute_action(action_name, record['threat_analysis'])
action['status'] = 'executed'
action['result'] = result
action['approved_at'] = datetime.now().isoformat()
return {'status': 'approved', 'result': result}
return {'status': 'error', 'message': 'Action not found or already executed'}
def unblock_ip(self, ip_address):
with self.lock:
if ip_address in self.blocked_ips:
self.blocked_ips.discard(ip_address)
return {'status': 'unblocked', 'ip': ip_address}
return {'status': 'not_found', 'ip': ip_address}
def remove_rate_limit(self, ip_address):
with self.lock:
if ip_address in self.rate_limited_ips:
del self.rate_limited_ips[ip_address]
return {'status': 'rate_limit_removed', 'ip': ip_address}
return {'status': 'not_found', 'ip': ip_address}
def get_response_history(self, count=50):
with self.lock:
history = list(self.response_history)
return history[-count:] if len(history) > count else history
def get_blocked_ips(self):
with self.lock:
return list(self.blocked_ips)
def get_rate_limited_ips(self):
with self.lock:
return dict(self.rate_limited_ips)
def get_response_stats(self):
with self.lock:
stats = dict(self.response_stats)
stats['currently_blocked'] = len(self.blocked_ips)
stats['currently_rate_limited'] = len(self.rate_limited_ips)
stats['currently_quarantined'] = len(self.quarantined_hosts)
return stats
def get_available_actions(self):
return self.RESPONSE_ACTIONS
def enable(self):
self.enabled = True
return {'status': 'enabled'}
def disable(self):
self.enabled = False
return {'status': 'disabled'}
```
### routes/__init__.py
```
from .main import main_bp
from .dashboard import dashboard_bp
from .api import api_bp
__all__ = ['main_bp', 'dashboard_bp', 'api_bp']
```
### routes/main.py
```
from flask import Blueprint, render_template, jsonify, request
from datetime import datetime
main_bp = Blueprint('main', __name__)
@main_bp.route('/')
def index():
return render_template('index.html')
@main_bp.route('/about')
def about():
return render_template('about.html')
@main_bp.route('/health')
def health_check():
return jsonify({
'status': 'healthy',
'timestamp': datetime.now().isoformat(),
'service': 'AI-Based Network Verification System'
})
```
### routes/dashboard.py
```
from flask import Blueprint, render_template, jsonify, current_app
from datetime import datetime
dashboard_bp = Blueprint('dashboard', __name__)
def get_services():
return (
current_app.traffic_service,
current_app.threat_service,
current_app.response_service
)
@dashboard_bp.route('/dashboard')
def dashboard():
return render_template('dashboard.html')
@dashboard_bp.route('/reports')
def reports():
return render_template('reports.html')
@dashboard_bp.route('/api/dashboard/stats')
def dashboard_stats():
traffic, threat, response = get_services()
traffic_stats = traffic.get_statistics()
detection_stats = threat.get_detection_stats()
response_stats = response.get_response_stats()
return jsonify({
'traffic': traffic_stats,
'detection': detection_stats,
'response': response_stats,
'timestamp': datetime.now().isoformat()
})
@dashboard_bp.route('/api/dashboard/traffic_summary')
def traffic_summary():
traffic, _, _ = get_services()
summary_1m = traffic.get_traffic_summary(window_seconds=60)
summary_5m = traffic.get_traffic_summary(window_seconds=300)
summary_15m = traffic.get_traffic_summary(window_seconds=900)
return jsonify({
'1_minute': summary_1m,
'5_minutes': summary_5m,
'15_minutes': summary_15m,
'timestamp': datetime.now().isoformat()
})
@dashboard_bp.route('/api/dashboard/recent_packets')
def recent_packets():
traffic, _, _ = get_services()
packets = traffic.get_recent_packets(count=50)
return jsonify({
'packets': packets,
'count': len(packets),
'timestamp': datetime.now().isoformat()
})
@dashboard_bp.route('/api/dashboard/recent_threats')
def recent_threats():
_, threat, _ = get_services()
threats = threat.get_recent_threats(count=50)
return jsonify({
'threats': threats,
'count': len(threats),
'timestamp': datetime.now().isoformat()
})
@dashboard_bp.route('/api/dashboard/threat_summary')
def threat_summary():
_, threat, _ = get_services()
type_summary = threat.get_threat_summary()
severity_dist = threat.get_severity_distribution()
return jsonify({
'threat_types': type_summary,
'severity_distribution': severity_dist,
'timestamp': datetime.now().isoformat()
})
@dashboard_bp.route('/api/dashboard/response_summary')
def response_summary():
_, _, response = get_services()
history = response.get_response_history(count=20)
blocked_ips = response.get_blocked_ips()
rate_limited = response.get_rate_limited_ips()
stats = response.get_response_stats()
return jsonify({
'recent_responses': history,
'blocked_ips': blocked_ips,
'rate_limited_ips': rate_limited,
'stats': stats,
'timestamp': datetime.now().isoformat()
})
@dashboard_bp.route('/api/dashboard/model_status')
def model_status():
_, threat, _ = get_services()
anomaly_status = threat.anomaly_detector.get_model_status()
predictor_status = threat.threat_predictor.get_model_status()
feature_importance = threat.threat_predictor.feature_importance()
return jsonify({
'anomaly_detector': anomaly_status,
'threat_predictor': predictor_status,
'feature_importance': feature_importance,
'timestamp': datetime.now().isoformat()
})
```
### routes/api.py
```
from flask import Blueprint, jsonify, request, current_app
from datetime import datetime
api_bp = Blueprint('api', __name__)
def get_services():
return (
current_app.traffic_service,
current_app.threat_service,
current_app.response_service
)
@api_bp.route('/api/traffic/start', methods=['POST'])
def start_traffic_capture():
traffic, _, _ = get_services()
interval = request.json.get('interval', 0.5) if request.json else 0.5
result = traffic.start_capture(interval=interval)
return jsonify({'status': 'started' if result else 'already_running'})
@api_bp.route('/api/traffic/stop', methods=['POST'])
def stop_traffic_capture():
traffic, _, _ = get_services()
traffic.stop_capture()
return jsonify({'status': 'stopped'})
@api_bp.route('/api/traffic/inject', methods=['POST'])
def inject_test_packet():
traffic, _, _ = get_services()
packet_type = request.json.get('type', 'normal') if request.json else 'normal'
packet = traffic.inject_test_packet(packet_type=packet_type)
return jsonify({'status': 'injected', 'packet': packet})
@api_bp.route('/api/traffic/stats')
def traffic_stats():
traffic, _, _ = get_services()
return jsonify(traffic.get_statistics())
@api_bp.route('/api/detection/start', methods=['POST'])
def start_detection():
_, threat, _ = get_services()
interval = request.json.get('interval', 1.0) if request.json else 1.0
result = threat.start_detection(interval=interval)
return jsonify({'status': 'started' if result else 'already_running'})
@api_bp.route('/api/detection/stop', methods=['POST'])
def stop_detection():
_, threat, _ = get_services()
threat.stop_detection()
return jsonify({'status': 'stopped'})
@api_bp.route('/api/detection/analyze', methods=['POST'])
def analyze_packet():
_, threat, _ = get_services()
data = request.json or {}
anomaly_result = threat.anomaly_detector.predict(data)
threat_result = threat.threat_predictor.predict(data)
return jsonify({
'anomaly': anomaly_result,
'threat': threat_result,
'timestamp': datetime.now().isoformat()
})
@api_bp.route('/api/detection/train', methods=['POST'])
def train_models():
_, threat, _ = get_services()
data = request.json.get('data') if request.json else None
labels = request.json.get('labels') if request.json else None
results = threat.train_models(historical_data=data, labels=labels)
return jsonify({'status': 'training_complete', 'results': results})
@api_bp.route('/api/response/enable', methods=['POST'])
def enable_response():
_, _, response = get_services()
return jsonify(response.enable())
@api_bp.route('/api/response/disable', methods=['POST'])
def disable_response():
_, _, response = get_services()
return jsonify(response.disable())
@api_bp.route('/api/response/block_ip', methods=['POST'])
def block_ip():
_, _, response = get_services()
ip = request.json.get('ip') if request.json else None
if ip:
result = response._execute_action('block_ip', {'packet': {'src_ip': ip}})
return jsonify({'status': 'blocked', 'ip': ip, 'result': result})
return jsonify({'status': 'error', 'message': 'IP address required'})
@api_bp.route('/api/response/unblock_ip', methods=['POST'])
def unblock_ip():
_, _, response = get_services()
ip = request.json.get('ip') if request.json else None
if ip:
return jsonify(response.unblock_ip(ip))
return jsonify({'status': 'error', 'message': 'IP address required'})
@api_bp.route('/api/response/blocked_ips')
def get_blocked_ips():
_, _, response = get_services()
return jsonify({'blocked_ips': response.get_blocked_ips()})
@api_bp.route('/api/system/status')
def system_status():
traffic, threat, response = get_services()
return jsonify({
'traffic_capture': {
'active': traffic.is_capturing,
'total_packets': traffic.packet_count
},
'threat_detection': {
'active': threat.detection_active,
'total_scanned': threat.detection_stats['total_scanned']
},
'automated_response': {
'enabled': response.enabled,
'total_responses': response.response_stats['total_responses']
},
'timestamp': datetime.now().isoformat()
})
@api_bp.route('/api/reports/generate', methods=['POST'])
def generate_report():
traffic, threat, response = get_services()
report_type = request.json.get('type', 'summary') if request.json else 'summary'
traffic_stats = traffic.get_statistics()
detection_stats = threat.get_detection_stats()
threat_summary_data = threat.get_threat_summary()
severity_dist = threat.get_severity_distribution()
response_stats = response.get_response_stats()
response_history = response.get_response_history(count=100)
report = {
'report_type': report_type,
'generated_at': datetime.now().isoformat(),
'executive_summary': {
'total_packets_analyzed': traffic_stats.get('total_packets', 0),
'total_threats_detected': detection_stats.get('threats_detected', 0),
'total_anomalies': detection_stats.get('anomalies_detected', 0),
'automated_responses_triggered': response_stats.get('total_responses', 0),
'currently_blocked_ips': len(response.get_blocked_ips())
},
'traffic_analysis': {
'statistics': traffic_stats,
'protocol_distribution': traffic_stats.get('protocol_counts', {})
},
'threat_analysis': {
'detection_statistics': detection_stats,
'threat_type_distribution': threat_summary_data,
'severity_distribution': severity_dist
},
'response_summary': {
'statistics': response_stats,
'recent_actions': response_history[-20:] if response_history else []
},
'recommendations': generate_recommendations(
threat_summary_data, severity_dist, response_stats
)
}
return jsonify({'status': 'generated', 'report': report})
def generate_recommendations(threat_summary, severity_dist, response_stats):
recommendations = []
if threat_summary.get('DDoS', 0) > 10:
recommendations.append({
'priority': 'High',
'category': 'DDoS Protection',
'recommendation': 'Implement rate limiting and DDoS mitigation rules at firewall level.'
})
if threat_summary.get('Port Scan', 0) > 20:
recommendations.append({
'priority': 'Medium',
'category': 'Network Hardening',
'recommendation': 'Review open ports and implement port knocking or hidden port strategies.'
})
if severity_dist.get('Critical', 0) > 5:
recommendations.append({
'priority': 'Critical',
'category': 'Incident Response',
'recommendation': 'Immediate review of critical threats required. Consider network segmentation.'
})
if response_stats.get('total_responses', 0) == 0 and sum(threat_summary.values()) > 0:
recommendations.append({
'priority': 'High',
'category': 'Automation',
'recommendation': 'Enable automated response system to reduce reaction time to threats.'
})
if not recommendations:
recommendations.append({
'priority': 'Low',
'category': 'General',
'recommendation': 'Continue monitoring. No immediate action required.'
})
return recommendations
```
### utils/__init__.py
```
from .helpers import format_timestamp, calculate_stats, get_severity_color
__all__ = ['format_timestamp', 'calculate_stats', 'get_severity_color']
```
### utils/helpers.py
```
from datetime import datetime
import numpy as np
def format_timestamp(timestamp):
if isinstance(timestamp, (int, float)):
return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
if isinstance(timestamp, str):
try:
ts = float(timestamp)
return datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
return timestamp
return str(timestamp)
def calculate_stats(values):
if not values:
return {'mean': 0, 'median': 0, 'std': 0, 'min': 0, 'max': 0}
arr = np.array(values, dtype=float)
return {
'mean': round(float(np.mean(arr)), 2),
'median': round(float(np.median(arr)), 2),
'std': round(float(np.std(arr)), 2),
'min': round(float(np.min(arr)), 2),
'max': round(float(np.max(arr)), 2)
}
def get_severity_color(score):
if score >= 0.85:
return 'danger'
elif score >= 0.7:
return 'warning'
elif score >= 0.5:
return 'info'
return 'success'
def get_threat_label(score):
if score >= 0.9:
return 'Critical'
elif score >= 0.75:
return 'High'
elif score >= 0.5:
return 'Medium'
elif score >= 0.3:
return 'Low'
return 'Normal'
def packet_to_dict(packet):
return {
'timestamp': datetime.now().timestamp(),
'size': len(packet) if hasattr(packet, '__len__') else 0,
'protocol': getattr(packet, 'name', 'UNKNOWN'),
'src_ip': getattr(packet, 'src', '0.0.0.0'),
'dst_ip': getattr(packet, 'dst', '0.0.0.0'),
}
```
## HTML模板
### templates/base.html
```
{% block title %}AI-Based Network Verification System{% endblock %}
{% block content %}{% endblock %}
{% block extra_js %}{% endblock %}
```
### templates/index.html
```
{% extends "base.html" %}
{% block content %}
AI-Based Network Verification System
Intelligent network traffic analysis, anomaly detection, and real-time threat intelligence powered by Machine Learning.
Traffic Capture
Real-time network traffic monitoring with intelligent packet analysis and protocol identification.
ML Anomaly Detection
Isolation Forest and Random Forest models trained to detect unusual network behavior patterns.
Real-time Response
Automated threat response with IP blocking, rate limiting, and quarantine capabilities.
Network Traffic Capture & Analysis Engine
Isolation Forest Anomaly Detector
Random Forest Threat Classifier
Real-time WebSocket Updates
Automated Response System
Comprehensive Reporting Module
The system automatically starts traffic simulation and threat detection on launch.
$ python app.py
[INFO] Initializing AI models...
[INFO] Training anomaly detector...
[INFO] Training threat classifier...
[OK] Models trained successfully
[INFO] Starting traffic capture...
[INFO] Threat detection active
[OK] Server running on http://localhost:5000
Go to Dashboard
{% endblock %}
{% block extra_css %}
{% endblock %}
```
### templates/dashboard.html
```
{% extends "base.html" %}
{% block title %}Dashboard - AI Network Verification{% endblock %}
{% block content %}
Security Dashboard
Traffic Capture Active
Detection Active
Time Type Severity Score Source IP
Waiting for data...
Time Protocol Size Src IP Dst Port
Waiting for data...
Inject Normal
Inject DDoS
Inject Port Scan
Inject Exfiltration
Model Status: Ready
Packets/sec: 0
Queue Size: 0
{% endblock %}
{% block extra_js %}
{% endblock %}
```
### templates/reports.html
```
{% extends "base.html" %}
{% block title %}Reports - AI Network Verification{% endblock %}
{% block content %}
Security Reports
Generate Report
Recent Response Actions
Time Threat Type Severity Actions Taken
Click "Generate Report" to create a comprehensive security report.
{% endblock %}
{% block extra_js %}
{% endblock %}
```
### templates/about.html
```
{% extends "base.html" %}
{% block title %}About - AI Network Verification{% endblock %}
{% block content %}
AI-Based Network Verification System
This intelligent system analyzes network traffic and identifies anomalies or potential
security threats using advanced AI/ML techniques. It employs machine learning models
to detect unusual patterns, predict potential attacks, and provide real-time threat
intelligence.
Key Features
Traffic Capture
Real-time packet capture and analysis with support for TCP, UDP, ICMP, and IGMP protocols.
Generates synthetic network traffic for testing and demonstration.
Anomaly Detection
Isolation Forest algorithm identifies outliers in network behavior.
Learns normal traffic patterns and flags deviations with confidence scores.
Threat Prediction
Random Forest classifier identifies 8 threat types including DDoS, Port Scan,
Brute Force, Malware, Data Exfiltration, ARP Spoofing, and DNS Tunneling.
Automated Response
Configurable automated response system with IP blocking, rate limiting,
quarantine, and alert generation based on threat severity levels.
Technology Stack
Python 3.x + Flask Web Framework
Scikit-learn (ML Models)
Chart.js (Visualizations)
SocketIO (Real-time Updates)
Bootstrap 5 (UI)
API Endpoints
Method Endpoint Description
GET /api/system/status Overall system status
POST /api/traffic/start Start traffic capture
POST /api/traffic/stop Stop traffic capture
POST /api/traffic/inject Inject test packet
POST /api/detection/start Start threat detection
POST /api/detection/stop Stop threat detection
POST /api/detection/analyze Analyze packet data
POST /api/detection/train Train ML models
POST /api/reports/generate Generate security report
{% endblock %}
```
## 如何运行
```
pip install -r requirements.txt
python app.py
```
**URL:**
- 仪表盘:http://localhost:5000/dashboard
- 报告:http://localhost:5000/reports
- API状态:http://localhost:5000/api/system/status
- 健康检查:http://localhost:5000/health
## API端点参考
| 端点 | 方法 | 描述 |
|----------|--------|-------------|
| `/api/system/status` | GET | 系统健康与状态 |
| `/api/dashboard/stats` | GET | 仪表盘统计 |
| `/api/dashboard/traffic_summary` | GET | 流量摘要(1分钟/5分钟/15分钟) |
| `/api/dashboard/recent_packets` | GET | 最近数据包 |
| `/api/dashboard/recent_threats` | GET | 最近威胁 |
| `/api/dashboard/threat_summary` | GET | 威胁类型与严重性摘要 |
| `/api/dashboard/response_summary` | GET | 响应历史与被封锁IP |
| `/api/dashboard/model_status` | GET | ML模型状态 |
| `/api/traffic/start` | POST | 开始流量捕获 |
| `/api/traffic/stop` | POST | 停止流量捕获 |
| `/api/traffic/inject` | POST | 注入测试数据包 |
| `/api/traffic/stats` | GET | 流量统计 |
| `/api/detection/start` | POST | 开始威胁检测 |
| `/api/detection/stop` | POST | 停止威胁检测 |
| `/api/detection/analyze` | POST | 分析数据包数据 |
| `/api/detection/train` | POST | 训练ML模型 |
| `/api/response/enable` | POST | 启用自动响应 |
| `/api/response/disable` | POST | 禁用自动响应 |
| `/api/response/block_ip` | POST | 封锁IP |
| `/api/response/unblock_ip` | POST | 解封IP |
| `/api/response/blocked_ips` | GET | 列出被封锁IP |
| `/api/reports/generate` | POST | 生成安全报告 |
## 技术栈
- **后端:** Flask 2.3.3, Flask-SocketIO 5.3.6
- **ML:** scikit-learn 1.3.0 (Isolation Forest, Random Forest)
- **前端:** Bootstrap 5, Chart.js 4.3.0, Font Awesome 6.4.0
- **实时:** SocketIO 4.6.0
- **数据:** NumPy 1.24.3, Pandas 2.0.3
- **工具:** python-dateutil, psutil, Werkzeug, eventlet