hatchet-dev/hatchet
GitHub: hatchet-dev/hatchet
基于Postgres的后台任务与持久化工作流平台,提供任务队列、DAG编排、流量控制和实时可观测性。
Stars: 6683 | Forks: 313
Hatchet Cloud · 文档 · 官网 · 问题
📥 队列
#### Hatchet 建立在持久化任务队列之上,该队列将你的任务入队,并以 worker 能够承受的速率将其发送给你的 worker。Hatchet 将跟踪你的任务进度,并确保工作完成(或者你收到警报),即使你的应用程序崩溃。 **这特别适用于:** - 确保你永远不会丢失用户请求 - 平抑应用程序中的巨大峰值 - 将大型复杂逻辑分解为更小的、可重用的任务 [阅读更多 ➶](https://docs.hatchet.run/home/your-first-task) -Python
# 1. 定义你的任务输入
class SimpleInput(BaseModel):
message: str
# 2. 使用 hatchet.task 定义你的任务
@hatchet.task(name="SimpleWorkflow", input_validator=SimpleInput)
def simple(input: SimpleInput, ctx: Context) -> dict[str, str]:
return {
"transformed_message": input.message.lower(),
}
# 3. 在你的 worker 上注册你的任务
worker = hatchet.worker("test-worker", workflows=[simple])
worker.start()
# 4. 从你的应用程序调用任务
simple.run(SimpleInput(message="Hello World!"))
Typescript
// 1. 定义你的任务输入
export type SimpleInput = {
Message: string;
};
// 2. 使用 hatchet.task 定义你的任务
export const simple = hatchet.task({
name: "simple",
fn: (input: SimpleInput) => {
return {
TransformedMessage: input.Message.toLowerCase(),
};
},
});
// 3. 在你的 worker 上注册你的任务
const worker = await hatchet.worker("simple-worker", {
workflows: [simple],
});
await worker.start();
// 4. 从你的应用程序调用任务
await simple.run({
Message: "Hello World!",
});
Go
// 1. 定义你的任务输入
type SimpleInput struct {
Message string `json:"message"`
}
// 2. 使用 factory.NewTask 定义你的任务
simple := factory.NewTask(
create.StandaloneTask{
Name: "simple-task",
}, func(ctx worker.HatchetContext, input SimpleInput) (*SimpleResult, error) {
return &SimpleResult{
TransformedMessage: strings.ToLower(input.Message),
}, nil
},
hatchet,
)
// 3. 在你的 worker 上注册你的任务
worker, err := hatchet.Worker(v1worker.WorkerOpts{
Name: "simple-worker",
Workflows: []workflow.WorkflowBase{
simple,
},
})
worker.StartBlocking()
// 4. 从你的应用程序调用任务
simple.Run(context.Background(), SimpleInput{Message: "Hello, World!"})
🎻 任务编排
#### Hatchet 允许你构建由多个任务组成的复杂工作流。例如,如果你想把一个工作负载分解成更小的任务,你可以使用 Hatchet 创建一个扇出工作流,并行生成多个任务。 Hatchet 支持以下任务编排机制: - **DAG(有向无环图)** —— 预定义你的工作形状,自动将父任务的输出路由到子任务的输入。[阅读更多 ➶](https://docs.hatchet.run/home/dags) - **持久化任务** —— 这些任务负责编排其他任务。它们存储所有生成任务的完整历史记录,允许你缓存中间结果。[阅读更多 ➶](https://docs.hatchet.run/home/durable-execution) -Python
# 1. 定义一个工作流(工作流是任务的集合)
simple = hatchet.workflow(name="SimpleWorkflow")
# 2. 将第一个任务附加到工作流
@simple.task()
def task_1(input: EmptyModel, ctx: Context) -> dict[str, str]:
print("executed task_1")
return {"result": "task_1"}
# 3. 将第二个任务附加到工作流,该任务在 task_1 之后执行
@simple.task(parents=[task_1])
def task_2(input: EmptyModel, ctx: Context) -> None:
first_result = ctx.task_output(task_1)
print(first_result)
# 4. 从你的应用程序调用工作流
result = simple.run(input_data)
Typescript
// 1. 定义一个工作流(工作流是任务的集合)
const simple = hatchet.workflowGo
// 1. 定义一个工作流(工作流是任务的集合)
simple := v1.WorkflowFactory[DagInput, DagOutput](
workflow.CreateOpts[DagInput]{
Name: "simple-workflow",
},
hatchet,
)
// 2. 将第一个任务附加到工作流
const task1 = simple.Task(
task.CreateOpts[DagInput]{
Name: "task-1",
Fn: func(ctx worker.HatchetContext, _ DagInput) (*SimpleOutput, error) {
return &SimpleOutput{
Result: "task-1",
}, nil
},
},
);
// 3. 将第二个任务附加到工作流,该任务在 task-1 之后执行
const task2 = simple.Task(
task.CreateOpts[DagInput]{
Name: "task-2",
Parents: []task.NamedTask{
step1,
},
Fn: func(ctx worker.HatchetContext, _ DagInput) (*SimpleOutput, error) {
return &SimpleOutput{
Result: "task-2",
}, nil
},
},
);
// 4. 从你的应用程序调用工作流
simple.Run(ctx, DagInput{})
🚦 流量控制
#### 不要让繁忙的用户弄垮你的应用程序。使用 Hatchet,你可以基于每个用户、每个租户和每个队列限制执行速度,从而提高系统稳定性并限制繁忙用户对系统其余部分的影响。 Hatchet 支持以下流量控制原语: - **并发** —— 基于动态并发键设置并发限制(例如,每个用户在给定时间只能运行 10 个批处理作业)。[阅读更多 ➶](https://docs.hatchet.run/home/concurrency) - **速率限制** —— 创建全局和动态速率限制。[阅读更多 ➶](https://docs.hatchet.run/home/rate-limits) -Python
# limit concurrency on a per-user basis
flow_control_workflow = hatchet.workflow(
name="FlowControlWorkflow",
concurrency=ConcurrencyExpression(
expression="input.user_id",
max_runs=5,
limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN,
),
input_validator=FlowControlInput,
)
# rate limit a task per user to 10 tasks per minute, with each task consuming 1 unit
@flow_control_workflow.task(
rate_limits=[
RateLimit(
dynamic_key="input.user_id",
units=1,
limit=10,
duration=RateLimitDuration.MINUTE,
)
]
)
def rate_limit_task(input: FlowControlInput, ctx: Context) -> None:
print("executed rate_limit_task")
Typescript
// limit concurrency on a per-user basis
flowControlWorkflow = hatchet.workflowGo
// limit concurrency on a per-user basis
flowControlWorkflow := factory.NewWorkflow[DagInput, DagResult](
create.WorkflowCreateOpts[DagInput]{
Name: "simple-dag",
Concurrency: []*types.Concurrency{
{
Expression: "input.userId",
MaxRuns: 1,
LimitStrategy: types.GroupRoundRobin,
},
},
},
hatchet,
)
// rate limit a task per user to 10 tasks per minute, with each task consuming 1 unit
flowControlWorkflow.Task(
create.WorkflowTask[FlowControlInput, FlowControlOutput]{
Name: "rate-limit-task",
RateLimits: []*types.RateLimit{
{
Key: "user-rate-limit",
KeyExpr: "input.userId",
Units: 1,
LimitValueExpr: 10,
Duration: types.Minute,
},
},
}, func(ctx worker.HatchetContext, input FlowControlInput) (interface{}, error) {
return &SimpleOutput{
Step: 1,
}, nil
},
)
📅 调度
#### Hatchet 全面支持调度功能,包括 cron、一次性调度以及暂停执行一段时间。这特别适用于: - **Cron 计划** —— 按 cron 计划运行数据管道、批处理或通知系统 [阅读更多 ➶](https://docs.hatchet.run/home/cron-runs) - **一次性任务** —— 为未来的特定时间安排工作流 [阅读更多 ➶](https://docs.hatchet.run/home/scheduled-runs) - **持久化睡眠** —— 暂停任务执行特定持续时间 [阅读更多 ➶](https://docs.hatchet.run/home/durable-execution) -Python
tomorrow = datetime.today() + timedelta(days=1)
# schedule a task to run tomorrow
scheduled = simple.schedule(
tomorrow,
SimpleInput(message="Hello, World!")
)
# schedule a task to run every day at midnight
cron = simple.cron(
"every-day",
"0 0 * * *",
SimpleInput(message="Hello, World!")
)
Typescript>
const tomorrow = new Date(Date.now() + 1000 * 60 * 60 * 24);
// schedule a task to run tomorrow
const scheduled = simple.schedule(tomorrow, {
Message: "Hello, World!",
});
// schedule a task to run every day at midnight
const cron = simple.cron("every-day", "0 0 * * *", {
Message: "Hello, World!",
});
Go
const tomorrow = time.Now().Add(24 * time.Hour);
// schedule a task to run tomorrow
simple.Schedule(ctx, tomorrow, ScheduleInput{
Message: "Hello, World!",
})
// schedule a task to run every day at midnight
simple.Cron(ctx, "every-day", "0 0 * * *", CronInput{
Message: "Hello, World!",
})
🚏 任务路由
#### 虽然 Hatchet 的默认行为是实现 FIFO 队列,但它也支持额外的调度机制,将你的任务路由到理想的 worker。 - **粘性分配** —— 允许生成的任务优先或要求在同一个 worker 上执行。[阅读更多 ➶](https://docs.hatchet.run/home/sticky-assignment) - **Worker 亲和性** —— 对 worker 进行排名,以发现哪个最适合处理给定任务。[阅读更多 ➶](https://docs.hatchet.run/home/worker-affinity) -Python
# create a workflow which prefers to run on the same worker, but can be
# scheduled on any worker if the original worker is busy
hatchet.workflow(
name="StickyWorkflow",
sticky=StickyStrategy.SOFT,
)
# create a workflow which must run on the same worker
hatchet.workflow(
name="StickyWorkflow",
sticky=StickyStrategy.HARD,
)
Typescript
// create a workflow which prefers to run on the same worker, but can be
// scheduled on any worker if the original worker is busy
hatchet.workflow({
name: "StickyWorkflow",
sticky: StickyStrategy.SOFT,
});
// create a workflow which must run on the same worker
hatchet.workflow({
name: "StickyWorkflow",
sticky: StickyStrategy.HARD,
});
Go
// create a workflow which prefers to run on the same worker, but can be
// scheduled on any worker if the original worker is busy
factory.NewWorkflow[StickyInput, StickyOutput](
create.WorkflowCreateOpts[StickyInput]{
Name: "sticky-dag",
StickyStrategy: types.StickyStrategy_SOFT,
},
hatchet,
);
// create a workflow which must run on the same worker
factory.NewWorkflow[StickyInput, StickyOutput](
create.WorkflowCreateOpts[StickyInput]{
Name: "sticky-dag",
StickyStrategy: types.StickyStrategy_HARD,
},
hatchet,
);
⚡️ 事件触发器和监听器
#### Hatchet 支持基于事件的架构,任务和工作流可以在等待特定外部事件时暂停执行。它支持以下功能: - **事件监听** —— 任务可以暂停,直到触发特定事件。[阅读更多 ➶](https://docs.hatchet.run/home/durable-execution) - **事件触发** —— 事件可以触发新工作流或工作流中的步骤。[阅读更多 ➶](https://docs.hatchet.run/home/run-on-event) -Python
# Create a task which waits for an external user event or sleeps for 10 seconds
@dag_with_conditions.task(
parents=[first_task],
wait_for=[
or_(
SleepCondition(timedelta(seconds=10)),
UserEventCondition(event_key="user:event"),
)
]
)
def second_task(input: EmptyModel, ctx: Context) -> dict[str, str]:
return {"completed": "true"}
Typescript
// Create a task which waits for an external user event or sleeps for 10 seconds
dagWithConditions.task({
name: "secondTask",
parents: [firstTask],
waitFor: Or({ eventKey: "user:event" }, { sleepFor: "10s" }),
fn: async (_, ctx) => {
return {
Completed: true,
};
},
});
Go
// Create a task which waits for an external user event or sleeps for 10 seconds
simple.Task(
conditionOpts{
Name: "Step2",
Parents: []create.NamedTask{
step1,
},
WaitFor: condition.Conditions(
condition.UserEventCondition("user:event", "'true'"),
condition.SleepCondition(10 * time.Second),
),
}, func(ctx worker.HatchetContext, input DagWithConditionsInput) (interface{}, error) {
// ...
},
);
🖥️ 实时 Web UI
#### Hatchet 附带了许多功能来帮助你监控任务、工作流和队列。 **实时仪表板和指标** 通过实时更新监控你的任务、工作流和队列,以快速检测问题。内置告警功能,以便你可以在问题发生时立即响应。 https://github.com/user-attachments/assets/b1797540-c9da-4057-b50f-4780f52a2cb9 **日志记录** Hatchet 支持从你的任务记录日志,允许你轻松地将任务失败与系统中的日志关联起来。不再需要翻阅日志服务来弄清楚任务失败的原因。 https://github.com/user-attachments/assets/427c15cd-8842-4b54-ab2e-3b1cabc01c7b **告警** Hatchet 支持任务失败时的 Slack 和基于电子邮件的告警。警报是实时的,具有可调整的告警窗口。Hatchet vs Temporal
#### Hatchet 旨在成为一个通用的任务编排平台——它可以用作队列、基于 DAG 的编排器、持久化执行引擎,或三者兼而有之。因此,Hatchet 涵盖了更广泛的用例,如多种排队策略、速率限制、DAG 功能、条件触发、流功能等等。 Temporal 专注于持久化执行,并支持更广泛的数据库后端和结果存储,如 Apache Cassandra、MySQL、PostgreSQL 和 SQLite。 **何时使用 Hatchet:** 当你希望更好地控制底层队列逻辑,运行基于 DAG 的工作流,或者希望通过仅运行 Hatchet 引擎和 Postgres 来简化自托管时。 **何时使用 Temporal:** 当你想使用非 Postgres 的结果存储,或者你唯一的工作负载最适合持久化执行时。Hatchet vs 任务队列 (BullMQ, Celery)
#### Hatchet 是一个持久化任务队列,这意味着它会保留所有执行的历史记录(直到保留期),这允许轻松监控 + 调试,并为上述许多持久化功能提供支持。这不是 Celery 和 BullMQ 的标准行为(你需要依赖功能极其有限的第三方 UI 工具,如 Celery Flower)。 **何时使用 Hatchet:** 当你希望结果持久化并可在 UI 中观察时 **何时使用像 BullMQ/Celery 这样的任务队列库:** 当你需要非常高的吞吐量(>10k/s)且不需要保留,或者当你想使用单个库(而不是像 Hatchet 这样的独立服务)与队列交互时。Hatchet vs 基于 DAG 的平台 (Airflow, Prefect, Dagster)
#### 这些工具通常是为数据工程师设计的,并非设计为高容量应用程序的一部分。它们通常具有更高的延迟和更高的成本,其主要卖点是与常见数据存储和连接器的集成。 **何时使用 Hatchet:** 当你想使用基于 DAG 的框架,编写自己的集成和函数,并且需要更高的吞吐量(>100/s)时 **何时使用其他基于 DAG 的平台:** 当你想使用开箱即用的其他数据存储和连接器时Hatchet vs AI 框架
#### 大多数 AI 框架构建为在内存中运行,横向扩展和持久化是事后才考虑的。虽然你可以将 AI 框架与 Hatchet 结合使用,但我们的大多数用户都丢弃了他们的 AI 框架,并使用 Hatchet 的原语来构建他们的应用程序。 **何时使用 Hatchet:** 当你希望完全控制底层函数和 LLM 调用,或者你的函数需要高可用性和持久性时。 **何时使用 AI 框架:** 当你想通过简单的抽象快速入门时。标签:API集成, Docker, EVTX分析, Go语言, MIT许可, PostgreSQL, TypeScript, 任务调度, 任务队列, 分布式系统, 可观测性, 后台任务, 响应大小分析, 威胁情报, 安全插件, 安全防御评估, 工作流引擎, 延迟任务, 开发者工具, 开源, 持久化工作流, 日志审计, 横向扩展, 消息队列, 程序破解, 请求拦截, 逆向工具