riverqueue/river
GitHub: riverqueue/river
一个为 Go 和 PostgreSQL 打造的高性能后台作业处理系统,通过事务性插入机制从根本上解决分布式一致性问题。
Stars: 4965 | Forks: 144
# River [](https://github.com/riverqueue/river/actions) [](https://pkg.go.dev/github.com/riverqueue/river)
River 是一个为 Go 和 Postgres 打造的健壮且高性能的作业处理系统。
参见[主页]、[文档]和 [godoc],以及 [River UI][riverui] 及其[在线演示][riveruidemo]。
由于专为 Postgres 构建,River 鼓励将同一个数据库用于应用程序数据和作业队列。通过与其他数据库更改一起在事务中插入作业,可以避免整类分布式系统问题。如果事务提交,则保证作业已插入;如果事务回滚,则作业将被移除;并且在提交_之前_作业不会被可见以供处理。有关此理念的更多背景信息,请参阅[事务性插入]。
## 作业参数与 Worker
作业在结构体对中定义,由 [`JobArgs`] 的一个实现和 [`Worker`] 的一个实现组成。
Job args 包含 `json` 注解,定义了作业如何序列化到数据库以及如何从数据库反序列化,同时还有一个 "kind"(种类),这是一个唯一标识该作业的稳定字符串。
```
type SortArgs struct {
// Strings is a slice of strings to sort.
Strings []string `json:"strings"`
}
func (SortArgs) Kind() string { return "sort" }
```
Workers 暴露了一个 `Work` 函数,用于指示作业如何运行。
```
type SortWorker struct {
// An embedded WorkerDefaults sets up default methods to fulfill the rest of
// the Worker interface:
river.WorkerDefaults[SortArgs]
}
func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
sort.Strings(job.Args.Strings)
fmt.Printf("Sorted strings: %+v\n", job.Args.Strings)
return nil
}
```
## 注册 Worker
作业通过其 "kind" 字符串唯一标识。Workers 在启动时注册,以便 River 知道如何将作业分配给 workers:
```
workers := river.NewWorkers()
// AddWorker panics if the worker is already registered or invalid:
river.AddWorker(workers, &SortWorker{})
```
## 启动客户端
River 的 [`Client`] 提供了一个用于插入作业的接口,并管理作业处理和[维护服务]。客户端使用数据库连接池、[驱动]和包含 `Workers` 包及其他设置的配置结构体来创建。
下面是一个在 `Client` 中处理一个队列 (`"default"`) 且一次最多运行 100 个 worker goroutine 的示例:
```
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Workers: workers,
})
if err != nil {
panic(err)
}
// Run the client inline. All executed jobs will inherit from ctx:
if err := riverClient.Start(ctx); err != nil {
panic(err)
}
```
## 仅插入客户端
通常需要一个仅用于插入作业、但不对它们进行处理的客户端。这可以通过省略 `Queues` 配置并跳过对 `Start` 的调用来实现:
```
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Workers: workers,
})
if err != nil {
panic(err)
}
```
`Workers` 也可以省略,但最好将其包含在内,以便 River 可以检查插入的作业种类是否具有能够运行它们的 worker。
### 停止
在程序关闭时也应该停止客户端:
```
// Stop fetching new work and wait for active jobs to finish.
if err := riverClient.Stop(ctx); err != nil {
panic(err)
}
```
确保客户端干净利落且及时地停止存在一些复杂性。有关 River 停止模式的更多详细信息,请参阅[优雅停机]。
## 插入作业
[`Client.InsertTx`] 与 Job args 实例结合使用,用于在事务中插入要处理的作业:
```
_, err = riverClient.InsertTx(ctx, tx, SortArgs{
Strings: []string{
"whale", "tiger", "bear",
},
}, nil)
if err != nil {
panic(err)
}
```
完整代码请参阅 [`InsertAndWork` 示例]。
## 其他特性
- [批量插入作业],利用 Postgres 的 `COPY FROM` 高效地一次插入多个作业。
- 在工作函数内部[取消作业]。
- [错误与 panic 处理]。
- [多队列],以更好地保证作业吞吐量、worker 的可用性以及组件之间的隔离。
- [周期性与 cron 作业]。
- [计划作业],会在未来的预定时间自动运行。
- 在工作函数内部[暂停作业]。
- 队列活动和统计信息的[订阅],为日志记录和指标等遥测提供简单的钩子。
- [测试辅助工具],用于验证作业是否按预期插入。
- [事务性作业完成],以保证作业完成与其他事务中的更改一起提交。
- 基于参数、时间段、队列和状态的[唯一作业]。
- 用于检查作业和队列并与之交互的 [Web UI]。
- 用于简化 worker 实现的[工作函数]。
## 跨语言插入
River 支持使用某些非 Go 语言插入作业,然后由 Go 实现来处理它们。在对性能敏感的情况下,这可能很受欢迎,因为作业可以利用 Go 的快速运行时。
- [从 Python 插入作业](https://riverqueue.com/docs/python)。
- [从 Ruby 插入作业](https://riverqueue.com/docs/ruby)。
## 开发
请参阅[开发 River]。
## 致谢
River 在很大程度上受到了我们多年来使用其他后台作业库的经验的启发,最著名的是:
- Elixir 中的 [Oban](https://github.com/sorentwo/oban)。
- Ruby 中的 [Que](https://github.com/que-rb/que)、[Sidekiq](https://github.com/sidekiq/sidekiq)、[Delayed::Job](https://github.com/collectiveidea/delayed_job) 和 [GoodJob](https://github.com/bensheldon/good_job)。
- .NET 中的 [Hangfire](https://www.hangfire.io/)。
感谢你们推动了软件生态系统的发展。
标签:DNS解析, Go, Golang, Homebrew安装, JobQueue, Postgres, PostgreSQL, River, Ruby工具, Syscall, Web开发, 事务性, 任务队列, 作业系统, 作业调度, 分布式系统, 后台任务, 后端开发, 响应大小分析, 安全编程, 开源项目, 异步处理, 日志审计, 测试用例, 消息队列, 逆向工具, 高并发