zero-day-ai/sdk

GitHub: zero-day-ai/sdk

用于构建 Kubernetes 原生 AI 代理、工具和插件的 Go SDK,支持多模型 LLM 推理、分布式任务执行和基于 Neo4j 的知识图谱自动构建。

Stars: 0 | Forks: 0

# Gibson SDK 用于构建 Kubernetes 原生 AI 代理、工具和插件的官方 Go SDK。 ## 安装 ``` go get github.com/zero-day-ai/sdk@latest ``` ## 概述 Gibson 是** Kubernetes 原生 AI 代理开发框架**。使用 Helm 部署 Gibson,然后使用此 SDK 构建能够执行以下操作的自主代理: - 使用前沿的 LLM (Claude, GPT, Gemini, Ollama) 进行推理 - 通过分布式 Redis 工作队列执行工具 - 将知识存储在由 Neo4j 支持的图数据库中 - 使用标准的 Kubernetes 模式进行水平扩展 ``` # 将 Gibson 部署到您的集群 helm install gibson gibson/gibson --namespace gibson-system --create-namespace # 构建并部署您的 agent go build -o myagent ./cmd/myagent kubectl apply -f myagent-deployment.yaml ``` ## 快速示例 构建一个用于排查 Kubernetes 集群故障的代理: ``` package main import ( "context" "github.com/zero-day-ai/sdk/agent" "github.com/zero-day-ai/sdk/llm" "github.com/zero-day-ai/sdk/serve" ) type K8sTroubleshooter struct{} func (a *K8sTroubleshooter) Name() string { return "k8s-troubleshooter" } func (a *K8sTroubleshooter) Version() string { return "1.0.0" } func (a *K8sTroubleshooter) Description() string { return "Troubleshoots Kubernetes cluster issues" } func (a *K8sTroubleshooter) LLMSlots() []agent.SlotDefinition { return []agent.SlotDefinition{ agent.NewSlotDefinition("primary", "Main reasoning LLM", true). WithConstraints(agent.SlotConstraints{ MinContextWindow: 8000, RequiredFeatures: []string{agent.FeatureToolUse}, }), } } func (a *K8sTroubleshooter) Execute(ctx context.Context, task agent.Task, h agent.Harness) (agent.Result, error) { // Use LLM to reason about the issue messages := []llm.Message{ llm.NewSystemMessage("You are a Kubernetes expert. Diagnose cluster issues."), llm.NewUserMessage(task.Goal), } resp, err := h.Complete(ctx, "primary", messages) if err != nil { return agent.NewFailedResult(err), err } // Execute kubectl via tool output, err := h.ExecuteTool(ctx, "kubectl", &pb.KubectlRequest{ Command: "get pods -A --field-selector=status.phase!=Running", }) // Store findings in memory h.Memory().Mission().Set(ctx, "diagnosis", resp.Content) return agent.NewSuccessResult(map[string]any{ "diagnosis": resp.Content, "pods": output, }), nil } func main() { serve.Agent(&K8sTroubleshooter{}, serve.WithPort(50051)) } ``` 作为 Kubernetes 工作负载部署: ``` apiVersion: apps/v1 kind: Deployment metadata: name: k8s-troubleshooter spec: replicas: 1 template: spec: containers: - name: agent image: myorg/k8s-troubleshooter:latest ports: - containerPort: 50051 env: - name: REDIS_URL value: redis://gibson-redis:6379 ``` ## 核心概念 ### 组件类型 | 组件 | 用途 | 状态 | I/O | 示例 | |-----------|---------|-------|-----|---------| | **Agent** | 自主 LLM 驱动的任务执行 | 有状态 | Harness API | k8s-troubleshooter, log-analyzer | | **Tool** | 原子操作 (CLI wrappers, APIs) | 无状态 | Protocol Buffers | kubectl, curl, terraform | | **Plugin** | 外部服务集成 | 有状态 | JSON | slack, jira, pagerduty | ### Harness (控制引擎) 每个代理都会接收一个 `Harness` - 这是您访问所有 Gibson 功能的单一接口: ``` type Harness interface { // LLM Access - reason with AI Complete(ctx, slot, messages, opts...) (*CompletionResponse, error) CompleteWithTools(ctx, slot, messages, tools) (*CompletionResponse, error) CompleteStructured(ctx, slot, schema, messages) (*StructuredResult, error) // Tool Execution - call tools via Redis queues ExecuteTool(ctx, name, input proto.Message) (proto.Message, error) CallToolProto(ctx, name, request, response proto.Message) error // Agent Delegation - spawn sub-agents DelegateToAgent(ctx, name, task) (Result, error) // Memory - three-tier persistence Memory() MemoryManager // Knowledge Graph - Neo4j-backed storage QueryNodes(ctx, query) ([]*QueryResult, error) StoreNode(ctx, node) (string, error) // Observability - built-in tracing and logging Logger() *slog.Logger Tracer() trace.Tracer } ``` ### LLM 槽位 代理声明在运行时解析的抽象 LLM 需求: ``` func (a *MyAgent) LLMSlots() []agent.SlotDefinition { return []agent.SlotDefinition{ agent.NewSlotDefinition("primary", "Main reasoning LLM", true). WithConstraints(agent.SlotConstraints{ MinContextWindow: 8000, RequiredFeatures: []string{agent.FeatureToolUse, agent.FeatureJSONMode}, }), agent.NewSlotDefinition("fast", "Quick completions", false). WithConstraints(agent.SlotConstraints{ MinContextWindow: 4000, }), } } ``` **槽位特性:** | 特性 | 描述 | |---------|-------------| | `tool_use` | 函数调用支持 | | `vision` | 图像分析能力 | | `streaming` | 流式响应支持 | | `json_mode` | 结构化 JSON 输出 | ### 三层内存 ``` // Working Memory - ephemeral, task-scoped h.Memory().Working().Set(ctx, "step", "diagnosing") h.Memory().Working().Get(ctx, "step") // Mission Memory - persistent, Redis-backed with full-text search h.Memory().Mission().Set(ctx, "findings", data, metadata) h.Memory().Mission().Search(ctx, "error timeout", 10) // Long-Term Memory - vector embeddings for semantic search h.Memory().LongTerm().Store(ctx, "Kubernetes OOM kills often caused by...", metadata) h.Memory().LongTerm().Search(ctx, "pod memory issues", 5) ``` ## 构建代理 ### 1. 实现 Agent 接口 ``` type MyAgent struct { config Config } // Identity func (a *MyAgent) Name() string { return "my-agent" } func (a *MyAgent) Version() string { return "1.0.0" } func (a *MyAgent) Description() string { return "My autonomous agent" } // Capabilities func (a *MyAgent) Capabilities() []string { return []string{"troubleshooting", "analysis"} } // LLM Requirements func (a *MyAgent) LLMSlots() []agent.SlotDefinition { return []agent.SlotDefinition{ agent.NewSlotDefinition("primary", "Main LLM", true). WithConstraints(agent.SlotConstraints{ MinContextWindow: 8000, RequiredFeatures: []string{agent.FeatureToolUse}, }), } } // Lifecycle func (a *MyAgent) Initialize(ctx context.Context, cfg agent.AgentConfig) error { return nil } func (a *MyAgent) Shutdown(ctx context.Context) error { return nil } func (a *MyAgent) Health(ctx context.Context) types.HealthStatus { return types.HealthStatus{Status: types.HealthStatusHealthy} } // Core Execution func (a *MyAgent) Execute(ctx context.Context, task agent.Task, h agent.Harness) (agent.Result, error) { result := agent.NewResult(task.ID) result.Start() // Your agent logic here messages := []llm.Message{ llm.NewSystemMessage("You are an expert assistant"), llm.NewUserMessage(task.Goal), } resp, err := h.Complete(ctx, "primary", messages) if err != nil { result.Fail(err) return result, err } result.Complete(map[string]any{"response": resp.Content}) return result, nil } ``` ### 2. 提供代理服务 ``` func main() { agent := &myagent.MyAgent{} serve.Agent(agent, serve.WithPort(50051), serve.WithHealthPort(8080), // K8s probes ) } ``` ### 3. 部署到 Kubernetes ``` apiVersion: apps/v1 kind: Deployment metadata: name: my-agent spec: replicas: 2 template: spec: containers: - name: agent image: myorg/my-agent:latest ports: - containerPort: 50051 - containerPort: 8080 livenessProbe: httpGet: path: /healthz port: 8080 readinessProbe: httpGet: path: /readyz port: 8080 env: - name: REDIS_URL value: redis://gibson-redis:6379 ``` ## 构建工具 工具是实现 SDK 的 `tool.Tool` 接口的无状态进程,它们定义自己的 proto 类型,并通过 gRPC 向 gibson 注册。该平台负责调度、可观察性,以及至关重要的——根据您的工具返回的内容**自动填充知识图谱**。 ### 心智模型 您的工具作为独立的 Go 二进制文件运行。它并不存在于这个 SDK 仓库中。您的工具和 gibson 在编译时都导入此 SDK;在运行时,它们是通过 gRPC 通信的独立进程。SDK 是契约;您来实现它。 “自动填充图谱”的神奇之处在于一个规则:**您的响应消息中的 proto 字段 100 必须是 `gibson.graphrag.v1.DiscoveryResult`**。Gibson 的 `DiscoveryProcessor` 会内省每个工具响应,找到字段 100,并将实体 + 关系写入 Neo4j。您永远不需要直接写入 Neo4j。 ### 实战示例:自动填充主机/端口/服务关系的 nmap 工具 **步骤 1 — 从模板引导** ``` gh repo create my-nmap-tool --template zero-day-ai/component-skeleton --public cd my-nmap-tool # 您将获得:main.go, Dockerfile, Makefile, component.yaml, CLAUDE.md ``` 编辑 `component.yaml`:`name: my-nmap-tool`、`kind: tool`、版本、描述。 **步骤 2 — 定义 proto(字段 100 是魔法所在)** `api/proto/mytool/nmap/v1/nmap.proto`: ``` syntax = "proto3"; package mytool.nmap.v1; option go_package = "github.com/you/my-nmap-tool/api/gen/nmap/v1;nmappb"; import "gibson/graphrag/v1/graphrag.proto"; // ← from this SDK message NmapRequest { repeated string targets = 1; // "10.0.0.0/24" or "example.com" string scan_type = 2; // "syn", "connect", "udp", etc. repeated string ports = 3; // "1-1024" or "80,443,22" int32 timeout_seconds = 4; } message NmapResponse { // Tool-specific result fields surfaced to the agent. string raw_xml = 1; int32 hosts_scanned = 2; double scan_duration_seconds = 3; // ── Field 100 is reserved platform-wide for DiscoveryResult. ── // Gibson's DiscoveryProcessor extracts it from every tool response // and writes the entities + relationships into Neo4j automatically. gibson.graphrag.v1.DiscoveryResult discovery = 100; } ``` 运行 `buf generate`(骨架的 Makefile 目标)。输出:`api/gen/nmap/v1/nmap.pb.go`。 **步骤 3 — 实现工具** ``` package main import ( "context" "encoding/xml" "fmt" "os/exec" nmappb "github.com/you/my-nmap-tool/api/gen/nmap/v1" graphragpb "github.com/zero-day-ai/sdk/api/gen/gibson/graphrag/v1" "github.com/zero-day-ai/sdk/serve" "google.golang.org/protobuf/proto" ) type NmapTool struct{} func (t *NmapTool) Name() string { return "my-nmap-tool" } func (t *NmapTool) Version() string { return "v0.1.0" } func (t *NmapTool) InputMessageType() string { return "mytool.nmap.v1.NmapRequest" } func (t *NmapTool) OutputMessageType() string { return "mytool.nmap.v1.NmapResponse" } // Execute is the tool's main entrypoint. SDK serves it over gRPC. func (t *NmapTool) Execute(ctx context.Context, in proto.Message) (proto.Message, error) { req := in.(*nmappb.NmapRequest) // 1. Run nmap. args := []string{"-sV", "-oX", "-"} if req.ScanType != "" { args = append(args, "-s"+req.ScanType[:1]) } args = append(args, req.Targets...) out, err := exec.CommandContext(ctx, "nmap", args...).Output() if err != nil { return nil, err } // 2. Parse nmap's XML output (use a parser library — omitted here). var parsed nmapXML xml.Unmarshal(out, &parsed) // 3. Translate parsed XML → DiscoveryResult. // THIS is the part that lights up the knowledge graph. discovery := &graphragpb.DiscoveryResult{} for _, h := range parsed.Hosts { discovery.Hosts = append(discovery.Hosts, &graphragpb.Host{ Ip: h.Address, Hostname: ptr(h.Hostname), }) for _, p := range h.Ports { discovery.Ports = append(discovery.Ports, &graphragpb.Port{ Number: int32(p.Number), Protocol: p.Protocol, State: ptr(p.State), HostId: h.Address, // ← edge: Port BELONGS_TO Host }) if p.Service != "" { discovery.Services = append(discovery.Services, &graphragpb.Service{ Name: p.Service, PortId: fmt.Sprintf("%s:%d", h.Address, p.Number), // ← edge: Service RUNS_ON Port Version: p.Version, }) } } } // 4. Return the response with discovery in field 100. return &nmappb.NmapResponse{ RawXml: string(out), HostsScanned: int32(len(parsed.Hosts)), ScanDurationSeconds: parsed.Duration, Discovery: discovery, // ← the magic field }, nil } func ptr[T any](v T) *T { return &v } func main() { // serve.Tool handles all gRPC plumbing, FileDescriptorSet registration // with gibson, mode selection (local vs platform), health probes, OTel. if err := serve.Tool(&NmapTool{}); err != nil { panic(err) } } ``` 这就是整个工具——大约 80 行您的代码。 **步骤 4 — 部署** `make docker-build && make docker-push`,然后选择以下任一模式: - **本地模式**:部署为 Kubernetes 服务,暴露端口 `50051`,通过 `gibson-cli component install ` 安装。Gibson 将工作发送到您工具的 gRPC 端口。 - **平台模式**:设置 `PLATFORM_URL=gibson:50002` 环境变量。`serve.Tool()` 切换为出站模式——您的工具通过 `ComponentService.PollWork` 轮询 gibson 以获取工作,而不是运行自己的服务器。更适合扩展:副本可以自由增减,而无需重新配置 gibson。 ``` apiVersion: apps/v1 kind: Deployment metadata: name: my-nmap-tool spec: replicas: 5 # Scale horizontally template: spec: containers: - name: tool image: ghcr.io/you/my-nmap-tool:v0.1.0 env: - name: PLATFORM_URL value: gibson:50002 - name: BOOTSTRAP_TOKEN valueFrom: secretKeyRef: name: gibson-bootstrap key: token ``` 首次连接时,您的工具会通过 `RegisterComponent` 将其 FileDescriptorSet(即 `NmapRequest`/`NmapResponse` 的 schema)发送给 gibson。Gibson 通过 `protoresolver` 缓存它。从此时起,gibson 知道如何按名称调用您的工具以及如何解释其响应——而无需对您工具的 Go 类型有任何编译时依赖。 **步骤 5 — 代理如何使用它** 在代理(一个独立的组件)中: ``` import ( nmappb "github.com/you/my-nmap-tool/api/gen/nmap/v1" // ← agent imports the tool's Go bindings ) // Inside the agent's reasoning loop: req := &nmappb.NmapRequest{ Targets: []string{"10.0.0.0/24"}, ScanType: "syn", Ports: []string{"1-1024"}, } resp := &nmappb.NmapResponse{} if err := h.CallToolProto(ctx, "my-nmap-tool", req, resp); err != nil { return err } slog.Info("nmap done", "hosts", resp.HostsScanned, "duration", resp.ScanDurationSeconds) // The agent does NOT need to do anything for the graph — gibson already // extracted resp.Discovery and wrote Hosts, Ports, Services + their // relationships to Neo4j BEFORE returning the response to the agent. ``` ### gibson 自动执行的操作(“免费”部分) 当您工具的响应到达 gibson 的 `HarnessCallbackService.CallToolProto` 处理程序的那一刻,`DiscoveryProcessor` 会: 1. 使用 `protoresolver` 找到已缓存 `NmapResponse` 描述符的字段 100。 2. 对 `resp.Discovery` 进行反射,提取出每一个 `Host`、`Port`、`Service` 等。 3. 对于每一个实体,调用 GraphRAG `loader` 写入/更新对应的 Neo4j 节点——分类法类型化(`:Host {ip: "10.0.0.5"}`、`:Port {number: 443, protocol: "tcp"}`、`:Service {name: "https"}`)。 4. 根据 `HostId` / `PortId` 交叉引用创建关系:`(:Port)-[:BELONGS_TO]->(:Host)`、`(:Service)-[:RUNS_ON]->(:Port)`。 5. 为所有内容打上任务/代理/运行 ID 的标签,以便编排器的跨任务智能查询稍后可以说“编排器关于 10.0.0.5 的先前知识包括来自任务 XYZ 的这 12 个端口。” ### 您不需要做的 - 编写任何 Neo4j Cypher。 - 修改此 SDK 的源代码。 - 提前告诉 gibson 您的 proto 类型(在注册时通过 FileDescriptorSet 自动发现)。 - 实现任何“保存到图”的逻辑——根据字段 100 的契约,这是 gibson 的责任。 - 关心是哪个任务、目标或代理调用了您——gibson 会自动使用该上下文标记图条目。 ### 一句话总结 您实现 `Execute(req) → resp`,将结构化的发现结果放在字段 100 中,平台将处理持久化 + 跨任务学习 + 基于您工具输出的智能查询,而您无需编写一行平台代码。 ## 构建插件 插件提供有状态的服务集成: ``` type SlackPlugin struct { client *slack.Client } func (p *SlackPlugin) Name() string { return "slack" } func (p *SlackPlugin) Version() string { return "1.0.0" } func (p *SlackPlugin) Initialize(ctx context.Context, cfg plugin.PluginConfig) error { token := cfg.Settings["token"].(string) p.client = slack.New(token) return nil } func (p *SlackPlugin) Methods() []plugin.MethodDescriptor { return []plugin.MethodDescriptor{ { Name: "send_message", Description: "Send a message to a Slack channel", InputSchema: schema.JSON(`{ "type": "object", "properties": { "channel": {"type": "string"}, "message": {"type": "string"} }, "required": ["channel", "message"] }`), }, } } func (p *SlackPlugin) Query(ctx context.Context, method string, params map[string]any) (any, error) { switch method { case "send_message": channel := params["channel"].(string) message := params["message"].(string) _, _, err := p.client.PostMessage(channel, slack.MsgOptionText(message, false)) return map[string]any{"sent": true}, err default: return nil, fmt.Errorf("unknown method: %s", method) } } func main() { serve.Plugin(&SlackPlugin{}, serve.WithPort(50053)) } ``` ## GraphRAG 知识图谱 代理发现的每个实体都通过完整的关系映射持久化在 Neo4j 中: ``` // Create entities with automatic UUID assignment host := graphrag.NewHost() host.Ip = "192.168.1.100" host.Hostname = "api-server" host.Os = "Ubuntu 22.04" // Child entities auto-wire parent relationships port := graphrag.NewPort(host, 443, "tcp") service := graphrag.NewService(port, "nginx") endpoint := graphrag.NewEndpoint(service, "/api/v1/status") // Store in graph h.StoreNode(ctx, host.ToProto()) // Query knowledge results, _ := h.QueryNodes(ctx, &graphragpb.GraphQuery{ NodeType: "host", Filters: map[string]string{"os": "Ubuntu*"}, }) ``` **Schema 亮点:** ``` Host ──[HAS_PORT]──▶ Port ──[RUNS_SERVICE]──▶ Service ──[HAS_ENDPOINT]──▶ Endpoint Domain ──[HAS_SUBDOMAIN]──▶ Subdomain ──[RESOLVES_TO]──▶ Host ``` - 基于 UUID 的身份标识以实现可合并性 - 基于 CEL 的验证规则 - YAML 驱动的分类法(单一事实来源) - 跨任务智能 ## 包结构 ``` sdk/ ├── agent/ # Agent interfaces and types ├── tool/ # Tool interfaces and worker system ├── plugin/ # Plugin interfaces ├── llm/ # LLM abstractions and message types ├── memory/ # Three-tier memory APIs ├── mission/ # Mission context types ├── result/ # Execution result types ├── health/ # Health check utilities ├── serve/ # gRPC serving utilities ├── graphrag/ # Knowledge graph integration │ ├── domain/ # Generated domain types │ ├── validation/ # CEL-based validators │ └── id/ # Node ID generation ├── taxonomy/ # YAML-driven taxonomy └── examples/ # Reference implementations ``` ## gRPC 服务选项 ``` serve.Agent(agent, serve.WithPort(50051), serve.WithHealthPort(8080), // K8s probes: /healthz, /readyz serve.WithTLS(certFile, keyFile), serve.WithLogger(logger), ) serve.Tool(tool, serve.WithPort(50052)) serve.Plugin(plugin, serve.WithPort(50053)) ``` ## 文档 | 指南 | 描述 | |-------|-------------| | [代理开发](docs/AGENTS.md) | 构建自主代理的完整指南 | | [工具开发](docs/TOOLS.md) | 构建具有自动图存储的工具 | | [插件开发](docs/PLUGINS.md) | 创建有状态的服务集成 | ## 使用案例 Gibson 代理可以自动化任何领域: | 领域 | 示例代理 | |--------|----------------| | **DevOps** | K8s 故障排查器、日志分析器、事件响应器 | | **平台工程** | 漂移检测器、成本优化器、合规审计器 | | **安全** | 漏洞扫描器、渗透测试器、威胁猎手 | | **数据工程** | Pipeline 监控器、Schema 验证器、ETL 编排器 | | **自定义工作流** | 任何自主代理能增加价值的领域 | ## Authz 注册表 SDK protos 带有描述每个 RPC 授权规则的 `(gibson.auth.v1.authz)` 注解。SDK 代码生成工具 (`cmd/authz-registry-gen`) 是将这些注解渲染为运行时工件的标准来源。 渲染后的输出(`registry.go`、`registry.yaml`、`permissions.ts`)**不再从此仓库发布**。它位于私有 `zero-day-ai/gibson` 仓库的 `internal/authz/registry/` 目录中,并在每次 gibson 发布时作为私有 OCI 制品 (`ghcr.io/zero-day-ai/internal-authz-registry`) 发布。 - **平台工程师**:在 `zero-day-ai/gibson` 中运行 `make authz-registry` 以重新生成。 - **SDK 贡献者**:在此仓库中运行 `make proto-authz-registry-emit` 以在本地验证注解更改(输出到 `tmp/authz/`,不提交)。 规范:`private-authz-registry`。 ## 许可证 专有 - Zero-Day.AI
标签:AI 工具链, AI 智能体, Claude, CVE检测, DLL 劫持, EVTX分析, Gemini, Gibson SDK, Go SDK, Go 语言, GPT, Helm, K8s 运维, Kubernetes 排障, LLM 推理, LLM评估, Neo4j, NIDS, Ollama, Python工具, Redis, 云部署, 人工智能开发框架, 分布式队列, 大模型, 大语言模型, 子域名突变, 容器化, 工具调用, 插件开发, 搜索引擎查询, 数字取证, 日志审计, 漏洞管理, 用户代理, 网络调试, 自主代理, 自动化, 自动化脚本, 请求拦截