cpauldev/rate-engine
GitHub: cpauldev/rate-engine
一个基于 TypeScript 和 Redis 的客户端无关多阶段限流策略引擎,通过声明式策略流水线简化复杂限流逻辑的编排与管理。
Stars: 0 | Forks: 0

# RateEngine: TypeScript 的限流引擎
     [](./CHANGELOG.md)
RateEngine 是一个由 `@upstash/ratelimit` 驱动的、客户端无关的多阶段限流策略引擎,专为 TypeScript 设计。它可以帮助您定义基于 Redis 的限流桶,执行有序的策略流水线,选择 fail-open 或 fail-closed 行为,并以最少的路由处理样板代码返回标准的 HTTP 限流响应。
它专为通过 Upstash、ElastiCache、Redis Cloud、DragonflyDB、KeyDB 或类似基础设施提供商使用 **Redis 或 Valkey**,并希望获得更简洁、配置驱动的限流逻辑的开发者而设计。它对于在冷启动、跨区域或多个运行时实例中内存限制器无法可靠扩展的 **serverless、边缘计算和 Node.js API** 尤为有用。当您需要 **分层限制**(例如全局 IP、用户账户和特定端点的检查);在 Redis 性能降级时显式指定 **fail-open 或 fail-closed 行为**;以及用于标准限流请求头和 `429` 响应的内置辅助工具时,它将非常适用。
#### 🤖 询问您的编程助手
## 为什么使用 RateEngine?
RateEngine 底层使用 `@upstash/ratelimit` 通过滑动窗口、固定窗口或令牌桶算法来执行限流检查。它在这些检查之上添加了一个结构化的策略层,使得应用程序代码可以专注于请求处理,而不是重复的限流编排。
| 功能 | 原生 `@upstash/ratelimit` | 使用 **RateEngine** |
| :----------------------- | :--------------------------------------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------- |
| **链式检查** | 需要在路由处理器中手动协调多个限流器调用。 | 🔗 **策略流水线。** 顺序评估声明的多阶段策略,例如 `全局 ➔ 用户 ➔ 端点`。 |
| **故障安全模式** | 需要路由级别的错误处理和自定义回退行为。 | ⚙️ **可配置。** 在策略级别或每次直接桶调用时定义 fail-open 或 fail-closed 行为。 |
| **Serverless 生命周期** | 当运行时需要后台工作以保持存活时,需要处理 `result.pending`。 | ⚡ **已处理。** 在提供时,将后台分析 Promise 传递给运行环境的 `waitUntil` 钩子。 |
| **HTTP 响应** | 返回诸如 `limit`、`remaining` 和 `reset` 等原始指标。 | 🌐 **内置辅助工具。** 生成限流请求头和标准的 `429` JSON 响应。 |
| **动态路由** | 需要自定义路由逻辑以在运行时切换不同的限流器策略。 | 🔄 **解析器钩子。** 使用 `resolvePolicy` 根据上下文将请求重定向到更严格或替代的策略。 |
| **客户端灵活性** | 普通用法与 `@upstash/redis` 绑定;TCP 客户端需要适配器逻辑。 | 🧩 **鸭子类型 Redis 客户端。** 接受暴露了 RateEngine 所需 Redis 命令方法的客户端。 |
| **违规跟踪** | 需要在每个被拦截的路径中添加遥测调用。 | 🛡️ **违规钩子。** 通过 `onViolation` 集中记录日志、遥测或滥用跟踪。 |
| **内存优化** | 除非手动传入共享 map,否则每个 `Ratelimit` 实例可能会使用自己的缓存。 | 🧠 **共享缓存。** 默认情况下,在所有桶限流器之间共享一个内存缓存映射。 |
## 安装说明
通过您首选的包管理器安装 RateEngine:
```
# npm
npm install rate-engine
# yarn
yarn add rate-engine
# pnpm
pnpm add rate-engine
# bun
bun add rate-engine
```
## 快速入门
### 1. 定义桶
创建一个 `buckets.ts` 文件来定义您的限流窗口和容量:
```
// buckets.ts
import { type BucketConfig } from "rate-engine";
export const APP_BUCKETS = {
"global:ip": {
requests: 500,
window: "1 m",
},
"global:user": {
requests: 300,
window: "1 m",
algorithm: "slidingWindow",
},
"auth:login": {
requests: 5,
window: "15 m",
algorithm: "fixedWindow",
},
"api:default": {
requests: 100,
window: "1 m",
},
"api:burst": {
requests: 50,
window: "10 s",
algorithm: "tokenBucket",
refillRate: 5,
},
} as const satisfies Record
;
export type AppBucketId = keyof typeof APP_BUCKETS;
```
### 2. 定义策略
创建一个 `policies.ts` 文件来定义您的多阶段检查流水线:
```
// policies.ts
import { type RateLimitPolicy } from "rate-engine";
import { type AppBucketId } from "./buckets";
export type AppContext = {
userId?: string;
ipAddress?: string;
userAgent?: string;
};
export const APP_POLICIES = {
"auth.login": {
// Critical endpoint: fail closed if Redis is degraded.
failureMode: "closed",
stages: [
{
bucketId: "global:ip",
identifier: (ctx) => ctx.ipAddress,
tier: "global",
message: "Too many requests from this IP.",
},
{
bucketId: "auth:login",
identifier: (ctx) => ctx.userId ?? ctx.ipAddress,
tier: "endpoint",
message: "Too many login attempts. Please try again later.",
},
],
},
"api.read": {
// Lower-risk endpoint: fail open to avoid unnecessary site outages.
failureMode: "open",
stages: [
{
bucketId: "api:default",
identifier: (ctx) => ctx.userId ?? ctx.ipAddress,
tier: "single",
},
],
},
} as const satisfies Record>;
export type AppPolicyId = keyof typeof APP_POLICIES;
```
### 3. 实例化 RateEngine
创建一个 `rate-engine.ts` 文件来初始化 RateEngine 实例:
```
// rate-engine.ts
import { Redis } from "@upstash/redis";
import { RateEngine } from "rate-engine";
import { APP_BUCKETS, type AppBucketId } from "./buckets";
import { APP_POLICIES, type AppContext, type AppPolicyId } from "./policies";
export const rateEngine = new RateEngine({
redis: new Redis({
url: process.env.UPSTASH_REDIS_REST_URL!,
token: process.env.UPSTASH_REDIS_REST_TOKEN!,
}),
logger: console,
buckets: APP_BUCKETS,
policies: APP_POLICIES,
// Optional dynamic policy resolution.
resolvePolicy: async (policyId, context) => {
// Example: return a stricter policy for suspicious users.
return policyId;
},
// Optional central violation hook.
onViolation: async (context, decision) => {
console.warn(
`[RateEngine] ${context.ipAddress ?? "unknown"} exceeded ${decision.bucketId}`,
{
policyId: decision.policyId,
tier: decision.tier,
degraded: decision.degraded,
},
);
},
});
```
### 4. 在 API 处理器中执行限流
在您的路由处理器内部使用 `enforce()`。对于 serverless 或边缘计算运行时,请传入 `waitUntil`,以便平台可以完成后台分析工作。
```
// Next.js Route Handler Example
import { type NextRequest, NextResponse } from "next/server";
import { getRateLimitHeaders, toRateLimitResponse } from "rate-engine";
import { rateEngine } from "@/lib/rate-engine";
export async function POST(req: NextRequest, event: { waitUntil: any }) {
const ipAddress =
req.headers.get("x-forwarded-for")?.split(",")[0]?.trim() ?? "127.0.0.1";
const decision = await rateEngine.enforce(
"auth.login",
{
ipAddress,
userId: "user_123",
userAgent: req.headers.get("user-agent") ?? undefined,
},
{
waitUntil: (promise) => event.waitUntil(promise),
},
);
if (!decision.allowed) {
return toRateLimitResponse(decision, {
message: "Too many login attempts. Please try again later.",
errorCode: "LOGIN_LIMIT_EXCEEDED",
});
}
const headers = getRateLimitHeaders(decision);
// Continue with authentication...
return NextResponse.json({ success: true }, { headers });
}
```
## 核心概念
### 桶
**桶**定义了限流的容量、窗口和算法。
```
{
requests: 100,
window: "1 m",
algorithm: "slidingWindow"
}
```
支持的算法:
- `slidingWindow`
- `fixedWindow`
- `tokenBucket`
对于令牌桶,您还可以提供 `refillRate`。
### 策略
**策略**是一个有序的阶段列表。每个阶段选择一个桶并解析要进行限流的标识符。
```
{
failureMode: "closed",
stages: [
{
bucketId: "global:ip",
identifier: (ctx) => ctx.ipAddress,
tier: "global",
},
{
bucketId: "auth:login",
identifier: (ctx) => ctx.userId ?? ctx.ipAddress,
tier: "endpoint",
},
],
}
```
策略将按顺序进行评估,并在第一个被拦截的阶段停止。
### 故障模式
当 Redis 不可用或限流操作失败时,RateEngine 支持两种回退模式:
| 模式 | 行为 | 常见用例 |
| :------- | :--------------------------------------------- | :--------------------------------------------------------------------------- |
| `open` | 在后端性能降级期间允许请求。 | 公开读取、低风险 API、可用性优先路由。 |
| `closed` | 在后端性能降级期间拦截请求。 | 登录、密码重置、结账、OTP、写入密集型或对滥用敏感的路由。 |
`enforce()` 使用策略的故障模式。除非您传入 `failureMode: "closed"`,否则直接的 `consumeBucket()` 调用默认为 fail-open。
### 有效配额报告
对于多阶段策略,RateEngine 返回针对 HTTP 标头优化的保守根级别决策。
如果所有阶段均通过:
- `remaining` 来自剩余计数最少的已评估阶段。
- `limit` 来自同一个剩余最少的阶段。
- `reset` 来自重置时间戳最晚的阶段。
- `stages` 包含每个阶段的决策。
- `effective` 标识了哪些桶贡献了根级别的 `limit`、`remaining` 和 `reset` 值。
这意味着根级别的 `limit`、`remaining` 和 `reset` 字段可以是多个阶段的组合。它们旨在用于面向客户端的保守标头,而不是作为确切每个桶状态的替代品。当您需要确切的每个阶段配额状态时,请使用 `decision.stages`。
### 顺序执行
RateEngine 按顺序评估策略阶段,并在第一次违规时短路。这避免了在早期阶段已经拦截请求时消耗下游令牌。
例如,如果请求已经被全局 IP 限制所拦截,RateEngine 将不会再消耗特定端点桶的令牌。
## Redis 客户端兼容性
RateEngine 是客户端无关的。它不需要严格的 `@upstash/redis` 实例;相反,它使用鸭子类型的 Redis 客户端接口。
RateEngine 专为与暴露了 `@upstash/ratelimit` 所需命令方法的兼容 Redis 的客户端一起使用而设计,包括:
- `eval`
- `evalsha`
- `incr`
- `expire`
- `ping`
它的设计旨在用于:
- ☁️ **云/企业托管 Redis:** 通过诸如 `ioredis` 或 `redis` 的 TCP 客户端使用 AWS ElastiCache、Redis Cloud、Google Memorystore 和 Azure Managed Redis。
- ⚡ **Serverless/边缘 Redis:** 通过 HTTP REST 使用 `@upstash/redis` 的 Upstash Redis。
- 🚀 **兼容 Redis 的引擎:** DragonflyDB、KeyDB 和 Valkey。
应在您的部署环境中验证特定于提供商的行为。
### 高级:环境感知的 Redis 代理
以下代理规范化了 `ioredis` 和 `@upstash/redis` 之间的 `eval` 和 `evalsha` 调用。
```
import { Redis as UpstashRedis } from "@upstash/redis";
import IORedis from "ioredis";
const provider = process.env.REDIS_PROVIDER ?? "upstash";
let client: UpstashRedis | IORedis | null = null;
function getRedisClient() {
if (client) return client;
if (provider === "tcp") {
client = new IORedis(process.env.REDIS_URL ?? "redis://localhost:6379");
} else {
client = new UpstashRedis({
url: process.env.UPSTASH_REDIS_REST_URL!,
token: process.env.UPSTASH_REDIS_REST_TOKEN!,
});
}
return client;
}
export const redis = new Proxy({} as any, {
get(_target, prop) {
const activeClient = getRedisClient();
if (prop === "eval" || prop === "evalsha") {
return async (scriptOrSha: string, keys: string[], args: any[] = []) => {
if (provider === "tcp") {
return await (activeClient as IORedis)[prop](
scriptOrSha,
keys.length,
...keys,
...args,
);
}
return await (activeClient as UpstashRedis)[prop](
scriptOrSha,
keys,
args,
);
};
}
const value = (activeClient as any)[prop];
return typeof value === "function" ? value.bind(activeClient) : value;
},
});
```
## API 参考
### `RateEngine` 构造函数
`RateEngine` 类使用一个选项对象进行初始化。
| 参数 | 类型 | 必需 | 默认值 | 描述 |
| :---------------------- | :------------------------------------------------------- | :------- | :----------- | :--------------------------------------------------------------------------------------------------------------------------------- |
| `redis` | `RateEngineRedisClient` | 是 | - | 一个鸭子类型的 Redis 客户端实例。 |
| `buckets` | `Record` | 是 | - | 所有可用限流桶的配置。 |
| `policies` | `Record` | 是 | - | 指定有序评估阶段的命名策略。 |
| `logger` | `RateEngineLogger` | 否 | - | 用于限流错误和后台分析失败的记录器接口。 |
| `redisTimeoutMs` | `number` | 否 | `1000` | 在触发回退行为之前的 Redis 响应超时时间。 |
| `fallbackResetMs` | `number` | 否 | `60000` | 在性能降级的回退快照中使用的重置持续时间。 |
| `analytics` | `boolean` | 否 | `true` | 启用 `@upstash/ratelimit` 分析上传。设置为 `false` 以选择退出;RateEngine 绝不会上传本地健康计数器。 |
| `bucketPrefixOverrides` | `Partial>` | 否 | - | 可选的每个桶的 Redis 键前缀覆盖。 |
| `resolvePolicy` | `(policyId, context) => Promise \| TPolicyId` | 否 | - | 用于动态将请求重定向到另一个策略的钩子。 |
| `ephemeralCache` | `Map` | 否 | 共享的 `Map` | 可选的自定义共享缓存映射。 |
| `onViolation` | `(context, decision) => Promise \| void` | 否 | - | 当突破限流或由于性能降级导致 fail-closed 策略拦截请求时触发的回调。 |
### 实例方法
#### 1. `enforce(policyId, context, options?)`
按顺序评估命名策略的各个阶段。
```
const decision = await rateEngine.enforce("auth.login", {
ipAddress: "203.0.113.10",
userId: "user_123",
});
```
- **参数**
- `policyId` (`TPolicyId`):要执行的策略 ID。
- `context` (`TContext`):阶段标识符函数使用的请求上下文。
- `options` (`EnforceOptions`):可选钩子,例如 `{ waitUntil: (promise) => void }`。
- **返回**
- `Promise`
返回的决策可能包括:
```
type RateLimitDecision = {
allowed: boolean;
bucketId: string;
identifier: string;
limit: number;
remaining: number;
used: number;
reset: number;
resetDate: Date;
degraded: boolean;
policyId?: string;
tier: "single" | "global" | "category" | "endpoint";
message?: string;
stages?: RateLimitStageDecision[];
effective?: EffectiveQuotaMeta;
};
```
`stages` 包含每个阶段的决策快照。`effective` 标识了哪个阶段提供了根级别的 `limit`、`remaining` 和 `reset` 值。
#### 2. `consumeBucket(bucketId, identifier, options?, enforceOptions?)`
在不运行完整策略流水线的情况下从一个桶消耗令牌。
```
const decision = await rateEngine.consumeBucket("api:default", "user_123", {
failureMode: "closed",
});
```
- **参数**
- `bucketId` (`TBucketId`):目标桶 ID。
- `identifier` (`string`):唯一的参与者标识符,例如 IP、用户 ID 或 API key。
- `options` (`ConsumeBucketOptions`):诸如 `{ rate, context, tier, policyId, message, failureMode }` 的选项。
- `enforceOptions` (`EnforceOptions`):可选钩子,例如 `{ waitUntil }`。
- **返回**
- `Promise`
#### 3. `readBucket(bucketId, identifier)`
在不消耗令牌的情况下读取桶的当前状态。
```
const snapshot = await rateEngine.readBucket("api:default", "user_123");
```
- **返回**
- `Promise`
#### 4. `resetBucket(bucketId, identifier)`
重置桶和标识符已消耗的令牌。
```
await rateEngine.resetBucket("auth:login", "user_123");
```
成功的重置还会将有状态的运行状况跟踪器的 Redis 连接性记录为健康。
- **返回**
- `Promise`
#### 5. `getHealth()`
Ping Redis 并返回有状态的运行状况遥测数据。
```
const health = await rateEngine.getHealth();
```
- **返回**
```
Promise<{
healthy: boolean;
usingFallback: boolean;
consecutiveFailures: number;
totalFailures: number;
lastFailure: Date | null;
lastSuccess: Date | null;
}>;
```
#### 6. `resetHealth()`
清除有状态的运行状况遥测数据。
```
rateEngine.resetHealth();
```
这将重置:
- `conFailures`
- `totalFailures`
- `lastFailure`
- `lastSuccess`
适用于测试、管理重置,或者在恢复后希望清除历史运行状况计数器的长时间运行的进程。
- **返回**
- `void`
## HTTP 适配器
RateEngine 包含了与框架无关的辅助工具,用于将限流状态返回给客户端。
### `toRateLimitResponse(decision, options?)`
创建标准的 `429 Too Many Requests` JSON 响应。
```
return toRateLimitResponse(decision, {
message: "Too many requests. Please try again later.",
errorCode: "RATE_LIMIT_EXCEEDED",
});
```
响应体:
```
{
"error": "RATE_LIMIT_EXCEEDED",
"message": "Too many requests. Please try again later.",
"retryAfter": 60,
"degraded": false
}
```
### `toOAuthSlowDownResponse(decision, options?)`
为轮询和设备流程端点创建 OAuth 风格的 `slow_down` 响应。
```
return toOAuthSlowDownResponse(decision, {
message: "Polling too frequently. Please wait before trying again.",
});
```
响应体:
```
{
"error": "slow_down",
"error_description": "Polling too frequently. Please wait before trying again.",
"retry_after": 60,
"degraded": false
}
```
### `getRateLimitHeaders(decision)`
根据决策构建限流请求头。
```
const headers = getRateLimitHeaders(decision);
```
在客户端达到限制之前需要配额元数据的成功响应上使用它:
```
import { getRateLimitHeaders, toRateLimitResponse } from "rate-engine";
const decision = await rateEngine.enforce("translate.request", {
apiKeyId: "key_123",
});
if (!decision.allowed) {
return toRateLimitResponse(decision);
}
const result = await translate(request);
return Response.json(result, {
headers: getRateLimitHeaders(decision),
});
```
返回的请求头包括:
```
RateLimit-Limit: 100
RateLimit-Remaining: 42
RateLimit-Reset: 60
```
## 开发
要构建包并生成 TypeScript 声明:
```
bun run build
```
要运行包单元测试:
```
bun run test
```
要运行包类型检查:
```
bun run typecheck
```
构建后,验证发布的运行时导出:
```
bun run test:smoke
```
## 相关包
- [`route-engine`](https://github.com/cpauldev/route-engine) 用于安全的 HTTP 路由边界。
- [`redact-log`](https://github.com/cpauldev/redact-log) 用于安全的日志记录。
- [`secret-engine`](https://github.com/cpauldev/secret-engine) 用于上下文绑定的加密和机密处理。
- [`session-engine`](https://github.com/cpauldev/session-engine) 用于浏览器会话和缓存生命周期管理。
## 许可证
MIT © [Christian Paul](https://github.com/cpauldev)标签:API网关, MITM代理, Redis, Serverless, TypeScript, 中间件, 安全插件, 搜索引擎查询, 策略引擎, 网络安全挑战, 自动化攻击, 限流