Marcusrods/reddit-streaming-pipeline
GitHub: Marcusrods/reddit-streaming-pipeline
一个实时数据管道,用于抓取Reddit评论并通过Spark Streaming进行窗口化词频统计,解决社交媒体数据流的实时分析问题。
Stars: 0 | Forks: 0
# reddit-streaming-pipeline
一个实时数据管道,它能够实时抓取Reddit评论,将其打包为JSON格式,通过TCP传输,并在接收端使用Spark Streaming进行窗口化词频统计。
该项目基于BTS大数据与人工智能硕士课程中实时数据分析模块的S7和S8课程内容构建。原始实验是一组Jupyter笔记本。我将可运行的代码提取成了两个生产形态的Python脚本,并配合docker-compose以实现端到端的运行。
## 项目证明的能力
- **实时数据管道**:生产者-消费者模式,TCP传输,换行符分隔的JSON帧。
- **Reddit API集成**:通过PRAW使用OAuth认证并连接流式评论端点。
- **Apache Spark Streaming**:使用 `StreamingContext`、`socketTextStream`、窗口化转换、`flatMap`、`reduceByKey`。
- **停用词过滤**:使用NLTK,以及用于在执行器之间共享只读数据的广播变量。
- **生产规范**:基于环境变量的配置,无硬编码密钥,消费者断开时优雅重连,带日志记录的类型化Python。
- **容器化**:独立的生产者和消费者镜像,通过docker-compose一键启动。
## 管道架构
```
+---------------------+ JSON over TCP +-----------------------------+
| producer.py | newline-delimited | consumer.py |
| PRAW comment | ---------------------------> | Spark Streaming |
| stream over many | port 9999 | socketTextStream + window |
| subreddits | | word count, top 30 / win |
+---------------------+ +-----------------------------+
```
生产者通过PRAW的 `subreddit.stream.comments()` 端点订阅一个用逗号或加号分隔的subreddit列表,将每条评论及其父评论、帖子正文、作者和互动计数器序列化为一条JSON记录,并以换行符分隔的行发送到一个TCP套接字。消费者通过Spark Streaming打开该套接字,应用一个每5秒更新一次的60秒滑动窗口,将每条记录拆分成词元,移除英语停用词(从驱动器广播),进行词频统计、降序排序,并打印出前30个高频词。
## 运行方法
### 最快路径 - docker-compose
```
cp .env.example .env
# 填写您的 Reddit 凭据(在 https://www.reddit.com/prefs/apps 创建的脚本类型应用)
docker-compose up --build
```
生产者会等待消费者连接。两个容器都启动后,您将每5秒看到一次词频统计输出。
### 本地运行,两个终端
```
python3.11 -m venv .venv && source .venv/bin/activate
pip install -r requirements.txt
python -c "import nltk; nltk.download('stopwords')"
# 终端 1
export $(grep -v '^#' .env | xargs)
python src/consumer.py
# 终端 2
export $(grep -v '^#' .env | xargs)
python src/producer.py
```
首先启动消费者。生产者在流式传输之前会等待消费者连接,因此先启动消费者是正确的顺序。
## 仓库结构
```
reddit-streaming-pipeline/
├── src/
│ ├── producer.py # PRAW + TCP socket producer
│ └── consumer.py # Spark Streaming + word count consumer
├── docs/
│ └── architecture.md # pipeline diagram and notes
├── Dockerfile.producer
├── Dockerfile.consumer
├── docker-compose.yml
├── requirements.txt
├── .env.example
└── README.md
```
## Reddit API 设置
在 https://www.reddit.com/prefs/apps 注册一个Reddit脚本类型应用。复制 `client_id`(在应用名称下)和 `secret`。将两者都放入 `.env` 文件(参见 `.env.example`)。
默认的subreddit列表混合了新闻、AI和Python相关的subreddit,以保持窗口化词频统计的流数据活跃度。可以通过 `REDDIT_SUBREDDITS=python+news+worldnews` 覆盖。
## 生产环境扩展建议
- **使用Apache Kafka替代原始TCP传输。** 开发环境使用单个broker,高可用环境使用三个broker。生产者发布到一个主题,消费者订阅该主题。解耦了消费者和生产者的速率,并允许多个消费者同时读取同一个数据流。
- **从DStreams迁移到Structured Streaming。** 较新的基于DataFrame的API提供了事件时间语义,并通过检查点实现精确一次语义。
- **添加持久化存储。** 目前,词频统计结果打印到标准输出。真实的管道应将其写入可查询的存储:例如按分钟分区的parquet目录、Postgres表或实时索引。
- **作者级别信息丰富。** 将评论流与一个小的subreddit元数据或已知账户标志的查找表预先连接,并从驱动器广播该表。
- **背压机制。** 如果消费者落后,套接字缓冲区会被填满,生产者将阻塞。Kafka可以用消费者组的滞后指标替代这种机制。
## 关于原始笔记本代码的说明
S8的生产者笔记本在流循环内部接受新的套接字连接。这在消费者每条记录都重连时是可行的,但这并不是典型的流处理模式。在本仓库中,生产者仅接受一次消费者连接,保持其打开状态,并持续发送换行符分隔的记录。如果消费者断开连接,生产者会记录断开事件并等待新的连接。功能行为是一样的:消费者仍然能接收到记录。数据传输的格式正是Spark Streaming实际所期望的。
标签:Apache Spark Streaming, Docker, Docker Compose, ETW劫持, Homebrew安装, JSON, NIDS, NLTK, OAuth认证, PRAW, Python, Reddit API, TCP, 停用词过滤, 单词频率统计, 大数据, 安全防御评估, 实时分析, 实时数据处理, 实时数据流, 容器化, 容器编排, 广播变量, 数据序列化, 数据流, 数据管道, 无后门, 日志记录, 流处理, 流式计算, 环境变量配置, 生产者-消费者模式, 目录扫描, 社交媒体分析, 窗口化单词计数, 类型化Python, 请求拦截, 软件工程, 逆向工具