Bizly/cvent_scrape_poc
GitHub: Bizly/cvent_scrape_poc
一个基于 Redis 队列的模块化 Python 爬虫,用于从 Cvent.com 批量采集活动场馆详情并刷入下游 API。
Stars: 0 | Forks: 0
# Cvent Venue Scraper
一个模块化的 Python 爬虫,用于为一系列城市/国家目的地从 Cvent.com 收集场馆详细信息。其后端采用带有 DLQ 的 Redis 队列流水线,结果缓存会以 50 条为一批次刷入 Bizly API,并支持通过 Dozzle 进行实时日志追踪。
## 工作原理
**阶段 1 — 链接发现**
读取包含 `country,city_name`(国家,城市名)对的 CSV 文件,并抓取 Cvent 的分页场馆列表页面。每个发现的 URL 都会被 **RPUSH** 到 Redis 工作队列 (`cvent:queue:urls`) 中。如果某个目的地在经过 HTTP 重试后仍然失败,它将被推送到目的地 DLQ,随后运行过程继续。
**阶段 2 — 场馆抓取**
依次从 Redis 队列中 **LPOP** 出 URL,抓取每个场馆页面(名称、地址、房间数量、会议空间、评分、停车设施、机场距离、配套设施、建成/翻新年份、税率、入住率),并将验证后的结果 **RPUSH** 到 Redis 结果缓存 (`cvent:cache:results`) 中。当缓存达到 `BATCH_SIZE`(50)时,它会通过 `MULTI/EXEC` 流水线(`LRANGE` + `DEL`)进行原子性清空,并 POST 到 Bizly API。如果 API 刷新失败,数据会被**重新恢复**回缓存中,以防止数据丢失。在阶段 2 结束时会运行一次最终的无条件清空。
失败的 URL 抓取(在 `throttled_get` 耗尽其 HTTP 重试次数后)会直接进入 URL DLQ (`cvent:dlq:urls`) —— 没有作业级别的重新入队。非阻塞设计:单个目的地或 URL 的失败绝不会中断整个流水线。
### 架构
```
flowchart LR
CSV["data/bizly_prod_trending_destinations.csv"] --> P1["Phase 1 — Link Discovery"]
P1 -->|"RPUSH success"| UQ[("cvent:queue:urls (LIST)")]
P1 -->|"HTTP retries exhausted"| DLQd[("cvent:dlq:destinations")]
UQ -->|"LPOP"| P2["Phase 2 — Serial Worker"]
P2 -->|"RPUSH success"| RC[("cvent:cache:results (LIST)")]
P2 -->|"HTTP retries exhausted"| DLQu[("cvent:dlq:urls")]
RC -->|"LLEN >= 50 or final drain"| API["Bizly API (insert_venue_batch)"]
API -.->|"flush failed -> rehydrate"| RC
subgraph dockerStack ["Docker Compose"]
Redis
Scraper
Dozzle
end
Scraper -->|"structured logs (stdout)"| Dozzle
```
### 阶段内部流程
```
flowchart TD
scrapeLoop["For each venue URL (LPOP)"]
subgraph scraperClass ["CventScraper (single requests.Session)"]
httpReq["throttled_get()
min/max delay + 3-attempt retry"] parseSections["Parse page sections
name . address . rooms
meeting space . ratings
facilities . parking
airport distances . years"] end subgraph transforms ["transforms/cleaning.py"] rename["rename_property_fields()"] ensure["ensure_required_venue_schema_fields()"] sanitize["sanitize_data_types()"] schema["validate_schema()"] end scrapeLoop --> httpReq --> parseSections --> rename --> ensure --> sanitize --> schema --> cachePush["cache_push_result()"] cachePush --> flushCheck{"LLEN >= 50?"} flushCheck -->|"yes"| flush["cache_flush_if_ready() -> Bizly API"] flushCheck -->|"no"| scrapeLoop flush --> scrapeLoop ``` ### Redis 键值布局 | 键 | 类型 | 作用 | |---|---|---| | `cvent:queue:urls` | LIST | 阶段 1 RPUSH,阶段 2 LPOP(FIFO 工作队列) | | `cvent:seen:urls` | SET | 去重保护 —— 阶段 1 在每次 RPUSH 之前执行 SADD;防止重启时出现重复作业 | | `cvent:dlq:destinations` | LIST | 来自阶段 1 的目的地级别失败记录 | | `cvent:dlq:urls` | LIST | 来自阶段 2 的 URL 级别抓取失败记录 | | `cvent:cache:results` | LIST | 等待批量刷入 Bizly 的已抓取场馆数据 | ### 日志 每行日志都包含 `trace=<8-char-uuid>` 和 `dest=` 上下文(通过 `contextvars` 和日志过滤器实现),因此你可以通过 Dozzle 的搜索功能端到端地追踪单个目的地或 URL。关键的追踪事件:
`PHASE1_START` -> `DEST_START` -> `DEST_SUCCESS urls=N` | `DEST_FAIL_DLQ error=...` -> `PHASE1_END enqueued=N dlq_destinations=N` -> `PHASE2_START queue_len=N` -> `URL_START link=...` -> `URL_SUCCESS` | `URL_FAIL_DLQ error=...` -> `BATCH_FLUSH_START/SUCCESS/FAIL size=50` -> `FINAL_DRAIN size=N` -> `PHASE2_END scraped=N dlq_urls=N` -> `RUN_SUMMARY ...`
## 前置条件
- **Docker Desktop**(或任何 Docker Engine + `docker compose` v2)—— 推荐用于所有运行方式
- 或者仅用于本地运行:**Python 3.12.x** 以及一个可在本地访问的 Redis
## 快速开始(Docker Compose —— 推荐)
这会在同一个网络中启动三个服务:`redis`、`scraper` 和 `dozzle`(日志查看器)。
### 1. 创建你的 `.env`
```
cp .env.example .env
```
打开 `.env` 并粘贴你的 `BIZLY_WEBHOOK_KEY`。你**不需要**修改 `REDIS_URL` —— Docker Compose 会在 `scraper` 容器内将其覆盖为 `redis://redis:6379/0`(参见 [docker-compose.yml](docker-compose.yml))。
### 2. 添加输入 CSV 文件
将你的目的地文件放置在:
```
data/bizly_prod_trending_destinations.csv
```
必须包含的列:
```
country,city_name
United States,New York
France,Paris
```
### 3. 运行所有服务
```
docker compose up --build
```
这将会:
- 启动 Redis(已配置健康检查,并持久化到名为 `redis_data` 的命名卷中)。
- 在 [http://localhost:8080](http://localhost:8080) 启动 Dozzle,用于实时日志追踪。
- 等待 Redis 变得健康,然后运行爬虫直至完成。
打开 [http://localhost:8080](http://localhost:8080) 上的 Dozzle,实时查看 `scraper` 日志。通过 `URL_FAIL_DLQ`、`trace=` 或 `dest=San Francisco,United States` 进行过滤,可以在两个阶段中追踪单个目的地或 URL。
爬虫容器在完成后会退出。Redis 和 Dozzle 会继续运行(`restart: unless-stopped`)。使用以下命令停止所有服务:
```
docker compose down
```
### 4. 代码修改后
爬虫源代码会在构建时被打包进镜像中,因此在重新运行之前需要重新构建:
```
docker compose up --build scraper
```
### 检查 Redis 状态
```
# queue + DLQ 计数
docker compose exec redis redis-cli LLEN cvent:queue:urls
docker compose exec redis redis-cli LLEN cvent:dlq:urls
docker compose exec redis redis-cli LLEN cvent:dlq:destinations
docker compose exec redis redis-cli LLEN cvent:cache:results
# 查看第一个 DLQ 条目
docker compose exec redis redis-cli LINDEX cvent:dlq:urls 0
```
### 端口映射
| 服务 | 主机端口 | 容器端口 |
|---|---|---|
| `redis` | `6380` | `6379` |
| `dozzle` | `8080` | `8080` |
注意:Redis 暴露在主机端口 **`6380`** 上,以避免与本地 `6379` 上的 Redis 冲突。在 Docker 网络内部,容器仍然使用 `redis:6379`。
## 本地运行(不使用 Docker)
适用于快速迭代。需要 Python 3.12 和一个可从你的主机访问的 Redis。
### 1. 创建虚拟环境并安装
```
python3 -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install -r requirements.txt
cp .env.example .env
```
### 2. 确保 Redis 可访问
选择一项:
- **使用容器化的 Redis**(推荐):
docker compose up -d redis
# Redis 现在在主机上的 localhost:6380 运行(参见 docker-compose.yml 端口映射)
然后在 `.env` 中设置:
REDIS_URL=redis://localhost:6380/0
- **或者在默认端口上运行本地原生 Redis:**
brew services start redis # macOS
# .env: REDIS_URL=redis://localhost:6379/0
### 3. 运行
```
python main.py
```
如果你看到 `ConnectionRefusedError: [Errno 61] ... localhost:6379`,说明 Redis 没有在你 `REDIS_URL` 所指向的端口上运行 —— 请按照第 2 步操作。
## 配置
| 变量 | 默认值 | 描述 |
|---|---|---|
| `DEBUG_MODE` | `true` | 如果为 `true`,阶段 1 会在入队 `DEBUG_LIMIT` 个 URL 后停止入队 |
| `DEBUG_LIMIT` | `5` | 调试模式下入队的最大 URL 数量 |
| `MAX_PAGES` | `5` | 每个城市抓取的最大分页页面数 |
| `MIN_DELAY` | `1.0` | HTTP 请求之间的最小秒数 |
| `MAX_DELAY` | `3.0` | HTTP 请求之间的最大秒数 |
| `REQUEST_TIMEOUT` | `20` | HTTP 请求超时时间(秒) |
| `MAX_RETRIES` | `3` | 在进入 DLQ 之前,`throttled_get` 内部的 HTTP 重试次数 |
| `BATCH_SIZE` | `50` | 触发 Bizly API 刷新的缓存大小 |
| `INPUT_CSV` | `data/bizly_prod_trending_destinations.csv` | 输入文件 |
| `OUTPUT_DIR` | `output` | 本地 CSV 输出目录(使用 `save_data_to_csv` 时) |
| `OUTPUT_FILENAME` | `cvent_venues.csv` | 本地 CSV 文件名 |
| `BIZLY_API_URL` | `https://api-dev.bizly.com/hooks/venues/scraper/batch` | 批量插入端点 |
| `BIZLY_WEBHOOK_KEY` | *(必填)* | Bizly API 的认证请求头 |
| `REDIS_URL` | `redis://localhost:6379/0` | 本地运行时使用。Docker Compose 会在 `scraper` 容器中将其覆盖为 `redis://redis:6379/0` |
## 项目结构
```
cvent-scraper/
├── main.py # Entry point — run_phase1() + run_phase2()
├── config.py # All tunable settings (env-backed)
├── services/
│ ├── http.py # throttled_get() + HEADERS (HTTP retries)
│ ├── scraper.py # CventScraper — single requests.Session, DOM extraction
│ ├── redis_client.py # Singleton redis.Redis from REDIS_URL
│ ├── queues.py # enqueue_url / pop_url / DLQ / cache helpers
│ └── logging_setup.py # contextvars-backed trace_id/dest injection
├── models/
│ └── venue.py # VenueDetailsSchema (Pydantic)
├── transforms/
│ └── cleaning.py # rename / ensure-required / sanitize / validate
├── storage/
│ ├── csv_writer.py # Local CSV utility (optional)
│ └── bizly_api/
│ └── insert_batch_venues.py # Batch POST to Bizly API
├── data/ # Input CSV (bind-mounted read-only in Docker)
├── output/ # Local CSV output (bind-mounted in Docker)
├── Dockerfile # python:3.12-slim + PYTHONUNBUFFERED=1
├── docker-compose.yml # redis + scraper + dozzle
├── .dockerignore
├── .env.example
└── requirements.txt
```
## 操作说明
- **重试**:`throttled_get`(3 次尝试,指数退避)是唯一的重试层。如果抓取仍然失败,将直接进入 URL DLQ —— 没有作业级别的重新入队。
- **并发**:阶段 2 是串行的(一次处理一个 URL)。这遵守了 Cvent 的速率限制,并保持了单会话连接池的简单性。
- **生命周期**:运行一次。阶段 1 完全完成后,阶段 2 会清空 URL 队列,然后容器退出。
- **崩溃安全**:结果缓存和队列是持久化的 Redis LIST;失败的 Bizly API 刷新会将该批次重新恢复回 `cvent:cache:results` 中,以便进行下次刷新尝试。
- **重放 DLQ**:由于 DLQ 只是普通的 Redis LIST,你可以使用 `LRANGE` 检查它们,并通过简单的脚本重新入队 —— 或者使用 `redis-cli --scan` / `LRANGE cvent:dlq:urls 0 -1 > dlq.jsonl` 将其转储。
min/max delay + 3-attempt retry"] parseSections["Parse page sections
name . address . rooms
meeting space . ratings
facilities . parking
airport distances . years"] end subgraph transforms ["transforms/cleaning.py"] rename["rename_property_fields()"] ensure["ensure_required_venue_schema_fields()"] sanitize["sanitize_data_types()"] schema["validate_schema()"] end scrapeLoop --> httpReq --> parseSections --> rename --> ensure --> sanitize --> schema --> cachePush["cache_push_result()"] cachePush --> flushCheck{"LLEN >= 50?"} flushCheck -->|"yes"| flush["cache_flush_if_ready() -> Bizly API"] flushCheck -->|"no"| scrapeLoop flush --> scrapeLoop ``` ### Redis 键值布局 | 键 | 类型 | 作用 | |---|---|---| | `cvent:queue:urls` | LIST | 阶段 1 RPUSH,阶段 2 LPOP(FIFO 工作队列) | | `cvent:seen:urls` | SET | 去重保护 —— 阶段 1 在每次 RPUSH 之前执行 SADD;防止重启时出现重复作业 | | `cvent:dlq:destinations` | LIST | 来自阶段 1 的目的地级别失败记录 | | `cvent:dlq:urls` | LIST | 来自阶段 2 的 URL 级别抓取失败记录 | | `cvent:cache:results` | LIST | 等待批量刷入 Bizly 的已抓取场馆数据 | ### 日志 每行日志都包含 `trace=<8-char-uuid>` 和 `dest=
标签:API集成, Python, Redis, URL抓取, 任务队列, 可观测性, 搜索引擎查询, 数据抓取, 无后门, 版权保护, 请求拦截, 逆向工具