hibiken/asynq

GitHub: hibiken/asynq

Asynq 是一个简单、可靠且高效的 Go 分布式任务队列库,基于 Redis 实现异步任务处理,解决后台任务的可靠分发与并发执行问题。

Stars: 13392 | Forks: 946

Asynq logo # 简单、可靠且高效的 Go 分布式任务队列 [![GoDoc](https://godoc.org/github.com/hibiken/asynq?status.svg)](https://godoc.org/github.com/hibiken/asynq) [![Go Report Card](https://goreportcard.com/badge/github.com/hibiken/asynq)](https://goreportcard.com/report/github.com/hibiken/asynq) ![Build Status](https://static.pigsec.cn/wp-content/uploads/repos/2026/06/69d6406f47201035.svg) [![License: MIT](https://img.shields.io/badge/license-MIT-green.svg)](https://opensource.org/licenses/MIT) [![Gitter chat](https://badges.gitter.im/go-asynq/gitter.svg)](https://gitter.im/go-asynq/community) Asynq 是一个 Go 库,用于将任务加入队列并由 worker 异步处理。它由 [Redis](https://redis.io/) 提供支持,旨在具备可扩展性的同时易于上手。 以下是 Asynq 工作原理的高级概述: - Client 将任务放入队列 - Server 从队列中提取任务,并为每个任务启动一个 worker goroutine - 任务由多个 worker 并发处理 任务队列被用作在多台机器间分发工作的机制。一个系统可以由多个 worker server 和 broker 组成,从而实现高可用性和水平扩展。 **示例用例** ![Task Queue Diagram](https://static.pigsec.cn/wp-content/uploads/repos/2026/06/08372e7c5f201039.jpg) ## 功能 - 保证任务[至少执行一次](https://www.cloudcomputingpatterns.org/at_least_once_delivery/) - 任务调度 - 失败任务[重试](https://github.com/hibiken/asynq/wiki/Task-Retry) - worker 崩溃时任务自动恢复 - [加权优先级队列](https://github.com/hibiken/asynq/wiki/Queue-Priority#weighted-priority) - [严格优先级队列](https://github.com/hibiken/asynq/wiki/Queue-Priority#strict-priority) - 添加任务低延迟,因为在 Redis 中写入速度非常快 - 使用 [unique 选项](https://github.com/hibiken/asynq/wiki/Unique-Tasks)实现任务去重 - 允许[为每个任务设置 timeout 和 deadline](https://github.com/hibiken/asynq/wiki/Task-Timeout-and-Cancelation) - 允许[聚合任务组](https://github.com/hibiken/asynq/wiki/Task-aggregation)以批量处理多个连续操作 - [灵活的 handler 接口,支持 middleware](https://github.com/hibiken/asynq/wiki/Handler-Deep-Dive) - [支持暂停队列](/tools/asynq/README.md#pause)以停止处理该队列中的任务 - [周期性任务](https://github.com/hibiken/asynq/wiki/Periodic-Tasks) - [支持 Redis Sentinels](https://github.com/hibiken/asynq/wiki/Automatic-Failover) 以实现高可用性 - 集成 [Prometheus](https://prometheus.io/) 以收集和可视化队列指标 - [Web UI](#web-ui) 用于检查和远程控制队列与任务 - [CLI](#command-line-tool) 用于检查和远程控制队列与任务 ## 稳定性与兼容性 **状态**:该库相对稳定,目前处于**适度开发**阶段,破坏性 API 变更频率较低。 ### Redis Cluster 兼容性 此库中的某些 lua 脚本可能与 Redis Cluster 不兼容。 ## 快速开始 确保你已安装 Go([下载](https://golang.org/dl/))。支持**最近两个** Go 版本(参见 https://go.dev/dl)。 通过创建一个文件夹并在其中运行 `go mod init github.com/your/repo`([了解更多](https://blog.golang.org/using-go-modules))来初始化你的项目。然后使用 [`go get`](https://golang.org/cmd/go/#hdr-Add_dependencies_to_current_module_and_install_them) 命令安装 Asynq 库: ``` go get -u github.com/hibiken/asynq ``` 确保你在本地或 [Docker](https://hub.docker.com/_/redis) 容器中运行了 Redis 服务器。需要 `4.0` 或更高版本。 接下来,编写一个封装任务创建和任务处理的 package。 ``` package tasks import ( "context" "encoding/json" "fmt" "log" "time" "github.com/hibiken/asynq" ) // A list of task types. const ( TypeEmailDelivery = "email:deliver" TypeImageResize = "image:resize" ) type EmailDeliveryPayload struct { UserID int TemplateID string } type ImageResizePayload struct { SourceURL string } //---------------------------------------------- // Write a function NewXXXTask to create a task. // A task consists of a type and a payload. //---------------------------------------------- func NewEmailDeliveryTask(userID int, tmplID string) (*asynq.Task, error) { payload, err := json.Marshal(EmailDeliveryPayload{UserID: userID, TemplateID: tmplID}) if err != nil { return nil, err } return asynq.NewTask(TypeEmailDelivery, payload), nil } func NewImageResizeTask(src string) (*asynq.Task, error) { payload, err := json.Marshal(ImageResizePayload{SourceURL: src}) if err != nil { return nil, err } // task options can be passed to NewTask, which can be overridden at enqueue time. return asynq.NewTask(TypeImageResize, payload, asynq.MaxRetry(5), asynq.Timeout(20 * time.Minute)), nil } //--------------------------------------------------------------- // Write a function HandleXXXTask to handle the input task. // Note that it satisfies the asynq.HandlerFunc interface. // // Handler doesn't need to be a function. You can define a type // that satisfies asynq.Handler interface. See examples below. //--------------------------------------------------------------- func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error { var p EmailDeliveryPayload if err := json.Unmarshal(t.Payload(), &p); err != nil { return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) } log.Printf("Sending Email to User: user_id=%d, template_id=%s", p.UserID, p.TemplateID) // Email delivery code ... return nil } // ImageProcessor implements asynq.Handler interface. type ImageProcessor struct { // ... fields for struct } func (processor *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { var p ImageResizePayload if err := json.Unmarshal(t.Payload(), &p); err != nil { return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) } log.Printf("Resizing image: src=%s", p.SourceURL) // Image resizing code ... return nil } func NewImageProcessor() *ImageProcessor { return &ImageProcessor{} } ``` 在你的应用代码中,导入上述 package 并使用 [`Client`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Client) 将任务放入队列。 ``` package main import ( "log" "time" "github.com/hibiken/asynq" "your/app/package/tasks" ) const redisAddr = "127.0.0.1:6379" func main() { client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr}) defer client.Close() // ------------------------------------------------------ // Example 1: Enqueue task to be processed immediately. // Use (*Client).Enqueue method. // ------------------------------------------------------ task, err := tasks.NewEmailDeliveryTask(42, "some:template:id") if err != nil { log.Fatalf("could not create task: %v", err) } info, err := client.Enqueue(task) if err != nil { log.Fatalf("could not enqueue task: %v", err) } log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue) // ------------------------------------------------------------ // Example 2: Schedule task to be processed in the future. // Use ProcessIn or ProcessAt option. // ------------------------------------------------------------ info, err = client.Enqueue(task, asynq.ProcessIn(24*time.Hour)) if err != nil { log.Fatalf("could not schedule task: %v", err) } log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue) // ---------------------------------------------------------------------------- // Example 3: Set other options to tune task processing behavior. // Options include MaxRetry, Queue, Timeout, Deadline, Unique etc. // ---------------------------------------------------------------------------- task, err = tasks.NewImageResizeTask("https://example.com/myassets/image.jpg") if err != nil { log.Fatalf("could not create task: %v", err) } info, err = client.Enqueue(task, asynq.MaxRetry(10), asynq.Timeout(3 * time.Minute)) if err != nil { log.Fatalf("could not enqueue task: %v", err) } log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue) } ``` 接下来,启动一个 worker server 在后台处理这些任务。要启动后台 worker,请使用 [`Server`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Server) 并提供你的 [`Handler`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Handler) 来处理任务。 你也可以选择使用 [`ServeMux`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#ServeMux) 来创建 handler,就像你使用 [`net/http`](https://golang.org/pkg/net/http/) Handler 那样。 ``` package main import ( "log" "github.com/hibiken/asynq" "your/app/package/tasks" ) const redisAddr = "127.0.0.1:6379" func main() { srv := asynq.NewServer( asynq.RedisClientOpt{Addr: redisAddr}, asynq.Config{ // Specify how many concurrent workers to use Concurrency: 10, // Optionally specify multiple queues with different priority. Queues: map[string]int{ "critical": 6, "default": 3, "low": 1, }, // See the godoc for other configuration options }, ) // mux maps a type to a handler mux := asynq.NewServeMux() mux.HandleFunc(tasks.TypeEmailDelivery, tasks.HandleEmailDeliveryTask) mux.Handle(tasks.TypeImageResize, tasks.NewImageProcessor()) // ...register other handlers... if err := srv.Run(mux); err != nil { log.Fatalf("could not run server: %v", err) } } ``` 有关该库更详细的操作演练,请参阅我们的[入门指南](https://github.com/hibiken/asynq/wiki/Getting-Started)。 要了解更多关于 `asynq` 的功能和 API,请查看 package 的 [godoc](https://godoc.org/github.com/hibiken/asynq)。 ## Web UI [Asynqmon](https://github.com/hibiken/asynqmon) 是一个基于 Web 的工具,用于监控和管理 Asynq 队列和任务。 以下是 Web UI 的几张截图: **队列视图** ![Web UI Queues View](https://static.pigsec.cn/wp-content/uploads/repos/2026/06/83fbce6c50201105.png) **任务视图** ![Web UI TasksView](https://static.pigsec.cn/wp-content/uploads/repos/2026/06/dda5317b4d201108.png) **指标视图** Screen Shot 2021-12-19 at 4 37 19 PM **设置与自适应深色模式** ![Web UI Settings and adaptive dark mode](https://static.pigsec.cn/wp-content/uploads/repos/2026/06/eefd177b2d201115.png) 有关如何使用该工具的详细信息,请参阅该工具的 [README](https://github.com/hibiken/asynqmon#readme)。 ## 命令行工具 Asynq 附带了一个命令行工具,用于检查队列和任务的状态。 要安装该 CLI 工具,请运行以下命令: ``` go install github.com/hibiken/asynq/tools/asynq@latest ``` 以下是运行 `asynq dash` 命令的示例: ![Gif](/docs/assets/dash.gif) 有关如何使用该工具的详细信息,请参阅该工具的 [README](/tools/asynq/README.md)。 ## 贡献 我们欢迎并感谢社区提供的任何贡献(GitHub issues/PR、在 [Gitter channel](https://gitter.im/go-asynq/community) 上的反馈等)。 在贡献之前,请参阅[贡献指南](/CONTRIBUTING.md)。 ## 许可证 Copyright (c) 2019-present [Ken Hibino](https://github.com/hibiken) 及[贡献者](https://github.com/hibiken/asynq/graphs/contributors)。`Asynq` 是免费的开源软件,采用 [MIT License](https://github.com/hibiken/asynq/blob/master/LICENSE) 授权。官方 logo 由 [Vic Shóstak](https://github.com/koddr) 创建,并根据 [Creative Commons](https://creativecommons.org/publicdomain/zero/1.0/) 许可证(CC0 1.0 Universal)分发。
标签:Go, Redis, Ruby工具, 分布式任务队列, 后端开发, 异步处理, 搜索引擎查询, 日志审计, 自定义请求头, 高并发