ABHAYMATHUR07/Threatzy
GitHub: ABHAYMATHUR07/Threatzy
一个以本地隐私保护为核心的移动端威胁检测与响应系统,利用后端情报聚合与前端 VpnService 实现低延迟、精准阻断。
Stars: 0 | Forks: 0
# 移动威胁防御 (MTD)
零信任、注重隐私的 Android 防火墙,配备 Python/Flask OSINT 后端。
**用户流量绝不会离开设备进行检查。**
## 仓库结构
```
mobile-threat-defender/
├── backend/ ← Python/Flask server
│ ├── app.py ← Flask entry point + APScheduler
│ ├── config.py ← All configuration (URLs, paths, intervals)
│ ├── models.py ← SQLAlchemy models (IOC, FeedRun)
│ ├── ingestion.py ← OSINT feed parsers (Spamhaus, URLhaus, ThreatFox, Feodo)
│ ├── threat_scoring.py ← Threat score engine (confidence + severity + recency decay)
│ ├── delta_generator.py ← Delta computation + JSONL.gz writer
│ ├── crypto_utils.py ← RSA-2048 keypair + PKCS#1v15 signing/verification
│ └── requirements.txt
│
└── android/
└── app/
├── build.gradle ← Dependencies + BACKEND_BASE_URL
└── src/main/
├── AndroidManifest.xml
├── kotlin/com/mtd/
│ ├── MtdApplication.kt ← App class (Timber init)
│ ├── BootReceiver.kt ← Reschedule WorkManager on boot
│ ├── PacketParser.kt ← IPv4/IPv6 + TCP/UDP + DNS + TLS SNI
│ ├── ThreatMatcher.kt ← Bloom Filter + Radix Trie + Domain Trie
│ ├── TcpRstForger.kt ← TCP RST packet synthesis
│ ├── MtdVpnService.kt ← VpnService + packet interception loop
│ ├── FeedUpdater.kt ← WorkManager: download + verify + apply
│ └── ui/MainActivity.kt ← Control panel + live stats
└── res/layout/activity_main.xml
```
## 后端设置(本地机器)
### 先决条件
- Python 3.10+
- (可选)Redis — 如果未运行,Flask 应用仍可使用 SQLite + 文件存储
### 1. 安装依赖
```
cd backend
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install -r requirements.txt
```
### 2. 运行
```
```
服务器将在 `http://0.0.0.0:5000` 启动。
首次启动时将:
- 创建 `mtd.db`(SQLite)
- 在 `keys/private.pem` 与 `keys/public.pem` 中生成 RSA-2048 密钥对
- 立即运行首次摄取周期(获取实时 OSINT 源)
- 每 6 小时调度一次后续运行
### 3. 验证
```
curl http://localhost:5000/api/v1/stats
curl http://localhost:5000/api/v1/keys/public
curl "http://localhost:5000/api/v1/feeds/latest"
```
## Android 设置(Android Studio)
### 先决条件
- Android Studio Hedgehog 或更新版本
- 运行 API 26+(Android 8.0)的 Android 设备或模拟器
- 后端在本地机器上运行
### 1. 导入项目
打开 Android Studio → **Open** → 选择 `android/` 文件夹。
### 2. 设置后端 URL
编辑 `app/build.gradle`:
```
buildConfigField "String", "BACKEND_BASE_URL", "\"http://:5000\""
```
- 对于 **Android 模拟器**,使用 `10.0.2.2`(映射到本机 localhost)。
- 对于同一 Wi-Fi 下的 **物理设备**,使用本机 LAN IP(例如 `192.168.1.10`)。
### 3. 添加占位图资源
Android Studio 会标记缺失的绘图资源。请创建两个简单的矢量资源:
- `res/drawable/ic_shield.xml` — 盾牌图标(使用 Material 图标 `security`)
- `res/drawable/ic_stop.xml` — 停止图标(使用 Material 图标 `stop`)
快速解决方法:右键点击 `res` → **New → Vector Asset** → 选择图标。
### 4. 添加字符串资源
在 `res/values/strings.xml` 中:
```
Mobile Threat Defender
```
### 5. 添加 Material3 主题
在 `res/values/themes.xml` 中:
```
```
### 6. 构建并运行
点击 **Run**(▶)。首次启动时:
- 提示时授予 VPN 权限
- 点击 **Start Protection** → TUN 接口将捕获设备所有流量
- WorkManager 立即运行源同步,之后每 6 小时一次
- 主屏幕上的实时统计每秒更新一次
## API 参考
| 端点 | 方法 | 描述 |
|---|---|---|
| `/api/v1/keys/public` | GET | 用于签名验证的 RSA-2048 公钥(PEM) |
| `/api/v1/feeds/latest?since=` | GET | 最新的增量元数据 + 下载 URL 与签名 |
| `/api/v1/deltas/` | GET | 下载已签名的 `delta_add_*.jsonl.gz` 或 `delta_remove_*.jsonl.gz` |
| `/api/v1/stats` | GET | 按类型统计的活动 IOC 数量 + 最后一次运行摘要 |
## 架构说明
### 后端流程
```
APScheduler (every 6h)
→ ingestion.py: fetch Spamhaus + URLhaus + ThreatFox + Feodo
→ CIDR merge + deduplication
→ threat_scoring.py: confidence × severity × recency decay → score [0-100]
→ delta_generator.py: diff vs DB → delta_add + delta_remove JSONL.gz
→ crypto_utils.py: SHA-256 → RSA-PKCS1v15 sign → hex signature
→ FeedRun recorded in SQLite
→ Files served via Flask
```
### Android 流程
```
VpnService (TUN 0.0.0.0/0)
→ read raw IP packets from TUN FileInputStream
→ PacketParser: IPv4/IPv6 header → TCP/UDP → DNS QNAME / TLS SNI
→ ThreatMatcher:
Bloom Filter (O(1) fast-path negative)
→ false → forward packet (safe)
→ true → Radix Trie / Domain Trie (precise confirmation)
→ match → blackhole + forge TCP RST
→ no match → forward packet
WorkManager (every 6h, on network available)
→ FeedUpdater: GET /feeds/latest → download deltas
→ verifySignature: SHA-256 + NONEwithRSA + RSA public key
→ parse JSONL → ThreatMatcher.applyDelta(add, remove)
```
### 威胁评分公式
```
Score (0–100) =
Confidence (0–30) = min(distinct_feeds / 4, 1.0) × 30
+ Severity (0–40) = {high: 1.0, medium: 0.55, low: 0.20} × 40
+ Recency (0–30) = max(0, 1 − age_days / 30) × 30
```
## 安全特性
| 特性 | 实现方式 |
|---|---|
| 流量不离开设备 | 所有检查均在 VpnService 内部进行 |
| 防篡改的源更新 | 对每个增量的 SHA-256 使用 RSA-2048 PKCS#1v15 签名 |
| 快速负向查找 | 在昂贵的 trie 遍历前使用 Bloom 过滤器(误报率 < 0.1%) |
| 节能阻断 | 伪造 TCP RST 以防止无限 SYN 重试 |
| CIDR 包含匹配 | 基于比特遍历的 Radix trie |
| 域名后缀匹配 | 反向标签 trie(`evil.example.com` 匹配 `example.com`) |
## 扩展
**添加新源:**
1. 在 `ingestion.py` 中编写 `parse_() → list[RawIOC]` 函数
2. 在 `aggregate_all_feeds()` 中调用它
**按应用(包名)而非全局阻断:**
- 使用 `VpnService.Builder().addAllowedApplication(pkg)` / `addDisallowedApplication(pkg)`
**将 trie 持久化到磁盘:**
- 使用 `ObjectOutputStream` 将 `ThreatMatcher` 状态序列化为二进制文件
- 在 `MtdVpnService.onStartCommand` 中加载以跳过首次网络同步
标签:AMSI绕过, Android防火墙, APScheduler, Bloom Filter, DNS拦截, ESC4, Flask框架, IOC, JSONLines, OSINT, Python后端, Radix Trie, REST API, RSA签名, TCP RST, VpnService, WorkManager, 威胁情报聚合, 威胁检测, 安全评分, 安全项目, 实时检测, 开源安全工具, 搜索引擎查询, 数据包解析, 无数据外传, 毕业设计, 流量本地分析, 目录枚举, 移动安全, 终端安全, 网络威胁, 网络安全, 逆向工具, 逆向工程平台, 隐私保护, 零信任