hatchet-dev/hatchet

GitHub: hatchet-dev/hatchet

基于Postgres的后台任务与持久化工作流平台,提供任务队列、DAG编排、流量控制和实时可观测性。

Stars: 6683 | Forks: 313

Hatchet Logo ### 大规模运行后台任务 [![文档](https://img.shields.io/badge/docs-docs.hatchet.run-3F16E4)](https://docs.hatchet.run) [![许可证: MIT](https://img.shields.io/badge/License-MIT-purple.svg)](https://opensource.org/licenses/MIT) [![Go 参考](https://pkg.go.dev/badge/github.com/hatchet-dev/hatchet.svg)](https://pkg.go.dev/github.com/hatchet-dev/hatchet) [![NPM 下载量](https://img.shields.io/npm/dm/%40hatchet-dev%2Ftypescript-sdk)](https://www.npmjs.com/package/@hatchet-dev/typescript-sdk) [![Discord](https://img.shields.io/discord/1088927970518909068?style=social&logo=discord)](https://hatchet.run/discord) [![Twitter](https://img.shields.io/twitter/url/https/twitter.com/hatchet-dev.svg?style=social&label=Follow%20%40hatchet-dev)](https://twitter.com/hatchet_dev) [![GitHub Repo 星标](https://img.shields.io/github/stars/hatchet-dev/hatchet?style=social)](https://github.com/hatchet-dev/hatchet)

Hatchet Cloud · 文档 · 官网 · 问题

