cpauldev/rate-engine

GitHub: cpauldev/rate-engine

一个基于 TypeScript 和 Redis 的客户端无关多阶段限流策略引擎,通过声明式策略流水线简化复杂限流逻辑的编排与管理。

Stars: 0 | Forks: 0

RateEngine Banner # RateEngine: TypeScript 的限流引擎 ![TypeScript](https://img.shields.io/badge/-TypeScript-3178C6?style=flat-square&logo=typescript&logoColor=white) ![Rate Limiting](https://img.shields.io/badge/-Rate_Limiting-C2410C?style=flat-square) ![Pipelines](https://img.shields.io/badge/-Policy_Pipelines-0F766E?style=flat-square) ![Redis](https://img.shields.io/badge/-Redis-DC382D?style=flat-square&logo=redis&logoColor=white) ![License](https://img.shields.io/badge/-MIT_License-blue?style=flat-square) [![Changelog](https://img.shields.io/badge/Changelog-v0.3.0-blue?style=flat-square)](./CHANGELOG.md) RateEngine 是一个由 `@upstash/ratelimit` 驱动的、客户端无关的多阶段限流策略引擎,专为 TypeScript 设计。它可以帮助您定义基于 Redis 的限流桶,执行有序的策略流水线,选择 fail-open 或 fail-closed 行为,并以最少的路由处理样板代码返回标准的 HTTP 限流响应。 它专为通过 Upstash、ElastiCache、Redis Cloud、DragonflyDB、KeyDB 或类似基础设施提供商使用 **Redis 或 Valkey**,并希望获得更简洁、配置驱动的限流逻辑的开发者而设计。它对于在冷启动、跨区域或多个运行时实例中内存限制器无法可靠扩展的 **serverless、边缘计算和 Node.js API** 尤为有用。当您需要 **分层限制**(例如全局 IP、用户账户和特定端点的检查);在 Redis 性能降级时显式指定 **fail-open 或 fail-closed 行为**;以及用于标准限流请求头和 `429` 响应的内置辅助工具时,它将非常适用。 #### 🤖 询问您的编程助手 ## 为什么使用 RateEngine? RateEngine 底层使用 `@upstash/ratelimit` 通过滑动窗口、固定窗口或令牌桶算法来执行限流检查。它在这些检查之上添加了一个结构化的策略层,使得应用程序代码可以专注于请求处理,而不是重复的限流编排。 | 功能 | 原生 `@upstash/ratelimit` | 使用 **RateEngine** | | :----------------------- | :--------------------------------------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------- | | **链式检查** | 需要在路由处理器中手动协调多个限流器调用。 | 🔗 **策略流水线。** 顺序评估声明的多阶段策略,例如 `全局 ➔ 用户 ➔ 端点`。 | | **故障安全模式** | 需要路由级别的错误处理和自定义回退行为。 | ⚙️ **可配置。** 在策略级别或每次直接桶调用时定义 fail-open 或 fail-closed 行为。 | | **Serverless 生命周期** | 当运行时需要后台工作以保持存活时,需要处理 `result.pending`。 | ⚡ **已处理。** 在提供时,将后台分析 Promise 传递给运行环境的 `waitUntil` 钩子。 | | **HTTP 响应** | 返回诸如 `limit`、`remaining` 和 `reset` 等原始指标。 | 🌐 **内置辅助工具。** 生成限流请求头和标准的 `429` JSON 响应。 | | **动态路由** | 需要自定义路由逻辑以在运行时切换不同的限流器策略。 | 🔄 **解析器钩子。** 使用 `resolvePolicy` 根据上下文将请求重定向到更严格或替代的策略。 | | **客户端灵活性** | 普通用法与 `@upstash/redis` 绑定;TCP 客户端需要适配器逻辑。 | 🧩 **鸭子类型 Redis 客户端。** 接受暴露了 RateEngine 所需 Redis 命令方法的客户端。 | | **违规跟踪** | 需要在每个被拦截的路径中添加遥测调用。 | 🛡️ **违规钩子。** 通过 `onViolation` 集中记录日志、遥测或滥用跟踪。 | | **内存优化** | 除非手动传入共享 map,否则每个 `Ratelimit` 实例可能会使用自己的缓存。 | 🧠 **共享缓存。** 默认情况下,在所有桶限流器之间共享一个内存缓存映射。 | ## 安装说明 通过您首选的包管理器安装 RateEngine: ``` # npm npm install rate-engine # yarn yarn add rate-engine # pnpm pnpm add rate-engine # bun bun add rate-engine ``` ## 快速入门 ### 1. 定义桶 创建一个 `buckets.ts` 文件来定义您的限流窗口和容量: ``` // buckets.ts import { type BucketConfig } from "rate-engine"; export const APP_BUCKETS = { "global:ip": { requests: 500, window: "1 m", }, "global:user": { requests: 300, window: "1 m", algorithm: "slidingWindow", }, "auth:login": { requests: 5, window: "15 m", algorithm: "fixedWindow", }, "api:default": { requests: 100, window: "1 m", }, "api:burst": { requests: 50, window: "10 s", algorithm: "tokenBucket", refillRate: 5, }, } as const satisfies Record; export type AppBucketId = keyof typeof APP_BUCKETS; ``` ### 2. 定义策略 创建一个 `policies.ts` 文件来定义您的多阶段检查流水线: ``` // policies.ts import { type RateLimitPolicy } from "rate-engine"; import { type AppBucketId } from "./buckets"; export type AppContext = { userId?: string; ipAddress?: string; userAgent?: string; }; export const APP_POLICIES = { "auth.login": { // Critical endpoint: fail closed if Redis is degraded. failureMode: "closed", stages: [ { bucketId: "global:ip", identifier: (ctx) => ctx.ipAddress, tier: "global", message: "Too many requests from this IP.", }, { bucketId: "auth:login", identifier: (ctx) => ctx.userId ?? ctx.ipAddress, tier: "endpoint", message: "Too many login attempts. Please try again later.", }, ], }, "api.read": { // Lower-risk endpoint: fail open to avoid unnecessary site outages. failureMode: "open", stages: [ { bucketId: "api:default", identifier: (ctx) => ctx.userId ?? ctx.ipAddress, tier: "single", }, ], }, } as const satisfies Record>; export type AppPolicyId = keyof typeof APP_POLICIES; ``` ### 3. 实例化 RateEngine 创建一个 `rate-engine.ts` 文件来初始化 RateEngine 实例: ``` // rate-engine.ts import { Redis } from "@upstash/redis"; import { RateEngine } from "rate-engine"; import { APP_BUCKETS, type AppBucketId } from "./buckets"; import { APP_POLICIES, type AppContext, type AppPolicyId } from "./policies"; export const rateEngine = new RateEngine({ redis: new Redis({ url: process.env.UPSTASH_REDIS_REST_URL!, token: process.env.UPSTASH_REDIS_REST_TOKEN!, }), logger: console, buckets: APP_BUCKETS, policies: APP_POLICIES, // Optional dynamic policy resolution. resolvePolicy: async (policyId, context) => { // Example: return a stricter policy for suspicious users. return policyId; }, // Optional central violation hook. onViolation: async (context, decision) => { console.warn( `[RateEngine] ${context.ipAddress ?? "unknown"} exceeded ${decision.bucketId}`, { policyId: decision.policyId, tier: decision.tier, degraded: decision.degraded, }, ); }, }); ``` ### 4. 在 API 处理器中执行限流 在您的路由处理器内部使用 `enforce()`。对于 serverless 或边缘计算运行时,请传入 `waitUntil`,以便平台可以完成后台分析工作。 ``` // Next.js Route Handler Example import { type NextRequest, NextResponse } from "next/server"; import { getRateLimitHeaders, toRateLimitResponse } from "rate-engine"; import { rateEngine } from "@/lib/rate-engine"; export async function POST(req: NextRequest, event: { waitUntil: any }) { const ipAddress = req.headers.get("x-forwarded-for")?.split(",")[0]?.trim() ?? "127.0.0.1"; const decision = await rateEngine.enforce( "auth.login", { ipAddress, userId: "user_123", userAgent: req.headers.get("user-agent") ?? undefined, }, { waitUntil: (promise) => event.waitUntil(promise), }, ); if (!decision.allowed) { return toRateLimitResponse(decision, { message: "Too many login attempts. Please try again later.", errorCode: "LOGIN_LIMIT_EXCEEDED", }); } const headers = getRateLimitHeaders(decision); // Continue with authentication... return NextResponse.json({ success: true }, { headers }); } ``` ## 核心概念 ### 桶 **桶**定义了限流的容量、窗口和算法。 ``` { requests: 100, window: "1 m", algorithm: "slidingWindow" } ``` 支持的算法: - `slidingWindow` - `fixedWindow` - `tokenBucket` 对于令牌桶,您还可以提供 `refillRate`。 ### 策略 **策略**是一个有序的阶段列表。每个阶段选择一个桶并解析要进行限流的标识符。 ``` { failureMode: "closed", stages: [ { bucketId: "global:ip", identifier: (ctx) => ctx.ipAddress, tier: "global", }, { bucketId: "auth:login", identifier: (ctx) => ctx.userId ?? ctx.ipAddress, tier: "endpoint", }, ], } ``` 策略将按顺序进行评估,并在第一个被拦截的阶段停止。 ### 故障模式 当 Redis 不可用或限流操作失败时,RateEngine 支持两种回退模式: | 模式 | 行为 | 常见用例 | | :------- | :--------------------------------------------- | :--------------------------------------------------------------------------- | | `open` | 在后端性能降级期间允许请求。 | 公开读取、低风险 API、可用性优先路由。 | | `closed` | 在后端性能降级期间拦截请求。 | 登录、密码重置、结账、OTP、写入密集型或对滥用敏感的路由。 | `enforce()` 使用策略的故障模式。除非您传入 `failureMode: "closed"`,否则直接的 `consumeBucket()` 调用默认为 fail-open。 ### 有效配额报告 对于多阶段策略,RateEngine 返回针对 HTTP 标头优化的保守根级别决策。 如果所有阶段均通过: - `remaining` 来自剩余计数最少的已评估阶段。 - `limit` 来自同一个剩余最少的阶段。 - `reset` 来自重置时间戳最晚的阶段。 - `stages` 包含每个阶段的决策。 - `effective` 标识了哪些桶贡献了根级别的 `limit`、`remaining` 和 `reset` 值。 这意味着根级别的 `limit`、`remaining` 和 `reset` 字段可以是多个阶段的组合。它们旨在用于面向客户端的保守标头,而不是作为确切每个桶状态的替代品。当您需要确切的每个阶段配额状态时,请使用 `decision.stages`。 ### 顺序执行 RateEngine 按顺序评估策略阶段,并在第一次违规时短路。这避免了在早期阶段已经拦截请求时消耗下游令牌。 例如,如果请求已经被全局 IP 限制所拦截,RateEngine 将不会再消耗特定端点桶的令牌。 ## Redis 客户端兼容性 RateEngine 是客户端无关的。它不需要严格的 `@upstash/redis` 实例;相反,它使用鸭子类型的 Redis 客户端接口。 RateEngine 专为与暴露了 `@upstash/ratelimit` 所需命令方法的兼容 Redis 的客户端一起使用而设计,包括: - `eval` - `evalsha` - `incr` - `expire` - `ping` 它的设计旨在用于: - ☁️ **云/企业托管 Redis:** 通过诸如 `ioredis` 或 `redis` 的 TCP 客户端使用 AWS ElastiCache、Redis Cloud、Google Memorystore 和 Azure Managed Redis。 - ⚡ **Serverless/边缘 Redis:** 通过 HTTP REST 使用 `@upstash/redis` 的 Upstash Redis。 - 🚀 **兼容 Redis 的引擎:** DragonflyDB、KeyDB 和 Valkey。 应在您的部署环境中验证特定于提供商的行为。 ### 高级:环境感知的 Redis 代理 以下代理规范化了 `ioredis` 和 `@upstash/redis` 之间的 `eval` 和 `evalsha` 调用。 ``` import { Redis as UpstashRedis } from "@upstash/redis"; import IORedis from "ioredis"; const provider = process.env.REDIS_PROVIDER ?? "upstash"; let client: UpstashRedis | IORedis | null = null; function getRedisClient() { if (client) return client; if (provider === "tcp") { client = new IORedis(process.env.REDIS_URL ?? "redis://localhost:6379"); } else { client = new UpstashRedis({ url: process.env.UPSTASH_REDIS_REST_URL!, token: process.env.UPSTASH_REDIS_REST_TOKEN!, }); } return client; } export const redis = new Proxy({} as any, { get(_target, prop) { const activeClient = getRedisClient(); if (prop === "eval" || prop === "evalsha") { return async (scriptOrSha: string, keys: string[], args: any[] = []) => { if (provider === "tcp") { return await (activeClient as IORedis)[prop]( scriptOrSha, keys.length, ...keys, ...args, ); } return await (activeClient as UpstashRedis)[prop]( scriptOrSha, keys, args, ); }; } const value = (activeClient as any)[prop]; return typeof value === "function" ? value.bind(activeClient) : value; }, }); ``` ## API 参考 ### `RateEngine` 构造函数 `RateEngine` 类使用一个选项对象进行初始化。 | 参数 | 类型 | 必需 | 默认值 | 描述 | | :---------------------- | :------------------------------------------------------- | :------- | :----------- | :--------------------------------------------------------------------------------------------------------------------------------- | | `redis` | `RateEngineRedisClient` | 是 | - | 一个鸭子类型的 Redis 客户端实例。 | | `buckets` | `Record` | 是 | - | 所有可用限流桶的配置。 | | `policies` | `Record` | 是 | - | 指定有序评估阶段的命名策略。 | | `logger` | `RateEngineLogger` | 否 | - | 用于限流错误和后台分析失败的记录器接口。 | | `redisTimeoutMs` | `number` | 否 | `1000` | 在触发回退行为之前的 Redis 响应超时时间。 | | `fallbackResetMs` | `number` | 否 | `60000` | 在性能降级的回退快照中使用的重置持续时间。 | | `analytics` | `boolean` | 否 | `true` | 启用 `@upstash/ratelimit` 分析上传。设置为 `false` 以选择退出;RateEngine 绝不会上传本地健康计数器。 | | `bucketPrefixOverrides` | `Partial>` | 否 | - | 可选的每个桶的 Redis 键前缀覆盖。 | | `resolvePolicy` | `(policyId, context) => Promise \| TPolicyId` | 否 | - | 用于动态将请求重定向到另一个策略的钩子。 | | `ephemeralCache` | `Map` | 否 | 共享的 `Map` | 可选的自定义共享缓存映射。 | | `onViolation` | `(context, decision) => Promise \| void` | 否 | - | 当突破限流或由于性能降级导致 fail-closed 策略拦截请求时触发的回调。 | ### 实例方法 #### 1. `enforce(policyId, context, options?)` 按顺序评估命名策略的各个阶段。 ``` const decision = await rateEngine.enforce("auth.login", { ipAddress: "203.0.113.10", userId: "user_123", }); ``` - **参数** - `policyId` (`TPolicyId`):要执行的策略 ID。 - `context` (`TContext`):阶段标识符函数使用的请求上下文。 - `options` (`EnforceOptions`):可选钩子,例如 `{ waitUntil: (promise) => void }`。 - **返回** - `Promise` 返回的决策可能包括: ``` type RateLimitDecision = { allowed: boolean; bucketId: string; identifier: string; limit: number; remaining: number; used: number; reset: number; resetDate: Date; degraded: boolean; policyId?: string; tier: "single" | "global" | "category" | "endpoint"; message?: string; stages?: RateLimitStageDecision[]; effective?: EffectiveQuotaMeta; }; ``` `stages` 包含每个阶段的决策快照。`effective` 标识了哪个阶段提供了根级别的 `limit`、`remaining` 和 `reset` 值。 #### 2. `consumeBucket(bucketId, identifier, options?, enforceOptions?)` 在不运行完整策略流水线的情况下从一个桶消耗令牌。 ``` const decision = await rateEngine.consumeBucket("api:default", "user_123", { failureMode: "closed", }); ``` - **参数** - `bucketId` (`TBucketId`):目标桶 ID。 - `identifier` (`string`):唯一的参与者标识符,例如 IP、用户 ID 或 API key。 - `options` (`ConsumeBucketOptions`):诸如 `{ rate, context, tier, policyId, message, failureMode }` 的选项。 - `enforceOptions` (`EnforceOptions`):可选钩子,例如 `{ waitUntil }`。 - **返回** - `Promise` #### 3. `readBucket(bucketId, identifier)` 在不消耗令牌的情况下读取桶的当前状态。 ``` const snapshot = await rateEngine.readBucket("api:default", "user_123"); ``` - **返回** - `Promise` #### 4. `resetBucket(bucketId, identifier)` 重置桶和标识符已消耗的令牌。 ``` await rateEngine.resetBucket("auth:login", "user_123"); ``` 成功的重置还会将有状态的运行状况跟踪器的 Redis 连接性记录为健康。 - **返回** - `Promise` #### 5. `getHealth()` Ping Redis 并返回有状态的运行状况遥测数据。 ``` const health = await rateEngine.getHealth(); ``` - **返回** ``` Promise<{ healthy: boolean; usingFallback: boolean; consecutiveFailures: number; totalFailures: number; lastFailure: Date | null; lastSuccess: Date | null; }>; ``` #### 6. `resetHealth()` 清除有状态的运行状况遥测数据。 ``` rateEngine.resetHealth(); ``` 这将重置: - `conFailures` - `totalFailures` - `lastFailure` - `lastSuccess` 适用于测试、管理重置,或者在恢复后希望清除历史运行状况计数器的长时间运行的进程。 - **返回** - `void` ## HTTP 适配器 RateEngine 包含了与框架无关的辅助工具,用于将限流状态返回给客户端。 ### `toRateLimitResponse(decision, options?)` 创建标准的 `429 Too Many Requests` JSON 响应。 ``` return toRateLimitResponse(decision, { message: "Too many requests. Please try again later.", errorCode: "RATE_LIMIT_EXCEEDED", }); ``` 响应体: ``` { "error": "RATE_LIMIT_EXCEEDED", "message": "Too many requests. Please try again later.", "retryAfter": 60, "degraded": false } ``` ### `toOAuthSlowDownResponse(decision, options?)` 为轮询和设备流程端点创建 OAuth 风格的 `slow_down` 响应。 ``` return toOAuthSlowDownResponse(decision, { message: "Polling too frequently. Please wait before trying again.", }); ``` 响应体: ``` { "error": "slow_down", "error_description": "Polling too frequently. Please wait before trying again.", "retry_after": 60, "degraded": false } ``` ### `getRateLimitHeaders(decision)` 根据决策构建限流请求头。 ``` const headers = getRateLimitHeaders(decision); ``` 在客户端达到限制之前需要配额元数据的成功响应上使用它: ``` import { getRateLimitHeaders, toRateLimitResponse } from "rate-engine"; const decision = await rateEngine.enforce("translate.request", { apiKeyId: "key_123", }); if (!decision.allowed) { return toRateLimitResponse(decision); } const result = await translate(request); return Response.json(result, { headers: getRateLimitHeaders(decision), }); ``` 返回的请求头包括: ``` RateLimit-Limit: 100 RateLimit-Remaining: 42 RateLimit-Reset: 60 ``` ## 开发 要构建包并生成 TypeScript 声明: ``` bun run build ``` 要运行包单元测试: ``` bun run test ``` 要运行包类型检查: ``` bun run typecheck ``` 构建后,验证发布的运行时导出: ``` bun run test:smoke ``` ## 相关包 - [`route-engine`](https://github.com/cpauldev/route-engine) 用于安全的 HTTP 路由边界。 - [`redact-log`](https://github.com/cpauldev/redact-log) 用于安全的日志记录。 - [`secret-engine`](https://github.com/cpauldev/secret-engine) 用于上下文绑定的加密和机密处理。 - [`session-engine`](https://github.com/cpauldev/session-engine) 用于浏览器会话和缓存生命周期管理。 ## 许可证 MIT © [Christian Paul](https://github.com/cpauldev)
标签:API网关, MITM代理, Redis, Serverless, TypeScript, 中间件, 安全插件, 搜索引擎查询, 策略引擎, 网络安全挑战, 自动化攻击, 限流