riverqueue/river

GitHub: riverqueue/river

一个为 Go 和 PostgreSQL 打造的高性能后台作业处理系统,通过事务性插入机制从根本上解决分布式一致性问题。

Stars: 4965 | Forks: 144

# River [![构建状态](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/3d34c4c372130523.svg)](https://github.com/riverqueue/river/actions) [![Go 参考](https://pkg.go.dev/badge/github.com/riverqueue/river.svg)](https://pkg.go.dev/github.com/riverqueue/river) River 是一个为 Go 和 Postgres 打造的健壮且高性能的作业处理系统。 参见[主页]、[文档]和 [godoc],以及 [River UI][riverui] 及其[在线演示][riveruidemo]。 由于专为 Postgres 构建,River 鼓励将同一个数据库用于应用程序数据和作业队列。通过与其他数据库更改一起在事务中插入作业,可以避免整类分布式系统问题。如果事务提交,则保证作业已插入;如果事务回滚,则作业将被移除;并且在提交_之前_作业不会被可见以供处理。有关此理念的更多背景信息,请参阅[事务性插入]。 ## 作业参数与 Worker 作业在结构体对中定义,由 [`JobArgs`] 的一个实现和 [`Worker`] 的一个实现组成。 Job args 包含 `json` 注解,定义了作业如何序列化到数据库以及如何从数据库反序列化,同时还有一个 "kind"(种类),这是一个唯一标识该作业的稳定字符串。 ``` type SortArgs struct { // Strings is a slice of strings to sort. Strings []string `json:"strings"` } func (SortArgs) Kind() string { return "sort" } ``` Workers 暴露了一个 `Work` 函数,用于指示作业如何运行。 ``` type SortWorker struct { // An embedded WorkerDefaults sets up default methods to fulfill the rest of // the Worker interface: river.WorkerDefaults[SortArgs] } func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error { sort.Strings(job.Args.Strings) fmt.Printf("Sorted strings: %+v\n", job.Args.Strings) return nil } ``` ## 注册 Worker 作业通过其 "kind" 字符串唯一标识。Workers 在启动时注册,以便 River 知道如何将作业分配给 workers: ``` workers := river.NewWorkers() // AddWorker panics if the worker is already registered or invalid: river.AddWorker(workers, &SortWorker{}) ``` ## 启动客户端 River 的 [`Client`] 提供了一个用于插入作业的接口,并管理作业处理和[维护服务]。客户端使用数据库连接池、[驱动]和包含 `Workers` 包及其他设置的配置结构体来创建。 下面是一个在 `Client` 中处理一个队列 (`"default"`) 且一次最多运行 100 个 worker goroutine 的示例: ``` riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: 100}, }, Workers: workers, }) if err != nil { panic(err) } // Run the client inline. All executed jobs will inherit from ctx: if err := riverClient.Start(ctx); err != nil { panic(err) } ``` ## 仅插入客户端 通常需要一个仅用于插入作业、但不对它们进行处理的客户端。这可以通过省略 `Queues` 配置并跳过对 `Start` 的调用来实现: ``` riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Workers: workers, }) if err != nil { panic(err) } ``` `Workers` 也可以省略,但最好将其包含在内,以便 River 可以检查插入的作业种类是否具有能够运行它们的 worker。 ### 停止 在程序关闭时也应该停止客户端: ``` // Stop fetching new work and wait for active jobs to finish. if err := riverClient.Stop(ctx); err != nil { panic(err) } ``` 确保客户端干净利落且及时地停止存在一些复杂性。有关 River 停止模式的更多详细信息,请参阅[优雅停机]。 ## 插入作业 [`Client.InsertTx`] 与 Job args 实例结合使用,用于在事务中插入要处理的作业: ``` _, err = riverClient.InsertTx(ctx, tx, SortArgs{ Strings: []string{ "whale", "tiger", "bear", }, }, nil) if err != nil { panic(err) } ``` 完整代码请参阅 [`InsertAndWork` 示例]。 ## 其他特性 - [批量插入作业],利用 Postgres 的 `COPY FROM` 高效地一次插入多个作业。 - 在工作函数内部[取消作业]。 - [错误与 panic 处理]。 - [多队列],以更好地保证作业吞吐量、worker 的可用性以及组件之间的隔离。 - [周期性与 cron 作业]。 - [计划作业],会在未来的预定时间自动运行。 - 在工作函数内部[暂停作业]。 - 队列活动和统计信息的[订阅],为日志记录和指标等遥测提供简单的钩子。 - [测试辅助工具],用于验证作业是否按预期插入。 - [事务性作业完成],以保证作业完成与其他事务中的更改一起提交。 - 基于参数、时间段、队列和状态的[唯一作业]。 - 用于检查作业和队列并与之交互的 [Web UI]。 - 用于简化 worker 实现的[工作函数]。 ## 跨语言插入 River 支持使用某些非 Go 语言插入作业,然后由 Go 实现来处理它们。在对性能敏感的情况下,这可能很受欢迎,因为作业可以利用 Go 的快速运行时。 - [从 Python 插入作业](https://riverqueue.com/docs/python)。 - [从 Ruby 插入作业](https://riverqueue.com/docs/ruby)。 ## 开发 请参阅[开发 River]。 ## 致谢 River 在很大程度上受到了我们多年来使用其他后台作业库的经验的启发,最著名的是: - Elixir 中的 [Oban](https://github.com/sorentwo/oban)。 - Ruby 中的 [Que](https://github.com/que-rb/que)、[Sidekiq](https://github.com/sidekiq/sidekiq)、[Delayed::Job](https://github.com/collectiveidea/delayed_job) 和 [GoodJob](https://github.com/bensheldon/good_job)。 - .NET 中的 [Hangfire](https://www.hangfire.io/)。 感谢你们推动了软件生态系统的发展。
标签:DNS解析, Go, Golang, Homebrew安装, JobQueue, Postgres, PostgreSQL, River, Ruby工具, Syscall, Web开发, 事务性, 任务队列, 作业系统, 作业调度, 分布式系统, 后台任务, 后端开发, 响应大小分析, 安全编程, 开源项目, 异步处理, 日志审计, 测试用例, 消息队列, 逆向工具, 高并发