### 什么是 Hatchet? Hatchet 是一个基于 Postgres 构建的平台,用于运行后台任务和持久化工作流。它将持久化任务队列、可观测性、告警、仪表板和 CLI 打包到一个平台中。 ### 快速入门 开始使用运行中的 Hatchet 实例最快的方法是安装 Hatchet CLI(在 MacOS、Linux 或 WSL 上)——请注意,这需要在本地安装 [Docker](https://www.docker.com/get-started) 才能工作: ``` curl -fsSL https://install.hatchet.run/install.sh | bash hatchet --version hatchet server start ``` 你也可以注册 [Hatchet Cloud](https://cloud.onhatchet.run) 进行试用!即使你计划自托管,我们也推荐这种方式,这样你可以了解完全部署的 Hatchet 平台是什么样子的。 要查看关于自托管和使用云服务的完整文档,请查看 [文档](https://docs.hatchet.run)。 ### 什么时候应该使用 Hatchet? 后台任务对于从主 Web 应用程序中卸载工作至关重要。通常,后台任务通过 FIFO(先进先出)队列发送,这有助于防止流量峰值(队列可以吸收大量负载),并确保当任务处理器出错时重试任务。大多数技术栈一开始都使用基于 Redis 或 RabbitMQ 的库支持队列(如 Celery 或 BullMQ)。但随着任务变得越来越复杂,这些队列变得难以调试、监控,并开始以意想不到的方式失败。 这就是 Hatchet 的用武之地。Hatchet 是一个功能全面的后台任务管理平台,内置支持将复杂任务链接成工作流、失败告警、增强任务持久性以及在实时 Web 仪表板中查看任务。 ### 功能
📥 队列 #### Hatchet 建立在持久化任务队列之上,该队列将你的任务入队,并以 worker 能够承受的速率将其发送给你的 worker。Hatchet 将跟踪你的任务进度,并确保工作完成(或者你收到警报),即使你的应用程序崩溃。 **这特别适用于:** - 确保你永远不会丢失用户请求 - 平抑应用程序中的巨大峰值 - 将大型复杂逻辑分解为更小的、可重用的任务 [阅读更多 ➶](https://docs.hatchet.run/home/your-first-task) -
Python # 1. 定义你的任务输入 class SimpleInput(BaseModel): message: str # 2. 使用 hatchet.task 定义你的任务 @hatchet.task(name="SimpleWorkflow", input_validator=SimpleInput) def simple(input: SimpleInput, ctx: Context) -> dict[str, str]: return { "transformed_message": input.message.lower(), } # 3. 在你的 worker 上注册你的任务 worker = hatchet.worker("test-worker", workflows=[simple]) worker.start() # 4. 从你的应用程序调用任务 simple.run(SimpleInput(message="Hello World!"))
-
Typescript // 1. 定义你的任务输入 export type SimpleInput = { Message: string; }; // 2. 使用 hatchet.task 定义你的任务 export const simple = hatchet.task({ name: "simple", fn: (input: SimpleInput) => { return { TransformedMessage: input.Message.toLowerCase(), }; }, }); // 3. 在你的 worker 上注册你的任务 const worker = await hatchet.worker("simple-worker", { workflows: [simple], }); await worker.start(); // 4. 从你的应用程序调用任务 await simple.run({ Message: "Hello World!", });
-
Go // 1. 定义你的任务输入 type SimpleInput struct { Message string `json:"message"` } // 2. 使用 factory.NewTask 定义你的任务 simple := factory.NewTask( create.StandaloneTask{ Name: "simple-task", }, func(ctx worker.HatchetContext, input SimpleInput) (*SimpleResult, error) { return &SimpleResult{ TransformedMessage: strings.ToLower(input.Message), }, nil }, hatchet, ) // 3. 在你的 worker 上注册你的任务 worker, err := hatchet.Worker(v1worker.WorkerOpts{ Name: "simple-worker", Workflows: []workflow.WorkflowBase{ simple, }, }) worker.StartBlocking() // 4. 从你的应用程序调用任务 simple.Run(context.Background(), SimpleInput{Message: "Hello, World!"})
🎻 任务编排 #### Hatchet 允许你构建由多个任务组成的复杂工作流。例如,如果你想把一个工作负载分解成更小的任务,你可以使用 Hatchet 创建一个扇出工作流,并行生成多个任务。 Hatchet 支持以下任务编排机制: - **DAG(有向无环图)** —— 预定义你的工作形状,自动将父任务的输出路由到子任务的输入。[阅读更多 ➶](https://docs.hatchet.run/home/dags) - **持久化任务** —— 这些任务负责编排其他任务。它们存储所有生成任务的完整历史记录,允许你缓存中间结果。[阅读更多 ➶](https://docs.hatchet.run/home/durable-execution) -
Python # 1. 定义一个工作流(工作流是任务的集合) simple = hatchet.workflow(name="SimpleWorkflow") # 2. 将第一个任务附加到工作流 @simple.task() def task_1(input: EmptyModel, ctx: Context) -> dict[str, str]: print("executed task_1") return {"result": "task_1"} # 3. 将第二个任务附加到工作流,该任务在 task_1 之后执行 @simple.task(parents=[task_1]) def task_2(input: EmptyModel, ctx: Context) -> None: first_result = ctx.task_output(task_1) print(first_result) # 4. 从你的应用程序调用工作流 result = simple.run(input_data)
-
Typescript // 1. 定义一个工作流(工作流是任务的集合) const simple = hatchet.workflow({ name: "simple", }); // 2. 将第一个任务附加到工作流 const task1 = simple.task({ name: "task-1", fn: (input) => { return { result: "task-1", }; }, }); // 3. 将第二个任务附加到工作流,该任务在 task-1 之后执行 const task2 = simple.task({ name: "task-2", parents: [task1], fn: (input, ctx) => { const firstResult = ctx.getParentOutput(task1); console.log(firstResult); }, }); // 4. 从你的应用程序调用工作流 await simple.run({ Message: "Hello World" });
-
Go // 1. 定义一个工作流(工作流是任务的集合) simple := v1.WorkflowFactory[DagInput, DagOutput]( workflow.CreateOpts[DagInput]{ Name: "simple-workflow", }, hatchet, ) // 2. 将第一个任务附加到工作流 const task1 = simple.Task( task.CreateOpts[DagInput]{ Name: "task-1", Fn: func(ctx worker.HatchetContext, _ DagInput) (*SimpleOutput, error) { return &SimpleOutput{ Result: "task-1", }, nil }, }, ); // 3. 将第二个任务附加到工作流,该任务在 task-1 之后执行 const task2 = simple.Task( task.CreateOpts[DagInput]{ Name: "task-2", Parents: []task.NamedTask{ step1, }, Fn: func(ctx worker.HatchetContext, _ DagInput) (*SimpleOutput, error) { return &SimpleOutput{ Result: "task-2", }, nil }, }, ); // 4. 从你的应用程序调用工作流 simple.Run(ctx, DagInput{})
🚦 流量控制 #### 不要让繁忙的用户弄垮你的应用程序。使用 Hatchet,你可以基于每个用户、每个租户和每个队列限制执行速度,从而提高系统稳定性并限制繁忙用户对系统其余部分的影响。 Hatchet 支持以下流量控制原语: - **并发** —— 基于动态并发键设置并发限制(例如,每个用户在给定时间只能运行 10 个批处理作业)。[阅读更多 ➶](https://docs.hatchet.run/home/concurrency) - **速率限制** —— 创建全局和动态速率限制。[阅读更多 ➶](https://docs.hatchet.run/home/rate-limits) -
Python # limit concurrency on a per-user basis flow_control_workflow = hatchet.workflow( name="FlowControlWorkflow", concurrency=ConcurrencyExpression( expression="input.user_id", max_runs=5, limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN, ), input_validator=FlowControlInput, ) # rate limit a task per user to 10 tasks per minute, with each task consuming 1 unit @flow_control_workflow.task( rate_limits=[ RateLimit( dynamic_key="input.user_id", units=1, limit=10, duration=RateLimitDuration.MINUTE, ) ] ) def rate_limit_task(input: FlowControlInput, ctx: Context) -> None: print("executed rate_limit_task")
-
Typescript // limit concurrency on a per-user basis flowControlWorkflow = hatchet.workflow({ name: "ConcurrencyLimitWorkflow", concurrency: { expression: "input.userId", maxRuns: 5, limitStrategy: ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN, }, }); // rate limit a task per user to 10 tasks per minute, with each task consuming 1 unit flowControlWorkflow.task({ name: "rate-limit-task", rateLimits: [ { dynamicKey: "input.userId", units: 1, limit: 10, duration: RateLimitDuration.MINUTE, }, ], fn: async (input) => { return { Completed: true, }; }, });
-
Go // limit concurrency on a per-user basis flowControlWorkflow := factory.NewWorkflow[DagInput, DagResult]( create.WorkflowCreateOpts[DagInput]{ Name: "simple-dag", Concurrency: []*types.Concurrency{ { Expression: "input.userId", MaxRuns: 1, LimitStrategy: types.GroupRoundRobin, }, }, }, hatchet, ) // rate limit a task per user to 10 tasks per minute, with each task consuming 1 unit flowControlWorkflow.Task( create.WorkflowTask[FlowControlInput, FlowControlOutput]{ Name: "rate-limit-task", RateLimits: []*types.RateLimit{ { Key: "user-rate-limit", KeyExpr: "input.userId", Units: 1, LimitValueExpr: 10, Duration: types.Minute, }, }, }, func(ctx worker.HatchetContext, input FlowControlInput) (interface{}, error) { return &SimpleOutput{ Step: 1, }, nil }, )
📅 调度 #### Hatchet 全面支持调度功能,包括 cron、一次性调度以及暂停执行一段时间。这特别适用于: - **Cron 计划** —— 按 cron 计划运行数据管道、批处理或通知系统 [阅读更多 ➶](https://docs.hatchet.run/home/cron-runs) - **一次性任务** —— 为未来的特定时间安排工作流 [阅读更多 ➶](https://docs.hatchet.run/home/scheduled-runs) - **持久化睡眠** —— 暂停任务执行特定持续时间 [阅读更多 ➶](https://docs.hatchet.run/home/durable-execution) -
Python tomorrow = datetime.today() + timedelta(days=1) # schedule a task to run tomorrow scheduled = simple.schedule( tomorrow, SimpleInput(message="Hello, World!") ) # schedule a task to run every day at midnight cron = simple.cron( "every-day", "0 0 * * *", SimpleInput(message="Hello, World!") )
-
Typescript const tomorrow = new Date(Date.now() + 1000 * 60 * 60 * 24); // schedule a task to run tomorrow const scheduled = simple.schedule(tomorrow, { Message: "Hello, World!", }); // schedule a task to run every day at midnight const cron = simple.cron("every-day", "0 0 * * *", { Message: "Hello, World!", });
-
Go const tomorrow = time.Now().Add(24 * time.Hour); // schedule a task to run tomorrow simple.Schedule(ctx, tomorrow, ScheduleInput{ Message: "Hello, World!", }) // schedule a task to run every day at midnight simple.Cron(ctx, "every-day", "0 0 * * *", CronInput{ Message: "Hello, World!", })
🚏 任务路由 #### 虽然 Hatchet 的默认行为是实现 FIFO 队列,但它也支持额外的调度机制,将你的任务路由到理想的 worker。 - **粘性分配** —— 允许生成的任务优先或要求在同一个 worker 上执行。[阅读更多 ➶](https://docs.hatchet.run/home/sticky-assignment) - **Worker 亲和性** —— 对 worker 进行排名,以发现哪个最适合处理给定任务。[阅读更多 ➶](https://docs.hatchet.run/home/worker-affinity) -
Python # create a workflow which prefers to run on the same worker, but can be # scheduled on any worker if the original worker is busy hatchet.workflow( name="StickyWorkflow", sticky=StickyStrategy.SOFT, ) # create a workflow which must run on the same worker hatchet.workflow( name="StickyWorkflow", sticky=StickyStrategy.HARD, )
-
Typescript // create a workflow which prefers to run on the same worker, but can be // scheduled on any worker if the original worker is busy hatchet.workflow({ name: "StickyWorkflow", sticky: StickyStrategy.SOFT, }); // create a workflow which must run on the same worker hatchet.workflow({ name: "StickyWorkflow", sticky: StickyStrategy.HARD, });
-
Go // create a workflow which prefers to run on the same worker, but can be // scheduled on any worker if the original worker is busy factory.NewWorkflow[StickyInput, StickyOutput]( create.WorkflowCreateOpts[StickyInput]{ Name: "sticky-dag", StickyStrategy: types.StickyStrategy_SOFT, }, hatchet, ); // create a workflow which must run on the same worker factory.NewWorkflow[StickyInput, StickyOutput]( create.WorkflowCreateOpts[StickyInput]{ Name: "sticky-dag", StickyStrategy: types.StickyStrategy_HARD, }, hatchet, );
⚡️ 事件触发器和监听器 #### Hatchet 支持基于事件的架构,任务和工作流可以在等待特定外部事件时暂停执行。它支持以下功能: - **事件监听** —— 任务可以暂停,直到触发特定事件。[阅读更多 ➶](https://docs.hatchet.run/home/durable-execution) - **事件触发** —— 事件可以触发新工作流或工作流中的步骤。[阅读更多 ➶](https://docs.hatchet.run/home/run-on-event) -
Python # Create a task which waits for an external user event or sleeps for 10 seconds @dag_with_conditions.task( parents=[first_task], wait_for=[ or_( SleepCondition(timedelta(seconds=10)), UserEventCondition(event_key="user:event"), ) ] ) def second_task(input: EmptyModel, ctx: Context) -> dict[str, str]: return {"completed": "true"}
-
Typescript // Create a task which waits for an external user event or sleeps for 10 seconds dagWithConditions.task({ name: "secondTask", parents: [firstTask], waitFor: Or({ eventKey: "user:event" }, { sleepFor: "10s" }), fn: async (_, ctx) => { return { Completed: true, }; }, });
-
Go // Create a task which waits for an external user event or sleeps for 10 seconds simple.Task( conditionOpts{ Name: "Step2", Parents: []create.NamedTask{ step1, }, WaitFor: condition.Conditions( condition.UserEventCondition("user:event", "'true'"), condition.SleepCondition(10 * time.Second), ), }, func(ctx worker.HatchetContext, input DagWithConditionsInput) (interface{}, error) { // ... }, );
🖥️ 实时 Web UI #### Hatchet 附带了许多功能来帮助你监控任务、工作流和队列。 **实时仪表板和指标** 通过实时更新监控你的任务、工作流和队列,以快速检测问题。内置告警功能,以便你可以在问题发生时立即响应。 https://github.com/user-attachments/assets/b1797540-c9da-4057-b50f-4780f52a2cb9 **日志记录** Hatchet 支持从你的任务记录日志,允许你轻松地将任务失败与系统中的日志关联起来。不再需要翻阅日志服务来弄清楚任务失败的原因。 https://github.com/user-attachments/assets/427c15cd-8842-4b54-ab2e-3b1cabc01c7b **告警** Hatchet 支持任务失败时的 Slack 和基于电子邮件的告警。警报是实时的,具有可调整的告警窗口。
### 文档 最新的文档可以在 https://docs.hatchet.run 找到。 ### 社区与支持 - [Discord](https://discord.gg/ZMeUafwH89) - 最适合与维护者取得联系并与社区交流 - [Github Issues](https://github.com/hatchet-dev/hatchet/issues) - 用于提交错误报告 - [Github Discussions](https://github.com/hatchet-dev/hatchet/discussions) - 用于开始适合异步沟通的深入技术讨论 - [电子邮件](mailto:contact@hatchet.run) - 最适合获取 Hatchet Cloud 支持以及有关计费、数据删除等方面的帮助 ### Hatchet 对比...
Hatchet vs Temporal #### Hatchet 旨在成为一个通用的任务编排平台——它可以用作队列、基于 DAG 的编排器、持久化执行引擎,或三者兼而有之。因此,Hatchet 涵盖了更广泛的用例,如多种排队策略、速率限制、DAG 功能、条件触发、流功能等等。 Temporal 专注于持久化执行,并支持更广泛的数据库后端和结果存储,如 Apache Cassandra、MySQL、PostgreSQL 和 SQLite。 **何时使用 Hatchet:** 当你希望更好地控制底层队列逻辑,运行基于 DAG 的工作流,或者希望通过仅运行 Hatchet 引擎和 Postgres 来简化自托管时。 **何时使用 Temporal:** 当你想使用非 Postgres 的结果存储,或者你唯一的工作负载最适合持久化执行时。
Hatchet vs 任务队列 (BullMQ, Celery) #### Hatchet 是一个持久化任务队列,这意味着它会保留所有执行的历史记录(直到保留期),这允许轻松监控 + 调试,并为上述许多持久化功能提供支持。这不是 Celery 和 BullMQ 的标准行为(你需要依赖功能极其有限的第三方 UI 工具,如 Celery Flower)。 **何时使用 Hatchet:** 当你希望结果持久化并可在 UI 中观察时 **何时使用像 BullMQ/Celery 这样的任务队列库:** 当你需要非常高的吞吐量(>10k/s)且不需要保留,或者当你想使用单个库(而不是像 Hatchet 这样的独立服务)与队列交互时。
Hatchet vs 基于 DAG 的平台 (Airflow, Prefect, Dagster) #### 这些工具通常是为数据工程师设计的,并非设计为高容量应用程序的一部分。它们通常具有更高的延迟和更高的成本,其主要卖点是与常见数据存储和连接器的集成。 **何时使用 Hatchet:** 当你想使用基于 DAG 的框架,编写自己的集成和函数,并且需要更高的吞吐量(>100/s)时 **何时使用其他基于 DAG 的平台:** 当你想使用开箱即用的其他数据存储和连接器时
Hatchet vs AI 框架 #### 大多数 AI 框架构建为在内存中运行,横向扩展和持久化是事后才考虑的。虽然你可以将 AI 框架与 Hatchet 结合使用,但我们的大多数用户都丢弃了他们的 AI 框架,并使用 Hatchet 的原语来构建他们的应用程序。 **何时使用 Hatchet:** 当你希望完全控制底层函数和 LLM 调用,或者你的函数需要高可用性和持久性时。 **何时使用 AI 框架:** 当你想通过简单的抽象快速入门时。
### 问题 请通过 Github issues 提交你遇到的任何错误。 ### 我想贡献 请在 [Discord](https://discord.gg/ZMeUafwH89) 的 #contributing 频道告诉我们你有兴趣做什么。这将帮助我们塑造项目的方向,并使协作变得更加容易!
标签:API集成, Docker, EVTX分析, Go语言, MIT许可, PostgreSQL, TypeScript, 任务调度, 任务队列, 分布式系统, 可观测性, 后台任务, 响应大小分析, 威胁情报, 安全插件, 安全防御评估, 工作流引擎, 延迟任务, 开发者工具, 开源, 持久化工作流, 日志审计, 横向扩展, 消息队列, 程序破解, 请求拦截, 逆向工具