PetrefiedThunder/agent-middleware-api

GitHub: PetrefiedThunder/agent-middleware-api

一个面向代理原生系统的 FastAPI 中间件,统一提供状态、计费、遥测、安全与 AI 推理能力,解决代理工具链的管控与成本问题。

Stars: 0 | Forks: 0

# Agent 中间件 API [![CI](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/ac20dedba4031514.svg)](https://github.com/PetrefiedThunder/agent-middleware-api/actions/workflows/ci.yml) [![Auto PR](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/43dadd4de3031516.svg)](https://github.com/PetrefiedThunder/agent-middleware-api/actions/workflows/ci.yml) **The FastAPI control plane for agent-native infrastructure.** **Agent Middleware API** is a production-ready FastAPI service that provides durable state, billing, telemetry, secure tool execution, IoT connectivity, and **AI-powered agent intelligence** for autonomous agents. It lets agents: * Send and receive structured messages * Store persistent state * Bill and track usage * Emit telemetry and anomaly alerts * Execute secure external actions * **Make autonomous decisions with AI reasoning** * **Self-heal by diagnosing and fixing issues** * **Answer natural language queries** * **Remember and learn from experiences** Deploy in minutes via Docker or Railway. ## 架构概览 | Domain | Endpoint Prefix | Durable | Rate Limited | Auth Required | |--------------------------|-------------------|---------|--------------|---------------| | Billing | `/v1/billing` | Yes | Yes | Yes | | Stripe Webhooks | `/webhooks/stripe`| - | - | Yes | | Telemetry | `/v1/telemetry` | Yes | Yes | Yes | | Comms | `/v1/comms` | Yes | Yes | Yes | | IoT Bridge | `/v1/iot` | Optional| Yes | Yes | | Security | `/v1/security` | Partial | Yes | Yes | | **Agent Intelligence** | `/v1/ai` | Yes | Yes | Yes | *(Additional modules include programmatic media, content factory, agent oracle, and protocol generation)* ## 快速开始(本地) ``` python -m venv .venv source .venv/bin/activate pip install -r requirements.txt uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload ``` Then open: - `http://localhost:8000/docs` ## 示例 API 调用 ``` curl -X POST http://localhost:8000/v1/telemetry \ -H "X-API-Key: test-key" \ -H "Content-Type: application/json" \ -d '{"event":"agent_started","agent_id":"demo"}' ``` ## 计费与支付(`/v1/billing`) PostgreSQL-backed wallet system with ACID transactions, Stripe integration, and spend velocity monitoring. ### 钱包操作 ``` # 创建钱包 curl -X POST http://localhost:8000/v1/billing/wallets \ -H "X-API-Key: your-key" \ -H "Content-Type: application/json" \ -d '{"wallet_id": "agent-001", "balance": 10000}' # 向钱包添加信用 curl -X POST http://localhost:8000/v1/billing/wallets/agent-001/deposit \ -H "X-API-Key: your-key" \ -H "Content-Type: application/json" \ -d '{"amount": 5000, "description": "Monthly allocation"}' # 检查钱包余额 curl http://localhost:8000/v1/billing/wallets/agent-001 \ -H "X-API-Key: your-key" ``` ### Stripe 法币充值 ``` # 准备 Stripe 支付(返回 client_secret) curl -X POST http://localhost:8000/v1/billing/top-up/prepare \ -H "X-API-Key: your-key" \ -H "Content-Type: application/json" \ -d '{"wallet_id": "agent-001", "amount": 5000}' # Stripe Webhook 在 POST /webhooks/stripe 自动处理 # 当 payment_intent.succeeded 事件到达时分配信用 ``` ### KYC 身份验证(`/v1/kyc`) Require identity verification for sponsor wallets before allowing fiat top-ups. ``` # 创建需要 KYC 的赞助钱包 curl -X POST http://localhost:8000/v1/billing/wallets/sponsor \ -H "X-API-Key: your-key" \ -H "Content-Type: application/json" \ -d '{ "sponsor_name": "Acme Corp", "email": "billing@acme.com", "initial_credits": 10000, "require_kyc": true }' # 检查 KYC 状态 curl http://localhost:8000/v1/kyc/status/{wallet_id} \ -H "X-API-Key: your-key" # 创建 Stripe 身份验证会话 curl -X POST http://localhost:8000/v1/kyc/sessions \ -H "X-API-Key: your-key" \ -H "Content-Type: application/json" \ -d '{ "wallet_id": "{wallet_id}", "return_url": "https://yourapp.com/kyc-callback", "document_type": "passport" }' ``` ### Agent 到 Agent 转账 ``` # 在钱包之间转移信用 curl -X POST http://localhost:8000/v1/billing/transfer \ -H "X-API-Key: your-key" \ -H "Content-Type: application/json" \ -d '{ "from_wallet_id": "agent-001", "to_wallet_id": "agent-002", "amount": 1000, "memo": "Payment for data processing" }' # 创建带有支出限制子钱包 curl -X POST http://localhost:8000/v1/billing/wallets \ -H "X-API-Key: your-key" \ -H "Content-Type: application/json" \ -d '{ "wallet_id": "task-agent-001", "parent_wallet_id": "agent-001", "max_spend": 500, "task_description": "Data indexing", "ttl_seconds": 3600 }' ``` ### 服务市场 ``` # 注册服务 curl -X POST http://localhost:8000/v1/billing/services \ -H "X-API-Key: your-key" \ -H "Content-Type: application/json" \ -d '{ "service_id": "data-indexer", "name": "Data Indexing Service", "description": "Fast vector indexing for documents", "price_per_call": 50, "provider_wallet_id": "provider-001" }' # 列出可用服务 curl http://localhost:8000/v1/billing/services \ -H "X-API-Key: your-key" # 调用服务 curl -X POST http://localhost:8000/v1/billing/services/data-indexer/invoke \ -H "X-API-Key: your-key" \ -H "Content-Type: application/json" \ -d '{"caller_wallet_id": "agent-001", "input_data": {"documents": [...]}}' ``` ### 支出速度监控 Wallets are automatically frozen if spending exceeds thresholds. ``` # 检查速度状态 curl http://localhost:8000/v1/billing/wallets/agent-001/velocity \ -H "X-API-Key: your-key" # 解冻冻结的钱包 curl -X POST http://localhost:8000/v1/billing/wallets/agent-001/unfreeze \ -H "X-API-Key: your-key" ``` ### 配置 Stripe 与通知 ``` # Stripe STRIPE_SECRET_KEY=sk_test_... STRIPE_WEBHOOK_SECRET=whsec_... STRIPE_PUBLISHABLE_KEY=pk_test_... # 电子邮件提醒(Resend) RESEND_API_KEY=re_... ALERT_FROM_EMAIL=alerts@b2a.dev # Slack 通知 SLACK_WEBHOOK_URL=https://hooks.slack.com/... # 速度限制(信用) VELOCITY_HOURLY_LIMIT=1000.0 VELOCITY_DAILY_LIMIT=10000.0 VELOCITY_FREEZE_THRESHOLD=3 # KYC 验证 KYC_REQUIRED_FOR_TOPUP=false # Set to true to enforce KYC before fiat top-ups ``` ## AI Agent 智能(`/v1/ai`) Powered by LLM integration (OpenAI, Azure OpenAI, Anthropic, Ollama), agents can: ### 自主决策 ``` curl -X POST http://localhost:8000/v1/ai/decide \ -H "X-API-Key: your-key" \ -H "Content-Type: application/json" \ -d '{ "agent_id": "agent-001", "context": {"tasks": ["process_data", "send_report"], "load": 0.8}, "options": ["process_data", "send_report", "wait"] }' ``` ### 自愈 ``` curl -X POST http://localhost:8000/v1/ai/heal \ -H "X-API-Key: your-key" \ -H "Content-Type: application/json" \ -d '{ "issue": "Service returning 500 errors", "context": {"error_log": "Connection refused to database"} }' ``` ### 自然语言查询 ``` curl -X POST http://localhost:8000/v1/ai/query \ -H "X-API-Key: your-key" \ -H "Content-Type: application/json" \ -d '{"question": "What is the current system health?"}' ``` ### Agent 内存与学习 ``` # 存储记忆 curl -X POST http://localhost:8000/v1/ai/memory \ -H "X-API-Key: your-key" \ -H "Content-Type: application/json" \ -d '{"agent_id": "agent-001", "key": "preference", "value": "dark_mode"}' # 从经验学习 curl -X POST http://localhost:8000/v1/ai/learn \ -H "X-API-Key: your-key" \ -H "Content-Type: application/json" \ -d '{"agent_id": "agent-001", "experience": {"action": "retry", "outcome": "success"}}' ``` ### 配置 LLM 提供程序 ``` # OpenAI LLM_PROVIDER=openai LLM_API_KEY=sk-... LLM_MODEL=gpt-4o # Azure OpenAI LLM_PROVIDER=azure AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com AZURE_OPENAI_DEPLOYMENT=gpt-4o # Ollama(本地) LLM_PROVIDER=ollama OLLAMA_BASE_URL=http://localhost:11434 OLLAMA_MODEL=llama3 ``` ## Python SDK(`b2a-sdk`) Agents can integrate using the `b2a-sdk` package for automatic usage tracking and billing. ``` cd b2a_sdk && pip install -e . ``` ``` from b2a_sdk import B2AClient, monitored, billable, combined client = B2AClient( api_url="http://localhost:8000", api_key="your-key", wallet_id="agent-001" ) # 跟踪函数使用情况 @monitored(event_name="data_processing") def process_data(data: list): return [x * 2 for x in data] # 每次调用自动扣除信用 @billable(amount=10) def call_llm(prompt: str): return f"Response to: {prompt}" # 使用组合计费链式操作 @combined(total_amount=100, step_amount=25) async def complex_task(): step1 = await client.emit_telemetry(...) step2 = await client.get_balance(...) return step1 + step2 ``` See [`b2a_sdk/README.md`](./b2a_sdk/README.md) for full documentation. ## 安全探索(干运行沙箱) Agents can safely test billing operations without affecting real wallet balances or triggering velocity monitoring. ### 基于会话的模拟 ``` from b2a_sdk import B2AClient from b2a_sdk.decorators import billable b2a = B2AClient(api_key="key", wallet_id="agent-001") @billable(b2a, wallet_id="agent-001", service_category="content_factory", units=10.0) async def generate_video(url: str): return {"url": url} # 模拟 - 无实际扣费 async with b2a.simulate_session(wallet_id="agent-001") as sim: await generate_video("https://example.com/video.mp4") print(f"Cost: {sim.total_cost}") # 500 credits print(f"Would succeed: {sim.would_succeed}") # True # 实际执行 await generate_video("https://example.com/video.mp4") # Real deduction ``` ### 单次估算 ``` # 快速价格检查,无需会话开销 estimate = await b2a.get_dry_run_estimate( "agent-001", "content_factory", units=10.0, ) print(f"Would cost: {estimate['credits_would_charge']} credits") ``` ### API 端点 ``` # 启动模拟会话(15 分钟 TTL) POST /v1/billing/dry-run/session {"wallet_id": "agent-001"} # 在会话内模拟扣费 POST /v1/billing/dry-run/charge {"wallet_id": "agent-001", "service": "content_factory", "units": 10.0, "dry_run_session_id": "..."} # 结束会话并获取摘要 DELETE /v1/billing/dry-run/session/{session_id} ``` ## MCP 服务器(`/mcp`) Model Context Protocol (MCP) enables agents to discover and call your billable services. Tools are automatically exposed with their schemas and pricing. ### 发现可用工具 ``` # 获取 MCP 工具清单 curl http://localhost:8000/mcp/tools.json # 通过 CLI 列出工具 cd b2a_sdk && pip install -e . && python -m b2a_sdk.mcp list ``` ### 通过 JSON-RPC 调用工具 ``` # 列出工具 curl -X POST http://localhost:8000/mcp/messages \ -H "Content-Type: application/json" \ -d '{"jsonrpc": "2.0", "method": "tools/list", "id": 1}' # 调用工具 curl -X POST http://localhost:8000/mcp/messages \ -H "Content-Type: application/json" \ -d '{ "jsonrpc": "2.0", "method": "tools/call", "params": { "name": "my-service", "arguments": {"input": "value"}, "mcpContext": {"wallet_id": "agent-001"} }, "id": 2 }' ``` ### 使用 `@mcp_tool` 创建 MCP 工具 ``` from b2a_sdk.decorators import mcp_tool @mcp_tool( service_id="video-generator", name="Video Generator", description="Generate videos from URLs", category="content_factory", credits_per_unit=50.0, unit_name="video", ) async def generate_video(url: str, style: str = "cinematic") -> dict: """Your tool implementation here.""" return {"video_url": f"{url}.mp4"} ``` ### 生成独立 MCP 服务器 ``` # 生成独立的 Python MCP 服务器 cd b2a_sdk && pip install -e . && pip install mcp httpx python -m b2a_sdk.mcp standalone --output my_server.py # 运行它 export B2A_API_KEY=your-key export B2A_WALLET_ID=your-wallet python my_server.py ``` ## 部署(Railway + PostgreSQL) This repository is deployment-ready for Railway via `Dockerfile` + `railway.json`. ### 1. 创建服务 1. Create a Railway project from this repository. 2. Add a PostgreSQL database: - Dashboard → + New → Database → PostgreSQL - Or CLI: `railway add -n agent-middleware-db` 3. Link the database to your app in the Railway dashboard. ### 2. 通过 CLI 部署 ``` railway login railway init railway up ``` ### 3. 配置环境变量 Set these in Railway dashboard or via CLI: ``` railway variables set DEBUG=false railway variables set VALID_API_KEYS=your-key-here,your-other-key railway variables set STATE_BACKEND=postgres railway variables set RATE_LIMIT_PER_MINUTE=120 railway variables set CORS_ORIGINS=https://your-app.com # Stripe(可选 - 用于法币充值) railway variables set STRIPE_SECRET_KEY=sk_live_... railway variables set STRIPE_WEBHOOK_SECRET=whsec_... # 速度监控(可选) railway variables set VELOCITY_HOURLY_LIMIT=1000.0 railway variables set VELOCITY_DAILY_LIMIT=10000.0 railway variables set VELOCITY_FREEZE_THRESHOLD=3 ``` ### 4. 验证部署 - `GET /health` returns `200` - `GET /health/dependencies` reports healthy state backend - `GET /docs` loads OpenAPI UI ### PostgreSQL 连接字符串 Railway automatically provides `DATABASE_URL` when you add the PostgreSQL addon. To get it manually: ``` railway variables get DATABASE_URL ``` ## 安全模型 - API-key based authentication for protected endpoints - Per-key rate limiting (Redis-backed) - Durable audit logging (Postgres) - Configurable CORS (via `CORS_ORIGINS` env var) - Red-team endpoints (`/v1/security`, `/v1/rtaas`) for adversarial testing - Memory fallback disabled in production by default ## 耐久性模型 - **PostgreSQL:** preferred durable state backend for runtime stores. - **Redis:** distributed rate limiting and backend fallback state store. - **Memory:** automatic fallback when no persistent backend is configured. Current durable service stores: - Billing (`wallets`, `ledger`, `alerts`, `velocity_snapshots`, `services`) - Comms (`agent registry`, `inbox`, `outbox`) - Telemetry (`events`, `anomalies`) ## 路线图 - [x] PostgreSQL ledger with ACID transactions - [x] Stripe fiat ingestion with webhooks - [x] Agent-to-agent transfers with child wallets - [x] Service marketplace - [x] Spend velocity monitoring with auto-freeze - [x] Python SDK (`b2a-sdk`) - [x] MCP Server Generator for agent tool exposure - [x] Stripe Identity (KYC) for sponsor verification - [ ] Sandbox engine wired to billing - [ ] Automated API key rotation for wallets - [ ] Add comprehensive agent interaction examples and recipes - [ ] Multi-tenant hardening validations - [ ] Add SQLite backend support for simpler edge deployments ## API 发现端点 - `GET /` - `GET /openapi.json` - `GET /llm.txt` - `GET /.well-known/agent.json` ## 配置与开发 Use `.env.example` as local template and `.env.production` as production reference. **Run tests:** ``` # API 测试 pytest -q # SDK 测试 cd b2a_sdk && pip install -e ".[dev]" && pytest -q ``` *(CI runs on Python 3.11 and 3.12 via GitHub Actions)* ## 安全与贡献 Report vulnerabilities using `/SECURITY.md`. Please read `/CONTRIBUTING.md` before opening pull requests. ## 许可证 MIT License. See `/LICENSE`.
标签:AI代理, AI风险缓解, API网关, AV绕过, FastAPI, IoT网关, Python, Railway部署, Streamlit, Stripe集成, Webhooks, 云端原生, 代理中间件, 协议生成, 异常检测, 持久化状态, 推理, 搜索引擎查询, 无后门, 无头服务, 测试用例, 消息队列, 物联网桥接, 生产就绪, 监控告警, 端点安全, 红队安全, 自动化决策, 自愈系统, 自然语言查询, 补丁管理, 计费, 访问控制, 请求拦截, 账单追踪, 逆向工具, 遥测