ntu168108/realtime-packet-sniff

GitHub: ntu168108/realtime-packet-sniff

一款基于 Python 的实时网络数据包捕获工具,配套完整的 Kafka→特征提取→分类→ClickHouse→Grafana 入侵检测流水线。

Stars: 0 | Forks: 0

# realtime-packet-sniff 🛰️ [![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](LICENSE) [![Python 3.8+](https://img.shields.io/badge/python-3.8%2B-blue.svg)](https://www.python.org/) [![Platform: Linux](https://img.shields.io/badge/platform-Linux-lightgrey.svg)]() [![Tests](https://img.shields.io/badge/tests-36%20passing-brightgreen.svg)](tests/integration_tests/) [![Version](https://img.shields.io/badge/version-0.2.0-orange.svg)]() 📖 [使用越南语阅读](README_VI.md) SNIFF 是一个实时数据包捕获工具,具有交互式 TUI、后台 daemon 以及实时 NDJSON 流模式。同一个捕获引擎也是完整 IDS pipeline 的前端,它将 pcap 分段流式传输到 Kafka,使用 Argus + Zeek 提取每个 flow 的特征,通过七个基于攻击族的分类器将它们与 UNSW-NB15 攻击分类体系进行比对分类,并将结果发送到 ClickHouse 以便在 Grafana 中进行可视化。 ## 架构 ``` flowchart LR NIC[("Network NIC
SPAN / mirror")] NIC -->|"libpcap"| SNIFF["sniff capture
(core.capture)"] SNIFF -->|"~60s pcap blobs"| SEG["KafkaPcapSegmenter"] SEG -->|"topic
raw_pcap_segments"| KAFKA[(Kafka)] KAFKA --> EC["ec_consumer"] EC -->|"Argus + Zeek"| EX["Extraction
(UNSW-NB15 features)"] EX --> CL["7 Family Classifiers
dos · exploits · fuzzers
generic · analysis
reconnaissance · shellcode"] CL -->|"per-family CSVs"| CH[(ClickHouse
flows_<family> + flows_all)] CH --> GRAF[Grafana
SNIFF IDS Pipeline] ``` 每个 Kafka 消息都是一个自描述的 pcap 分段(约 60 秒的流量),并带有 UUID `segment_id` 标签。重新处理同一个分段是幂等的:ClickHouse 的 `ReplacingMergeTree` 会根据 `(segment_id, srcip, dstip, sport, dport, proto, ts)` 进行去重。 ## 功能特性 **捕获工具** - 实时交互式 TUI,包含数据包列表、协议统计信息和 top flows。 - 后台 daemon,具有 PID/日志文件管理以及 `SIGTERM`/`SIGHUP`/`SIGUSR1`/`SIGUSR2` 信号处理功能。 - `--live` NDJSON 流输出到 stdout —— 可通过管道传输给 `jq`、`head` 或其他任何工具。 - 采用 Scapy `AsyncSniffer` + libpcap 后端,无锁环形缓冲区,两级解码(低开销的 L2–L4 热路径,可选的 L7 深度解码)。 - BPF 内核端过滤器 + 解码后的 Wireshark 风格显示过滤器。 - 轮转 pcap 写入器(基于时间和大小),并带有保留期清理功能。 - 协议计数器和 5-tuple 会话跟踪。 **IDS pipeline** - Kafka KRaft producer,处理约 60 秒的 pcap blobs 并带有 64 MiB 背压。 - Argus + Zeek 特征提取,生成 45 列的 UNSW-NB15 特征集。 - 七个基于攻击族的分类器(one-vs-rest 二元模型):`dos`、`exploits`、`fuzzers`、`generic`、`analysis`、`reconnaissance`、`shellcode`。 - ClickHouse sink,支持批量插入、ReplacingMergeTree 去重以及审计列(`segment_id`、`attack_family`、`attack_subtype`、`is_attack`、`interface`、`t_window`、`pcap_file`)。 - `pipeline_runs` 审计表 —— 每个被消费的分段对应一行,包含持续时间和错误信息。 - Grafana 仪表板“SNIFF IDS Pipeline”,可展示攻击时间线、主要攻击者、攻击族计数以及 pipeline 健康状况。 - 用于 `kafka`、`sniff-producer`、`ec-consumer` 的 systemd 单元。 ## 快速开始 ### 一行命令安装(推荐) ``` curl -fsSL https://raw.githubusercontent.com/ntu168108/realtime-packet-sniff/main/install.sh | sudo bash ``` 安装程序会检查 Python 3.8+、libpcap 和磁盘空间,然后安装 `scapy` 和 `sniff` 命令。传入 `--skip-systemd` 可跳过可选的 daemon 单元设置。 ### 基本用法 ``` sudo sniff # Interactive menu (lists interfaces) sudo sniff -i eth0 # Quick capture on eth0 sudo sniff -i eth0 --live | jq . # Live NDJSON stream sudo sniff -i eth0 -f "tcp port 443" # BPF filter (kernel-side) sudo sniff --help # Full CLI reference sudo sniff --list-interfaces # Show available interfaces sudo sniff --list-protocols # Show supported protocols sudo sniff --status # Daemon status sudo sniff --stop # Stop daemon gracefully ``` ### 从源码构建 ``` git clone https://github.com/ntu168108/realtime-packet-sniff.git cd realtime-packet-sniff python3 -m venv .venv && source .venv/bin/activate pip install . sudo .venv/bin/sniff -i eth0 ``` ## CLI 参考 轻量级入口点 `sniff.py` 暴露了最有用的标志(通过 `sudo sniff --help` 查看完整列表): | 标志 | 用途 | |------|---------| | `-i`, `--interface` | 要进行捕获的网络接口 | | `-f`, `--filter` | BPF 过滤器(内核端,例如 `"tcp port 80"`) | | `-s`, `--snaplen` | 每个数据包的捕获长度(默认为 `65535`) | | `-b`, `--buffer` | 缓冲区配置:`low`、`balanced`、`fast`、`max` | | `-p`, `--no-promisc` | 禁用混杂模式 | | `-o`, `--output` | 输出目录(默认为 `./sniff_data`) | | `-r`, `--retention` | 文件保留天数(默认为 `7`) | | `--rotate-interval` | 轮转间隔,后缀为 `s/m/h/d`(默认为 `3600`) | | `--rotate-size` | 文件超过指定大小时轮转(以 MB 为单位,默认为 `500`) | | `--no-rotate` | 单文件捕获(不轮转) | | `--live` | 将 NDJSON 流传输到 stdout(不启用 TUI) | | `--display-filter` | 解码后的 Wireshark 风格显示过滤器 | | `--count` | 捕获 N 个数据包后停止(`0` = 不限) | | `--exclude-port` | 排除某个端口(可多次重复使用) | | `-d`, `--daemon` | 作为后台 daemon 运行 | | `--status` | 显示 daemon 状态 | | `--stop` | 停止 daemon(先优雅停止,再使用 SIGKILL) | | `--list-interfaces` | 列出可用的接口 | | `--list-protocols` | 列出支持的 L2–L7 协议并退出 | ## 显示过滤器 `--display-filter` 接受一种小型的 Wireshark 风格微型语言,该语言在解码*之后*运行(它不替代内核端的 BPF 过滤器)。它支持: - `host `, `src host `, `dst host ` - `port `, `src port `, `dst port ` - `tcp`, `udp`, `icmp`, `icmpv6`, `arp`, `igmp`, `ipv4`, `ipv6`, `dns`, `http`, `tls`, `quic`, `dhcp`, `ntp` - 布尔运算符:`and`、`or`、`not` 以及括号 示例: ``` --display-filter 'port 443' --display-filter 'tcp and not (port 22 or port 80)' --display-filter '(src host 10.0.0.5 or dst host 10.0.0.5) and dns' ``` ## 完整的 IDS pipeline(高级) 本仓库附带的 IDS pipeline 是项目作者使用的实验室参考 pipeline —— 不包含在单行命令安装中。有关完整的数据流,请参阅 [`docs/ARCHITECTURE.md`](docs/ARCHITECTURE.md);有关操作手册,请参阅 [`docs/OPERATIONS.md`](docs/OPERATIONS.md);有关开箱即用的配置,请参阅 `deploy/` 和 `sql/` 目录。 **攻击族**(UNSW-NB15 分类体系,每种对应一个二元分类器): `dos`、`exploits`、`fuzzers`、`generic`、`analysis`、`reconnaissance`、`shellcode` 完整 pipeline **所需的服务**: - Apache Kafka(KRaft 模式) - ClickHouse server - Grafana(包含已配置的仪表板) - 用于基于 flow 的特征提取的 Zeek 和 Argus - `requirements-integration.txt` 中列出的 Python 依赖项: `kafka-python-ng`、`clickhouse-driver`、`pandas`、`numpy`、`pyyaml` ## 项目结构 ``` . ├── sniff.py # Thin CLI entry point (argparse + dispatch) ├── install.sh # One-line installer for the capture tool ├── setup.py # pip-installable package (`sniff` command) ├── requirements.txt # Capture-only deps: scapy ├── requirements-integration.txt # Pipeline deps: kafka, clickhouse, pandas, … ├── config.yaml.example # Reference config for the capture tool ├── cli/ # TUI app, daemon, menu, live NDJSON printer ├── core/ # Capture engine, decoder, pcap writer, rotator │ # ring buffer, constants, display filter ├── ui/ # Colours / TUI helpers ├── modules/ # Pluggable analyzer modules ├── integration/ # Kafka producer/consumer, pcap segmenter, │ # ClickHouse sink, schema, config loader ├── Extraction-and-classification/ │ ├── MODULE_TRICHXUAT # Argus + Zeek feature extraction │ ├── MODULE_PHANLOAI # 7 UNSW-NB15 per-family classifiers │ └── MODULE_AUTO # Orchestrator (auto_pipeline.py) ├── deploy/ # Kafka properties, Grafana provisioning, │ # systemd unit files ├── sql/ # ClickHouse DDL (7 flows_ + flows_all │ # + pipeline_runs) ├── docs/ # ARCHITECTURE.md, OPERATIONS.md (runbook) ├── scripts/ # install.sh, setup.sh, uninstall.sh, sniff.service └── tests/integration_tests/ # 36 tests covering segmenter, sink, config, … ``` ## 开发说明 ``` git clone https://github.com/ntu168108/realtime-packet-sniff.git cd realtime-packet-sniff python3 -m venv .venv && source .venv/bin/activate # Capture tool 依赖 (scapy) pip install -r requirements.txt # Pipeline 依赖 (Kafka, ClickHouse, pandas, …) pip install -r requirements-integration.txt # 运行 test suite pytest -q ``` 集成测试涵盖了 pcap 分段往返、Kafka 分段器行为、ClickHouse sink 类型转换器、配置加载、幂等重新处理以及显示过滤器解析。 ## 许可证 [MIT](LICENSE) —— 详见 `LICENSE` 文件。 ## 致谢 - [UNSW-NB15 dataset](https://research.unsw.edu.au/projects/unsw-nb15-dataset) 及其特征方案,被基于攻击族的分类器所使用。 - [Argus](https://openargus.org/) 和 [Zeek](https://zeek.org/) 用于基于 flow 的特征提取。 - [Scapy](https://scapy.net/) 用于提供 libpcap 捕获后端。 - [Apache Kafka](https://kafka.apache.org/)、[ClickHouse](https://clickhouse.com/) 和 [Grafana](https://grafana.com/) 用于提供存储和可观测性技术栈。
标签:ClickHouse, Grafana, Kafka, Python, SonarQube插件, UNSW-NB15, 入侵检测系统, 安全数据湖, 无后门, 网络流量抓包, 软件成分分析, 逆向工具, 防御绕过