ntu168108/realtime-packet-sniff
GitHub: ntu168108/realtime-packet-sniff
一款基于 Python 的实时网络数据包捕获工具,配套完整的 Kafka→特征提取→分类→ClickHouse→Grafana 入侵检测流水线。
Stars: 0 | Forks: 0
# realtime-packet-sniff 🛰️
[](LICENSE)
[](https://www.python.org/)
[]()
[](tests/integration_tests/)
[]()
📖 [使用越南语阅读](README_VI.md)
SNIFF 是一个实时数据包捕获工具,具有交互式 TUI、后台 daemon 以及实时 NDJSON 流模式。同一个捕获引擎也是完整 IDS pipeline 的前端,它将 pcap 分段流式传输到 Kafka,使用 Argus + Zeek 提取每个 flow 的特征,通过七个基于攻击族的分类器将它们与 UNSW-NB15 攻击分类体系进行比对分类,并将结果发送到 ClickHouse 以便在 Grafana 中进行可视化。
## 架构
```
flowchart LR
NIC[("Network NIC
SPAN / mirror")] NIC -->|"libpcap"| SNIFF["sniff capture
(core.capture)"] SNIFF -->|"~60s pcap blobs"| SEG["KafkaPcapSegmenter"] SEG -->|"topic
raw_pcap_segments"| KAFKA[(Kafka)] KAFKA --> EC["ec_consumer"] EC -->|"Argus + Zeek"| EX["Extraction
(UNSW-NB15 features)"] EX --> CL["7 Family Classifiers
dos · exploits · fuzzers
generic · analysis
reconnaissance · shellcode"] CL -->|"per-family CSVs"| CH[(ClickHouse
flows_<family> + flows_all)] CH --> GRAF[Grafana
SNIFF IDS Pipeline] ``` 每个 Kafka 消息都是一个自描述的 pcap 分段(约 60 秒的流量),并带有 UUID `segment_id` 标签。重新处理同一个分段是幂等的:ClickHouse 的 `ReplacingMergeTree` 会根据 `(segment_id, srcip, dstip, sport, dport, proto, ts)` 进行去重。 ## 功能特性 **捕获工具** - 实时交互式 TUI,包含数据包列表、协议统计信息和 top flows。 - 后台 daemon,具有 PID/日志文件管理以及 `SIGTERM`/`SIGHUP`/`SIGUSR1`/`SIGUSR2` 信号处理功能。 - `--live` NDJSON 流输出到 stdout —— 可通过管道传输给 `jq`、`head` 或其他任何工具。 - 采用 Scapy `AsyncSniffer` + libpcap 后端,无锁环形缓冲区,两级解码(低开销的 L2–L4 热路径,可选的 L7 深度解码)。 - BPF 内核端过滤器 + 解码后的 Wireshark 风格显示过滤器。 - 轮转 pcap 写入器(基于时间和大小),并带有保留期清理功能。 - 协议计数器和 5-tuple 会话跟踪。 **IDS pipeline** - Kafka KRaft producer,处理约 60 秒的 pcap blobs 并带有 64 MiB 背压。 - Argus + Zeek 特征提取,生成 45 列的 UNSW-NB15 特征集。 - 七个基于攻击族的分类器(one-vs-rest 二元模型):`dos`、`exploits`、`fuzzers`、`generic`、`analysis`、`reconnaissance`、`shellcode`。 - ClickHouse sink,支持批量插入、ReplacingMergeTree 去重以及审计列(`segment_id`、`attack_family`、`attack_subtype`、`is_attack`、`interface`、`t_window`、`pcap_file`)。 - `pipeline_runs` 审计表 —— 每个被消费的分段对应一行,包含持续时间和错误信息。 - Grafana 仪表板“SNIFF IDS Pipeline”,可展示攻击时间线、主要攻击者、攻击族计数以及 pipeline 健康状况。 - 用于 `kafka`、`sniff-producer`、`ec-consumer` 的 systemd 单元。 ## 快速开始 ### 一行命令安装(推荐) ``` curl -fsSL https://raw.githubusercontent.com/ntu168108/realtime-packet-sniff/main/install.sh | sudo bash ``` 安装程序会检查 Python 3.8+、libpcap 和磁盘空间,然后安装 `scapy` 和 `sniff` 命令。传入 `--skip-systemd` 可跳过可选的 daemon 单元设置。 ### 基本用法 ``` sudo sniff # Interactive menu (lists interfaces) sudo sniff -i eth0 # Quick capture on eth0 sudo sniff -i eth0 --live | jq . # Live NDJSON stream sudo sniff -i eth0 -f "tcp port 443" # BPF filter (kernel-side) sudo sniff --help # Full CLI reference sudo sniff --list-interfaces # Show available interfaces sudo sniff --list-protocols # Show supported protocols sudo sniff --status # Daemon status sudo sniff --stop # Stop daemon gracefully ``` ### 从源码构建 ``` git clone https://github.com/ntu168108/realtime-packet-sniff.git cd realtime-packet-sniff python3 -m venv .venv && source .venv/bin/activate pip install . sudo .venv/bin/sniff -i eth0 ``` ## CLI 参考 轻量级入口点 `sniff.py` 暴露了最有用的标志(通过 `sudo sniff --help` 查看完整列表): | 标志 | 用途 | |------|---------| | `-i`, `--interface` | 要进行捕获的网络接口 | | `-f`, `--filter` | BPF 过滤器(内核端,例如 `"tcp port 80"`) | | `-s`, `--snaplen` | 每个数据包的捕获长度(默认为 `65535`) | | `-b`, `--buffer` | 缓冲区配置:`low`、`balanced`、`fast`、`max` | | `-p`, `--no-promisc` | 禁用混杂模式 | | `-o`, `--output` | 输出目录(默认为 `./sniff_data`) | | `-r`, `--retention` | 文件保留天数(默认为 `7`) | | `--rotate-interval` | 轮转间隔,后缀为 `s/m/h/d`(默认为 `3600`) | | `--rotate-size` | 文件超过指定大小时轮转(以 MB 为单位,默认为 `500`) | | `--no-rotate` | 单文件捕获(不轮转) | | `--live` | 将 NDJSON 流传输到 stdout(不启用 TUI) | | `--display-filter` | 解码后的 Wireshark 风格显示过滤器 | | `--count` | 捕获 N 个数据包后停止(`0` = 不限) | | `--exclude-port` | 排除某个端口(可多次重复使用) | | `-d`, `--daemon` | 作为后台 daemon 运行 | | `--status` | 显示 daemon 状态 | | `--stop` | 停止 daemon(先优雅停止,再使用 SIGKILL) | | `--list-interfaces` | 列出可用的接口 | | `--list-protocols` | 列出支持的 L2–L7 协议并退出 | ## 显示过滤器 `--display-filter` 接受一种小型的 Wireshark 风格微型语言,该语言在解码*之后*运行(它不替代内核端的 BPF 过滤器)。它支持: - `host`, `src host `, `dst host `
- `port `, `src port `, `dst port `
- `tcp`, `udp`, `icmp`, `icmpv6`, `arp`, `igmp`, `ipv4`, `ipv6`,
`dns`, `http`, `tls`, `quic`, `dhcp`, `ntp`
- 布尔运算符:`and`、`or`、`not` 以及括号
示例:
```
--display-filter 'port 443'
--display-filter 'tcp and not (port 22 or port 80)'
--display-filter '(src host 10.0.0.5 or dst host 10.0.0.5) and dns'
```
## 完整的 IDS pipeline(高级)
本仓库附带的 IDS pipeline 是项目作者使用的实验室参考 pipeline —— 不包含在单行命令安装中。有关完整的数据流,请参阅 [`docs/ARCHITECTURE.md`](docs/ARCHITECTURE.md);有关操作手册,请参阅 [`docs/OPERATIONS.md`](docs/OPERATIONS.md);有关开箱即用的配置,请参阅 `deploy/` 和 `sql/` 目录。
**攻击族**(UNSW-NB15 分类体系,每种对应一个二元分类器):
`dos`、`exploits`、`fuzzers`、`generic`、`analysis`、`reconnaissance`、`shellcode`
完整 pipeline **所需的服务**:
- Apache Kafka(KRaft 模式)
- ClickHouse server
- Grafana(包含已配置的仪表板)
- 用于基于 flow 的特征提取的 Zeek 和 Argus
- `requirements-integration.txt` 中列出的 Python 依赖项:
`kafka-python-ng`、`clickhouse-driver`、`pandas`、`numpy`、`pyyaml`
## 项目结构
```
.
├── sniff.py # Thin CLI entry point (argparse + dispatch)
├── install.sh # One-line installer for the capture tool
├── setup.py # pip-installable package (`sniff` command)
├── requirements.txt # Capture-only deps: scapy
├── requirements-integration.txt # Pipeline deps: kafka, clickhouse, pandas, …
├── config.yaml.example # Reference config for the capture tool
├── cli/ # TUI app, daemon, menu, live NDJSON printer
├── core/ # Capture engine, decoder, pcap writer, rotator
│ # ring buffer, constants, display filter
├── ui/ # Colours / TUI helpers
├── modules/ # Pluggable analyzer modules
├── integration/ # Kafka producer/consumer, pcap segmenter,
│ # ClickHouse sink, schema, config loader
├── Extraction-and-classification/
│ ├── MODULE_TRICHXUAT # Argus + Zeek feature extraction
│ ├── MODULE_PHANLOAI # 7 UNSW-NB15 per-family classifiers
│ └── MODULE_AUTO # Orchestrator (auto_pipeline.py)
├── deploy/ # Kafka properties, Grafana provisioning,
│ # systemd unit files
├── sql/ # ClickHouse DDL (7 flows_ + flows_all
│ # + pipeline_runs)
├── docs/ # ARCHITECTURE.md, OPERATIONS.md (runbook)
├── scripts/ # install.sh, setup.sh, uninstall.sh, sniff.service
└── tests/integration_tests/ # 36 tests covering segmenter, sink, config, …
```
## 开发说明
```
git clone https://github.com/ntu168108/realtime-packet-sniff.git
cd realtime-packet-sniff
python3 -m venv .venv && source .venv/bin/activate
# Capture tool 依赖 (scapy)
pip install -r requirements.txt
# Pipeline 依赖 (Kafka, ClickHouse, pandas, …)
pip install -r requirements-integration.txt
# 运行 test suite
pytest -q
```
集成测试涵盖了 pcap 分段往返、Kafka 分段器行为、ClickHouse sink 类型转换器、配置加载、幂等重新处理以及显示过滤器解析。
## 许可证
[MIT](LICENSE) —— 详见 `LICENSE` 文件。
## 致谢
- [UNSW-NB15 dataset](https://research.unsw.edu.au/projects/unsw-nb15-dataset)
及其特征方案,被基于攻击族的分类器所使用。
- [Argus](https://openargus.org/) 和 [Zeek](https://zeek.org/)
用于基于 flow 的特征提取。
- [Scapy](https://scapy.net/)
用于提供 libpcap 捕获后端。
- [Apache Kafka](https://kafka.apache.org/)、[ClickHouse](https://clickhouse.com/)
和 [Grafana](https://grafana.com/)
用于提供存储和可观测性技术栈。
SPAN / mirror")] NIC -->|"libpcap"| SNIFF["sniff capture
(core.capture)"] SNIFF -->|"~60s pcap blobs"| SEG["KafkaPcapSegmenter"] SEG -->|"topic
raw_pcap_segments"| KAFKA[(Kafka)] KAFKA --> EC["ec_consumer"] EC -->|"Argus + Zeek"| EX["Extraction
(UNSW-NB15 features)"] EX --> CL["7 Family Classifiers
dos · exploits · fuzzers
generic · analysis
reconnaissance · shellcode"] CL -->|"per-family CSVs"| CH[(ClickHouse
flows_<family> + flows_all)] CH --> GRAF[Grafana
SNIFF IDS Pipeline] ``` 每个 Kafka 消息都是一个自描述的 pcap 分段(约 60 秒的流量),并带有 UUID `segment_id` 标签。重新处理同一个分段是幂等的:ClickHouse 的 `ReplacingMergeTree` 会根据 `(segment_id, srcip, dstip, sport, dport, proto, ts)` 进行去重。 ## 功能特性 **捕获工具** - 实时交互式 TUI,包含数据包列表、协议统计信息和 top flows。 - 后台 daemon,具有 PID/日志文件管理以及 `SIGTERM`/`SIGHUP`/`SIGUSR1`/`SIGUSR2` 信号处理功能。 - `--live` NDJSON 流输出到 stdout —— 可通过管道传输给 `jq`、`head` 或其他任何工具。 - 采用 Scapy `AsyncSniffer` + libpcap 后端,无锁环形缓冲区,两级解码(低开销的 L2–L4 热路径,可选的 L7 深度解码)。 - BPF 内核端过滤器 + 解码后的 Wireshark 风格显示过滤器。 - 轮转 pcap 写入器(基于时间和大小),并带有保留期清理功能。 - 协议计数器和 5-tuple 会话跟踪。 **IDS pipeline** - Kafka KRaft producer,处理约 60 秒的 pcap blobs 并带有 64 MiB 背压。 - Argus + Zeek 特征提取,生成 45 列的 UNSW-NB15 特征集。 - 七个基于攻击族的分类器(one-vs-rest 二元模型):`dos`、`exploits`、`fuzzers`、`generic`、`analysis`、`reconnaissance`、`shellcode`。 - ClickHouse sink,支持批量插入、ReplacingMergeTree 去重以及审计列(`segment_id`、`attack_family`、`attack_subtype`、`is_attack`、`interface`、`t_window`、`pcap_file`)。 - `pipeline_runs` 审计表 —— 每个被消费的分段对应一行,包含持续时间和错误信息。 - Grafana 仪表板“SNIFF IDS Pipeline”,可展示攻击时间线、主要攻击者、攻击族计数以及 pipeline 健康状况。 - 用于 `kafka`、`sniff-producer`、`ec-consumer` 的 systemd 单元。 ## 快速开始 ### 一行命令安装(推荐) ``` curl -fsSL https://raw.githubusercontent.com/ntu168108/realtime-packet-sniff/main/install.sh | sudo bash ``` 安装程序会检查 Python 3.8+、libpcap 和磁盘空间,然后安装 `scapy` 和 `sniff` 命令。传入 `--skip-systemd` 可跳过可选的 daemon 单元设置。 ### 基本用法 ``` sudo sniff # Interactive menu (lists interfaces) sudo sniff -i eth0 # Quick capture on eth0 sudo sniff -i eth0 --live | jq . # Live NDJSON stream sudo sniff -i eth0 -f "tcp port 443" # BPF filter (kernel-side) sudo sniff --help # Full CLI reference sudo sniff --list-interfaces # Show available interfaces sudo sniff --list-protocols # Show supported protocols sudo sniff --status # Daemon status sudo sniff --stop # Stop daemon gracefully ``` ### 从源码构建 ``` git clone https://github.com/ntu168108/realtime-packet-sniff.git cd realtime-packet-sniff python3 -m venv .venv && source .venv/bin/activate pip install . sudo .venv/bin/sniff -i eth0 ``` ## CLI 参考 轻量级入口点 `sniff.py` 暴露了最有用的标志(通过 `sudo sniff --help` 查看完整列表): | 标志 | 用途 | |------|---------| | `-i`, `--interface` | 要进行捕获的网络接口 | | `-f`, `--filter` | BPF 过滤器(内核端,例如 `"tcp port 80"`) | | `-s`, `--snaplen` | 每个数据包的捕获长度(默认为 `65535`) | | `-b`, `--buffer` | 缓冲区配置:`low`、`balanced`、`fast`、`max` | | `-p`, `--no-promisc` | 禁用混杂模式 | | `-o`, `--output` | 输出目录(默认为 `./sniff_data`) | | `-r`, `--retention` | 文件保留天数(默认为 `7`) | | `--rotate-interval` | 轮转间隔,后缀为 `s/m/h/d`(默认为 `3600`) | | `--rotate-size` | 文件超过指定大小时轮转(以 MB 为单位,默认为 `500`) | | `--no-rotate` | 单文件捕获(不轮转) | | `--live` | 将 NDJSON 流传输到 stdout(不启用 TUI) | | `--display-filter` | 解码后的 Wireshark 风格显示过滤器 | | `--count` | 捕获 N 个数据包后停止(`0` = 不限) | | `--exclude-port` | 排除某个端口(可多次重复使用) | | `-d`, `--daemon` | 作为后台 daemon 运行 | | `--status` | 显示 daemon 状态 | | `--stop` | 停止 daemon(先优雅停止,再使用 SIGKILL) | | `--list-interfaces` | 列出可用的接口 | | `--list-protocols` | 列出支持的 L2–L7 协议并退出 | ## 显示过滤器 `--display-filter` 接受一种小型的 Wireshark 风格微型语言,该语言在解码*之后*运行(它不替代内核端的 BPF 过滤器)。它支持: - `host
标签:ClickHouse, Grafana, Kafka, Python, SonarQube插件, UNSW-NB15, 入侵检测系统, 安全数据湖, 无后门, 网络流量抓包, 软件成分分析, 逆向工具, 防御绕过