PetrefiedThunder/agent-middleware-api
GitHub: PetrefiedThunder/agent-middleware-api
一个面向代理原生系统的 FastAPI 中间件,统一提供状态、计费、遥测、安全与 AI 推理能力,解决代理工具链的管控与成本问题。
Stars: 0 | Forks: 0
# Agent 中间件 API
[](https://github.com/PetrefiedThunder/agent-middleware-api/actions/workflows/ci.yml)
[](https://github.com/PetrefiedThunder/agent-middleware-api/actions/workflows/ci.yml)
**The FastAPI control plane for agent-native infrastructure.**
**Agent Middleware API** is a production-ready FastAPI service that provides durable state, billing, telemetry, secure tool execution, IoT connectivity, and **AI-powered agent intelligence** for autonomous agents.
It lets agents:
* Send and receive structured messages
* Store persistent state
* Bill and track usage
* Emit telemetry and anomaly alerts
* Execute secure external actions
* **Make autonomous decisions with AI reasoning**
* **Self-heal by diagnosing and fixing issues**
* **Answer natural language queries**
* **Remember and learn from experiences**
Deploy in minutes via Docker or Railway.
## 架构概览
| Domain | Endpoint Prefix | Durable | Rate Limited | Auth Required |
|--------------------------|-------------------|---------|--------------|---------------|
| Billing | `/v1/billing` | Yes | Yes | Yes |
| Stripe Webhooks | `/webhooks/stripe`| - | - | Yes |
| Telemetry | `/v1/telemetry` | Yes | Yes | Yes |
| Comms | `/v1/comms` | Yes | Yes | Yes |
| IoT Bridge | `/v1/iot` | Optional| Yes | Yes |
| Security | `/v1/security` | Partial | Yes | Yes |
| **Agent Intelligence** | `/v1/ai` | Yes | Yes | Yes |
*(Additional modules include programmatic media, content factory, agent oracle, and protocol generation)*
## 快速开始(本地)
```
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
```
Then open:
- `http://localhost:8000/docs`
## 示例 API 调用
```
curl -X POST http://localhost:8000/v1/telemetry \
-H "X-API-Key: test-key" \
-H "Content-Type: application/json" \
-d '{"event":"agent_started","agent_id":"demo"}'
```
## 计费与支付(`/v1/billing`)
PostgreSQL-backed wallet system with ACID transactions, Stripe integration, and spend velocity monitoring.
### 钱包操作
```
# 创建钱包
curl -X POST http://localhost:8000/v1/billing/wallets \
-H "X-API-Key: your-key" \
-H "Content-Type: application/json" \
-d '{"wallet_id": "agent-001", "balance": 10000}'
# 向钱包添加信用
curl -X POST http://localhost:8000/v1/billing/wallets/agent-001/deposit \
-H "X-API-Key: your-key" \
-H "Content-Type: application/json" \
-d '{"amount": 5000, "description": "Monthly allocation"}'
# 检查钱包余额
curl http://localhost:8000/v1/billing/wallets/agent-001 \
-H "X-API-Key: your-key"
```
### Stripe 法币充值
```
# 准备 Stripe 支付(返回 client_secret)
curl -X POST http://localhost:8000/v1/billing/top-up/prepare \
-H "X-API-Key: your-key" \
-H "Content-Type: application/json" \
-d '{"wallet_id": "agent-001", "amount": 5000}'
# Stripe Webhook 在 POST /webhooks/stripe 自动处理
# 当 payment_intent.succeeded 事件到达时分配信用
```
### KYC 身份验证(`/v1/kyc`)
Require identity verification for sponsor wallets before allowing fiat top-ups.
```
# 创建需要 KYC 的赞助钱包
curl -X POST http://localhost:8000/v1/billing/wallets/sponsor \
-H "X-API-Key: your-key" \
-H "Content-Type: application/json" \
-d '{
"sponsor_name": "Acme Corp",
"email": "billing@acme.com",
"initial_credits": 10000,
"require_kyc": true
}'
# 检查 KYC 状态
curl http://localhost:8000/v1/kyc/status/{wallet_id} \
-H "X-API-Key: your-key"
# 创建 Stripe 身份验证会话
curl -X POST http://localhost:8000/v1/kyc/sessions \
-H "X-API-Key: your-key" \
-H "Content-Type: application/json" \
-d '{
"wallet_id": "{wallet_id}",
"return_url": "https://yourapp.com/kyc-callback",
"document_type": "passport"
}'
```
### Agent 到 Agent 转账
```
# 在钱包之间转移信用
curl -X POST http://localhost:8000/v1/billing/transfer \
-H "X-API-Key: your-key" \
-H "Content-Type: application/json" \
-d '{
"from_wallet_id": "agent-001",
"to_wallet_id": "agent-002",
"amount": 1000,
"memo": "Payment for data processing"
}'
# 创建带有支出限制子钱包
curl -X POST http://localhost:8000/v1/billing/wallets \
-H "X-API-Key: your-key" \
-H "Content-Type: application/json" \
-d '{
"wallet_id": "task-agent-001",
"parent_wallet_id": "agent-001",
"max_spend": 500,
"task_description": "Data indexing",
"ttl_seconds": 3600
}'
```
### 服务市场
```
# 注册服务
curl -X POST http://localhost:8000/v1/billing/services \
-H "X-API-Key: your-key" \
-H "Content-Type: application/json" \
-d '{
"service_id": "data-indexer",
"name": "Data Indexing Service",
"description": "Fast vector indexing for documents",
"price_per_call": 50,
"provider_wallet_id": "provider-001"
}'
# 列出可用服务
curl http://localhost:8000/v1/billing/services \
-H "X-API-Key: your-key"
# 调用服务
curl -X POST http://localhost:8000/v1/billing/services/data-indexer/invoke \
-H "X-API-Key: your-key" \
-H "Content-Type: application/json" \
-d '{"caller_wallet_id": "agent-001", "input_data": {"documents": [...]}}'
```
### 支出速度监控
Wallets are automatically frozen if spending exceeds thresholds.
```
# 检查速度状态
curl http://localhost:8000/v1/billing/wallets/agent-001/velocity \
-H "X-API-Key: your-key"
# 解冻冻结的钱包
curl -X POST http://localhost:8000/v1/billing/wallets/agent-001/unfreeze \
-H "X-API-Key: your-key"
```
### 配置 Stripe 与通知
```
# Stripe
STRIPE_SECRET_KEY=sk_test_...
STRIPE_WEBHOOK_SECRET=whsec_...
STRIPE_PUBLISHABLE_KEY=pk_test_...
# 电子邮件提醒(Resend)
RESEND_API_KEY=re_...
ALERT_FROM_EMAIL=alerts@b2a.dev
# Slack 通知
SLACK_WEBHOOK_URL=https://hooks.slack.com/...
# 速度限制(信用)
VELOCITY_HOURLY_LIMIT=1000.0
VELOCITY_DAILY_LIMIT=10000.0
VELOCITY_FREEZE_THRESHOLD=3
# KYC 验证
KYC_REQUIRED_FOR_TOPUP=false # Set to true to enforce KYC before fiat top-ups
```
## AI Agent 智能(`/v1/ai`)
Powered by LLM integration (OpenAI, Azure OpenAI, Anthropic, Ollama), agents can:
### 自主决策
```
curl -X POST http://localhost:8000/v1/ai/decide \
-H "X-API-Key: your-key" \
-H "Content-Type: application/json" \
-d '{
"agent_id": "agent-001",
"context": {"tasks": ["process_data", "send_report"], "load": 0.8},
"options": ["process_data", "send_report", "wait"]
}'
```
### 自愈
```
curl -X POST http://localhost:8000/v1/ai/heal \
-H "X-API-Key: your-key" \
-H "Content-Type: application/json" \
-d '{
"issue": "Service returning 500 errors",
"context": {"error_log": "Connection refused to database"}
}'
```
### 自然语言查询
```
curl -X POST http://localhost:8000/v1/ai/query \
-H "X-API-Key: your-key" \
-H "Content-Type: application/json" \
-d '{"question": "What is the current system health?"}'
```
### Agent 内存与学习
```
# 存储记忆
curl -X POST http://localhost:8000/v1/ai/memory \
-H "X-API-Key: your-key" \
-H "Content-Type: application/json" \
-d '{"agent_id": "agent-001", "key": "preference", "value": "dark_mode"}'
# 从经验学习
curl -X POST http://localhost:8000/v1/ai/learn \
-H "X-API-Key: your-key" \
-H "Content-Type: application/json" \
-d '{"agent_id": "agent-001", "experience": {"action": "retry", "outcome": "success"}}'
```
### 配置 LLM 提供程序
```
# OpenAI
LLM_PROVIDER=openai
LLM_API_KEY=sk-...
LLM_MODEL=gpt-4o
# Azure OpenAI
LLM_PROVIDER=azure
AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com
AZURE_OPENAI_DEPLOYMENT=gpt-4o
# Ollama(本地)
LLM_PROVIDER=ollama
OLLAMA_BASE_URL=http://localhost:11434
OLLAMA_MODEL=llama3
```
## Python SDK(`b2a-sdk`)
Agents can integrate using the `b2a-sdk` package for automatic usage tracking and billing.
```
cd b2a_sdk && pip install -e .
```
```
from b2a_sdk import B2AClient, monitored, billable, combined
client = B2AClient(
api_url="http://localhost:8000",
api_key="your-key",
wallet_id="agent-001"
)
# 跟踪函数使用情况
@monitored(event_name="data_processing")
def process_data(data: list):
return [x * 2 for x in data]
# 每次调用自动扣除信用
@billable(amount=10)
def call_llm(prompt: str):
return f"Response to: {prompt}"
# 使用组合计费链式操作
@combined(total_amount=100, step_amount=25)
async def complex_task():
step1 = await client.emit_telemetry(...)
step2 = await client.get_balance(...)
return step1 + step2
```
See [`b2a_sdk/README.md`](./b2a_sdk/README.md) for full documentation.
## 安全探索(干运行沙箱)
Agents can safely test billing operations without affecting real wallet balances or triggering velocity monitoring.
### 基于会话的模拟
```
from b2a_sdk import B2AClient
from b2a_sdk.decorators import billable
b2a = B2AClient(api_key="key", wallet_id="agent-001")
@billable(b2a, wallet_id="agent-001", service_category="content_factory", units=10.0)
async def generate_video(url: str):
return {"url": url}
# 模拟 - 无实际扣费
async with b2a.simulate_session(wallet_id="agent-001") as sim:
await generate_video("https://example.com/video.mp4")
print(f"Cost: {sim.total_cost}") # 500 credits
print(f"Would succeed: {sim.would_succeed}") # True
# 实际执行
await generate_video("https://example.com/video.mp4") # Real deduction
```
### 单次估算
```
# 快速价格检查,无需会话开销
estimate = await b2a.get_dry_run_estimate(
"agent-001",
"content_factory",
units=10.0,
)
print(f"Would cost: {estimate['credits_would_charge']} credits")
```
### API 端点
```
# 启动模拟会话(15 分钟 TTL)
POST /v1/billing/dry-run/session
{"wallet_id": "agent-001"}
# 在会话内模拟扣费
POST /v1/billing/dry-run/charge
{"wallet_id": "agent-001", "service": "content_factory", "units": 10.0, "dry_run_session_id": "..."}
# 结束会话并获取摘要
DELETE /v1/billing/dry-run/session/{session_id}
```
## MCP 服务器(`/mcp`)
Model Context Protocol (MCP) enables agents to discover and call your billable services. Tools are automatically exposed with their schemas and pricing.
### 发现可用工具
```
# 获取 MCP 工具清单
curl http://localhost:8000/mcp/tools.json
# 通过 CLI 列出工具
cd b2a_sdk && pip install -e . && python -m b2a_sdk.mcp list
```
### 通过 JSON-RPC 调用工具
```
# 列出工具
curl -X POST http://localhost:8000/mcp/messages \
-H "Content-Type: application/json" \
-d '{"jsonrpc": "2.0", "method": "tools/list", "id": 1}'
# 调用工具
curl -X POST http://localhost:8000/mcp/messages \
-H "Content-Type: application/json" \
-d '{
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "my-service",
"arguments": {"input": "value"},
"mcpContext": {"wallet_id": "agent-001"}
},
"id": 2
}'
```
### 使用 `@mcp_tool` 创建 MCP 工具
```
from b2a_sdk.decorators import mcp_tool
@mcp_tool(
service_id="video-generator",
name="Video Generator",
description="Generate videos from URLs",
category="content_factory",
credits_per_unit=50.0,
unit_name="video",
)
async def generate_video(url: str, style: str = "cinematic") -> dict:
"""Your tool implementation here."""
return {"video_url": f"{url}.mp4"}
```
### 生成独立 MCP 服务器
```
# 生成独立的 Python MCP 服务器
cd b2a_sdk && pip install -e . && pip install mcp httpx
python -m b2a_sdk.mcp standalone --output my_server.py
# 运行它
export B2A_API_KEY=your-key
export B2A_WALLET_ID=your-wallet
python my_server.py
```
## 部署(Railway + PostgreSQL)
This repository is deployment-ready for Railway via `Dockerfile` + `railway.json`.
### 1. 创建服务
1. Create a Railway project from this repository.
2. Add a PostgreSQL database:
- Dashboard → + New → Database → PostgreSQL
- Or CLI: `railway add -n agent-middleware-db`
3. Link the database to your app in the Railway dashboard.
### 2. 通过 CLI 部署
```
railway login
railway init
railway up
```
### 3. 配置环境变量
Set these in Railway dashboard or via CLI:
```
railway variables set DEBUG=false
railway variables set VALID_API_KEYS=your-key-here,your-other-key
railway variables set STATE_BACKEND=postgres
railway variables set RATE_LIMIT_PER_MINUTE=120
railway variables set CORS_ORIGINS=https://your-app.com
# Stripe(可选 - 用于法币充值)
railway variables set STRIPE_SECRET_KEY=sk_live_...
railway variables set STRIPE_WEBHOOK_SECRET=whsec_...
# 速度监控(可选)
railway variables set VELOCITY_HOURLY_LIMIT=1000.0
railway variables set VELOCITY_DAILY_LIMIT=10000.0
railway variables set VELOCITY_FREEZE_THRESHOLD=3
```
### 4. 验证部署
- `GET /health` returns `200`
- `GET /health/dependencies` reports healthy state backend
- `GET /docs` loads OpenAPI UI
### PostgreSQL 连接字符串
Railway automatically provides `DATABASE_URL` when you add the PostgreSQL addon. To get it manually:
```
railway variables get DATABASE_URL
```
## 安全模型
- API-key based authentication for protected endpoints
- Per-key rate limiting (Redis-backed)
- Durable audit logging (Postgres)
- Configurable CORS (via `CORS_ORIGINS` env var)
- Red-team endpoints (`/v1/security`, `/v1/rtaas`) for adversarial testing
- Memory fallback disabled in production by default
## 耐久性模型
- **PostgreSQL:** preferred durable state backend for runtime stores.
- **Redis:** distributed rate limiting and backend fallback state store.
- **Memory:** automatic fallback when no persistent backend is configured.
Current durable service stores:
- Billing (`wallets`, `ledger`, `alerts`, `velocity_snapshots`, `services`)
- Comms (`agent registry`, `inbox`, `outbox`)
- Telemetry (`events`, `anomalies`)
## 路线图
- [x] PostgreSQL ledger with ACID transactions
- [x] Stripe fiat ingestion with webhooks
- [x] Agent-to-agent transfers with child wallets
- [x] Service marketplace
- [x] Spend velocity monitoring with auto-freeze
- [x] Python SDK (`b2a-sdk`)
- [x] MCP Server Generator for agent tool exposure
- [x] Stripe Identity (KYC) for sponsor verification
- [ ] Sandbox engine wired to billing
- [ ] Automated API key rotation for wallets
- [ ] Add comprehensive agent interaction examples and recipes
- [ ] Multi-tenant hardening validations
- [ ] Add SQLite backend support for simpler edge deployments
## API 发现端点
- `GET /`
- `GET /openapi.json`
- `GET /llm.txt`
- `GET /.well-known/agent.json`
## 配置与开发
Use `.env.example` as local template and `.env.production` as production reference.
**Run tests:**
```
# API 测试
pytest -q
# SDK 测试
cd b2a_sdk && pip install -e ".[dev]" && pytest -q
```
*(CI runs on Python 3.11 and 3.12 via GitHub Actions)*
## 安全与贡献
Report vulnerabilities using `/SECURITY.md`.
Please read `/CONTRIBUTING.md` before opening pull requests.
## 许可证
MIT License. See `/LICENSE`.
标签:AI代理, AI风险缓解, API网关, AV绕过, FastAPI, IoT网关, Python, Railway部署, Streamlit, Stripe集成, Webhooks, 云端原生, 代理中间件, 协议生成, 异常检测, 持久化状态, 推理, 搜索引擎查询, 无后门, 无头服务, 测试用例, 消息队列, 物联网桥接, 生产就绪, 监控告警, 端点安全, 红队安全, 自动化决策, 自愈系统, 自然语言查询, 补丁管理, 计费, 访问控制, 请求拦截, 账单追踪, 逆向工具, 遥测