thunderstornX/osint-pipeline-demo

GitHub: thunderstornX/osint-pipeline-demo

面向高吞吐量公开情报采集的异步 Python 流水线,通过令牌桶限速与内容寻址去重实现高效且幂等的数据抓取。

Stars: 0 | Forks: 0

# osint-pipeline-demo [![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.20480442.svg)](https://doi.org/10.5281/zenodo.20480442) [![tests](https://static.pigsec.cn/wp-content/uploads/repos/2026/06/416007e827001839.svg)](https://github.com/thunderstornX/osint-pipeline-demo/actions/workflows/tests.yml) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](LICENSE) 一个用于高吞吐量 OSINT 数据采集的异步 Python 参考流水线 —— 结合了 `aiohttp` 连接池、基于数据源的令牌桶 限速、基于 SHA-256 内容寻址的正则字段标准化,以及 使用 `ON CONFLICT DO NOTHING` 进行批量 PostgreSQL 写入,以实现 幂等重跑。 在 [`paper/paper.pdf`](paper/paper.pdf) 中描述:*用于高吞吐量 OSINT 数据采集的异步 Python 参考流水线:设计、基准测试与幂等去重。* ## 它能为你提供什么 - 通过 `asyncio.Semaphore` 实现**异步扇出**和有界 worker 并发 - **基于数据源的令牌桶限流器**,确保遵守每个上游服务文档中记录的限制 - 通过对规范化序列化字段进行 SHA-256 处理,实现**基于内容寻址的去重** - 通过 `UNNEST` + `ON CONFLICT (content_hash) DO NOTHING` 实现**批量 PostgreSQL 插入** —— 每个数据块仅需单次往返,并具有基于 `RETURNING` 的真实去重计数 - 针对瞬时 5xx 和连接错误实现**带抖动的指数退避** - 面向 JS 渲染数据源的**可选 Selenium 连接池**(延迟初始化;如果没有数据源需要它,则不会加载) - 面向流水线日志聚合的**结构化 JSON 日志记录** - 针对本地 mock 服务器的**可复现基准测试**(测量期间不会对上游 API 造成压力) ## 实际测量吞吐量 | 策略 | Workers | 挂钟时间 (s) | 请求/秒 | 相比同步的加速比 | |---|---|---|---|---| | `sync_baseline` | 1 | 5.14 | 19.46 | — | | `async_5` | 5 | 1.04 | 95.89 | **4.93×** | | `async_10` | 10 | 0.54 | 186.17 | **9.57×** | | `async_20` | 20 | 0.28 | 351.11 | **18.04×** | 这些数据来自于 `benchmarks/run_benchmarks.py` 针对本地 mock 服务器(50 ms 上游延迟,100 个请求)的测试。在你的机器上重新运行: ``` python -m benchmarks.run_benchmarks --requests 100 --latency-ms 50 ``` ![基准测试输出](https://static.pigsec.cn/wp-content/uploads/repos/2026/06/f854725c93001844.png) ## 测试套件 30 个测试,包括 4 个连接到实体 PostgreSQL 的集成测试。运行单元测试子集: ``` python -m pytest tests/ ``` ![pytest 输出](https://static.pigsec.cn/wp-content/uploads/repos/2026/06/a6c6fed5c5001850.png) 若要同时运行集成测试,请在端口 5433 上启动 Postgres(这样它就 不会与系统在 5432 上的 Postgres 冲突),然后重新运行: ``` docker run -d --rm --name osint_pg \ -e POSTGRES_USER=osint -e POSTGRES_PASSWORD=osint -e POSTGRES_DB=osint \ -p 5433:5432 \ -v "$(pwd)/docker/init.sql:/docker-entrypoint-initdb.d/init.sql:ro" \ postgres:16-alpine PIPELINE_TEST_POSTGRES=1 \ PIPELINE_DATABASE_URL=postgresql://osint:osint@localhost:5433/osint \ python -m pytest tests/ ``` ## 针对真实数据的端到端运行 针对公共 NIST NVD CVE 数据源运行该流水线(无需身份验证): ``` PIPELINE_DATABASE_URL=postgresql://osint:osint@localhost:5433/osint \ PIPELINE_SOURCES_FILE=config/sources_demo.yaml \ python -m src.pipeline ``` ![流水线运行 + 去重演示](https://static.pigsec.cn/wp-content/uploads/repos/2026/06/a65b58ae54001856.png) 首次运行会插入 20 条记录。针对同一数据源的第二次运行 插入了 0 条记录,并通过 `content_hash` UNIQUE 约束检测到 20 条重复记录 —— 端到端地证明了幂等性契约。 ## 仓库布局 ``` . ├── src/ │ ├── config.py # pydantic-settings tunables, sources.yaml loader │ ├── rate_limiter.py # async token-bucket per source │ ├── normaliser.py # regex extractors + SHA-256 content hash │ ├── storage.py # asyncpg, batched UNNEST INSERT with ON CONFLICT │ ├── collector.py # aiohttp fan-out, exponential backoff retry │ ├── selenium_pool.py # optional headless Chrome pool │ ├── pipeline.py # orchestrator + JSON logging │ └── cli.py # `python -m src.cli` entry ├── config/ │ ├── sources.yaml # 3 public sources: NVD, GitHub events, OpenCorporates │ └── sources_demo.yaml # NVD-only single-source demo ├── docker/ │ ├── docker-compose.yml # postgres:16-alpine + pipeline service │ ├── Dockerfile # pipeline runtime image │ └── init.sql # records table with content_hash UNIQUE + GIN ├── tests/ # 30 tests across 5 modules ├── benchmarks/ │ ├── run_benchmarks.py # real benchmark harness (local mock + sync vs async) │ └── results.csv # current run output ├── paper/ │ ├── paper.tex # IEEE 4-page paper │ ├── paper.pdf │ └── figures/ # 3 real terminal screenshots └── scripts/ └── render_terminal.py # ANSI-aware terminal-to-PNG (used to generate figures) ``` ## 已配置的数据源 | 数据源 | URL | 限速 | |---|---|---| | NVD CVE 2.0 | `services.nvd.nist.gov/rest/json/cves/2.0` | 0.15 req/s (无 API key 时为 5 / 30 s) | | GitHub Events | `api.github.com/events` | 0.015 req/s (未认证时为 60 / 小时) | | OpenCorporates | `api.opencorporates.com/v0.4/companies/search` | 0.5 req/s | 每个数据源都有一个命名的正则表达式提取器列表,每次匹配都会生成一条记录。请参阅 [`config/sources.yaml`](config/sources.yaml) 获取实时配置。 ## 道德使用 该流水线: - 通过令牌桶步调控制**遵守已发布的速率限制**,默认采用 每个提供商文档中记录的最保守层级 - 通过指向此仓库的 `User-Agent` 标头**标识自身** - 设计上**不存储 PII** —— 提取器对字段名称有严格要求 - **仅用于防御性研究和教育用途**,符合 [10.5281/zenodo.16924934](https://doi.org/10.5281/zenodo.16924934) 请勿将此流水线用于私人目标、需要身份验证的 端点,或任何服务条款禁止自动化 访问的数据源。 ## 引用此工作 ``` @software{bhutto2026osintpipeline, author = {Bhutto, Ali Murtaza}, title = {osint-pipeline-demo}, year = {2026}, doi = {10.5281/zenodo.20480442}, url = {https://github.com/thunderstornX/osint-pipeline-demo}, orcid = {0009-0007-2787-943X} } ``` - [OSINT 调查的法律与道德框架](https://doi.org/10.5281/zenodo.16924934) - [OSINT 工具框架](https://doi.org/10.5281/zenodo.16921792) ## 许可证 MIT © 2026 Ali Murtaza Bhutto
标签:ESC4, OSINT, PostgreSQL, Python, 命令控制, 异步编程, 数据去重, 数据采集, 无后门, 测试用例, 计算机取证, 请求拦截, 逆向工具