NijatHub/port-scanner
GitHub: NijatHub/port-scanner
NijatHub/port-scanner 是一款基于 Django 的异步 TCP 端口扫描器,用于网络安全评估和渗透测试。
Stars: 0 | Forks: 0
# NetProbe — 异步 Django 端口扫描器
一个生产就绪、安全加固的 TCP 端口扫描器,使用 Django、Celery、Redis、Django Channels(WebSockets)和 Python asyncio 构建。
## 架构
```
Browser ──── HTTP ────► Daphne (ASGI) ──► Django views
│ │
└─── WebSocket ─────┘
│
Channel Layer (Redis)
│
Celery Worker ──► asyncio engine ──► TCP probes
│
SQLite / PostgreSQL
```
| 层 | 技术 |
|-------|-----------|
| ASGI 服务器 | Daphne(HTTP + WebSocket) |
| 背景任务 | Celery 5 + Redis 代理 |
| 实时更新 | Django Channels 4(WebSocket)+ 轮询回退 |
| 异步 I/O 扫描器 | Python `asyncio.open_connection` 与信号量限制 |
| SSRF 防护 | RFC-1918 / 回环 / 链路本地阻止列表 + DNS 验证 |
| 数据库 | SQLite(开发)/ PostgreSQL(生产)通过 `ScanJob` + `ScanResult` 模型 |
## 功能
- **异步 TCP 扫描** — 通过 `asyncio` 并发扫描多达 200 个端口
- **实时更新** — WebSocket 流与 HTMX 风格轮询回退
- **SSRF 防止** — 包括 RFC-1918、回环、链路本地、多播、文档范围在内的 14 个阻止网络范围
- **速率限制** — 可配置的最大端口范围(默认 1000)和并发限制
- **抓取标头** — 在已知端口上尝试检索服务标头
- **服务检测** — 50+ 已知端口→服务名称映射
- **Celery 任务硬限制** — 每个扫描任务的软(270 秒)和硬(300 秒)时间限制
- **批量数据库写入** — 以 50 个为一组批量创建结果以提高效率
- **管理面板** — 完整的 `django.contrib.admin` 集成
## 快速开始(Docker Compose)
```
git clone
cd portscanner
docker compose up --build
# 打开 http://localhost:8000
```
## 本地开发
### 预先条件
- Python 3.11+
- Redis(在 `localhost:6379` 上本地运行)
### 设置
```
# 1. 创建并激活虚拟环境
python -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
# 2. 安装依赖
pip install -r requirements.txt
# 3. 应用迁移
python manage.py migrate
# 4. 创建超级用户(可选,用于 /admin)
python manage.py createsuperuser
```
### 运行所有服务
您需要 **三个终端标签**:
```
# 标签 1 — Redis(或使用 Docker)
redis-server
# 标签 2 — Django / Daphne
daphne -b 0.0.0.0 -p 8000 portscanner.asgi:application
# 标签 3 — Celery worker
celery -A portscanner worker -l info --concurrency=4
```
然后访问 **http://localhost:8000**。
## 配置(环境变量)
| 变量 | 默认 | 描述 |
|----------|---------|-------------|
| `DJANGO_SECRET_KEY` | dev key | Django 密钥(在生产中更改!) |
| `DEBUG` | `True` | Django 调试模式 |
| `ALLOWED_HOSTS` | `localhost 127.0.0.1` | 空格分隔的允许主机 |
| `REDIS_URL` | `redis://localhost:6379/0` | Redis 连接 URL |
| `SCANNER_MAX_PORT_RANGE` | `1000` | 每个扫描作业的最大端口 |
| `SCANNER_CONNECT_TIMEOUT` | `0.5` | TCP 连接超时(秒) |
| `SCANNER_MAX_CONCURRENCY` | `200` | 最大同时 asyncio 协程数 |
## 安全设计
### SSRF 防止
所有目标都经过 **两次** 验证:一次在 Django 视图(在作业创建之前)中,一次在 Celery 工作器(在扫描开始之前)中。
阻止范围:
| 范围 | 原因 |
|-------|--------|
| `127.0.0.0/8`, `::1` | 回环 |
| `10.0.0.0/8`, `172.16.0.0/12`, `192.168.0.0/16` | RFC-1918 私有 |
| `169.254.0.0/16`, `fe80::/10` | 链路本地 / APIPA |
| `fc00::/7` | IPv6 唯一本地 |
| `224.0.0.0/4`, `ff00::/8` | 多播 |
| `192.0.2.0/24`, `198.51.100.0/24`, `203.0.113.0/24` | 文档 |
| `100.64.0.0/10` | 共享地址空间(RFC 6598) |
| `0.0.0.0/8` | 未指定 |
| `240.0.0.0/4` | IANA 保留 |
主机名通过 **dnspython** 解析(绕过操作系统占位符解析器),解析的 IP 与阻止列表进行比较,在扫描之前进行检查。
### 滥用缓解
- 端口范围硬限制(默认 1000,环境可配置)
- Celery 任务时间限制(270 秒软/300 秒硬)防止扫描失控
- asyncio 并发信号量限制
## 项目结构
```
portscanner/
├── manage.py
├── requirements.txt
├── Dockerfile
├── docker-compose.yml
│
├── portscanner/ # Django project package
│ ├── settings.py
│ ├── urls.py
│ ├── asgi.py # Daphne / Channels ASGI config
│ └── celery.py # Celery app
│
├── scanner/ # Main Django app
│ ├── models.py # ScanJob + ScanResult
│ ├── views.py # index, start_scan, scan_detail, scan_status, scan_history
│ ├── forms.py # ScanRequestForm with validation
│ ├── validators.py # SSRF guards + port-range validation
│ ├── engine.py # asyncio TCP scanner
│ ├── tasks.py # Celery task (orchestrates scan + pushes WS events)
│ ├── consumers.py # Django Channels WebSocket consumer
│ ├── routing.py # WS URL routing
│ ├── admin.py
│ └── templatetags/
│ └── scanner_tags.py # well_known_service filter
│
└── templates/scanner/
├── base.html # Terminal/cyberpunk design system
├── index.html # Scan form + recent jobs
├── scan_detail.html # Live progress (WS + polling fallback)
└── history.html # Paginated scan history
```
## 数据库模式
```
-- ScanJob: one row per user-initiated scan
CREATE TABLE scanner_scanjob (
id UUID PRIMARY KEY,
target VARCHAR(253), -- hostname or IP as entered
resolved_ip INET, -- actual IP scanned
port_start INTEGER,
port_end INTEGER,
status VARCHAR(16), -- PENDING|RUNNING|COMPLETED|FAILED
celery_task_id VARCHAR(255),
error_message TEXT,
created_at TIMESTAMPTZ,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ
);
-- ScanResult: one row per (job, port) probe
CREATE TABLE scanner_scanresult (
id BIGSERIAL PRIMARY KEY,
job_id UUID REFERENCES scanner_scanjob(id) ON DELETE CASCADE,
ip INET,
port INTEGER,
status VARCHAR(10), -- OPEN|CLOSED|FILTERED
banner VARCHAR(256),
timestamp TIMESTAMPTZ
);
```
## 生产清单
- [ ] 将 `DEBUG=False` 和 `DJANGO_SECRET_KEY` 设置为强随机值
- [ ] 将 `DATABASES` 切换到 PostgreSQL
- [ ] 将 `ALLOWED_HOSTS` 设置为您的真实域名
- [ ] 添加身份验证(限制可以启动扫描的用户)
- [ ] 为每个用户添加速率限制(例如 `django-ratelimit`)
- [ ] 在 nginx 后面运行 Daphne 并使用 TLS
- [ ] 使用 `CELERY_TASK_ALWAYS_EAGER=False` 并使用 Flower 监控
- [ ] 审计所有扫描请求的日志
标签:ASGI, Celery, Daphne, Django, Django Channels, Docker, Python, Redis, SSRF防护, TCP, WebSockets, 任务队列, 安全防御评估, 实时更新, 异步I/O, 异步编程, 性能优化, 批量写入, 搜索引擎查询, 数据库, 数据库模型, 数据统计, 无后门, 服务检测, 检测绕过, 端口扫描, 管理员面板, 网络安全, 自定义脚本, 隐私保护