thunderstornX/osint-pipeline-demo
GitHub: thunderstornX/osint-pipeline-demo
面向高吞吐量公开情报采集的异步 Python 流水线,通过令牌桶限速与内容寻址去重实现高效且幂等的数据抓取。
Stars: 0 | Forks: 0
# osint-pipeline-demo
[](https://doi.org/10.5281/zenodo.20480442)
[](https://github.com/thunderstornX/osint-pipeline-demo/actions/workflows/tests.yml)
[](LICENSE)
一个用于高吞吐量 OSINT 数据采集的异步 Python 参考流水线
—— 结合了 `aiohttp` 连接池、基于数据源的令牌桶
限速、基于 SHA-256 内容寻址的正则字段标准化,以及
使用 `ON CONFLICT DO NOTHING` 进行批量 PostgreSQL 写入,以实现
幂等重跑。
在 [`paper/paper.pdf`](paper/paper.pdf) 中描述:*用于高吞吐量 OSINT 数据采集的异步 Python
参考流水线:设计、基准测试与幂等去重。*
## 它能为你提供什么
- 通过 `asyncio.Semaphore` 实现**异步扇出**和有界 worker 并发
- **基于数据源的令牌桶限流器**,确保遵守每个上游服务文档中记录的限制
- 通过对规范化序列化字段进行 SHA-256 处理,实现**基于内容寻址的去重**
- 通过 `UNNEST` + `ON CONFLICT (content_hash) DO NOTHING` 实现**批量 PostgreSQL 插入** —— 每个数据块仅需单次往返,并具有基于 `RETURNING` 的真实去重计数
- 针对瞬时 5xx 和连接错误实现**带抖动的指数退避**
- 面向 JS 渲染数据源的**可选 Selenium 连接池**(延迟初始化;如果没有数据源需要它,则不会加载)
- 面向流水线日志聚合的**结构化 JSON 日志记录**
- 针对本地 mock 服务器的**可复现基准测试**(测量期间不会对上游 API 造成压力)
## 实际测量吞吐量
| 策略 | Workers | 挂钟时间 (s) | 请求/秒 | 相比同步的加速比 |
|---|---|---|---|---|
| `sync_baseline` | 1 | 5.14 | 19.46 | — |
| `async_5` | 5 | 1.04 | 95.89 | **4.93×** |
| `async_10` | 10 | 0.54 | 186.17 | **9.57×** |
| `async_20` | 20 | 0.28 | 351.11 | **18.04×** |
这些数据来自于 `benchmarks/run_benchmarks.py` 针对本地
mock 服务器(50 ms 上游延迟,100 个请求)的测试。在你的机器上重新运行:
```
python -m benchmarks.run_benchmarks --requests 100 --latency-ms 50
```

## 测试套件
30 个测试,包括 4 个连接到实体 PostgreSQL 的集成测试。运行单元测试子集:
```
python -m pytest tests/
```

若要同时运行集成测试,请在端口 5433 上启动 Postgres(这样它就
不会与系统在 5432 上的 Postgres 冲突),然后重新运行:
```
docker run -d --rm --name osint_pg \
-e POSTGRES_USER=osint -e POSTGRES_PASSWORD=osint -e POSTGRES_DB=osint \
-p 5433:5432 \
-v "$(pwd)/docker/init.sql:/docker-entrypoint-initdb.d/init.sql:ro" \
postgres:16-alpine
PIPELINE_TEST_POSTGRES=1 \
PIPELINE_DATABASE_URL=postgresql://osint:osint@localhost:5433/osint \
python -m pytest tests/
```
## 针对真实数据的端到端运行
针对公共 NIST NVD CVE 数据源运行该流水线(无需身份验证):
```
PIPELINE_DATABASE_URL=postgresql://osint:osint@localhost:5433/osint \
PIPELINE_SOURCES_FILE=config/sources_demo.yaml \
python -m src.pipeline
```

首次运行会插入 20 条记录。针对同一数据源的第二次运行
插入了 0 条记录,并通过 `content_hash` UNIQUE 约束检测到 20 条重复记录
—— 端到端地证明了幂等性契约。
## 仓库布局
```
.
├── src/
│ ├── config.py # pydantic-settings tunables, sources.yaml loader
│ ├── rate_limiter.py # async token-bucket per source
│ ├── normaliser.py # regex extractors + SHA-256 content hash
│ ├── storage.py # asyncpg, batched UNNEST INSERT with ON CONFLICT
│ ├── collector.py # aiohttp fan-out, exponential backoff retry
│ ├── selenium_pool.py # optional headless Chrome pool
│ ├── pipeline.py # orchestrator + JSON logging
│ └── cli.py # `python -m src.cli` entry
├── config/
│ ├── sources.yaml # 3 public sources: NVD, GitHub events, OpenCorporates
│ └── sources_demo.yaml # NVD-only single-source demo
├── docker/
│ ├── docker-compose.yml # postgres:16-alpine + pipeline service
│ ├── Dockerfile # pipeline runtime image
│ └── init.sql # records table with content_hash UNIQUE + GIN
├── tests/ # 30 tests across 5 modules
├── benchmarks/
│ ├── run_benchmarks.py # real benchmark harness (local mock + sync vs async)
│ └── results.csv # current run output
├── paper/
│ ├── paper.tex # IEEE 4-page paper
│ ├── paper.pdf
│ └── figures/ # 3 real terminal screenshots
└── scripts/
└── render_terminal.py # ANSI-aware terminal-to-PNG (used to generate figures)
```
## 已配置的数据源
| 数据源 | URL | 限速 |
|---|---|---|
| NVD CVE 2.0 | `services.nvd.nist.gov/rest/json/cves/2.0` | 0.15 req/s (无 API key 时为 5 / 30 s) |
| GitHub Events | `api.github.com/events` | 0.015 req/s (未认证时为 60 / 小时) |
| OpenCorporates | `api.opencorporates.com/v0.4/companies/search` | 0.5 req/s |
每个数据源都有一个命名的正则表达式提取器列表,每次匹配都会生成一条记录。请参阅 [`config/sources.yaml`](config/sources.yaml) 获取实时配置。
## 道德使用
该流水线:
- 通过令牌桶步调控制**遵守已发布的速率限制**,默认采用
每个提供商文档中记录的最保守层级
- 通过指向此仓库的 `User-Agent` 标头**标识自身**
- 设计上**不存储 PII** —— 提取器对字段名称有严格要求
- **仅用于防御性研究和教育用途**,符合
[10.5281/zenodo.16924934](https://doi.org/10.5281/zenodo.16924934)
请勿将此流水线用于私人目标、需要身份验证的
端点,或任何服务条款禁止自动化
访问的数据源。
## 引用此工作
```
@software{bhutto2026osintpipeline,
author = {Bhutto, Ali Murtaza},
title = {osint-pipeline-demo},
year = {2026},
doi = {10.5281/zenodo.20480442},
url = {https://github.com/thunderstornX/osint-pipeline-demo},
orcid = {0009-0007-2787-943X}
}
```
- [OSINT 调查的法律与道德框架](https://doi.org/10.5281/zenodo.16924934)
- [OSINT 工具框架](https://doi.org/10.5281/zenodo.16921792)
## 许可证
MIT © 2026 Ali Murtaza Bhutto
标签:ESC4, OSINT, PostgreSQL, Python, 命令控制, 异步编程, 数据去重, 数据采集, 无后门, 测试用例, 计算机取证, 请求拦截, 逆向工具